From f3e0d613238ac4386f539021872aaea9285a9438 Mon Sep 17 00:00:00 2001 From: Alex Young Date: Thu, 4 Oct 2012 14:41:55 +0100 Subject: [PATCH] Quit with an error status on SIGTERM during migration This prevents the supervisor from thinking that the migration completed successfully. In order to do this, I've introduced a new lock around the start (and finish) of the migration so that we avoid a race between the signal handler in the server_accept loop and the control thread mirror startup. Without that, we'd risk successfully starting a migration after the SIGTERM handler fired, which would be Bad. --- src/control.c | 82 ++++++++++------ src/mirror.c | 1 + src/serve.c | 93 ++++++++++++++++++- src/serve.h | 13 +++ tests/acceptance/environment.rb | 5 +- .../fakes/dest/sigterm_after_hello.rb | 19 ++++ .../fakes/source/sigterm_after_hello.rb | 20 ++++ tests/acceptance/flexnbd.rb | 5 +- tests/acceptance/test_dest_error_handling.rb | 5 + .../acceptance/test_source_error_handling.rb | 20 ++++ 10 files changed, 226 insertions(+), 37 deletions(-) create mode 100755 tests/acceptance/fakes/dest/sigterm_after_hello.rb create mode 100755 tests/acceptance/fakes/source/sigterm_after_hello.rb diff --git a/src/control.c b/src/control.c index c3f459b..d81e4e8 100644 --- a/src/control.c +++ b/src/control.c @@ -361,19 +361,35 @@ int control_mirror(struct control_client* client, int linesc, char** lines) struct server * serve = flexnbd_server(flexnbd); - if ( serve->mirror_super ) { - warn( "Tried to start a second mirror run" ); - write_socket( "1: mirror already running" ); - } else { - serve->mirror_super = mirror_super_create( - serve->filename, - connect_to, - connect_from, - max_Bps , - action_at_finish, - client->mirror_state_mbox ); - serve->mirror = serve->mirror_super->mirror; + server_lock_start_mirror( serve ); + { + if ( server_mirror_can_start( serve ) ) { + serve->mirror_super = mirror_super_create( + serve->filename, + connect_to, + connect_from, + max_Bps , + action_at_finish, + client->mirror_state_mbox ); + serve->mirror = serve->mirror_super->mirror; + server_prevent_mirror_start( serve ); + } else { + if ( serve->mirror_super ) { + warn( "Tried to start a second mirror run" ); + write_socket( "1: mirror already running" ); + } else { + warn( "Cannot start mirroring, shutting down" ); + write_socket( "1: shutting down" ); + } + } + } + server_unlock_start_mirror( serve ); + + /* Do this outside the lock to minimise the length of time the + * sighandler can block the serve thread + */ + if ( serve->mirror_super ) { FATAL_IF( 0 != pthread_create( &serve->mirror_super->thread, NULL, @@ -389,6 +405,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines) debug("Control thread writing response"); control_write_mirror_response( state, client->socket ); } + debug( "Control thread going away." ); return 0; @@ -438,28 +455,33 @@ int control_break( struct flexnbd* flexnbd = client->flexnbd; struct server * serve = flexnbd_server( flexnbd ); - if ( server_is_mirroring( serve ) ) { - info( "Signaling to abandon mirror" ); - server_abandon_mirror( serve ); - debug( "Abandon signaled" ); + server_lock_start_mirror( serve ); + { + if ( server_is_mirroring( serve ) ) { - if ( server_is_closed( serve ) ) { - info( "Mirror completed while canceling" ); - write( client->socket, - "1: mirror completed\n", 20 ); + info( "Signaling to abandon mirror" ); + server_abandon_mirror( serve ); + debug( "Abandon signaled" ); + + if ( server_is_closed( serve ) ) { + info( "Mirror completed while canceling" ); + write( client->socket, + "1: mirror completed\n", 20 ); + } + else { + info( "Mirror successfully stopped." ); + write( client->socket, + "0: mirror stopped\n", 18 ); + result = 1; + } + + } else { + warn( "Not mirroring." ); + write( client->socket, "1: not mirroring\n", 17 ); } - else { - info( "Mirror successfully stopped." ); - write( client->socket, - "0: mirror stopped\n", 18 ); - result = 1; - } - - } else { - warn( "Not mirroring." ); - write( client->socket, "1: not mirroring\n", 17 ); } + server_unlock_start_mirror( serve ); return result; } diff --git a/src/mirror.c b/src/mirror.c index d5fb9a9..13978eb 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -568,6 +568,7 @@ void * mirror_super_runner( void * serve_uncast ) serve->mirror = NULL; serve->mirror_super = NULL; + server_allow_mirror_start( serve ); mirror_super_destroy( super ); debug( "Mirror supervisor done." ); diff --git a/src/serve.c b/src/serve.c index 777a3f6..86ae7ce 100644 --- a/src/serve.c +++ b/src/serve.c @@ -83,6 +83,9 @@ struct server * server_create ( out->l_io = flexthread_mutex_create(); out->l_acl= flexthread_mutex_create(); + out->l_start_mirror = flexthread_mutex_create(); + + out->mirror_can_start = 1; out->close_signal = self_pipe_create(); out->acl_updated_signal = self_pipe_create(); @@ -100,6 +103,7 @@ void server_destroy( struct server * serve ) self_pipe_destroy( serve->close_signal ); serve->close_signal = NULL; + flexthread_mutex_destroy( serve->l_start_mirror ); flexthread_mutex_destroy( serve->l_acl ); flexthread_mutex_destroy( serve->l_io ); @@ -177,6 +181,8 @@ void server_lock_acl( struct server *serve ) void server_unlock_acl( struct server *serve ) { + debug( "ACL unlocking" ); + SERVER_UNLOCK( serve, l_acl, "Problem with ACL unlock" ); } @@ -188,6 +194,26 @@ int server_acl_locked( struct server * serve ) } +void server_lock_start_mirror( struct server *serve ) +{ + debug("Mirror start locking"); + + SERVER_LOCK( serve, l_start_mirror, "Problem with start mirror lock" ); +} + +void server_unlock_start_mirror( struct server *serve ) +{ + debug("Mirror start unlocking"); + + SERVER_UNLOCK( serve, l_start_mirror, "Problem with start mirror unlock" ); +} + +int server_start_mirror_locked( struct server * serve ) +{ + NULLCHECK( serve ); + return flexthread_mutex_held( serve->l_start_mirror ); +} + /** Return the actual port the server bound to. This is used because we * are allowed to pass "0" on the command-line. */ @@ -628,6 +654,49 @@ void server_replace_acl( struct server *serve, struct acl * new_acl ) } +void server_prevent_mirror_start( struct server *serve ) +{ + NULLCHECK( serve ); + + serve->mirror_can_start = 0; +} + +void server_allow_mirror_start( struct server *serve ) +{ + NULLCHECK( serve ); + + serve->mirror_can_start = 1; +} + + +/* Only call this with the mirror start lock held */ +int server_mirror_can_start( struct server *serve ) +{ + NULLCHECK( serve ); + + return serve->mirror_can_start; +} + + +void serve_handle_signal( struct server *params ) +{ + int should_die = 0; + server_lock_start_mirror( params ); + { + if ( server_is_mirroring( params ) ) { + should_die = 1; + server_prevent_mirror_start( params ); + } + } + server_unlock_start_mirror( params ); + + if ( should_die ){ + fatal( "Stop signal received while mirroring." ); + } else { + server_close_clients( params ); + } +} + /** Accept either an NBD or control socket connection, dispatch appropriately */ int server_accept( struct server * params ) @@ -658,7 +727,11 @@ int server_accept( struct server * params ) if ( 0 < signal_fd && FD_ISSET( signal_fd, &fds ) ){ debug( "Stop signal received." ); - server_close_clients( params ); + serve_handle_signal( params ); + + /* serve_handle_signal will fatal() if it has to, so it + * might not return at all. + */ return 0; } @@ -752,9 +825,18 @@ void serve_cleanup(struct server* params, free(params->allocation_map); } - if ( server_is_mirroring( params ) ) { - server_abandon_mirror( params ); + int need_mirror_lock; + need_mirror_lock = !server_start_mirror_locked( params ); + + if ( need_mirror_lock ) { server_lock_start_mirror( params ); } + { + if ( server_is_mirroring( params ) ) { + server_abandon_mirror( params ); + } + server_prevent_mirror_start( params ); } + if ( need_mirror_lock ) { server_unlock_start_mirror( params ); } + for (i=0; i < params->max_nbd_clients; i++) { void* status; @@ -766,6 +848,10 @@ void serve_cleanup(struct server* params, } } + if ( server_start_mirror_locked( params ) ) { + server_unlock_start_mirror( params ); + } + if ( server_acl_locked( params ) ) { server_unlock_acl( params ); } @@ -786,6 +872,7 @@ int server_is_mirroring( struct server * serve ) return !!serve->mirror_super; } +/* This must only be called with the start_mirror lock held */ void server_abandon_mirror( struct server * serve ) { NULLCHECK( serve ); diff --git a/src/serve.h b/src/serve.h index 706825d..02c7021 100644 --- a/src/serve.h +++ b/src/serve.h @@ -53,8 +53,16 @@ struct server { /* Claimed around any updates to the ACL. */ struct flexthread_mutex * l_acl; + /* Claimed around starting a mirror so that it doesn't race with + * shutting down on a SIGTERM. */ + struct flexthread_mutex * l_start_mirror; + struct mirror* mirror; struct mirror_super * mirror_super; + /* This is used to stop the mirror from starting after we + * receive a SIGTERM */ + int mirror_can_start; + int server_fd; int control_fd; @@ -96,8 +104,13 @@ int server_io_locked( struct server * serve ); int server_acl_locked( struct server * serve ); void server_lock_acl( struct server *serve ); void server_unlock_acl( struct server *serve ); +void server_lock_start_mirror( struct server *serve ); +void server_unlock_start_mirror( struct server *serve ); int server_is_mirroring( struct server * serve ); void server_abandon_mirror( struct server * serve ); +void server_prevent_mirror_start( struct server *serve ); +void server_allow_mirror_start( struct server *serve ); +int server_mirror_can_start( struct server *serve ); void server_unlink( struct server * serve ); diff --git a/tests/acceptance/environment.rb b/tests/acceptance/environment.rb index 5c1c1c8..5b80cd2 100644 --- a/tests/acceptance/environment.rb +++ b/tests/acceptance/environment.rb @@ -121,11 +121,12 @@ class Environment def run_fake( name, addr, port, sock=nil ) fakedir = File.join( File.dirname( __FILE__ ), "fakes" ) - fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn| + fakeglob = File.join( fakedir, name ) + "*" + fake = Dir[fakeglob].sort.find { |fn| File.executable?( fn ) } - raise "no fake executable" unless fake + raise "no fake executable at #{fakeglob}" unless fake raise "no addr" unless addr raise "no port" unless port diff --git a/tests/acceptance/fakes/dest/sigterm_after_hello.rb b/tests/acceptance/fakes/dest/sigterm_after_hello.rb new file mode 100755 index 0000000..b4773f9 --- /dev/null +++ b/tests/acceptance/fakes/dest/sigterm_after_hello.rb @@ -0,0 +1,19 @@ +#!/usr/bin/env ruby + +# Wait for a sender connection, send a correct hello, then sigterm the +# sender. We expect the sender to exit with status of 6, which is +# enforced in the test. + +require 'flexnbd/fake_dest' +include FlexNBD + +addr, port, pid = *ARGV +server = FakeDest.new( addr, port ) +client = server.accept( "Timed out waiting for a connection" ) +client.write_hello + +Process.kill(15, pid.to_i) + +client.close +server.close +exit 0 diff --git a/tests/acceptance/fakes/source/sigterm_after_hello.rb b/tests/acceptance/fakes/source/sigterm_after_hello.rb new file mode 100755 index 0000000..6c7aaf9 --- /dev/null +++ b/tests/acceptance/fakes/source/sigterm_after_hello.rb @@ -0,0 +1,20 @@ +#!/usr/bin/env ruby + +# Connect to the listener, wait for the hello, then sigterm the +# listener. We expect the listener to exit with a status of 6, which +# is enforced in the test. + +require 'flexnbd/fake_source' +include FlexNBD + +addr, port, pid = *ARGV + +client = FakeSource.new( addr, port, "Timed out connecting." ) +client.read_hello + +Process.kill( "TERM", pid.to_i ) + +sleep(0.2) +client.close + +exit(0) diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index 1262982..c21e1d9 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -136,11 +136,11 @@ class ValgrindKillingExecutor def call( err ) - Process.kill( "KILL", @pid ) $stderr.puts "*"*72 $stderr.puts "* Valgrind error spotted:" $stderr.puts err.to_s.split("\n").map{|s| " #{s}"} $stderr.puts "*"*72 + Process.kill( "KILL", @pid ) exit(1) end @@ -323,7 +323,8 @@ module FlexNBD def serve( file, *acl) - run_serve_cmd( serve_cmd( file, acl ) ) + cmd = serve_cmd( file, acl ) + run_serve_cmd( cmd ) end def listen(file, *acl) diff --git a/tests/acceptance/test_dest_error_handling.rb b/tests/acceptance/test_dest_error_handling.rb index 73330ae..e358868 100644 --- a/tests/acceptance/test_dest_error_handling.rb +++ b/tests/acceptance/test_dest_error_handling.rb @@ -28,6 +28,11 @@ class TestDestErrorHandling < Test::Unit::TestCase end + def test_sigterm_has_bad_exit_status + @env.nbd1.can_die(1) + run_fake( "source/sigterm_after_hello" ) + end + def test_disconnect_after_hello_causes_error_not_fatal run_fake( "source/close_after_hello" ) assert_no_control diff --git a/tests/acceptance/test_source_error_handling.rb b/tests/acceptance/test_source_error_handling.rb index ad1bd21..3378f75 100644 --- a/tests/acceptance/test_source_error_handling.rb +++ b/tests/acceptance/test_source_error_handling.rb @@ -19,12 +19,24 @@ class TestSourceErrorHandling < Test::Unit::TestCase end + def expect_term_during_migration + @env.nbd1.can_die(6,9) + end + + def test_failure_to_connect_reported_in_mirror_cmd_response stdout, stderr = @env.mirror12_unchecked + expect_term_during_migration assert_match( /failed to connect/, stderr ) end + def test_sigterm_after_hello_quits_with_status_of_1 + expect_term_during_migration + run_fake( "dest/sigterm_after_hello" ) + end + + def test_destination_hangs_after_connect_reports_error_at_source run_fake( "dest/hang_after_connect", :err => /Remote server failed to respond/ ) @@ -36,6 +48,7 @@ class TestSourceErrorHandling < Test::Unit::TestCase :err => /Mirror was rejected/ ) end + def test_wrong_size_causes_disconnect run_fake( "dest/hello_wrong_size", :err => /Remote size does not match local size/ ) @@ -43,38 +56,45 @@ class TestSourceErrorHandling < Test::Unit::TestCase def test_wrong_magic_causes_disconnect + expect_term_during_migration run_fake( "dest/hello_wrong_magic", :err => /Mirror was rejected/ ) end def test_disconnect_after_hello_causes_retry + expect_term_during_migration run_fake( "dest/close_after_hello", :out => /Mirror started/ ) end def test_write_times_out_causes_retry + expect_term_during_migration run_fake( "dest/hang_after_write" ) end def test_rejected_write_causes_retry + expect_term_during_migration run_fake( "dest/error_on_write" ) end def test_disconnect_before_write_reply_causes_retry + expect_term_during_migration run_fake( "dest/close_after_write" ) end def test_bad_write_reply_causes_retry + expect_term_during_migration run_fake( "dest/write_wrong_magic" ) end def test_pre_entrust_disconnect_causes_retry + expect_term_during_migration run_fake( "dest/close_after_writes" ) end