diff --git a/src/mirror.c b/src/mirror.c index 6f6de55..e25103d 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -407,7 +407,7 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl ) int mirror_exceeds_max_bps( struct mirror *mirror ) { uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started; - uint64_t mig_speed = mirror->all_dirty / ( (duration_ms / 1000 ) + 1 ); + uint64_t mig_speed = mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 ); debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second ); @@ -467,7 +467,7 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents ) // or we might clear a bit that had been set by another write. if ( !server_io_locked( ctrl->serve ) ) { server_lock_io( ctrl->serve ); - debug( "In block block" ); + debug( "In lock block" ); } } @@ -623,11 +623,16 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents ) m->this_pass_clean = 0; debug( "mirror start pass=%d", m->pass ); - /* This is the start of our next pass. If it happens to be the - * final pass, we need to lock server I/O so that other writes - * don't race with our call to mirror_setup_next_xfer() below */ if ( m->pass == mirror_last_pass ) { + /* This is the start of our next pass. If it happens to be the + * final pass, we need to wait for all the clients to exit before + * continuing */ debug( "In lock block for last pass" ); + /* FIXME: this could block */ + server_forbid_new_clients( ctrl->serve ); + server_close_clients( ctrl->serve ); + + // FIXME: In theory, we won't need this any more... server_lock_io( ctrl->serve ); } } @@ -708,7 +713,9 @@ void mirror_run( struct server *serve ) NULLCHECK( serve ); NULLCHECK( serve->mirror ); - serve->mirror->migration_started = monotonic_time_ms(); + struct mirror *m = serve->mirror; + + m->migration_started = monotonic_time_ms(); info("Starting mirror" ); /* mirror_setup_next_xfer won't be able to cope with this, so special-case @@ -730,15 +737,16 @@ void mirror_run( struct server *serve ) memset( &ctrl, 0, sizeof( struct mirror_ctrl ) ); ctrl.serve = serve; - ctrl.mirror = serve->mirror; + ctrl.mirror = m; + ctrl.ev_loop = EV_DEFAULT; /* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */ - ev_io_init( &ctrl.read_watcher, mirror_read_cb, ctrl.mirror->client, EV_READ ); + ev_io_init( &ctrl.read_watcher, mirror_read_cb, m->client, EV_READ ); ctrl.read_watcher.data = (void*) &ctrl; - ev_io_init( &ctrl.write_watcher, mirror_write_cb, ctrl.mirror->client, EV_WRITE ); + ev_io_init( &ctrl.write_watcher, mirror_write_cb, m->client, EV_WRITE ); ctrl.write_watcher.data = (void*) &ctrl; ev_init( &ctrl.timeout_watcher, mirror_timeout_cb ); @@ -749,7 +757,7 @@ void mirror_run( struct server *serve ) ctrl.limit_watcher.data = (void*) &ctrl; ev_init( &ctrl.abandon_watcher, mirror_abandon_cb ); - ev_io_set( &ctrl.abandon_watcher, ctrl.mirror->abandon_signal->read_fd, EV_READ ); + ev_io_set( &ctrl.abandon_watcher, m->abandon_signal->read_fd, EV_READ ); ctrl.abandon_watcher.data = (void*) &ctrl; ev_io_start( ctrl.ev_loop, &ctrl.abandon_watcher ); @@ -767,27 +775,33 @@ void mirror_run( struct server *serve ) /* Everything up to here is blocking. We switch to non-blocking so we * can handle rate-limiting and weird error conditions better. TODO: We * should expand the event loop upwards so we can do the same there too */ - sock_set_nonblock( ctrl.mirror->client, 1 ); + sock_set_nonblock( m->client, 1 ); info( "Entering event loop" ); ev_run( ctrl.ev_loop, 0 ); info( "Exited event loop" ); /* Parent code might expect a non-blocking socket */ - sock_set_nonblock( ctrl.mirror->client, 0 ); + sock_set_nonblock( m->client, 0 ); /* Errors in the event loop don't track I/O lock state or try to restore * it to something sane - they just terminate the event loop with state != - * MS_DONE. We unlock here if it's locked. + * MS_DONE. We unlock I/O and re-allow new clients here if necessary. */ if ( server_io_locked( serve ) ) { server_unlock_io( serve ); } - if ( serve->mirror->commit_state != MS_DONE ) { + if ( m->action_at_finish == ACTION_NOTHING || m->commit_state != MS_DONE ) { + server_allow_new_clients( serve ); + } + + /* Returning here says "mirroring complete" to the runner. The error + * call retries the migration from scratch. */ + + if ( m->commit_state != MS_DONE ) { error( "Event loop exited, but mirroring is not complete" ); } - /* returning here says "mirroring complete" to the runner */ return; } diff --git a/src/serve.c b/src/serve.c index 61e1541..394b048 100644 --- a/src/serve.c +++ b/src/serve.c @@ -40,6 +40,7 @@ struct server * server_create ( out->success = success; out->max_nbd_clients = max_nbd_clients; out->use_killswitch = use_killswitch; + out->allow_new_clients = 1; out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) ); out->tcp_backlog = 10; /* does this need to be settable? */ @@ -499,7 +500,7 @@ void server_audit_clients( struct server * serve) * won't have been audited against the later acl. This isn't a * problem though, because in order to update the acl * server_replace_acl must have been called, so the - * server_accept ioop will see a second acl_updated signal as + * server_accept loop will see a second acl_updated signal as * soon as it hits select, and a second audit will be run. */ for( i = 0; i < serve->max_nbd_clients; i++ ) { @@ -664,8 +665,14 @@ int server_accept( struct server * params ) if ( FD_ISSET( params->server_fd, &fds ) ){ client_fd = accept( params->server_fd, &client_address.generic, &socklen ); - debug("Accepted nbd client socket fd %d", client_fd); - accept_nbd_client(params, client_fd, &client_address); + + if ( params->allow_new_clients ) { + debug("Accepted nbd client socket fd %d", client_fd); + accept_nbd_client(params, client_fd, &client_address); + } else { + debug( "New NBD client socket %d not allowed", client_fd ); + sock_try_close( client_fd ); + } } return should_continue; @@ -735,6 +742,44 @@ void serve_init_allocation_map(struct server* params) } +void server_forbid_new_clients( struct server * serve ) +{ + serve->allow_new_clients = 1; + return; +} + +void server_close_and_join_clients( struct server * serve ) +{ + server_close_clients( serve ); +} + +void server_allow_new_clients( struct server * serve ) +{ + serve->allow_new_clients = 0; + return; +} + +void server_join_clients( struct server * serve ) { + int i; + void* status; + + for (i=0; i < serve->max_nbd_clients; i++) { + pthread_t thread_id = serve->nbd_client[i].thread; + int err = 0; + + if (thread_id != 0) { + debug( "joining thread %p", thread_id ); + if ( 0 == (err = pthread_join( thread_id, &status ) ) ) { + serve->nbd_client[i].thread = 0; + } else { + warn( "Error %s (%i) joining thread %p", strerror( err ), err, thread_id ); + } + } + } + + return; +} + /* Tell the server to close all the things. */ void serve_signal_close( struct server * serve ) { @@ -775,12 +820,10 @@ void serve_cleanup(struct server* params, int fatal __attribute__ ((unused)) ) { NULLCHECK( params ); + void* status; info("cleaning up"); - int i; - void* status; - if (params->server_fd){ close(params->server_fd); } /* need to stop background build if we're killed very early on */ @@ -802,14 +845,7 @@ void serve_cleanup(struct server* params, } if ( need_mirror_lock ) { server_unlock_start_mirror( params ); } - for (i=0; i < params->max_nbd_clients; i++) { - pthread_t thread_id = params->nbd_client[i].thread; - - if (thread_id != 0) { - debug("joining thread %p", thread_id); - pthread_join(thread_id, &status); - } - } + server_join_clients( params ); if ( server_start_mirror_locked( params ) ) { server_unlock_start_mirror( params ); diff --git a/src/serve.h b/src/serve.h index 97faa37..3c26e5e 100644 --- a/src/serve.h +++ b/src/serve.h @@ -87,9 +87,12 @@ struct server { int max_nbd_clients; struct client_tbl_entry *nbd_client; - /* Should clients use the killswitch? */ + /** Should clients use the killswitch? */ int use_killswitch; + /** If this isn't set, newly accepted clients will be closed immediately */ + int allow_new_clients; + /* Marker for whether this server has control over the data in * the file, or if we're waiting to receive it from an inbound @@ -135,6 +138,15 @@ void server_prevent_mirror_start( struct server *serve ); void server_allow_mirror_start( struct server *serve ); int server_mirror_can_start( struct server *serve ); +/* These three functions are used by mirror around the final pass, to close + * existing clients and prevent new ones from being around + */ + +void server_forbid_new_clients( struct server *serve ); +void server_close_clients( struct server *serve ); +void server_join_clients( struct server *serve ); +void server_allow_new_clients( struct server *serve ); + void server_unlink( struct server * serve ); int do_serve( struct server *, struct self_pipe * );