diff --git a/src/control.c b/src/control.c index 7f56e60..d36c25b 100644 --- a/src/control.c +++ b/src/control.c @@ -44,7 +44,9 @@ #include -struct control * control_create(struct flexnbd * flexnbd, const char * csn) +struct control * control_create( + struct flexnbd * flexnbd, + const char * csn) { struct control * control = xmalloc( sizeof( struct control ) ); @@ -53,6 +55,7 @@ struct control * control_create(struct flexnbd * flexnbd, const char * csn) control->flexnbd = flexnbd; control->socket_name = csn; control->close_signal = self_pipe_create(); + control->mirror_state_mbox = mbox_create(); return control; } @@ -70,11 +73,15 @@ void control_destroy( struct control * control ) { NULLCHECK( control ); + mbox_destroy( control->mirror_state_mbox ); self_pipe_destroy( control->close_signal ); free( control ); } -struct control_client * control_client_create( struct flexnbd * flexnbd, int client_fd ) +struct control_client * control_client_create( + struct flexnbd * flexnbd, + int client_fd , + struct mbox * state_mbox ) { NULLCHECK( flexnbd ); @@ -83,6 +90,7 @@ struct control_client * control_client_create( struct flexnbd * flexnbd, int cli control_client->socket = client_fd; control_client->flexnbd = flexnbd; + control_client->mirror_state_mbox = state_mbox; return control_client; } @@ -102,7 +110,10 @@ void control_handle_client( struct control * control, int client_fd ) NULLCHECK( control ); NULLCHECK( control->flexnbd ); struct control_client * control_client = - control_client_create( control->flexnbd, client_fd ); + control_client_create( + control->flexnbd, + client_fd , + control->mirror_state_mbox); /* We intentionally don't spawn a thread for the client here. * This is to avoid having more than one thread potentially @@ -260,6 +271,28 @@ void control_write_mirror_response( enum mirror_state mirror_state, int client_f #undef write_socket +/* Call this in the thread where you want to receive the mirror state */ +enum mirror_state control_client_mirror_wait( + struct control_client* client) +{ + NULLCHECK( client ); + NULLCHECK( client->mirror_state_mbox ); + + struct mbox * mbox = client->mirror_state_mbox; + enum mirror_state mirror_state; + enum mirror_state * contents; + + contents = (enum mirror_state*)mbox_receive( mbox ); + NULLCHECK( contents ); + + mirror_state = *contents; + + free( contents ); + + return mirror_state; +} + + #define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1) /** Command parser to start mirror process from socket input */ int control_mirror(struct control_client* client, int linesc, char** lines) @@ -333,7 +366,8 @@ int control_mirror(struct control_client* client, int linesc, char** lines) connect_to, connect_from, max_Bps , - action_at_finish); + action_at_finish, + client->mirror_state_mbox ); serve->mirror = serve->mirror_super->mirror; FATAL_IF( 0 != pthread_create( @@ -346,7 +380,8 @@ int control_mirror(struct control_client* client, int linesc, char** lines) ); debug("Control thread mirror super waiting"); - enum mirror_state state = mirror_super_wait( serve->mirror_super ); + enum mirror_state state = + control_client_mirror_wait( client ); debug("Control thread writing response"); control_write_mirror_response( state, client->socket ); } diff --git a/src/control.h b/src/control.h index 8a8821c..dd40ba2 100644 --- a/src/control.h +++ b/src/control.h @@ -6,6 +6,7 @@ #include "mirror.h" #include "control.h" #include "flexnbd.h" +#include "mbox.h" struct control { struct flexnbd * flexnbd; @@ -15,14 +16,32 @@ struct control { pthread_t thread; struct self_pipe * close_signal; + + /* This is owned by the control object, and used by a + * mirror_super to communicate the state of a mirror attempt as + * early as feasible. It can't be owned by the mirror_super + * object because the mirror_super object can be freed at any + * time (including while the control_client is waiting on it), + * whereas the control object lasts for the lifetime of the + * process (and we can only have a mirror thread if the control + * thread has started it). + */ + struct mbox * mirror_state_mbox; }; struct control_client{ int socket; struct flexnbd * flexnbd; + + /* Passed in on creation. We know it's all right to do this + * because we know there's only ever one control_client. + */ + struct mbox * mirror_state_mbox; }; -struct control * control_create(struct flexnbd *, const char * control_socket_name); +struct control * control_create( + struct flexnbd *, + const char * control_socket_name ); void control_signal_close( struct control * ); void control_destroy( struct control * ); diff --git a/src/mbox.c b/src/mbox.c index a175e41..3e96785 100644 --- a/src/mbox.c +++ b/src/mbox.c @@ -55,11 +55,11 @@ void * mbox_receive( struct mbox * mbox ) mbox->full = 0; result = mbox->contents; mbox->contents = NULL; + while( 0 != pthread_cond_signal( &mbox->emptied_cond)); } pthread_mutex_unlock( &mbox->mutex ); - return result; } diff --git a/src/mirror.c b/src/mirror.c index fb7c70b..d7404a5 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -481,7 +481,8 @@ struct mirror_super * mirror_super_create( union mysockaddr * connect_to, union mysockaddr * connect_from, int max_Bps, - int action_at_finish) + int action_at_finish, + struct mbox * state_mbox) { struct mirror_super * super = xmalloc( sizeof( struct mirror_super) ); super->mirror = mirror_create( @@ -491,12 +492,12 @@ struct mirror_super * mirror_super_create( max_Bps, action_at_finish, mbox_create() ) ; - super->state_mbox = mbox_create(); + super->state_mbox = state_mbox; return super; } -/* Post the current state of the mirror into super->state_mbox */ +/* Post the current state of the mirror into super->state_mbox.*/ void mirror_super_signal_committed( struct mirror_super * super , enum mirror_state commit_state ) @@ -514,7 +515,6 @@ void mirror_super_destroy( struct mirror_super * super ) { NULLCHECK( super ); - mbox_destroy( super->state_mbox ); mbox_destroy( super->mirror->commit_signal ); mirror_destroy( super->mirror ); free( super ); @@ -612,25 +612,3 @@ void * mirror_super_runner( void * serve_uncast ) } -#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1) - -/* Call this in the thread where you want to receive the mirror state */ -enum mirror_state mirror_super_wait( struct mirror_super * super ) -{ - NULLCHECK( super ); - NULLCHECK( super->state_mbox ); - - struct mbox * mbox = super->state_mbox; - enum mirror_state mirror_state; - enum mirror_state * contents; - - contents = (enum mirror_state*)mbox_receive( mbox ); - NULLCHECK( contents ); - - mirror_state = *contents; - - free(contents); - - return mirror_state; -} - diff --git a/src/mirror.h b/src/mirror.h index f6e8125..4ceb303 100644 --- a/src/mirror.h +++ b/src/mirror.h @@ -100,7 +100,8 @@ struct mirror_super * mirror_super_create( union mysockaddr * connect_to, union mysockaddr * connect_from, int max_Bps, - int action_at_finish); + int action_at_finish, + struct mbox * state_mbox + ); void * mirror_super_runner( void * serve_uncast ); -enum mirror_state mirror_super_wait( struct mirror_super * ); #endif diff --git a/tests/acceptance/fakes/dest/hang_after_connect.rb b/tests/acceptance/fakes/dest/hang_after_connect.rb index 2c3c54f..cb5bcba 100755 --- a/tests/acceptance/fakes/dest/hang_after_connect.rb +++ b/tests/acceptance/fakes/dest/hang_after_connect.rb @@ -4,6 +4,9 @@ # seconds. After that time, the client should have disconnected, # which we can can't effectively check. # +# We also expect the client *not* to reconnect, since it could feed back +# an error. +# # This allows the test runner to check that the command-line sees the # right error message after the timeout time. @@ -19,4 +22,14 @@ client = server.accept( "Client didn't make a connection" ) sleep(FlexNBD::MS_HELLO_TIME_SECS + 1) client.close + +# Invert the sense of the timeout exception, since we *don't* want a +# connection attempt +begin + server.accept( "Expected timeout" ) + fail "Unexpected reconnection" +rescue Timeout::Error + # expected +end + server.close