From c6764b0de1417940dae6a98d76cfe3d29c7fbf4a Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 12 Aug 2013 15:30:21 +0100 Subject: [PATCH] mirror: abandon signals are now honoured outside of the remote end being readable / writable --- src/mirror.c | 63 +++++++++++++++++++++++++++++++++++----------------- src/mirror.h | 8 +++++-- src/serve.c | 15 +++++++------ 3 files changed, 57 insertions(+), 29 deletions(-) diff --git a/src/mirror.c b/src/mirror.c index 076cefa..561e4dc 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -73,6 +73,7 @@ struct mirror_ctrl { ev_io read_watcher; ev_io write_watcher; ev_timer timeout_watcher; + ev_io abandon_watcher; /* Use this to keep track of what we're copying at any moment */ struct xfer xfer; @@ -94,6 +95,12 @@ struct mirror * mirror_alloc( mirror->action_at_finish = action_at_finish; mirror->commit_signal = commit_signal; mirror->commit_state = MS_UNKNOWN; + mirror->abandon_signal = self_pipe_create(); + + if ( mirror->abandon_signal == NULL ) { + warn( "Couldn't create mirror abandon signal" ); + return NULL; + } return mirror; } @@ -195,6 +202,7 @@ struct mirror * mirror_create( void mirror_destroy( struct mirror *mirror ) { NULLCHECK( mirror ); + self_pipe_destroy( mirror->abandon_signal ); free(mirror->connect_to); free(mirror->connect_from); free(mirror->dirty_map); @@ -428,13 +436,6 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents ) return; } - /* TODO: This needs to be its own event, not checked here */ - if ( ctrl->mirror->signal_abandon ) { - debug("Abandon message received" ); - ev_break( loop, EVBREAK_ONE ); - return; - } - debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client ); if ( xfer->written < hdr_size ) { @@ -509,13 +510,6 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents ) return; } - /* TODO: This needs to be its own event, not checked here */ - if ( m->signal_abandon ) { - debug("Abandon message received" ); - ev_break( loop, EVBREAK_ONE ); - return; - } - struct nbd_reply rsp; ssize_t count; uint64_t left = sizeof( struct nbd_reply_raw ) - xfer->read; @@ -645,6 +639,22 @@ void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused) return; } +void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents ) +{ + struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data; + NULLCHECK( ctrl ); + + if ( !(revents & EV_READ ) ) { + warn( "Mirror abandon called but no abandon event signalled" ); + return; + } + + debug( "Abandon message received" ); + self_pipe_signal_clear( ctrl->mirror->abandon_signal ); + ev_break( loop, EVBREAK_ONE ); + return; +} + void mirror_run( struct server *serve ) { NULLCHECK( serve ); @@ -686,6 +696,11 @@ void mirror_run( struct server *serve ) ev_init( &ctrl.timeout_watcher, mirror_timeout_cb ); ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ; + ev_init( &ctrl.abandon_watcher, mirror_abandon_cb ); + ev_io_set( &ctrl.abandon_watcher, ctrl.mirror->abandon_signal->read_fd, EV_READ ); + ctrl.write_watcher.data = (void*) &ctrl; + ev_io_start( ctrl.ev_loop, &ctrl.abandon_watcher ); + ERROR_UNLESS( mirror_setup_next_xfer( &ctrl ), "Couldn't find first transfer for mirror!" @@ -855,7 +870,7 @@ void * mirror_super_runner( void * serve_uncast ) int first_pass = 1; int should_retry = 0; - int success = 0; + int success = 0, abandoned = 0; struct mirror * mirror = serve->mirror; struct mirror_super * super = serve->mirror_super; @@ -895,18 +910,26 @@ void * mirror_super_runner( void * serve_uncast ) debug("Supervisor waiting for mirror thread" ); pthread_join( mirror->thread, NULL ); + /* If we can't connect to the remote end, the watcher for the abandon + * signal never gets installed at the moment, which is why we also check + * it here. */ + abandoned = + mirror_get_state( mirror ) == MS_ABANDONED || + self_pipe_signal_clear( mirror->abandon_signal ); + success = MS_DONE == mirror_get_state( mirror ); + if( success ){ - info( "Mirror supervisor success, exiting" ); } - else if ( mirror->signal_abandon ) { + info( "Mirror supervisor success, exiting" ); + } else if ( abandoned ) { info( "Mirror abandoned" ); should_retry = 0; - } - else if (should_retry){ + } else if ( should_retry ) { info( "Mirror failed, retrying" ); + } else { + info( "Mirror failed before commit, giving up" ); } - else { info( "Mirror failed before commit, giving up" ); } first_pass = 0; diff --git a/src/mirror.h b/src/mirror.h index 53e750d..8986623 100644 --- a/src/mirror.h +++ b/src/mirror.h @@ -52,6 +52,7 @@ enum mirror_state { MS_UNKNOWN, MS_INIT, MS_GO, + MS_ABANDONED, MS_DONE, MS_FAIL_CONNECT, MS_FAIL_REJECTED, @@ -61,8 +62,10 @@ enum mirror_state { struct mirror { pthread_t thread; - /* set to 1, then join thread to make mirror terminate early */ - int signal_abandon; + + /* Signal to this then join the thread if you want to abandon mirroring */ + struct self_pipe * abandon_signal; + union mysockaddr * connect_to; union mysockaddr * connect_from; int client; @@ -126,5 +129,6 @@ struct mirror_super * mirror_super_create( struct mbox * state_mbox ); void * mirror_super_runner( void * serve_uncast ); + #endif diff --git a/src/serve.c b/src/serve.c index 43e8cb2..61e1541 100644 --- a/src/serve.c +++ b/src/serve.c @@ -853,13 +853,14 @@ void server_abandon_mirror( struct server * serve ) NULLCHECK( serve ); if ( serve->mirror_super ) { /* FIXME: AWOOGA! RACE! - * We can set signal_abandon after mirror_super has - * checked it, but before the reset. This would lead to - * a hang. However, mirror_reset doesn't change the - * signal_abandon flag, so it'll just terminate early on - * the next pass. - * */ - serve->mirror->signal_abandon = 1; + * We can set abandon_signal after mirror_super has checked it, but + * before the reset. However, mirror_reset doesn't clear abandon_signal + * so it'll just terminate early on the next pass. */ + ERROR_UNLESS( + self_pipe_signal( serve->mirror->abandon_signal ), + "Failed to signal abandon to mirror" + ); + pthread_t tid = serve->mirror_super->thread; pthread_join( tid, NULL ); debug( "Mirror thread %p pthread_join returned", tid );