Switch the mirror commit_signal to an mbox

At the moment, a first-pass failed migration will retry. This is wrong,
it should abort.  However, to make that happen the mirror supervisor
needs to know the commit state of the mirror thread.  With a self_pipe
mirror commit signal that information wasn't there.
This commit is contained in:
Alex Young
2012-07-15 19:46:35 +01:00
parent 5794913fdf
commit a10adf007c
2 changed files with 46 additions and 39 deletions

View File

@@ -36,7 +36,7 @@ struct mirror * mirror_alloc(
union mysockaddr * connect_from, union mysockaddr * connect_from,
int max_Bps, int max_Bps,
int action_at_finish, int action_at_finish,
struct self_pipe * commit_signal) struct mbox * commit_signal)
{ {
struct mirror * mirror; struct mirror * mirror;
@@ -109,7 +109,7 @@ struct mirror * mirror_create(
union mysockaddr * connect_from, union mysockaddr * connect_from,
int max_Bps, int max_Bps,
int action_at_finish, int action_at_finish,
struct self_pipe * commit_signal) struct mbox * commit_signal)
{ {
/* FIXME: shouldn't map_fd get closed? */ /* FIXME: shouldn't map_fd get closed? */
struct mirror * mirror; struct mirror * mirror;
@@ -400,14 +400,32 @@ void mirror_run( struct server *serve )
} }
void mbox_post_mirror_state( struct mbox * mbox, enum mirror_state st )
{
NULLCHECK( mbox );
enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) );
*contents = st;
mbox_post( mbox, contents );
}
void mirror_signal_commit( struct mirror * mirror ) void mirror_signal_commit( struct mirror * mirror )
{ {
NULLCHECK( mirror ); NULLCHECK( mirror );
self_pipe_signal( mirror->commit_signal ); mbox_post_mirror_state( mirror->commit_signal,
mirror_get_state( mirror ) );
} }
/** Thread launched to drive mirror process */ /** Thread launched to drive mirror process
* This is needed for two reasons: firstly, it decouples the mirroring
* from the control thread (although that's less valid with mboxes
* passing state back and forth) and to provide an error context so that
* retries can be cleanly handled without a bespoke error handling
* mechanism.
* */
void* mirror_runner(void* serve_params_uncast) void* mirror_runner(void* serve_params_uncast)
{ {
/* The supervisor thread relies on there not being any ERROR /* The supervisor thread relies on there not being any ERROR
@@ -472,22 +490,23 @@ struct mirror_super * mirror_super_create(
connect_from, connect_from,
max_Bps, max_Bps,
action_at_finish, action_at_finish,
self_pipe_create()) ; mbox_create() ) ;
super->state_mbox = mbox_create(); super->state_mbox = mbox_create();
return super; return super;
} }
/* Post the current state of the mirror into super->state_mbox */ /* Post the current state of the mirror into super->state_mbox */
void mirror_super_signal_committed( struct mirror_super * super ) void mirror_super_signal_committed(
struct mirror_super * super ,
enum mirror_state commit_state )
{ {
NULLCHECK( super ); NULLCHECK( super );
NULLCHECK( super->state_mbox ); NULLCHECK( super->state_mbox );
enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) );
*contents = mirror_get_state( super->mirror ); mbox_post_mirror_state(
super->state_mbox,
mbox_post( super->state_mbox, contents ); commit_state );
} }
@@ -496,7 +515,7 @@ void mirror_super_destroy( struct mirror_super * super )
NULLCHECK( super ); NULLCHECK( super );
mbox_destroy( super->state_mbox ); mbox_destroy( super->state_mbox );
self_pipe_destroy( super->mirror->commit_signal ); mbox_destroy( super->mirror->commit_signal );
mirror_destroy( super->mirror ); mirror_destroy( super->mirror );
free( super ); free( super );
} }
@@ -516,8 +535,6 @@ void * mirror_super_runner( void * serve_uncast )
int should_retry = 0; int should_retry = 0;
int success = 0; int success = 0;
fd_set fds;
int fd_count;
struct mirror * mirror = serve->mirror; struct mirror * mirror = serve->mirror;
struct mirror_super * super = serve->mirror_super; struct mirror_super * super = serve->mirror_super;
@@ -540,34 +557,23 @@ void * mirror_super_runner( void * serve_uncast )
serve), serve),
"Failed to create mirror thread"); "Failed to create mirror thread");
debug("Supervisor waiting for commit signal"); debug("Supervisor waiting for commit signal");
FD_ZERO( &fds ); enum mirror_state * commit_state =
self_pipe_fd_set( mirror->commit_signal, &fds ); mbox_receive( mirror->commit_signal );
/* There's no timeout on this select. This means that
* the mirror thread *must* signal then abort itself if
* it passes the timeout, and it *must* always signal,
* no matter what.
*/
fd_count = select( FD_SETSIZE, &fds, NULL, NULL, NULL );
if ( 1 == fd_count ) {
debug( "Supervisor got commit signal" ); debug( "Supervisor got commit signal" );
if ( 0 == should_retry ) { if ( 0 == should_retry ) {
should_retry = 1; should_retry = 1;
/* Only send this signal the first time */ /* Only send this signal the first time */
mirror_super_signal_committed(super); mirror_super_signal_committed(
super,
*commit_state);
debug("Mirror supervisor committed"); debug("Mirror supervisor committed");
} }
} free( commit_state );
else { fatal( "Select failed." ); }
debug("Supervisor waiting for mirror thread" ); debug("Supervisor waiting for mirror thread" );
pthread_join( mirror->thread, NULL ); pthread_join( mirror->thread, NULL );
debug( "Clearing the commit signal. If this blocks,"
" it's fatal but we can't check in advance." );
self_pipe_signal_clear( mirror->commit_signal );
debug( "Commit signal cleared." );
success = MS_DONE == mirror_get_state( mirror ); success = MS_DONE == mirror_get_state( mirror );

View File

@@ -9,6 +9,7 @@
#include "self_pipe.h" #include "self_pipe.h"
enum mirror_state; enum mirror_state;
#include "serve.h" #include "serve.h"
#include "mbox.h"
/* MS_CONNECT_TIME_SECS /* MS_CONNECT_TIME_SECS
@@ -76,7 +77,7 @@ struct mirror {
/* commit_signal is sent immediately after attempting to connect /* commit_signal is sent immediately after attempting to connect
* and checking the remote size, whether successful or not. * and checking the remote size, whether successful or not.
*/ */
struct self_pipe * commit_signal; struct mbox * commit_signal;
}; };