Retry failed rebind attempts
When we receive a migration, if rebinding to the new listen address and port fails for a reason which might be fixable, rather than killing the server we retry once a second. Also in this patch: non-overlapping log messages and a fix for the client going away halfway through a sendfile loop.
This commit is contained in:
@@ -366,7 +366,11 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
|
|||||||
client_write_reply( client, &request, 0);
|
client_write_reply( client, &request, 0);
|
||||||
|
|
||||||
offset = request.from;
|
offset = request.from;
|
||||||
FATAL_IF_NEGATIVE(
|
|
||||||
|
/* If we get cut off partway through this sendfile, we don't
|
||||||
|
* want to kill the server. This should be an error.
|
||||||
|
*/
|
||||||
|
ERROR_IF_NEGATIVE(
|
||||||
sendfileloop(
|
sendfileloop(
|
||||||
client->socket,
|
client->socket,
|
||||||
client->fileno,
|
client->fileno,
|
||||||
|
77
src/serve.c
77
src/serve.c
@@ -192,6 +192,62 @@ int server_port( struct server * server )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Try to bind to our serving socket, retrying until it works or gives a
|
||||||
|
* fatal error. */
|
||||||
|
void serve_bind( struct server * serve )
|
||||||
|
{
|
||||||
|
int bind_result;
|
||||||
|
|
||||||
|
char s_address[64];
|
||||||
|
memset( s_address, 0, 64 );
|
||||||
|
strcpy( s_address, "???" );
|
||||||
|
inet_ntop( serve->bind_to.generic.sa_family,
|
||||||
|
sockaddr_address_data( &serve->bind_to.generic),
|
||||||
|
s_address, 64 );
|
||||||
|
|
||||||
|
do {
|
||||||
|
bind_result = bind(
|
||||||
|
serve->server_fd,
|
||||||
|
&serve->bind_to.generic,
|
||||||
|
sizeof(serve->bind_to));
|
||||||
|
|
||||||
|
if ( 0 == bind_result ) {
|
||||||
|
info( "Bound to %s port %d",
|
||||||
|
s_address,
|
||||||
|
ntohs(serve->bind_to.v4.sin_port));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
|
||||||
|
warn( "Couldn't bind to %s port %d: %s",
|
||||||
|
s_address,
|
||||||
|
ntohs(serve->bind_to.v4.sin_port),
|
||||||
|
strerror( errno ) );
|
||||||
|
|
||||||
|
switch (errno){
|
||||||
|
/* bind() can give us EACCES,
|
||||||
|
* EADDRINUSE, EADDRNOTAVAIL, EBADF,
|
||||||
|
* EINVAL or ENOTSOCK.
|
||||||
|
*
|
||||||
|
* Any of these other than EACCES,
|
||||||
|
* EADDRINUSE or EADDRNOTAVAIL signify
|
||||||
|
* that there's a logic error somewhere.
|
||||||
|
*/
|
||||||
|
case EACCES:
|
||||||
|
case EADDRINUSE:
|
||||||
|
case EADDRNOTAVAIL:
|
||||||
|
debug("retrying");
|
||||||
|
sleep(1);
|
||||||
|
continue;
|
||||||
|
default:
|
||||||
|
fatal( "Giving up" );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while ( 1 );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/** Prepares a listening socket for the NBD server, binding etc. */
|
/** Prepares a listening socket for the NBD server, binding etc. */
|
||||||
void serve_open_server_socket(struct server* params)
|
void serve_open_server_socket(struct server* params)
|
||||||
{
|
{
|
||||||
@@ -205,21 +261,32 @@ void serve_open_server_socket(struct server* params)
|
|||||||
FATAL_IF_NEGATIVE(params->server_fd,
|
FATAL_IF_NEGATIVE(params->server_fd,
|
||||||
"Couldn't create server socket");
|
"Couldn't create server socket");
|
||||||
|
|
||||||
|
/* We need SO_REUSEADDR so that when we switch from listening to
|
||||||
|
* serving we don't have to change address if we don't want to.
|
||||||
|
*
|
||||||
|
* If this fails, it's not necessarily bad in principle, but at
|
||||||
|
* this point in the code we can't tell if it's going to be a
|
||||||
|
* problem. It's also indicative of something odd going on, so
|
||||||
|
* we barf.
|
||||||
|
*/
|
||||||
FATAL_IF_NEGATIVE(
|
FATAL_IF_NEGATIVE(
|
||||||
setsockopt(params->server_fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)),
|
setsockopt(params->server_fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)),
|
||||||
"Couldn't set SO_REUSEADDR"
|
"Couldn't set SO_REUSEADDR"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/* TCP_NODELAY makes everything not be slow. If we can't set
|
||||||
|
* this, again, there's something odd going on which we don't
|
||||||
|
* understand.
|
||||||
|
*/
|
||||||
FATAL_IF_NEGATIVE(
|
FATAL_IF_NEGATIVE(
|
||||||
setsockopt(params->server_fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)),
|
setsockopt(params->server_fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)),
|
||||||
"Couldn't set TCP_NODELAY"
|
"Couldn't set TCP_NODELAY"
|
||||||
);
|
);
|
||||||
|
|
||||||
FATAL_IF_NEGATIVE(
|
/* If we can't bind, presumably that's because someone else is
|
||||||
bind(params->server_fd, ¶ms->bind_to.generic,
|
* squatting on our ip/port combo, or the ip isn't yet
|
||||||
sizeof(params->bind_to)),
|
* configured. Ideally we want to retry this. */
|
||||||
"Couldn't bind server to IP address"
|
serve_bind(params);
|
||||||
);
|
|
||||||
|
|
||||||
FATAL_IF_NEGATIVE(
|
FATAL_IF_NEGATIVE(
|
||||||
listen(params->server_fd, params->tcp_backlog),
|
listen(params->server_fd, params->tcp_backlog),
|
||||||
|
15
src/util.c
15
src/util.c
@@ -45,12 +45,23 @@ void mylog(int line_level, const char* format, ...)
|
|||||||
{
|
{
|
||||||
va_list argptr;
|
va_list argptr;
|
||||||
|
|
||||||
|
|
||||||
if (line_level < log_level) { return; }
|
if (line_level < log_level) { return; }
|
||||||
|
|
||||||
|
/* Copy the format sideways so that we can append a "\n" to it
|
||||||
|
* and avoid a second fprintf. This stops log lines from getting
|
||||||
|
* interleaved.
|
||||||
|
*/
|
||||||
|
int format_len = strlen( format );
|
||||||
|
char *format_n = xmalloc( format_len + 2 );
|
||||||
|
memcpy( format_n, format, format_len );
|
||||||
|
format_n[format_len] = '\n';
|
||||||
|
|
||||||
va_start(argptr, format);
|
va_start(argptr, format);
|
||||||
vfprintf(stderr, format, argptr);
|
vfprintf(stderr, format_n, argptr);
|
||||||
va_end(argptr);
|
va_end(argptr);
|
||||||
fprintf(stderr, "\n");
|
|
||||||
|
free( format_n );
|
||||||
}
|
}
|
||||||
|
|
||||||
void* xrealloc(void* ptr, size_t size)
|
void* xrealloc(void* ptr, size_t size)
|
||||||
|
@@ -5,7 +5,7 @@ require 'file_writer'
|
|||||||
|
|
||||||
class Environment
|
class Environment
|
||||||
attr_reader( :blocksize, :filename1, :filename2, :ip,
|
attr_reader( :blocksize, :filename1, :filename2, :ip,
|
||||||
:port1, :port2, :nbd1, :nbd2, :file1, :file2 )
|
:port1, :port2, :nbd1, :nbd2, :file1, :file2, :rebind_port1 )
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@blocksize = 1024
|
@blocksize = 1024
|
||||||
@@ -14,9 +14,11 @@ class Environment
|
|||||||
@ip = "127.0.0.1"
|
@ip = "127.0.0.1"
|
||||||
@available_ports = [*40000..41000] - listening_ports
|
@available_ports = [*40000..41000] - listening_ports
|
||||||
@port1 = @available_ports.shift
|
@port1 = @available_ports.shift
|
||||||
|
@rebind_port1 = @available_ports.shift
|
||||||
@port2 = @available_ports.shift
|
@port2 = @available_ports.shift
|
||||||
@nbd1 = FlexNBD.new("../../build/flexnbd", @ip, @port1)
|
@rebind_port2 = @available_ports.shift
|
||||||
@nbd2 = FlexNBD.new("../../build/flexnbd", @ip, @port2)
|
@nbd1 = FlexNBD.new("../../build/flexnbd", @ip, @port1, @ip, @rebind_port1)
|
||||||
|
@nbd2 = FlexNBD.new("../../build/flexnbd", @ip, @port2, @ip, @rebind_port2)
|
||||||
|
|
||||||
@fake_pid = nil
|
@fake_pid = nil
|
||||||
end
|
end
|
||||||
@@ -95,6 +97,7 @@ class Environment
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
@nbd1.can_die(0)
|
||||||
@nbd1.kill
|
@nbd1.kill
|
||||||
@nbd2.kill
|
@nbd2.kill
|
||||||
|
|
||||||
@@ -104,7 +107,7 @@ class Environment
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
def run_fake( name, addr, port )
|
def run_fake( name, addr, port, rebind_addr = addr, rebind_port = port )
|
||||||
fakedir = File.join( File.dirname( __FILE__ ), "fakes" )
|
fakedir = File.join( File.dirname( __FILE__ ), "fakes" )
|
||||||
fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn|
|
fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn|
|
||||||
File.executable?( fn )
|
File.executable?( fn )
|
||||||
@@ -113,8 +116,11 @@ class Environment
|
|||||||
raise "no fake executable" unless fake
|
raise "no fake executable" unless fake
|
||||||
raise "no addr" unless addr
|
raise "no addr" unless addr
|
||||||
raise "no port" unless port
|
raise "no port" unless port
|
||||||
|
raise "no rebind_addr" unless rebind_addr
|
||||||
|
raise "no rebind_port" unless rebind_port
|
||||||
|
|
||||||
@fake_pid = fork do
|
@fake_pid = fork do
|
||||||
exec fake + " " + addr.to_s + " " + port.to_s + " " + @nbd1.pid.to_s
|
exec [fake, addr, port, @nbd1.pid, rebind_addr, rebind_port].map{|x| x.to_s}.join(" ")
|
||||||
end
|
end
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
end
|
end
|
||||||
|
@@ -24,5 +24,6 @@ client.close
|
|||||||
client2 = server.accept
|
client2 = server.accept
|
||||||
client2.receive_mirror
|
client2.receive_mirror
|
||||||
|
|
||||||
|
|
||||||
exit(0)
|
exit(0)
|
||||||
|
|
||||||
|
@@ -9,7 +9,7 @@
|
|||||||
require 'flexnbd/fake_source'
|
require 'flexnbd/fake_source'
|
||||||
include FlexNBD
|
include FlexNBD
|
||||||
|
|
||||||
addr, port, srv_pid = *ARGV
|
addr, port, srv_pid, rebind_addr, rebind_port = *ARGV
|
||||||
|
|
||||||
client = FakeSource.new( addr, port, "Timed out connecting" )
|
client = FakeSource.new( addr, port, "Timed out connecting" )
|
||||||
client.read_hello
|
client.read_hello
|
||||||
@@ -25,8 +25,8 @@ sleep(0.25)
|
|||||||
client2 = FakeSource.new( addr, port, "Timed out reconnecting to mirror" )
|
client2 = FakeSource.new( addr, port, "Timed out reconnecting to mirror" )
|
||||||
client2.send_mirror
|
client2.send_mirror
|
||||||
|
|
||||||
sleep(0.25)
|
sleep(1)
|
||||||
client3 = FakeSource.new( addr, port, "Timed out reconnecting to read" )
|
client3 = FakeSource.new( rebind_addr, rebind_port, "Timed out reconnecting to read" )
|
||||||
client3.close
|
client3.close
|
||||||
|
|
||||||
exit(0)
|
exit(0)
|
||||||
|
17
tests/acceptance/fakes/source/close_mid_read.rb
Executable file
17
tests/acceptance/fakes/source/close_mid_read.rb
Executable file
@@ -0,0 +1,17 @@
|
|||||||
|
#!/usr/bin/env ruby
|
||||||
|
|
||||||
|
# Connect, but get the protocol wrong: don't read the hello, so we
|
||||||
|
# close and break the sendfile.
|
||||||
|
|
||||||
|
require 'flexnbd/fake_source'
|
||||||
|
include FlexNBD
|
||||||
|
|
||||||
|
addr, port, srv_pid, newaddr, newport = *ARGV
|
||||||
|
|
||||||
|
client = FakeSource.new( addr, port, "Timed out connecting" )
|
||||||
|
client.write_read_request( 0, 8 )
|
||||||
|
client.read_raw( 4 )
|
||||||
|
client.close
|
||||||
|
|
||||||
|
|
||||||
|
exit(0)
|
29
tests/acceptance/fakes/source/successful_transfer.rb
Executable file
29
tests/acceptance/fakes/source/successful_transfer.rb
Executable file
@@ -0,0 +1,29 @@
|
|||||||
|
#!/usr/bin/env ruby
|
||||||
|
|
||||||
|
# Successfully send a migration, but squat on the IP and port which
|
||||||
|
# the destination wants to rebind to. The destination should retry
|
||||||
|
# every second, so we give it up then attempt to connect to the new
|
||||||
|
# server.
|
||||||
|
|
||||||
|
require 'flexnbd/fake_source'
|
||||||
|
include FlexNBD
|
||||||
|
|
||||||
|
addr, port, srv_pid, newaddr, newport = *ARGV
|
||||||
|
|
||||||
|
squatter = TCPServer.open( newaddr, newport.to_i )
|
||||||
|
|
||||||
|
client = FakeSource.new( addr, port, "Timed out connecting" )
|
||||||
|
client.send_mirror()
|
||||||
|
|
||||||
|
sleep(1)
|
||||||
|
|
||||||
|
squatter.close()
|
||||||
|
|
||||||
|
sleep(1)
|
||||||
|
|
||||||
|
client2 = FakeSource.new( newaddr, newport.to_i, "Timed out reconnecting" )
|
||||||
|
client2.read_hello
|
||||||
|
client2.read( 0, 8 )
|
||||||
|
client2.close
|
||||||
|
|
||||||
|
exit( 0 )
|
@@ -166,7 +166,7 @@ end # class ValgrindExecutor
|
|||||||
# Noddy test class to exercise FlexNBD from the outside for testing.
|
# Noddy test class to exercise FlexNBD from the outside for testing.
|
||||||
#
|
#
|
||||||
class FlexNBD
|
class FlexNBD
|
||||||
attr_reader :bin, :ctrl, :pid, :ip, :port
|
attr_reader :bin, :ctrl, :pid, :ip, :port, :rebind_ip, :rebind_port
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
def counter
|
def counter
|
||||||
@@ -187,7 +187,7 @@ class FlexNBD
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
def initialize(bin, ip, port)
|
def initialize(bin, ip, port, rebind_ip = ip, rebind_port = port)
|
||||||
@bin = bin
|
@bin = bin
|
||||||
@debug = (ENV['DEBUG'] && `#{@bin} serve --help` =~ /--verbose/) ? "--verbose" : ""
|
@debug = (ENV['DEBUG'] && `#{@bin} serve --help` =~ /--verbose/) ? "--verbose" : ""
|
||||||
raise "#{bin} not executable" unless File.executable?(bin)
|
raise "#{bin} not executable" unless File.executable?(bin)
|
||||||
@@ -195,6 +195,8 @@ class FlexNBD
|
|||||||
@ctrl = "/tmp/.flexnbd.ctrl.#{Time.now.to_i}.#{rand}"
|
@ctrl = "/tmp/.flexnbd.ctrl.#{Time.now.to_i}.#{rand}"
|
||||||
@ip = ip
|
@ip = ip
|
||||||
@port = port
|
@port = port
|
||||||
|
@rebind_ip = rebind_ip
|
||||||
|
@rebind_port = rebind_port
|
||||||
@kill = []
|
@kill = []
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -224,6 +226,8 @@ class FlexNBD
|
|||||||
"--addr #{ip} "\
|
"--addr #{ip} "\
|
||||||
"--port #{port} "\
|
"--port #{port} "\
|
||||||
"--file #{file} "\
|
"--file #{file} "\
|
||||||
|
"--rebind-addr #{rebind_ip} " \
|
||||||
|
"--rebind-port #{rebind_port} " \
|
||||||
"--sock #{ctrl} "\
|
"--sock #{ctrl} "\
|
||||||
"#{@debug} "\
|
"#{@debug} "\
|
||||||
"#{acl.join(' ')}"
|
"#{acl.join(' ')}"
|
||||||
|
@@ -54,12 +54,28 @@ module FlexNBD
|
|||||||
send_request( 2, handle )
|
send_request( 2, handle )
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def write_read_request( from, len, handle="myhandle" )
|
||||||
|
send_request( 0, "myhandle", from, len )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
def write_data( data )
|
def write_data( data )
|
||||||
@sock.write( data )
|
@sock.write( data )
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
# Handy utility
|
||||||
|
def read( from, len )
|
||||||
|
timing_out( 2, "Timed out reading" ) do
|
||||||
|
send_request( 0, "myhandle", from, len )
|
||||||
|
read_raw( len )
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def read_raw( len )
|
||||||
|
@sock.read( len )
|
||||||
|
end
|
||||||
|
|
||||||
def send_mirror
|
def send_mirror
|
||||||
read_hello()
|
read_hello()
|
||||||
write_write_request( 0, 8 )
|
write_write_request( 0, 8 )
|
||||||
|
@@ -34,6 +34,10 @@ class TestDestErrorHandling < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_partial_read_causes_error
|
||||||
|
run_fake( "source/close_mid_read" )
|
||||||
|
end
|
||||||
|
|
||||||
def test_double_connect_during_hello
|
def test_double_connect_during_hello
|
||||||
run_fake( "source/connect_during_hello" )
|
run_fake( "source/connect_during_hello" )
|
||||||
end
|
end
|
||||||
@@ -72,18 +76,32 @@ class TestDestErrorHandling < Test::Unit::TestCase
|
|||||||
# This fake runs a failed migration then a succeeding one, so we
|
# This fake runs a failed migration then a succeeding one, so we
|
||||||
# expect the destination to take control.
|
# expect the destination to take control.
|
||||||
run_fake( "source/close_after_entrust_reply" )
|
run_fake( "source/close_after_entrust_reply" )
|
||||||
|
assert_control
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_cant_rebind_retries
|
||||||
|
run_fake( "source/successful_transfer" )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
private
|
private
|
||||||
def run_fake( name )
|
def run_fake( name )
|
||||||
@env.run_fake( name, @env.ip, @env.port1 )
|
@env.run_fake( name, @env.ip, @env.port1, @env.ip, @env.rebind_port1 )
|
||||||
assert @env.fake_reports_success, "#{name} failed."
|
assert @env.fake_reports_success, "#{name} failed."
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def status
|
||||||
|
stat, _ = @env.status1
|
||||||
|
stat
|
||||||
|
end
|
||||||
|
|
||||||
def assert_no_control
|
def assert_no_control
|
||||||
status, stderr = @env.status1
|
|
||||||
assert !status['has_control'], "Thought it had control"
|
assert !status['has_control'], "Thought it had control"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def assert_control
|
||||||
|
assert status['has_control'], "Didn't think it had control"
|
||||||
|
end
|
||||||
|
|
||||||
end # class TestDestErrorHandling
|
end # class TestDestErrorHandling
|
||||||
|
@@ -80,6 +80,7 @@ class TestSourceErrorHandling < Test::Unit::TestCase
|
|||||||
|
|
||||||
|
|
||||||
def test_post_entrust_disconnect_causes_retry
|
def test_post_entrust_disconnect_causes_retry
|
||||||
|
@env.nbd1.can_die(0)
|
||||||
run_fake( "dest/close_after_entrust" )
|
run_fake( "dest/close_after_entrust" )
|
||||||
end
|
end
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user