Handle a failed disconnect correctly
If the sender disconnects its socket before sending the disconnect message, the destination should restart the migration process. This patch makes sure that happens.
This commit is contained in:
24
src/client.c
24
src/client.c
@@ -166,7 +166,7 @@ int fd_read_request( int fd, struct nbd_request_raw *out_request)
|
|||||||
|
|
||||||
/* Returns 1 if *request was filled with a valid request which we should
|
/* Returns 1 if *request was filled with a valid request which we should
|
||||||
* try to honour. 0 otherwise. */
|
* try to honour. 0 otherwise. */
|
||||||
int client_read_request( struct client * client , struct nbd_request *out_request )
|
int client_read_request( struct client * client , struct nbd_request *out_request, int * disconnected )
|
||||||
{
|
{
|
||||||
NULLCHECK( client );
|
NULLCHECK( client );
|
||||||
NULLCHECK( out_request );
|
NULLCHECK( out_request );
|
||||||
@@ -199,6 +199,7 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (fd_read_request(client->socket, &request_raw) == -1) {
|
if (fd_read_request(client->socket, &request_raw) == -1) {
|
||||||
|
*disconnected = 1;
|
||||||
switch( errno ){
|
switch( errno ){
|
||||||
case 0:
|
case 0:
|
||||||
debug( "EOF while reading request" );
|
debug( "EOF while reading request" );
|
||||||
@@ -296,8 +297,7 @@ void client_write_init( struct client * client, uint64_t size )
|
|||||||
* DISCONNECT and any bad request.
|
* DISCONNECT and any bad request.
|
||||||
*/
|
*/
|
||||||
int client_request_needs_reply( struct client * client,
|
int client_request_needs_reply( struct client * client,
|
||||||
struct nbd_request request,
|
struct nbd_request request )
|
||||||
int *should_disconnect )
|
|
||||||
{
|
{
|
||||||
debug("request type %d", request.type);
|
debug("request type %d", request.type);
|
||||||
|
|
||||||
@@ -323,7 +323,7 @@ int client_request_needs_reply( struct client * client,
|
|||||||
request.len
|
request.len
|
||||||
);
|
);
|
||||||
client_write_reply( client, &request, 1 );
|
client_write_reply( client, &request, 1 );
|
||||||
*should_disconnect = 0;
|
client->disconnect = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@@ -335,7 +335,7 @@ int client_request_needs_reply( struct client * client,
|
|||||||
break;
|
break;
|
||||||
case REQUEST_DISCONNECT:
|
case REQUEST_DISCONNECT:
|
||||||
debug("request disconnect");
|
debug("request disconnect");
|
||||||
*should_disconnect = 1;
|
client->disconnect = 1;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@@ -434,12 +434,13 @@ void client_reply( struct client* client, struct nbd_request request )
|
|||||||
int client_serve_request(struct client* client)
|
int client_serve_request(struct client* client)
|
||||||
{
|
{
|
||||||
struct nbd_request request;
|
struct nbd_request request;
|
||||||
int request_err;
|
|
||||||
int failure = 1;
|
int failure = 1;
|
||||||
|
int disconnected = 0;
|
||||||
|
|
||||||
if ( !client_read_request( client, &request ) ) { return failure; }
|
if ( !client_read_request( client, &request, &disconnected ) ) { return failure; }
|
||||||
if ( !client_request_needs_reply( client, request, &request_err ) ) {
|
if ( disconnected ) { return failure; }
|
||||||
return request_err;
|
if ( !client_request_needs_reply( client, request ) ) {
|
||||||
|
return client->disconnect;
|
||||||
}
|
}
|
||||||
|
|
||||||
server_lock_io( client->serve );
|
server_lock_io( client->serve );
|
||||||
@@ -502,9 +503,14 @@ void* client_serve(void* client_uncast)
|
|||||||
client->stopped = 1;
|
client->stopped = 1;
|
||||||
|
|
||||||
if ( client->entrusted ) {
|
if ( client->entrusted ) {
|
||||||
|
if ( client->disconnect ){
|
||||||
debug("client: control arrived" );
|
debug("client: control arrived" );
|
||||||
server_control_arrived( client->serve );
|
server_control_arrived( client->serve );
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
warn( "client: control transfer failed." );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
FATAL_IF_NEGATIVE(
|
FATAL_IF_NEGATIVE(
|
||||||
close(client->socket),
|
close(client->socket),
|
||||||
|
@@ -30,6 +30,9 @@ struct client {
|
|||||||
|
|
||||||
/* Have we seen a REQUEST_ENTRUST message? */
|
/* Have we seen a REQUEST_ENTRUST message? */
|
||||||
int entrusted;
|
int entrusted;
|
||||||
|
|
||||||
|
/* Have we seen a REQUEST_DISCONNECT message? */
|
||||||
|
int disconnect;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
36
tests/acceptance/fakes/dest/close_after_entrust_reply.rb
Executable file
36
tests/acceptance/fakes/dest/close_after_entrust_reply.rb
Executable file
@@ -0,0 +1,36 @@
|
|||||||
|
#!/usr/bin/env ruby
|
||||||
|
# encoding: utf-8
|
||||||
|
|
||||||
|
# Receive a mirror, and disconnect after sending the entrust reply but
|
||||||
|
# before it can send the disconnect signal.
|
||||||
|
#
|
||||||
|
# This test is currently unused: the sender can't detect that the
|
||||||
|
# write failed.
|
||||||
|
|
||||||
|
require 'flexnbd/fake_dest'
|
||||||
|
include FlexNBD
|
||||||
|
|
||||||
|
addr, port, src_pid = *ARGV
|
||||||
|
server = FakeDest.new( addr, port )
|
||||||
|
client = server.accept
|
||||||
|
|
||||||
|
client.write_hello
|
||||||
|
while (req = client.read_request; req[:type] == 1)
|
||||||
|
client.read_data( req[:len] )
|
||||||
|
client.write_reply( req[:handle] )
|
||||||
|
end
|
||||||
|
|
||||||
|
system "kill -STOP #{src_pid}"
|
||||||
|
client.write_reply( req[:handle] )
|
||||||
|
client.close
|
||||||
|
system "kill -CONT #{src_pid}"
|
||||||
|
|
||||||
|
sleep( 0.25 )
|
||||||
|
client2 = server.accept( "Timed out waiting for a reconnection" )
|
||||||
|
|
||||||
|
client2.close
|
||||||
|
server.close
|
||||||
|
|
||||||
|
$stderr.puts "done"
|
||||||
|
exit(0)
|
||||||
|
|
34
tests/acceptance/fakes/dest/error_on_entrust.rb
Executable file
34
tests/acceptance/fakes/dest/error_on_entrust.rb
Executable file
@@ -0,0 +1,34 @@
|
|||||||
|
#!/usr/bin/env ruby
|
||||||
|
# encoding: utf-8
|
||||||
|
|
||||||
|
# Receive a mirror, but respond to the entrust with an error. There's
|
||||||
|
# currently no code path in flexnbd which can do this, but we could
|
||||||
|
# add one.
|
||||||
|
|
||||||
|
require 'flexnbd/fake_dest'
|
||||||
|
include FlexNBD
|
||||||
|
|
||||||
|
addr, port = *ARGV
|
||||||
|
server = FakeDest.new( addr, port )
|
||||||
|
client = server.accept
|
||||||
|
|
||||||
|
client.write_hello
|
||||||
|
loop do
|
||||||
|
req = client.read_request
|
||||||
|
if req[:type] == 1
|
||||||
|
client.read_data( req[:len] )
|
||||||
|
client.write_reply( req[:handle] )
|
||||||
|
else
|
||||||
|
client.write_reply( req[:handle], 1 )
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
client.close
|
||||||
|
|
||||||
|
client2 = server.accept( "Timed out waiting for a reconnection" )
|
||||||
|
|
||||||
|
client2.close
|
||||||
|
server.close
|
||||||
|
|
||||||
|
exit(0)
|
32
tests/acceptance/fakes/source/close_after_entrust_reply.rb
Executable file
32
tests/acceptance/fakes/source/close_after_entrust_reply.rb
Executable file
@@ -0,0 +1,32 @@
|
|||||||
|
#!/usr/bin/env ruby
|
||||||
|
|
||||||
|
# Connect, send a migration, entrust then *immediately* disconnect.
|
||||||
|
# This simulates a client which fails while the client is blocked.
|
||||||
|
#
|
||||||
|
# We attempt to reconnect immediately afterwards to prove that we can
|
||||||
|
# retry the mirroring.
|
||||||
|
|
||||||
|
require 'flexnbd/fake_source'
|
||||||
|
include FlexNBD
|
||||||
|
|
||||||
|
addr, port, srv_pid = *ARGV
|
||||||
|
|
||||||
|
client = FakeSource.new( addr, port, "Timed out connecting" )
|
||||||
|
client.read_hello
|
||||||
|
client.write_write_request( 0, 8 )
|
||||||
|
client.write_data( "12345678" )
|
||||||
|
|
||||||
|
client.write_entrust_request
|
||||||
|
client.read_response
|
||||||
|
client.close
|
||||||
|
|
||||||
|
sleep(0.25)
|
||||||
|
|
||||||
|
client2 = FakeSource.new( addr, port, "Timed out reconnecting to mirror" )
|
||||||
|
client2.send_mirror
|
||||||
|
|
||||||
|
sleep(0.25)
|
||||||
|
client3 = FakeSource.new( addr, port, "Timed out reconnecting to read" )
|
||||||
|
client3.close
|
||||||
|
|
||||||
|
exit(0)
|
@@ -94,7 +94,7 @@ module FlexNBD
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
def receive_mirror
|
def receive_mirror( opts = {} )
|
||||||
write_hello()
|
write_hello()
|
||||||
loop do
|
loop do
|
||||||
req = read_request
|
req = read_request
|
||||||
@@ -103,7 +103,7 @@ module FlexNBD
|
|||||||
read_data( req[:len] )
|
read_data( req[:len] )
|
||||||
write_reply( req[:handle] )
|
write_reply( req[:handle] )
|
||||||
when 65536
|
when 65536
|
||||||
write_reply( req[:handle] )
|
write_reply( req[:handle], opts[:err] == :entrust ? 1 : 0 )
|
||||||
break
|
break
|
||||||
else
|
else
|
||||||
raise "Unexpected request: #{req.inspect}"
|
raise "Unexpected request: #{req.inspect}"
|
||||||
|
@@ -50,12 +50,27 @@ module FlexNBD
|
|||||||
send_request( 65536, handle )
|
send_request( 65536, handle )
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def write_disconnect_request( handle="myhandle" )
|
||||||
|
send_request( 2, handle )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
def write_data( data )
|
def write_data( data )
|
||||||
@sock.write( data )
|
@sock.write( data )
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def send_mirror
|
||||||
|
read_hello()
|
||||||
|
write_write_request( 0, 8 )
|
||||||
|
write_data( "12345678" )
|
||||||
|
read_response()
|
||||||
|
write_entrust_request()
|
||||||
|
read_response()
|
||||||
|
write_disconnect_request()
|
||||||
|
close()
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
def read_response
|
def read_response
|
||||||
magic = @sock.read(4)
|
magic = @sock.read(4)
|
||||||
|
@@ -66,6 +66,14 @@ class TestDestErrorHandling < Test::Unit::TestCase
|
|||||||
run_fake( "source/close_after_write_data" )
|
run_fake( "source/close_after_write_data" )
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_disconnect_after_entrust_reply_causes_error
|
||||||
|
@env.nbd1.can_die(0)
|
||||||
|
# This fake runs a failed migration then a succeeding one, so we
|
||||||
|
# expect the destination to take control.
|
||||||
|
run_fake( "source/close_after_entrust_reply" )
|
||||||
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
def run_fake( name )
|
def run_fake( name )
|
||||||
@env.run_fake( name, @env.ip, @env.port1 )
|
@env.run_fake( name, @env.ip, @env.port1 )
|
||||||
|
@@ -85,6 +85,12 @@ class TestSourceErrorHandling < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_entrust_error_causes_retry
|
||||||
|
run_fake( "dest/error_on_entrust" )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private
|
private
|
||||||
def run_fake(name, opts = {})
|
def run_fake(name, opts = {})
|
||||||
@env.run_fake( name, @env.ip, @env.port2 )
|
@env.run_fake( name, @env.ip, @env.port2 )
|
||||||
|
Reference in New Issue
Block a user