diff --git a/Rakefile b/Rakefile index 588d284..497179a 100644 --- a/Rakefile +++ b/Rakefile @@ -24,8 +24,8 @@ CCFLAGS = %w( -Wno-missing-field-initializers ) + # Added -Wno-missing-field-initializers to shut GCC up over {0} struct initialisers [ENV['CFLAGS']] - -LIBCHECK = File.exists?("/usr/lib/libcheck.a") ? + +LIBCHECK = File.exists?("/usr/lib/libcheck.a") ? "/usr/lib/libcheck.a" : "/usr/local/lib/libcheck.a" @@ -210,11 +210,13 @@ file check("flexnbd") => gcc_link t.name, t.prerequisites + [LIBCHECK] end + file check("control") => %w{build/tests/check_control.o} + OBJECTS - ["build/main.o"] do |t| gcc_link t.name, t.prerequisites + [LIBCHECK] end + (TEST_MODULES- %w{control flexnbd acl client serve readwrite util}).each do |m| tgt = "build/tests/check_#{m}.o" maybe_obj_name = "build/#{m}.o" diff --git a/src/control.c b/src/control.c index 3956123..1712968 100644 --- a/src/control.c +++ b/src/control.c @@ -21,7 +21,7 @@ * by a blank line (i.e. double LF). The first line is taken to be the command * name to invoke, and the lines before the double LF are its arguments. * - * These commands can be invoked remotely from the command line, with the + * These commands can be invoked remotely from the command line, with the * client code to be found in remote.c */ @@ -45,7 +45,7 @@ struct control * control_create( - struct flexnbd * flexnbd, + struct flexnbd * flexnbd, const char * csn) { struct control * control = xmalloc( sizeof( struct control ) ); @@ -54,9 +54,10 @@ struct control * control_create( control->flexnbd = flexnbd; control->socket_name = csn; + control->open_signal = self_pipe_create(); control->close_signal = self_pipe_create(); control->mirror_state_mbox = mbox_create(); - + return control; } @@ -75,21 +76,22 @@ void control_destroy( struct control * control ) mbox_destroy( control->mirror_state_mbox ); self_pipe_destroy( control->close_signal ); + self_pipe_destroy( control->open_signal ); free( control ); } -struct control_client * control_client_create( - struct flexnbd * flexnbd, +struct control_client * control_client_create( + struct flexnbd * flexnbd, int client_fd , struct mbox * state_mbox ) { NULLCHECK( flexnbd ); - struct control_client * control_client = + struct control_client * control_client = xmalloc( sizeof( struct control_client ) ); control_client->socket = client_fd; - control_client->flexnbd = flexnbd; + control_client->flexnbd = flexnbd; control_client->mirror_state_mbox = state_mbox; return control_client; } @@ -110,7 +112,7 @@ void control_handle_client( struct control * control, int client_fd ) NULLCHECK( control ); NULLCHECK( control->flexnbd ); struct control_client * control_client = - control_client_create( + control_client_create( control->flexnbd, client_fd , control->mirror_state_mbox); @@ -152,7 +154,7 @@ int control_accept( struct control * control ) if ( self_pipe_fd_isset( control->close_signal, &fds ) ){ return 0; } - + if ( FD_ISSET( control->control_fd, &fds ) ) { control_accept_client( control ); } @@ -170,27 +172,27 @@ int open_control_socket( const char * socket_name ) { struct sockaddr_un bind_address; int control_fd; - - if (!socket_name) { + + if (!socket_name) { fatal( "Tried to open a control socket without a socket name" ); } control_fd = socket(AF_UNIX, SOCK_STREAM, 0); FATAL_IF_NEGATIVE(control_fd , "Couldn't create control socket"); - + memset(&bind_address, 0, sizeof(struct sockaddr_un)); bind_address.sun_family = AF_UNIX; strncpy(bind_address.sun_path, socket_name, sizeof(bind_address.sun_path)-1); - + //unlink(socket_name); /* ignore failure */ - + FATAL_IF_NEGATIVE( bind(control_fd , &bind_address, sizeof(bind_address)), "Couldn't bind control socket to %s: %s", socket_name, strerror( errno ) ); - + FATAL_IF_NEGATIVE( listen(control_fd , 5), "Couldn't listen on control socket" @@ -205,17 +207,30 @@ void control_listen(struct control* control) control->control_fd = open_control_socket( control->socket_name ); } +void control_wait_for_open_signal( struct control * control ) +{ + fd_set fds; + FD_ZERO( &fds ); + self_pipe_fd_set( control->open_signal, &fds ); + FATAL_IF_NEGATIVE( select( FD_SETSIZE, &fds, NULL, NULL, NULL ), + "select() failed" ); + + self_pipe_signal_clear( control->open_signal ); +} + void control_serve( struct control * control ) { NULLCHECK( control ); + + control_wait_for_open_signal( control ); control_listen( control ); while( control_accept( control ) ); } -void control_cleanup( - struct control * control, +void control_cleanup( + struct control * control, int fatal __attribute__((unused)) ) { NULLCHECK( control ); @@ -272,7 +287,7 @@ void control_write_mirror_response( enum mirror_state mirror_state, int client_f /* Call this in the thread where you want to receive the mirror state */ -enum mirror_state control_client_mirror_wait( +enum mirror_state control_client_mirror_wait( struct control_client* client) { NULLCHECK( client ); @@ -281,14 +296,14 @@ enum mirror_state control_client_mirror_wait( struct mbox * mbox = client->mirror_state_mbox; enum mirror_state mirror_state; enum mirror_state * contents; - + contents = (enum mirror_state*)mbox_receive( mbox ); NULLCHECK( contents ); - + mirror_state = *contents; - + free( contents ); - + return mirror_state; } @@ -305,8 +320,8 @@ int control_mirror(struct control_client* client, int linesc, char** lines) uint64_t max_Bps = 0; int action_at_finish; int raw_port; - - + + if (linesc < 2) { write_socket("1: mirror takes at least two parameters"); return -1; @@ -316,7 +331,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines) write_socket("1: bad IP address"); return -1; } - + raw_port = atoi(lines[1]); if (raw_port < 0 || raw_port > 65535) { write_socket("1: bad IP port number"); @@ -349,11 +364,11 @@ int control_mirror(struct control_client* client, int linesc, char** lines) } } - if (linesc > 4) { - max_Bps = atoi(lines[4]); + if (linesc > 4) { + max_Bps = atoi(lines[4]); } - - + + if (linesc > 5) { write_socket("1: unrecognised parameters to mirror"); return -1; @@ -364,7 +379,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines) server_lock_start_mirror( serve ); { if ( server_mirror_can_start( serve ) ) { - serve->mirror_super = mirror_super_create( + serve->mirror_super = mirror_super_create( serve->filename, connect_to, connect_from, @@ -391,23 +406,23 @@ int control_mirror(struct control_client* client, int linesc, char** lines) */ if ( serve->mirror_super ) { FATAL_IF( 0 != pthread_create( - &serve->mirror_super->thread, - NULL, - mirror_super_runner, + &serve->mirror_super->thread, + NULL, + mirror_super_runner, serve ), "Failed to create mirror thread" ); debug("Control thread mirror super waiting"); - enum mirror_state state = + enum mirror_state state = control_client_mirror_wait( client ); debug("Control thread writing response"); control_write_mirror_response( state, client->socket ); } debug( "Control thread going away." ); - + return 0; } @@ -427,7 +442,7 @@ int control_acl(struct control_client* client, int linesc, char** lines) if (new_acl->len != linesc) { warn("Bad ACL spec: %s", lines[new_acl->len] ); write(client->socket, "1: bad spec: ", 13); - write(client->socket, lines[new_acl->len], + write(client->socket, lines[new_acl->len], strlen(lines[new_acl->len])); write(client->socket, "\n", 1); acl_destroy( new_acl ); @@ -443,8 +458,8 @@ int control_acl(struct control_client* client, int linesc, char** lines) int control_break( - struct control_client* client, - int linesc __attribute__ ((unused)), + struct control_client* client, + int linesc __attribute__ ((unused)), char** lines __attribute__((unused)) ) { @@ -466,7 +481,7 @@ int control_break( if ( server_is_closed( serve ) ) { info( "Mirror completed while canceling" ); - write( client->socket, + write( client->socket, "1: mirror completed\n", 20 ); } else { @@ -489,15 +504,15 @@ int control_break( /** FIXME: add some useful statistics */ int control_status( - struct control_client* client, - int linesc __attribute__ ((unused)), + struct control_client* client, + int linesc __attribute__ ((unused)), char** lines __attribute__((unused)) ) { NULLCHECK( client ); NULLCHECK( client->flexnbd ); struct status * status = flexnbd_status_create( client->flexnbd ); - + write( client->socket, "0: ", 3 ); status_write( status, client->socket ); status_destroy( status ); @@ -505,7 +520,7 @@ int control_status( return 0; } -void control_client_cleanup(struct control_client* client, +void control_client_cleanup(struct control_client* client, int fatal __attribute__ ((unused)) ) { if (client->socket) { close(client->socket); } @@ -524,7 +539,7 @@ void control_respond(struct control_client * client) error_set_handler((cleanup_handler*) control_client_cleanup, client); - int i, linesc; + int i, linesc; linesc = read_lines_until_blankline(client->socket, 256, &lines); if (linesc < 1) diff --git a/src/control.h b/src/control.h index cfd301f..977c565 100644 --- a/src/control.h +++ b/src/control.h @@ -18,7 +18,8 @@ struct control { const char * socket_name; pthread_t thread; - + + struct self_pipe * open_signal; struct self_pipe * close_signal; /* This is owned by the control object, and used by a @@ -36,7 +37,7 @@ struct control { struct control_client{ int socket; struct flexnbd * flexnbd; - + /* Passed in on creation. We know it's all right to do this * because we know there's only ever one control_client. */ @@ -44,7 +45,7 @@ struct control_client{ }; struct control * control_create( - struct flexnbd *, + struct flexnbd *, const char * control_socket_name ); void control_signal_close( struct control * ); void control_destroy( struct control * ); diff --git a/src/flexnbd.c b/src/flexnbd.c index f1ce5e0..9c34f5a 100644 --- a/src/flexnbd.c +++ b/src/flexnbd.c @@ -235,13 +235,15 @@ int flexnbd_serve( struct flexnbd * flexnbd ) { NULLCHECK( flexnbd ); int success; + struct self_pipe * open_signal = NULL; if ( flexnbd->control ){ debug( "Spawning control thread" ); flexnbd_spawn_control( flexnbd ); + open_signal = flexnbd->control->open_signal; } - success = do_serve( flexnbd->serve ); + success = do_serve( flexnbd->serve, open_signal ); debug("do_serve success is %d", success ); if ( flexnbd->control ) { diff --git a/src/mirror.c b/src/mirror.c index eb25318..e86505a 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -566,13 +566,6 @@ void * mirror_super_runner( void * serve_uncast ) } while ( should_retry && !success ); - serve->mirror = NULL; - serve->mirror_super = NULL; - server_allow_mirror_start( serve ); - - mirror_super_destroy( super ); - debug( "Mirror supervisor done." ); - return NULL; } diff --git a/src/mode.c b/src/mode.c index 808eac7..37a1215 100644 --- a/src/mode.c +++ b/src/mode.c @@ -187,7 +187,6 @@ char * help_help_text = help_help_text_arr; -int do_serve(struct server* params); void do_read(struct mode_readwrite_params* params); void do_write(struct mode_readwrite_params* params); void do_remote_command(char* command, char* mode, int argc, char** argv); diff --git a/src/serve.c b/src/serve.c index fcc4221..ca3022b 100644 --- a/src/serve.c +++ b/src/serve.c @@ -731,7 +731,7 @@ int server_accept( struct server * params ) if ( 0 < signal_fd && FD_ISSET( signal_fd, &fds ) ){ debug( "Stop signal received." ); server_close_clients( params ); - params->success = serve_shutdown_is_graceful( params ); + params->success = params->success && serve_shutdown_is_graceful( params ); should_continue = 0; } @@ -823,6 +823,7 @@ void serve_wait_for_close( struct server * serve ) */ void server_control_arrived( struct server *serve ) { + debug( "server_control_arrived" ); NULLCHECK( serve ); if ( !serve->success ) { @@ -908,6 +909,9 @@ int server_is_mirroring( struct server * serve ) return !!serve->mirror_super; } + +void mirror_super_destroy( struct mirror_super * super ); + /* This must only be called with the start_mirror lock held */ void server_abandon_mirror( struct server * serve ) { @@ -924,6 +928,14 @@ void server_abandon_mirror( struct server * serve ) pthread_t tid = serve->mirror_super->thread; pthread_join( tid, NULL ); debug( "Mirror thread %p pthread_join returned", tid ); + + server_allow_mirror_start( serve ); + mirror_super_destroy( serve->mirror_super ); + + serve->mirror = NULL; + serve->mirror_super = NULL; + + debug( "Mirror supervisor done." ); } } @@ -934,7 +946,7 @@ int server_default_deny( struct server * serve ) } /** Full lifecycle of the server */ -int do_serve(struct server* params) +int do_serve( struct server* params, struct self_pipe * open_signal ) { NULLCHECK( params ); @@ -942,6 +954,11 @@ int do_serve(struct server* params) error_set_handler((cleanup_handler*) serve_cleanup, params); serve_open_server_socket(params); + + /* Only signal that we are open for business once the server + socket is open */ + if ( NULL != open_signal ) { self_pipe_signal( open_signal ); } + serve_init_allocation_map(params); serve_accept_loop(params); success = params->success; diff --git a/src/serve.h b/src/serve.h index 332807b..2d44408 100644 --- a/src/serve.h +++ b/src/serve.h @@ -133,7 +133,7 @@ int server_mirror_can_start( struct server *serve ); void server_unlink( struct server * serve ); -int do_serve( struct server * ); +int do_serve( struct server *, struct self_pipe * ); struct mode_readwrite_params { union mysockaddr connect_to; diff --git a/src/status.c b/src/status.c index 0f53edc..dba9629 100644 --- a/src/status.c +++ b/src/status.c @@ -9,7 +9,7 @@ struct status * status_create( struct server * serve ) status = xmalloc( sizeof( struct status ) ); status->pid = getpid(); - status->has_control = server_is_in_control( serve ); + status->has_control = serve->success; status->is_mirroring = NULL != serve->mirror; return status; diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index 19e0188..e66ae75 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -97,7 +97,9 @@ class ValgrindKillingExecutor when "line" @error.add_line( @text ) if @found when "error", "stack" - @killer.call( @error ) + if @found + @killer.call( @error ) + end when "pid" @error.pid=@text end @@ -326,8 +328,10 @@ module FlexNBD def serve( file, *acl) cmd = serve_cmd( file, acl ) run_serve_cmd( cmd ) + sleep( 0.2 ) until File.exists?( ctrl ) end + def listen(file, *acl) run_serve_cmd( listen_cmd( file, acl ) ) end diff --git a/tests/acceptance/flexnbd/fake_source.rb b/tests/acceptance/flexnbd/fake_source.rb index 1f14719..930c56a 100644 --- a/tests/acceptance/flexnbd/fake_source.rb +++ b/tests/acceptance/flexnbd/fake_source.rb @@ -9,10 +9,16 @@ module FlexNBD def initialize( addr, port, err_msg, source_addr=nil, source_port=0 ) timing_out( 2, err_msg ) do - @sock = if source_addr - TCPSocket.new( addr, port, source_addr, source_port ) - else - TCPSocket.new( addr, port ) + begin + @sock = if source_addr + TCPSocket.new( addr, port, source_addr, source_port ) + else + TCPSocket.new( addr, port ) + end + rescue Errno::ECONNREFUSED + $stderr.puts "Connection refused, retrying" + sleep(0.2) + retry end end end diff --git a/tests/unit/check_status.c b/tests/unit/check_status.c index 3b72c4b..518c630 100644 --- a/tests/unit/check_status.c +++ b/tests/unit/check_status.c @@ -22,7 +22,7 @@ START_TEST( test_gets_has_control ) struct server server; struct status * status; - server.has_control = 1; + server.success = 1; status = status_create( &server ); fail_unless( status->has_control == 1, "has_control wasn't copied" ); @@ -162,7 +162,7 @@ Suite *status_suite(void) int main(void) { int number_failed; - + Suite *s = status_suite(); SRunner *sr = srunner_create(s); srunner_run_all(sr, CK_NORMAL);