From 161d2fccf1fdfb044fb835fb6408ca61f5b79f46 Mon Sep 17 00:00:00 2001 From: Alex Young Date: Tue, 9 Oct 2012 17:20:39 +0100 Subject: [PATCH] Rename serve->has_control to serve->success. This makes the use of this variable to signal an unexpected SIGTERM while migrating less confusing. --- src/control.c | 2 +- src/flexnbd.c | 43 +++++------ src/ioutil.c | 31 ++++---- src/mode.c | 48 +++++++------ src/serve.c | 72 +++++++++++-------- src/serve.h | 27 +++---- src/status.c | 4 +- tests/acceptance/flexnbd.rb | 3 +- .../acceptance/test_source_error_handling.rb | 2 +- 9 files changed, 128 insertions(+), 104 deletions(-) diff --git a/src/control.c b/src/control.c index d81e4e8..3956123 100644 --- a/src/control.c +++ b/src/control.c @@ -235,7 +235,7 @@ void * control_runner( void * control_uncast ) control_serve( control ); control_cleanup( control, 0 ); - return NULL; + pthread_exit( NULL ); } diff --git a/src/flexnbd.c b/src/flexnbd.c index 5124694..f1ce5e0 100644 --- a/src/flexnbd.c +++ b/src/flexnbd.c @@ -51,7 +51,7 @@ int flexnbd_build_signal_fd(void) sigaddset( &mask, SIGQUIT ); sigaddset( &mask, SIGINT ); - FATAL_UNLESS( 0 == pthread_sigmask( SIG_BLOCK, &mask, NULL ), + FATAL_UNLESS( 0 == pthread_sigmask( SIG_BLOCK, &mask, NULL ), "Signal blocking failed" ); sfd = signalfd( -1, &mask, 0 ); @@ -62,12 +62,12 @@ int flexnbd_build_signal_fd(void) void flexnbd_create_shared( - struct flexnbd * flexnbd, + struct flexnbd * flexnbd, const char * s_ctrl_sock) { NULLCHECK( flexnbd ); if ( s_ctrl_sock ){ - flexnbd->control = + flexnbd->control = control_create( flexnbd, s_ctrl_sock ); } else { @@ -89,37 +89,37 @@ struct flexnbd * flexnbd_create_serving( int max_nbd_clients) { struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) ); - flexnbd->serve = server_create( + flexnbd->serve = server_create( flexnbd, s_ip_address, s_port, - s_file, + s_file, default_deny, acl_entries, s_acl_entries, - max_nbd_clients, + max_nbd_clients, 1); - flexnbd_create_shared( flexnbd, + flexnbd_create_shared( flexnbd, s_ctrl_sock ); return flexnbd; } struct flexnbd * flexnbd_create_listening( - char* s_ip_address, - char* s_port, + char* s_ip_address, + char* s_port, char* s_file, - char *s_ctrl_sock, + char *s_ctrl_sock, int default_deny, - int acl_entries, + int acl_entries, char** s_acl_entries ) { struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) ); - flexnbd->serve = server_create( + flexnbd->serve = server_create( flexnbd, s_ip_address, s_port, - s_file, + s_file, default_deny, acl_entries, s_acl_entries, @@ -136,10 +136,10 @@ void flexnbd_spawn_control(struct flexnbd * flexnbd ) pthread_t * control_thread = &flexnbd->control->thread; - FATAL_UNLESS( 0 == pthread_create( - control_thread, - NULL, - control_runner, + FATAL_UNLESS( 0 == pthread_create( + control_thread, + NULL, + control_runner, flexnbd->control ), "Couldn't create the control thread" ); } @@ -150,8 +150,10 @@ void flexnbd_stop_control( struct flexnbd * flexnbd ) NULLCHECK( flexnbd->control ); control_signal_close( flexnbd->control ); - FATAL_UNLESS( 0 == pthread_join( flexnbd->control->thread, NULL ), - "Failed joining the control thread" ); + pthread_t tid = flexnbd->control->thread; + FATAL_UNLESS( 0 == pthread_join( tid, NULL ), + "Failed joining the control thread" ); + debug( "Control thread %p pthread_join returned", tid ); } @@ -191,7 +193,7 @@ struct status * flexnbd_status_create( struct flexnbd * flexnbd ) { NULLCHECK( flexnbd ); struct status * status; - + status = status_create( flexnbd_server( flexnbd ) ); return status; } @@ -240,6 +242,7 @@ int flexnbd_serve( struct flexnbd * flexnbd ) } success = do_serve( flexnbd->serve ); + debug("do_serve success is %d", success ); if ( flexnbd->control ) { debug( "Stopping control thread" ); diff --git a/src/ioutil.c b/src/ioutil.c index 3c92c3b..93f977d 100644 --- a/src/ioutil.c +++ b/src/ioutil.c @@ -11,6 +11,7 @@ #include "util.h" #include "bitset.h" + int build_allocation_map(struct bitset_mapping* allocation_map, int fd) { /* break blocking ioctls down */ @@ -19,8 +20,8 @@ int build_allocation_map(struct bitset_mapping* allocation_map, int fd) unsigned long offset = 0; - struct { - struct fiemap fiemap; + struct { + struct fiemap fiemap; struct fiemap_extent extents[max_extents]; } fiemap_static; struct fiemap* fiemap = (struct fiemap*) &fiemap_static; @@ -31,31 +32,33 @@ int build_allocation_map(struct bitset_mapping* allocation_map, int fd) unsigned int i; fiemap->fm_start = offset; + fiemap->fm_length = max_length; - if (offset + max_length > allocation_map->size) + if ( offset + max_length > allocation_map->size ) { fiemap->fm_length = allocation_map->size-offset; + } + fiemap->fm_flags = FIEMAP_FLAG_SYNC; fiemap->fm_extent_count = max_extents; fiemap->fm_mapped_extents = 0; - if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0) { + if ( ioctl( fd, FS_IOC_FIEMAP, fiemap ) < 0 ) { debug( "Couldn't get fiemap, returning no allocation_map" ); free(allocation_map); - return NULL; + allocation_map = NULL; + break; } - - for (i=0;ifm_mapped_extents;i++) { - bitset_set_range( - allocation_map, - fiemap->fm_extents[i].fe_logical, - fiemap->fm_extents[i].fe_length - ); - //debug("range from %ld + %ld", fiemap->fm_extents[i].fe_logical, fiemap->fm_extents[i].fe_length); + else { + for ( i = 0; i < fiemap->fm_mapped_extents; i++ ) { + bitset_set_range( allocation_map, + fiemap->fm_extents[i].fe_logical, + fiemap->fm_extents[i].fe_length ); + } } } debug("Successfully built allocation map"); - return allocation_map; + return NULL != allocation_map; } diff --git a/src/mode.c b/src/mode.c index b2a0142..808eac7 100644 --- a/src/mode.c +++ b/src/mode.c @@ -144,7 +144,7 @@ static struct option break_options[] = { {0} }; static char break_short_options[] = "hs:" SOPT_QUIET SOPT_VERBOSE; -static char break_help_text[] = +static char break_help_text[] = "Usage: flexnbd " CMD_BREAK " \n\n" "Stop mirroring from the server with control socket SOCK.\n\n" HELP_LINE @@ -228,11 +228,11 @@ void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char } -void read_listen_param( int c, - char **ip_addr, - char **ip_port, - char **file, - char **sock, +void read_listen_param( int c, + char **ip_addr, + char **ip_port, + char **file, + char **sock, int *default_deny ) { switch(c){ @@ -328,8 +328,8 @@ void read_acl_param( int c, char **sock ) read_sock_param( c, sock, acl_help_text ); } -void read_mirror_param( - int c, +void read_mirror_param( + int c, char **sock, char **ip_addr, char **ip_port, @@ -406,6 +406,8 @@ int mode_serve( int argc, char *argv[] ) int default_deny = 0; // not on by default int err = 0; + int success; + struct flexnbd * flexnbd; while (1) { @@ -427,10 +429,10 @@ int mode_serve( int argc, char *argv[] ) flexnbd = flexnbd_create_serving( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS ); info( "Serving file %s", file ); - flexnbd_serve( flexnbd ); + success = flexnbd_serve( flexnbd ); flexnbd_destroy( flexnbd ); - return 0; + return success ? 0 : 1; } @@ -452,7 +454,7 @@ int mode_listen( int argc, char *argv[] ) c = getopt_long(argc, argv, listen_short_options, listen_options, NULL); if ( c == -1 ) { break; } - read_listen_param( c, &ip_addr, &ip_port, + read_listen_param( c, &ip_addr, &ip_port, &file, &sock, &default_deny ); } @@ -466,13 +468,13 @@ int mode_listen( int argc, char *argv[] ) } if ( err ) { exit_err( listen_help_text ); } - flexnbd = flexnbd_create_listening( - ip_addr, - ip_port, + flexnbd = flexnbd_create_listening( + ip_addr, + ip_port, file, sock, - default_deny, - argc - optind, + default_deny, + argc - optind, argv + optind); success = flexnbd_serve( flexnbd ); flexnbd_destroy( flexnbd ); @@ -516,7 +518,7 @@ void params_readwrite( s_ip_address ); - if (s_bind_address != NULL && + if (s_bind_address != NULL && parse_ip_to_sockaddr(&out->connect_from.generic, s_bind_address) == 0) { fatal("Couldn't parse bind address '%s'", s_bind_address); } @@ -660,11 +662,11 @@ int mode_mirror( int argc, char *argv[] ) while (1) { c = getopt_long( argc, argv, mirror_short_options, mirror_options, NULL); if ( -1 == c ) { break; } - read_mirror_param( c, - &sock, - &remote_argv[0], - &remote_argv[1], - &unlink, + read_mirror_param( c, + &sock, + &remote_argv[0], + &remote_argv[1], + &unlink, &remote_argv[3] ); } @@ -678,7 +680,7 @@ int mode_mirror( int argc, char *argv[] ) } if ( err ) { exit_err( mirror_help_text ); } if ( unlink ) { remote_argv[2] = "unlink"; } - + if (remote_argv[3] == NULL) { do_remote_command( "mirror", sock, 3, remote_argv ); } diff --git a/src/serve.c b/src/serve.c index 4f2bdfd..fcc4221 100644 --- a/src/serve.c +++ b/src/serve.c @@ -45,13 +45,13 @@ struct server * server_create ( int acl_entries, char** s_acl_entries, int max_nbd_clients, - int has_control) + int success) { NULLCHECK( flexnbd ); struct server * out; out = xmalloc( sizeof( struct server ) ); out->flexnbd = flexnbd; - out->has_control = has_control; + out->success = success; out->max_nbd_clients = max_nbd_clients; out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) ); @@ -678,23 +678,24 @@ int server_mirror_can_start( struct server *serve ) } -void serve_handle_signal( struct server *params ) +/* Queries to see if we are currently mirroring. If we are, we need + * to communicate that via the process exit status. because otherwise + * the supervisor will assume the migration completed. + */ +int serve_shutdown_is_graceful( struct server *params ) { - int should_die = 0; + int is_mirroring = 0; server_lock_start_mirror( params ); { if ( server_is_mirroring( params ) ) { - should_die = 1; + is_mirroring = 1; + warn( "Stop signal received while mirroring." ); server_prevent_mirror_start( params ); } } server_unlock_start_mirror( params ); - if ( should_die ){ - fatal( "Stop signal received while mirroring." ); - } else { - server_close_clients( params ); - } + return !is_mirroring; } @@ -710,6 +711,7 @@ int server_accept( struct server * params ) /* We select on this fd to receive OS signals (only a few of * which we're interested in, see flexnbd.c */ int signal_fd = flexnbd_signal_fd( params->flexnbd ); + int should_continue = 1; FD_ZERO(&fds); FD_SET(params->server_fd, &fds); @@ -722,17 +724,15 @@ int server_accept( struct server * params ) if ( self_pipe_fd_isset( params->close_signal, &fds ) ){ server_close_clients( params ); - return 0; + should_continue = 0; } + if ( 0 < signal_fd && FD_ISSET( signal_fd, &fds ) ){ debug( "Stop signal received." ); - serve_handle_signal( params ); - - /* serve_handle_signal will fatal() if it has to, so it - * might not return at all. - */ - return 0; + server_close_clients( params ); + params->success = serve_shutdown_is_graceful( params ); + should_continue = 0; } @@ -747,7 +747,7 @@ int server_accept( struct server * params ) accept_nbd_client(params, client_fd, &client_address); } - return 1; + return should_continue; } @@ -765,7 +765,7 @@ void* build_allocation_map_thread(void* serve_uncast) int fd = open(serve->filename, O_RDONLY); FATAL_IF_NEGATIVE(fd, "Couldn't open %s", serve->filename); - serve->allocation_map = + serve->allocation_map = bitset_alloc(serve->size, block_allocation_resolution); if (build_allocation_map(serve->allocation_map, fd)) { @@ -794,8 +794,8 @@ void serve_init_allocation_map(struct server* params) params->size = size; FATAL_IF_NEGATIVE(size, "Couldn't find size of %s", params->filename); - FATAL_IF_NEGATIVE(pthread_create(¶ms->allocation_map_builder_thread, - NULL, build_allocation_map_thread, params), + FATAL_IF_NEGATIVE(pthread_create(¶ms->allocation_map_builder_thread, + NULL, build_allocation_map_thread, params), "Couldn't create thread"); } @@ -825,13 +825,15 @@ void server_control_arrived( struct server *serve ) { NULLCHECK( serve ); - if ( !serve->has_control ) { - serve->has_control = 1; + if ( !serve->success ) { + serve->success = 1; serve_signal_close( serve ); } } +void flexnbd_stop_control( struct flexnbd * flexnbd ); + /** Closes sockets, frees memory and waits for all client threads to finish */ void serve_cleanup(struct server* params, int fatal __attribute__ ((unused)) ) @@ -851,7 +853,7 @@ void serve_cleanup(struct server* params, if (params->allocation_map) { free(params->allocation_map); } - + int need_mirror_lock; need_mirror_lock = !server_start_mirror_locked( params ); @@ -864,7 +866,6 @@ void serve_cleanup(struct server* params, } if ( need_mirror_lock ) { server_unlock_start_mirror( params ); } - for (i=0; i < params->max_nbd_clients; i++) { pthread_t thread_id = params->nbd_client[i].thread; @@ -882,6 +883,15 @@ void serve_cleanup(struct server* params, server_unlock_acl( params ); } + /* if( params->flexnbd ) { */ + /* if ( params->flexnbd->control ) { */ + /* flexnbd_stop_control( params->flexnbd ); */ + /* } */ + /* flexnbd_destroy( params->flexnbd ); */ + /* } */ + + /* server_destroy( params ); */ + debug( "Cleanup done"); } @@ -889,7 +899,7 @@ void serve_cleanup(struct server* params, int server_is_in_control( struct server *serve ) { NULLCHECK( serve ); - return serve->has_control; + return serve->success; } int server_is_mirroring( struct server * serve ) @@ -911,7 +921,9 @@ void server_abandon_mirror( struct server * serve ) * the next pass. * */ serve->mirror->signal_abandon = 1; - pthread_join( serve->mirror_super->thread, NULL ); + pthread_t tid = serve->mirror_super->thread; + pthread_join( tid, NULL ); + debug( "Mirror thread %p pthread_join returned", tid ); } } @@ -926,15 +938,15 @@ int do_serve(struct server* params) { NULLCHECK( params ); - int has_control; + int success; error_set_handler((cleanup_handler*) serve_cleanup, params); serve_open_server_socket(params); serve_init_allocation_map(params); serve_accept_loop(params); - has_control = params->has_control; + success = params->success; serve_cleanup(params, 0); - return has_control; + return success; } diff --git a/src/serve.h b/src/serve.h index 1132f99..332807b 100644 --- a/src/serve.h +++ b/src/serve.h @@ -40,7 +40,7 @@ struct server { /** Claims around any I/O to this file */ struct flexthread_mutex * l_io; - + /** to interrupt accept loop and clients, write() to close_signal[1] */ struct self_pipe * close_signal; @@ -59,20 +59,20 @@ struct server { struct flexthread_mutex * l_start_mirror; struct mirror* mirror; - struct mirror_super * mirror_super; + struct mirror_super * mirror_super; /* This is used to stop the mirror from starting after we * receive a SIGTERM */ int mirror_can_start; int server_fd; int control_fd; - - /* the allocation_map keeps track of which blocks in the backing file + + /* the allocation_map keeps track of which blocks in the backing file * have been allocated, or part-allocated on disc, with unallocated * blocks presumed to contain zeroes (i.e. represented as sparse files * by the filesystem). We can use this information when receiving * incoming writes, and avoid writing zeroes to unallocated sections - * of the file which would needlessly increase disc usage. This + * of the file which would needlessly increase disc usage. This * bitmap will start at all-zeroes for an empty file, and tend towards * all-ones as the file is written to (i.e. we assume that allocated * blocks can never become unallocated again, as is the case with ext3 @@ -83,7 +83,7 @@ struct server { pthread_t allocation_map_builder_thread; /* when the thread has finished, it sets this to 1 */ volatile sig_atomic_t allocation_map_built; - + int max_nbd_clients; struct client_tbl_entry *nbd_client; @@ -91,20 +91,23 @@ struct server { /* Marker for whether this server has control over the data in * the file, or if we're waiting to receive it from an inbound * migration which hasn't yet finished. + * + * It's the value which controls the exit status of a serve or + * listen process. */ - int has_control; + int success; }; -struct server * server_create( +struct server * server_create( struct flexnbd * flexnbd, - char* s_ip_address, - char* s_port, + char* s_ip_address, + char* s_port, char* s_file, int default_deny, - int acl_entries, + int acl_entries, char** s_acl_entries, int max_nbd_clients, - int has_control ); + int success ); void server_destroy( struct server * ); int server_is_closed(struct server* serve); void server_dirty(struct server *serve, off64_t from, int len); diff --git a/src/status.c b/src/status.c index a80a30c..0f53edc 100644 --- a/src/status.c +++ b/src/status.c @@ -6,10 +6,10 @@ struct status * status_create( struct server * serve ) { NULLCHECK( serve ); struct status * status; - + status = xmalloc( sizeof( struct status ) ); status->pid = getpid(); - status->has_control = serve->has_control; + status->has_control = server_is_in_control( serve ); status->is_mirroring = NULL != serve->mirror; return status; diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index c21e1d9..19e0188 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -310,13 +310,14 @@ module FlexNBD debug( cmd ) @pid = @executor.run( cmd ) - start_wait_thread( @pid ) while !File.socket?(ctrl) pid, status = Process.wait2(@pid, Process::WNOHANG) raise "server did not start (#{cmd})" if pid sleep 0.1 end + + start_wait_thread( @pid ) at_exit { kill } end private :run_serve_cmd diff --git a/tests/acceptance/test_source_error_handling.rb b/tests/acceptance/test_source_error_handling.rb index 3378f75..9dd3f60 100644 --- a/tests/acceptance/test_source_error_handling.rb +++ b/tests/acceptance/test_source_error_handling.rb @@ -20,7 +20,7 @@ class TestSourceErrorHandling < Test::Unit::TestCase def expect_term_during_migration - @env.nbd1.can_die(6,9) + @env.nbd1.can_die(1,9) end