mirror: abandon signals are now honoured outside of the remote end being readable / writable
This commit is contained in:
63
src/mirror.c
63
src/mirror.c
@@ -73,6 +73,7 @@ struct mirror_ctrl {
|
|||||||
ev_io read_watcher;
|
ev_io read_watcher;
|
||||||
ev_io write_watcher;
|
ev_io write_watcher;
|
||||||
ev_timer timeout_watcher;
|
ev_timer timeout_watcher;
|
||||||
|
ev_io abandon_watcher;
|
||||||
|
|
||||||
/* Use this to keep track of what we're copying at any moment */
|
/* Use this to keep track of what we're copying at any moment */
|
||||||
struct xfer xfer;
|
struct xfer xfer;
|
||||||
@@ -94,6 +95,12 @@ struct mirror * mirror_alloc(
|
|||||||
mirror->action_at_finish = action_at_finish;
|
mirror->action_at_finish = action_at_finish;
|
||||||
mirror->commit_signal = commit_signal;
|
mirror->commit_signal = commit_signal;
|
||||||
mirror->commit_state = MS_UNKNOWN;
|
mirror->commit_state = MS_UNKNOWN;
|
||||||
|
mirror->abandon_signal = self_pipe_create();
|
||||||
|
|
||||||
|
if ( mirror->abandon_signal == NULL ) {
|
||||||
|
warn( "Couldn't create mirror abandon signal" );
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return mirror;
|
return mirror;
|
||||||
}
|
}
|
||||||
@@ -195,6 +202,7 @@ struct mirror * mirror_create(
|
|||||||
void mirror_destroy( struct mirror *mirror )
|
void mirror_destroy( struct mirror *mirror )
|
||||||
{
|
{
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
|
self_pipe_destroy( mirror->abandon_signal );
|
||||||
free(mirror->connect_to);
|
free(mirror->connect_to);
|
||||||
free(mirror->connect_from);
|
free(mirror->connect_from);
|
||||||
free(mirror->dirty_map);
|
free(mirror->dirty_map);
|
||||||
@@ -428,13 +436,6 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO: This needs to be its own event, not checked here */
|
|
||||||
if ( ctrl->mirror->signal_abandon ) {
|
|
||||||
debug("Abandon message received" );
|
|
||||||
ev_break( loop, EVBREAK_ONE );
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client );
|
debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client );
|
||||||
|
|
||||||
if ( xfer->written < hdr_size ) {
|
if ( xfer->written < hdr_size ) {
|
||||||
@@ -509,13 +510,6 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO: This needs to be its own event, not checked here */
|
|
||||||
if ( m->signal_abandon ) {
|
|
||||||
debug("Abandon message received" );
|
|
||||||
ev_break( loop, EVBREAK_ONE );
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct nbd_reply rsp;
|
struct nbd_reply rsp;
|
||||||
ssize_t count;
|
ssize_t count;
|
||||||
uint64_t left = sizeof( struct nbd_reply_raw ) - xfer->read;
|
uint64_t left = sizeof( struct nbd_reply_raw ) - xfer->read;
|
||||||
@@ -645,6 +639,22 @@ void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||||
|
{
|
||||||
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
||||||
|
NULLCHECK( ctrl );
|
||||||
|
|
||||||
|
if ( !(revents & EV_READ ) ) {
|
||||||
|
warn( "Mirror abandon called but no abandon event signalled" );
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug( "Abandon message received" );
|
||||||
|
self_pipe_signal_clear( ctrl->mirror->abandon_signal );
|
||||||
|
ev_break( loop, EVBREAK_ONE );
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
void mirror_run( struct server *serve )
|
void mirror_run( struct server *serve )
|
||||||
{
|
{
|
||||||
NULLCHECK( serve );
|
NULLCHECK( serve );
|
||||||
@@ -686,6 +696,11 @@ void mirror_run( struct server *serve )
|
|||||||
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
|
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
|
||||||
ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ;
|
ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ;
|
||||||
|
|
||||||
|
ev_init( &ctrl.abandon_watcher, mirror_abandon_cb );
|
||||||
|
ev_io_set( &ctrl.abandon_watcher, ctrl.mirror->abandon_signal->read_fd, EV_READ );
|
||||||
|
ctrl.write_watcher.data = (void*) &ctrl;
|
||||||
|
ev_io_start( ctrl.ev_loop, &ctrl.abandon_watcher );
|
||||||
|
|
||||||
ERROR_UNLESS(
|
ERROR_UNLESS(
|
||||||
mirror_setup_next_xfer( &ctrl ),
|
mirror_setup_next_xfer( &ctrl ),
|
||||||
"Couldn't find first transfer for mirror!"
|
"Couldn't find first transfer for mirror!"
|
||||||
@@ -855,7 +870,7 @@ void * mirror_super_runner( void * serve_uncast )
|
|||||||
|
|
||||||
int first_pass = 1;
|
int first_pass = 1;
|
||||||
int should_retry = 0;
|
int should_retry = 0;
|
||||||
int success = 0;
|
int success = 0, abandoned = 0;
|
||||||
|
|
||||||
struct mirror * mirror = serve->mirror;
|
struct mirror * mirror = serve->mirror;
|
||||||
struct mirror_super * super = serve->mirror_super;
|
struct mirror_super * super = serve->mirror_super;
|
||||||
@@ -895,18 +910,26 @@ void * mirror_super_runner( void * serve_uncast )
|
|||||||
debug("Supervisor waiting for mirror thread" );
|
debug("Supervisor waiting for mirror thread" );
|
||||||
pthread_join( mirror->thread, NULL );
|
pthread_join( mirror->thread, NULL );
|
||||||
|
|
||||||
|
/* If we can't connect to the remote end, the watcher for the abandon
|
||||||
|
* signal never gets installed at the moment, which is why we also check
|
||||||
|
* it here. */
|
||||||
|
abandoned =
|
||||||
|
mirror_get_state( mirror ) == MS_ABANDONED ||
|
||||||
|
self_pipe_signal_clear( mirror->abandon_signal );
|
||||||
|
|
||||||
success = MS_DONE == mirror_get_state( mirror );
|
success = MS_DONE == mirror_get_state( mirror );
|
||||||
|
|
||||||
|
|
||||||
if( success ){
|
if( success ){
|
||||||
info( "Mirror supervisor success, exiting" ); }
|
info( "Mirror supervisor success, exiting" );
|
||||||
else if ( mirror->signal_abandon ) {
|
} else if ( abandoned ) {
|
||||||
info( "Mirror abandoned" );
|
info( "Mirror abandoned" );
|
||||||
should_retry = 0;
|
should_retry = 0;
|
||||||
}
|
} else if ( should_retry ) {
|
||||||
else if (should_retry){
|
|
||||||
info( "Mirror failed, retrying" );
|
info( "Mirror failed, retrying" );
|
||||||
|
} else {
|
||||||
|
info( "Mirror failed before commit, giving up" );
|
||||||
}
|
}
|
||||||
else { info( "Mirror failed before commit, giving up" ); }
|
|
||||||
|
|
||||||
first_pass = 0;
|
first_pass = 0;
|
||||||
|
|
||||||
|
@@ -52,6 +52,7 @@ enum mirror_state {
|
|||||||
MS_UNKNOWN,
|
MS_UNKNOWN,
|
||||||
MS_INIT,
|
MS_INIT,
|
||||||
MS_GO,
|
MS_GO,
|
||||||
|
MS_ABANDONED,
|
||||||
MS_DONE,
|
MS_DONE,
|
||||||
MS_FAIL_CONNECT,
|
MS_FAIL_CONNECT,
|
||||||
MS_FAIL_REJECTED,
|
MS_FAIL_REJECTED,
|
||||||
@@ -61,8 +62,10 @@ enum mirror_state {
|
|||||||
|
|
||||||
struct mirror {
|
struct mirror {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
/* set to 1, then join thread to make mirror terminate early */
|
|
||||||
int signal_abandon;
|
/* Signal to this then join the thread if you want to abandon mirroring */
|
||||||
|
struct self_pipe * abandon_signal;
|
||||||
|
|
||||||
union mysockaddr * connect_to;
|
union mysockaddr * connect_to;
|
||||||
union mysockaddr * connect_from;
|
union mysockaddr * connect_from;
|
||||||
int client;
|
int client;
|
||||||
@@ -126,5 +129,6 @@ struct mirror_super * mirror_super_create(
|
|||||||
struct mbox * state_mbox
|
struct mbox * state_mbox
|
||||||
);
|
);
|
||||||
void * mirror_super_runner( void * serve_uncast );
|
void * mirror_super_runner( void * serve_uncast );
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
15
src/serve.c
15
src/serve.c
@@ -853,13 +853,14 @@ void server_abandon_mirror( struct server * serve )
|
|||||||
NULLCHECK( serve );
|
NULLCHECK( serve );
|
||||||
if ( serve->mirror_super ) {
|
if ( serve->mirror_super ) {
|
||||||
/* FIXME: AWOOGA! RACE!
|
/* FIXME: AWOOGA! RACE!
|
||||||
* We can set signal_abandon after mirror_super has
|
* We can set abandon_signal after mirror_super has checked it, but
|
||||||
* checked it, but before the reset. This would lead to
|
* before the reset. However, mirror_reset doesn't clear abandon_signal
|
||||||
* a hang. However, mirror_reset doesn't change the
|
* so it'll just terminate early on the next pass. */
|
||||||
* signal_abandon flag, so it'll just terminate early on
|
ERROR_UNLESS(
|
||||||
* the next pass.
|
self_pipe_signal( serve->mirror->abandon_signal ),
|
||||||
* */
|
"Failed to signal abandon to mirror"
|
||||||
serve->mirror->signal_abandon = 1;
|
);
|
||||||
|
|
||||||
pthread_t tid = serve->mirror_super->thread;
|
pthread_t tid = serve->mirror_super->thread;
|
||||||
pthread_join( tid, NULL );
|
pthread_join( tid, NULL );
|
||||||
debug( "Mirror thread %p pthread_join returned", tid );
|
debug( "Mirror thread %p pthread_join returned", tid );
|
||||||
|
Reference in New Issue
Block a user