From a10adf007c22e5d2a6ebbffe89cd54b1aca29167 Mon Sep 17 00:00:00 2001 From: Alex Young Date: Sun, 15 Jul 2012 19:46:35 +0100 Subject: [PATCH] Switch the mirror commit_signal to an mbox At the moment, a first-pass failed migration will retry. This is wrong, it should abort. However, to make that happen the mirror supervisor needs to know the commit state of the mirror thread. With a self_pipe mirror commit signal that information wasn't there. --- src/mirror.c | 80 ++++++++++++++++++++++++++++------------------------ src/mirror.h | 5 ++-- 2 files changed, 46 insertions(+), 39 deletions(-) diff --git a/src/mirror.c b/src/mirror.c index 10547ca..43ec802 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -36,7 +36,7 @@ struct mirror * mirror_alloc( union mysockaddr * connect_from, int max_Bps, int action_at_finish, - struct self_pipe * commit_signal) + struct mbox * commit_signal) { struct mirror * mirror; @@ -109,7 +109,7 @@ struct mirror * mirror_create( union mysockaddr * connect_from, int max_Bps, int action_at_finish, - struct self_pipe * commit_signal) + struct mbox * commit_signal) { /* FIXME: shouldn't map_fd get closed? */ struct mirror * mirror; @@ -400,14 +400,32 @@ void mirror_run( struct server *serve ) } +void mbox_post_mirror_state( struct mbox * mbox, enum mirror_state st ) +{ + NULLCHECK( mbox ); + enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) ); + + *contents = st; + + mbox_post( mbox, contents ); +} + + void mirror_signal_commit( struct mirror * mirror ) { NULLCHECK( mirror ); - self_pipe_signal( mirror->commit_signal ); + mbox_post_mirror_state( mirror->commit_signal, + mirror_get_state( mirror ) ); } -/** Thread launched to drive mirror process */ +/** Thread launched to drive mirror process + * This is needed for two reasons: firstly, it decouples the mirroring + * from the control thread (although that's less valid with mboxes + * passing state back and forth) and to provide an error context so that + * retries can be cleanly handled without a bespoke error handling + * mechanism. + * */ void* mirror_runner(void* serve_params_uncast) { /* The supervisor thread relies on there not being any ERROR @@ -471,23 +489,24 @@ struct mirror_super * mirror_super_create( connect_to, connect_from, max_Bps, - action_at_finish, - self_pipe_create()) ; + action_at_finish, + mbox_create() ) ; super->state_mbox = mbox_create(); return super; } /* Post the current state of the mirror into super->state_mbox */ -void mirror_super_signal_committed( struct mirror_super * super ) +void mirror_super_signal_committed( + struct mirror_super * super , + enum mirror_state commit_state ) { NULLCHECK( super ); NULLCHECK( super->state_mbox ); - enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) ); - *contents = mirror_get_state( super->mirror ); - - mbox_post( super->state_mbox, contents ); + mbox_post_mirror_state( + super->state_mbox, + commit_state ); } @@ -496,7 +515,7 @@ void mirror_super_destroy( struct mirror_super * super ) NULLCHECK( super ); mbox_destroy( super->state_mbox ); - self_pipe_destroy( super->mirror->commit_signal ); + mbox_destroy( super->mirror->commit_signal ); mirror_destroy( super->mirror ); free( super ); } @@ -516,8 +535,6 @@ void * mirror_super_runner( void * serve_uncast ) int should_retry = 0; int success = 0; - fd_set fds; - int fd_count; struct mirror * mirror = serve->mirror; struct mirror_super * super = serve->mirror_super; @@ -540,34 +557,23 @@ void * mirror_super_runner( void * serve_uncast ) serve), "Failed to create mirror thread"); - debug("Supervisor waiting for commit signal"); - FD_ZERO( &fds ); - self_pipe_fd_set( mirror->commit_signal, &fds ); - /* There's no timeout on this select. This means that - * the mirror thread *must* signal then abort itself if - * it passes the timeout, and it *must* always signal, - * no matter what. - */ - fd_count = select( FD_SETSIZE, &fds, NULL, NULL, NULL ); - if ( 1 == fd_count ) { - debug( "Supervisor got commit signal" ); - if ( 0 == should_retry ) { - should_retry = 1; - /* Only send this signal the first time */ - mirror_super_signal_committed(super); - debug("Mirror supervisor committed"); - } + enum mirror_state * commit_state = + mbox_receive( mirror->commit_signal ); + + debug( "Supervisor got commit signal" ); + if ( 0 == should_retry ) { + should_retry = 1; + /* Only send this signal the first time */ + mirror_super_signal_committed( + super, + *commit_state); + debug("Mirror supervisor committed"); } - else { fatal( "Select failed." ); } - + free( commit_state ); debug("Supervisor waiting for mirror thread" ); pthread_join( mirror->thread, NULL ); - debug( "Clearing the commit signal. If this blocks," - " it's fatal but we can't check in advance." ); - self_pipe_signal_clear( mirror->commit_signal ); - debug( "Commit signal cleared." ); success = MS_DONE == mirror_get_state( mirror ); diff --git a/src/mirror.h b/src/mirror.h index 05da038..f6e8125 100644 --- a/src/mirror.h +++ b/src/mirror.h @@ -9,6 +9,7 @@ #include "self_pipe.h" enum mirror_state; #include "serve.h" +#include "mbox.h" /* MS_CONNECT_TIME_SECS @@ -71,12 +72,12 @@ struct mirror { char *mapped; struct bitset_mapping *dirty_map; - enum mirror_state commit_state; + enum mirror_state commit_state; /* commit_signal is sent immediately after attempting to connect * and checking the remote size, whether successful or not. */ - struct self_pipe * commit_signal; + struct mbox * commit_signal; };