Quit with an error status on SIGTERM during migration
This prevents the supervisor from thinking that the migration completed successfully. In order to do this, I've introduced a new lock around the start (and finish) of the migration so that we avoid a race between the signal handler in the server_accept loop and the control thread mirror startup. Without that, we'd risk successfully starting a migration after the SIGTERM handler fired, which would be Bad.
This commit is contained in:
@@ -361,19 +361,35 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
|
||||
|
||||
struct server * serve = flexnbd_server(flexnbd);
|
||||
|
||||
if ( serve->mirror_super ) {
|
||||
warn( "Tried to start a second mirror run" );
|
||||
write_socket( "1: mirror already running" );
|
||||
} else {
|
||||
serve->mirror_super = mirror_super_create(
|
||||
serve->filename,
|
||||
connect_to,
|
||||
connect_from,
|
||||
max_Bps ,
|
||||
action_at_finish,
|
||||
client->mirror_state_mbox );
|
||||
serve->mirror = serve->mirror_super->mirror;
|
||||
server_lock_start_mirror( serve );
|
||||
{
|
||||
if ( server_mirror_can_start( serve ) ) {
|
||||
serve->mirror_super = mirror_super_create(
|
||||
serve->filename,
|
||||
connect_to,
|
||||
connect_from,
|
||||
max_Bps ,
|
||||
action_at_finish,
|
||||
client->mirror_state_mbox );
|
||||
serve->mirror = serve->mirror_super->mirror;
|
||||
server_prevent_mirror_start( serve );
|
||||
} else {
|
||||
if ( serve->mirror_super ) {
|
||||
warn( "Tried to start a second mirror run" );
|
||||
write_socket( "1: mirror already running" );
|
||||
} else {
|
||||
warn( "Cannot start mirroring, shutting down" );
|
||||
write_socket( "1: shutting down" );
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
server_unlock_start_mirror( serve );
|
||||
|
||||
/* Do this outside the lock to minimise the length of time the
|
||||
* sighandler can block the serve thread
|
||||
*/
|
||||
if ( serve->mirror_super ) {
|
||||
FATAL_IF( 0 != pthread_create(
|
||||
&serve->mirror_super->thread,
|
||||
NULL,
|
||||
@@ -389,6 +405,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
|
||||
debug("Control thread writing response");
|
||||
control_write_mirror_response( state, client->socket );
|
||||
}
|
||||
|
||||
debug( "Control thread going away." );
|
||||
|
||||
return 0;
|
||||
@@ -438,28 +455,33 @@ int control_break(
|
||||
struct flexnbd* flexnbd = client->flexnbd;
|
||||
|
||||
struct server * serve = flexnbd_server( flexnbd );
|
||||
if ( server_is_mirroring( serve ) ) {
|
||||
|
||||
info( "Signaling to abandon mirror" );
|
||||
server_abandon_mirror( serve );
|
||||
debug( "Abandon signaled" );
|
||||
server_lock_start_mirror( serve );
|
||||
{
|
||||
if ( server_is_mirroring( serve ) ) {
|
||||
|
||||
if ( server_is_closed( serve ) ) {
|
||||
info( "Mirror completed while canceling" );
|
||||
write( client->socket,
|
||||
"1: mirror completed\n", 20 );
|
||||
info( "Signaling to abandon mirror" );
|
||||
server_abandon_mirror( serve );
|
||||
debug( "Abandon signaled" );
|
||||
|
||||
if ( server_is_closed( serve ) ) {
|
||||
info( "Mirror completed while canceling" );
|
||||
write( client->socket,
|
||||
"1: mirror completed\n", 20 );
|
||||
}
|
||||
else {
|
||||
info( "Mirror successfully stopped." );
|
||||
write( client->socket,
|
||||
"0: mirror stopped\n", 18 );
|
||||
result = 1;
|
||||
}
|
||||
|
||||
} else {
|
||||
warn( "Not mirroring." );
|
||||
write( client->socket, "1: not mirroring\n", 17 );
|
||||
}
|
||||
else {
|
||||
info( "Mirror successfully stopped." );
|
||||
write( client->socket,
|
||||
"0: mirror stopped\n", 18 );
|
||||
result = 1;
|
||||
}
|
||||
|
||||
} else {
|
||||
warn( "Not mirroring." );
|
||||
write( client->socket, "1: not mirroring\n", 17 );
|
||||
}
|
||||
server_unlock_start_mirror( serve );
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@@ -568,6 +568,7 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
|
||||
serve->mirror = NULL;
|
||||
serve->mirror_super = NULL;
|
||||
server_allow_mirror_start( serve );
|
||||
|
||||
mirror_super_destroy( super );
|
||||
debug( "Mirror supervisor done." );
|
||||
|
93
src/serve.c
93
src/serve.c
@@ -83,6 +83,9 @@ struct server * server_create (
|
||||
|
||||
out->l_io = flexthread_mutex_create();
|
||||
out->l_acl= flexthread_mutex_create();
|
||||
out->l_start_mirror = flexthread_mutex_create();
|
||||
|
||||
out->mirror_can_start = 1;
|
||||
|
||||
out->close_signal = self_pipe_create();
|
||||
out->acl_updated_signal = self_pipe_create();
|
||||
@@ -100,6 +103,7 @@ void server_destroy( struct server * serve )
|
||||
self_pipe_destroy( serve->close_signal );
|
||||
serve->close_signal = NULL;
|
||||
|
||||
flexthread_mutex_destroy( serve->l_start_mirror );
|
||||
flexthread_mutex_destroy( serve->l_acl );
|
||||
flexthread_mutex_destroy( serve->l_io );
|
||||
|
||||
@@ -177,6 +181,8 @@ void server_lock_acl( struct server *serve )
|
||||
|
||||
void server_unlock_acl( struct server *serve )
|
||||
{
|
||||
debug( "ACL unlocking" );
|
||||
|
||||
SERVER_UNLOCK( serve, l_acl, "Problem with ACL unlock" );
|
||||
}
|
||||
|
||||
@@ -188,6 +194,26 @@ int server_acl_locked( struct server * serve )
|
||||
}
|
||||
|
||||
|
||||
void server_lock_start_mirror( struct server *serve )
|
||||
{
|
||||
debug("Mirror start locking");
|
||||
|
||||
SERVER_LOCK( serve, l_start_mirror, "Problem with start mirror lock" );
|
||||
}
|
||||
|
||||
void server_unlock_start_mirror( struct server *serve )
|
||||
{
|
||||
debug("Mirror start unlocking");
|
||||
|
||||
SERVER_UNLOCK( serve, l_start_mirror, "Problem with start mirror unlock" );
|
||||
}
|
||||
|
||||
int server_start_mirror_locked( struct server * serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
return flexthread_mutex_held( serve->l_start_mirror );
|
||||
}
|
||||
|
||||
/** Return the actual port the server bound to. This is used because we
|
||||
* are allowed to pass "0" on the command-line.
|
||||
*/
|
||||
@@ -628,6 +654,49 @@ void server_replace_acl( struct server *serve, struct acl * new_acl )
|
||||
}
|
||||
|
||||
|
||||
void server_prevent_mirror_start( struct server *serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
|
||||
serve->mirror_can_start = 0;
|
||||
}
|
||||
|
||||
void server_allow_mirror_start( struct server *serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
|
||||
serve->mirror_can_start = 1;
|
||||
}
|
||||
|
||||
|
||||
/* Only call this with the mirror start lock held */
|
||||
int server_mirror_can_start( struct server *serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
|
||||
return serve->mirror_can_start;
|
||||
}
|
||||
|
||||
|
||||
void serve_handle_signal( struct server *params )
|
||||
{
|
||||
int should_die = 0;
|
||||
server_lock_start_mirror( params );
|
||||
{
|
||||
if ( server_is_mirroring( params ) ) {
|
||||
should_die = 1;
|
||||
server_prevent_mirror_start( params );
|
||||
}
|
||||
}
|
||||
server_unlock_start_mirror( params );
|
||||
|
||||
if ( should_die ){
|
||||
fatal( "Stop signal received while mirroring." );
|
||||
} else {
|
||||
server_close_clients( params );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Accept either an NBD or control socket connection, dispatch appropriately */
|
||||
int server_accept( struct server * params )
|
||||
@@ -658,7 +727,11 @@ int server_accept( struct server * params )
|
||||
|
||||
if ( 0 < signal_fd && FD_ISSET( signal_fd, &fds ) ){
|
||||
debug( "Stop signal received." );
|
||||
server_close_clients( params );
|
||||
serve_handle_signal( params );
|
||||
|
||||
/* serve_handle_signal will fatal() if it has to, so it
|
||||
* might not return at all.
|
||||
*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -752,9 +825,18 @@ void serve_cleanup(struct server* params,
|
||||
free(params->allocation_map);
|
||||
}
|
||||
|
||||
if ( server_is_mirroring( params ) ) {
|
||||
server_abandon_mirror( params );
|
||||
int need_mirror_lock;
|
||||
need_mirror_lock = !server_start_mirror_locked( params );
|
||||
|
||||
if ( need_mirror_lock ) { server_lock_start_mirror( params ); }
|
||||
{
|
||||
if ( server_is_mirroring( params ) ) {
|
||||
server_abandon_mirror( params );
|
||||
}
|
||||
server_prevent_mirror_start( params );
|
||||
}
|
||||
if ( need_mirror_lock ) { server_unlock_start_mirror( params ); }
|
||||
|
||||
|
||||
for (i=0; i < params->max_nbd_clients; i++) {
|
||||
void* status;
|
||||
@@ -766,6 +848,10 @@ void serve_cleanup(struct server* params,
|
||||
}
|
||||
}
|
||||
|
||||
if ( server_start_mirror_locked( params ) ) {
|
||||
server_unlock_start_mirror( params );
|
||||
}
|
||||
|
||||
if ( server_acl_locked( params ) ) {
|
||||
server_unlock_acl( params );
|
||||
}
|
||||
@@ -786,6 +872,7 @@ int server_is_mirroring( struct server * serve )
|
||||
return !!serve->mirror_super;
|
||||
}
|
||||
|
||||
/* This must only be called with the start_mirror lock held */
|
||||
void server_abandon_mirror( struct server * serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
|
13
src/serve.h
13
src/serve.h
@@ -53,8 +53,16 @@ struct server {
|
||||
/* Claimed around any updates to the ACL. */
|
||||
struct flexthread_mutex * l_acl;
|
||||
|
||||
/* Claimed around starting a mirror so that it doesn't race with
|
||||
* shutting down on a SIGTERM. */
|
||||
struct flexthread_mutex * l_start_mirror;
|
||||
|
||||
struct mirror* mirror;
|
||||
struct mirror_super * mirror_super;
|
||||
/* This is used to stop the mirror from starting after we
|
||||
* receive a SIGTERM */
|
||||
int mirror_can_start;
|
||||
|
||||
int server_fd;
|
||||
int control_fd;
|
||||
|
||||
@@ -96,8 +104,13 @@ int server_io_locked( struct server * serve );
|
||||
int server_acl_locked( struct server * serve );
|
||||
void server_lock_acl( struct server *serve );
|
||||
void server_unlock_acl( struct server *serve );
|
||||
void server_lock_start_mirror( struct server *serve );
|
||||
void server_unlock_start_mirror( struct server *serve );
|
||||
int server_is_mirroring( struct server * serve );
|
||||
void server_abandon_mirror( struct server * serve );
|
||||
void server_prevent_mirror_start( struct server *serve );
|
||||
void server_allow_mirror_start( struct server *serve );
|
||||
int server_mirror_can_start( struct server *serve );
|
||||
|
||||
void server_unlink( struct server * serve );
|
||||
|
||||
|
@@ -121,11 +121,12 @@ class Environment
|
||||
|
||||
def run_fake( name, addr, port, sock=nil )
|
||||
fakedir = File.join( File.dirname( __FILE__ ), "fakes" )
|
||||
fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn|
|
||||
fakeglob = File.join( fakedir, name ) + "*"
|
||||
fake = Dir[fakeglob].sort.find { |fn|
|
||||
File.executable?( fn )
|
||||
}
|
||||
|
||||
raise "no fake executable" unless fake
|
||||
raise "no fake executable at #{fakeglob}" unless fake
|
||||
raise "no addr" unless addr
|
||||
raise "no port" unless port
|
||||
|
||||
|
19
tests/acceptance/fakes/dest/sigterm_after_hello.rb
Executable file
19
tests/acceptance/fakes/dest/sigterm_after_hello.rb
Executable file
@@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env ruby
|
||||
|
||||
# Wait for a sender connection, send a correct hello, then sigterm the
|
||||
# sender. We expect the sender to exit with status of 6, which is
|
||||
# enforced in the test.
|
||||
|
||||
require 'flexnbd/fake_dest'
|
||||
include FlexNBD
|
||||
|
||||
addr, port, pid = *ARGV
|
||||
server = FakeDest.new( addr, port )
|
||||
client = server.accept( "Timed out waiting for a connection" )
|
||||
client.write_hello
|
||||
|
||||
Process.kill(15, pid.to_i)
|
||||
|
||||
client.close
|
||||
server.close
|
||||
exit 0
|
20
tests/acceptance/fakes/source/sigterm_after_hello.rb
Executable file
20
tests/acceptance/fakes/source/sigterm_after_hello.rb
Executable file
@@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env ruby
|
||||
|
||||
# Connect to the listener, wait for the hello, then sigterm the
|
||||
# listener. We expect the listener to exit with a status of 6, which
|
||||
# is enforced in the test.
|
||||
|
||||
require 'flexnbd/fake_source'
|
||||
include FlexNBD
|
||||
|
||||
addr, port, pid = *ARGV
|
||||
|
||||
client = FakeSource.new( addr, port, "Timed out connecting." )
|
||||
client.read_hello
|
||||
|
||||
Process.kill( "TERM", pid.to_i )
|
||||
|
||||
sleep(0.2)
|
||||
client.close
|
||||
|
||||
exit(0)
|
@@ -136,11 +136,11 @@ class ValgrindKillingExecutor
|
||||
|
||||
|
||||
def call( err )
|
||||
Process.kill( "KILL", @pid )
|
||||
$stderr.puts "*"*72
|
||||
$stderr.puts "* Valgrind error spotted:"
|
||||
$stderr.puts err.to_s.split("\n").map{|s| " #{s}"}
|
||||
$stderr.puts "*"*72
|
||||
Process.kill( "KILL", @pid )
|
||||
exit(1)
|
||||
end
|
||||
|
||||
@@ -323,7 +323,8 @@ module FlexNBD
|
||||
|
||||
|
||||
def serve( file, *acl)
|
||||
run_serve_cmd( serve_cmd( file, acl ) )
|
||||
cmd = serve_cmd( file, acl )
|
||||
run_serve_cmd( cmd )
|
||||
end
|
||||
|
||||
def listen(file, *acl)
|
||||
|
@@ -28,6 +28,11 @@ class TestDestErrorHandling < Test::Unit::TestCase
|
||||
end
|
||||
|
||||
|
||||
def test_sigterm_has_bad_exit_status
|
||||
@env.nbd1.can_die(1)
|
||||
run_fake( "source/sigterm_after_hello" )
|
||||
end
|
||||
|
||||
def test_disconnect_after_hello_causes_error_not_fatal
|
||||
run_fake( "source/close_after_hello" )
|
||||
assert_no_control
|
||||
|
@@ -19,12 +19,24 @@ class TestSourceErrorHandling < Test::Unit::TestCase
|
||||
end
|
||||
|
||||
|
||||
def expect_term_during_migration
|
||||
@env.nbd1.can_die(6,9)
|
||||
end
|
||||
|
||||
|
||||
def test_failure_to_connect_reported_in_mirror_cmd_response
|
||||
stdout, stderr = @env.mirror12_unchecked
|
||||
expect_term_during_migration
|
||||
assert_match( /failed to connect/, stderr )
|
||||
end
|
||||
|
||||
|
||||
def test_sigterm_after_hello_quits_with_status_of_1
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/sigterm_after_hello" )
|
||||
end
|
||||
|
||||
|
||||
def test_destination_hangs_after_connect_reports_error_at_source
|
||||
run_fake( "dest/hang_after_connect",
|
||||
:err => /Remote server failed to respond/ )
|
||||
@@ -36,6 +48,7 @@ class TestSourceErrorHandling < Test::Unit::TestCase
|
||||
:err => /Mirror was rejected/ )
|
||||
end
|
||||
|
||||
|
||||
def test_wrong_size_causes_disconnect
|
||||
run_fake( "dest/hello_wrong_size",
|
||||
:err => /Remote size does not match local size/ )
|
||||
@@ -43,38 +56,45 @@ class TestSourceErrorHandling < Test::Unit::TestCase
|
||||
|
||||
|
||||
def test_wrong_magic_causes_disconnect
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/hello_wrong_magic",
|
||||
:err => /Mirror was rejected/ )
|
||||
end
|
||||
|
||||
|
||||
def test_disconnect_after_hello_causes_retry
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/close_after_hello",
|
||||
:out => /Mirror started/ )
|
||||
end
|
||||
|
||||
|
||||
def test_write_times_out_causes_retry
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/hang_after_write" )
|
||||
end
|
||||
|
||||
|
||||
def test_rejected_write_causes_retry
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/error_on_write" )
|
||||
end
|
||||
|
||||
|
||||
def test_disconnect_before_write_reply_causes_retry
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/close_after_write" )
|
||||
end
|
||||
|
||||
|
||||
def test_bad_write_reply_causes_retry
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/write_wrong_magic" )
|
||||
end
|
||||
|
||||
|
||||
def test_pre_entrust_disconnect_causes_retry
|
||||
expect_term_during_migration
|
||||
run_fake( "dest/close_after_writes" )
|
||||
end
|
||||
|
||||
|
Reference in New Issue
Block a user