flexnbd: Disconnect clients at the start of a mirror last pass

Currently, we prevent clients from processing requests by taking
the server I/O lock. This leads to requests hanging for a long
time before being terminated when the migration completes, which
is not ideal. With this change, at the start of the final pass,
existing clients are closed and any new connections will be closed
immediately (so no NBD server handshake will be seen).

This is part of the work required to remove remove the server I/O
lock completely.
This commit is contained in:
nick
2013-09-10 16:03:26 +01:00
parent 0494295705
commit 487bef1f40
3 changed files with 92 additions and 30 deletions

View File

@@ -407,7 +407,7 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
int mirror_exceeds_max_bps( struct mirror *mirror ) int mirror_exceeds_max_bps( struct mirror *mirror )
{ {
uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started; uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started;
uint64_t mig_speed = mirror->all_dirty / ( (duration_ms / 1000 ) + 1 ); uint64_t mig_speed = mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second ); debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second );
@@ -467,7 +467,7 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
// or we might clear a bit that had been set by another write. // or we might clear a bit that had been set by another write.
if ( !server_io_locked( ctrl->serve ) ) { if ( !server_io_locked( ctrl->serve ) ) {
server_lock_io( ctrl->serve ); server_lock_io( ctrl->serve );
debug( "In block block" ); debug( "In lock block" );
} }
} }
@@ -623,11 +623,16 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
m->this_pass_clean = 0; m->this_pass_clean = 0;
debug( "mirror start pass=%d", m->pass ); debug( "mirror start pass=%d", m->pass );
/* This is the start of our next pass. If it happens to be the
* final pass, we need to lock server I/O so that other writes
* don't race with our call to mirror_setup_next_xfer() below */
if ( m->pass == mirror_last_pass ) { if ( m->pass == mirror_last_pass ) {
/* This is the start of our next pass. If it happens to be the
* final pass, we need to wait for all the clients to exit before
* continuing */
debug( "In lock block for last pass" ); debug( "In lock block for last pass" );
/* FIXME: this could block */
server_forbid_new_clients( ctrl->serve );
server_close_clients( ctrl->serve );
// FIXME: In theory, we won't need this any more...
server_lock_io( ctrl->serve ); server_lock_io( ctrl->serve );
} }
} }
@@ -708,7 +713,9 @@ void mirror_run( struct server *serve )
NULLCHECK( serve ); NULLCHECK( serve );
NULLCHECK( serve->mirror ); NULLCHECK( serve->mirror );
serve->mirror->migration_started = monotonic_time_ms(); struct mirror *m = serve->mirror;
m->migration_started = monotonic_time_ms();
info("Starting mirror" ); info("Starting mirror" );
/* mirror_setup_next_xfer won't be able to cope with this, so special-case /* mirror_setup_next_xfer won't be able to cope with this, so special-case
@@ -730,15 +737,16 @@ void mirror_run( struct server *serve )
memset( &ctrl, 0, sizeof( struct mirror_ctrl ) ); memset( &ctrl, 0, sizeof( struct mirror_ctrl ) );
ctrl.serve = serve; ctrl.serve = serve;
ctrl.mirror = serve->mirror; ctrl.mirror = m;
ctrl.ev_loop = EV_DEFAULT; ctrl.ev_loop = EV_DEFAULT;
/* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */ /* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */
ev_io_init( &ctrl.read_watcher, mirror_read_cb, ctrl.mirror->client, EV_READ ); ev_io_init( &ctrl.read_watcher, mirror_read_cb, m->client, EV_READ );
ctrl.read_watcher.data = (void*) &ctrl; ctrl.read_watcher.data = (void*) &ctrl;
ev_io_init( &ctrl.write_watcher, mirror_write_cb, ctrl.mirror->client, EV_WRITE ); ev_io_init( &ctrl.write_watcher, mirror_write_cb, m->client, EV_WRITE );
ctrl.write_watcher.data = (void*) &ctrl; ctrl.write_watcher.data = (void*) &ctrl;
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb ); ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
@@ -749,7 +757,7 @@ void mirror_run( struct server *serve )
ctrl.limit_watcher.data = (void*) &ctrl; ctrl.limit_watcher.data = (void*) &ctrl;
ev_init( &ctrl.abandon_watcher, mirror_abandon_cb ); ev_init( &ctrl.abandon_watcher, mirror_abandon_cb );
ev_io_set( &ctrl.abandon_watcher, ctrl.mirror->abandon_signal->read_fd, EV_READ ); ev_io_set( &ctrl.abandon_watcher, m->abandon_signal->read_fd, EV_READ );
ctrl.abandon_watcher.data = (void*) &ctrl; ctrl.abandon_watcher.data = (void*) &ctrl;
ev_io_start( ctrl.ev_loop, &ctrl.abandon_watcher ); ev_io_start( ctrl.ev_loop, &ctrl.abandon_watcher );
@@ -767,27 +775,33 @@ void mirror_run( struct server *serve )
/* Everything up to here is blocking. We switch to non-blocking so we /* Everything up to here is blocking. We switch to non-blocking so we
* can handle rate-limiting and weird error conditions better. TODO: We * can handle rate-limiting and weird error conditions better. TODO: We
* should expand the event loop upwards so we can do the same there too */ * should expand the event loop upwards so we can do the same there too */
sock_set_nonblock( ctrl.mirror->client, 1 ); sock_set_nonblock( m->client, 1 );
info( "Entering event loop" ); info( "Entering event loop" );
ev_run( ctrl.ev_loop, 0 ); ev_run( ctrl.ev_loop, 0 );
info( "Exited event loop" ); info( "Exited event loop" );
/* Parent code might expect a non-blocking socket */ /* Parent code might expect a non-blocking socket */
sock_set_nonblock( ctrl.mirror->client, 0 ); sock_set_nonblock( m->client, 0 );
/* Errors in the event loop don't track I/O lock state or try to restore /* Errors in the event loop don't track I/O lock state or try to restore
* it to something sane - they just terminate the event loop with state != * it to something sane - they just terminate the event loop with state !=
* MS_DONE. We unlock here if it's locked. * MS_DONE. We unlock I/O and re-allow new clients here if necessary.
*/ */
if ( server_io_locked( serve ) ) { if ( server_io_locked( serve ) ) {
server_unlock_io( serve ); server_unlock_io( serve );
} }
if ( serve->mirror->commit_state != MS_DONE ) { if ( m->action_at_finish == ACTION_NOTHING || m->commit_state != MS_DONE ) {
server_allow_new_clients( serve );
}
/* Returning here says "mirroring complete" to the runner. The error
* call retries the migration from scratch. */
if ( m->commit_state != MS_DONE ) {
error( "Event loop exited, but mirroring is not complete" ); error( "Event loop exited, but mirroring is not complete" );
} }
/* returning here says "mirroring complete" to the runner */
return; return;
} }

View File

@@ -40,6 +40,7 @@ struct server * server_create (
out->success = success; out->success = success;
out->max_nbd_clients = max_nbd_clients; out->max_nbd_clients = max_nbd_clients;
out->use_killswitch = use_killswitch; out->use_killswitch = use_killswitch;
out->allow_new_clients = 1;
out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) ); out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) );
out->tcp_backlog = 10; /* does this need to be settable? */ out->tcp_backlog = 10; /* does this need to be settable? */
@@ -499,7 +500,7 @@ void server_audit_clients( struct server * serve)
* won't have been audited against the later acl. This isn't a * won't have been audited against the later acl. This isn't a
* problem though, because in order to update the acl * problem though, because in order to update the acl
* server_replace_acl must have been called, so the * server_replace_acl must have been called, so the
* server_accept ioop will see a second acl_updated signal as * server_accept loop will see a second acl_updated signal as
* soon as it hits select, and a second audit will be run. * soon as it hits select, and a second audit will be run.
*/ */
for( i = 0; i < serve->max_nbd_clients; i++ ) { for( i = 0; i < serve->max_nbd_clients; i++ ) {
@@ -664,8 +665,14 @@ int server_accept( struct server * params )
if ( FD_ISSET( params->server_fd, &fds ) ){ if ( FD_ISSET( params->server_fd, &fds ) ){
client_fd = accept( params->server_fd, &client_address.generic, &socklen ); client_fd = accept( params->server_fd, &client_address.generic, &socklen );
debug("Accepted nbd client socket fd %d", client_fd);
accept_nbd_client(params, client_fd, &client_address); if ( params->allow_new_clients ) {
debug("Accepted nbd client socket fd %d", client_fd);
accept_nbd_client(params, client_fd, &client_address);
} else {
debug( "New NBD client socket %d not allowed", client_fd );
sock_try_close( client_fd );
}
} }
return should_continue; return should_continue;
@@ -735,6 +742,44 @@ void serve_init_allocation_map(struct server* params)
} }
void server_forbid_new_clients( struct server * serve )
{
serve->allow_new_clients = 1;
return;
}
void server_close_and_join_clients( struct server * serve )
{
server_close_clients( serve );
}
void server_allow_new_clients( struct server * serve )
{
serve->allow_new_clients = 0;
return;
}
void server_join_clients( struct server * serve ) {
int i;
void* status;
for (i=0; i < serve->max_nbd_clients; i++) {
pthread_t thread_id = serve->nbd_client[i].thread;
int err = 0;
if (thread_id != 0) {
debug( "joining thread %p", thread_id );
if ( 0 == (err = pthread_join( thread_id, &status ) ) ) {
serve->nbd_client[i].thread = 0;
} else {
warn( "Error %s (%i) joining thread %p", strerror( err ), err, thread_id );
}
}
}
return;
}
/* Tell the server to close all the things. */ /* Tell the server to close all the things. */
void serve_signal_close( struct server * serve ) void serve_signal_close( struct server * serve )
{ {
@@ -775,12 +820,10 @@ void serve_cleanup(struct server* params,
int fatal __attribute__ ((unused)) ) int fatal __attribute__ ((unused)) )
{ {
NULLCHECK( params ); NULLCHECK( params );
void* status;
info("cleaning up"); info("cleaning up");
int i;
void* status;
if (params->server_fd){ close(params->server_fd); } if (params->server_fd){ close(params->server_fd); }
/* need to stop background build if we're killed very early on */ /* need to stop background build if we're killed very early on */
@@ -802,14 +845,7 @@ void serve_cleanup(struct server* params,
} }
if ( need_mirror_lock ) { server_unlock_start_mirror( params ); } if ( need_mirror_lock ) { server_unlock_start_mirror( params ); }
for (i=0; i < params->max_nbd_clients; i++) { server_join_clients( params );
pthread_t thread_id = params->nbd_client[i].thread;
if (thread_id != 0) {
debug("joining thread %p", thread_id);
pthread_join(thread_id, &status);
}
}
if ( server_start_mirror_locked( params ) ) { if ( server_start_mirror_locked( params ) ) {
server_unlock_start_mirror( params ); server_unlock_start_mirror( params );

View File

@@ -87,9 +87,12 @@ struct server {
int max_nbd_clients; int max_nbd_clients;
struct client_tbl_entry *nbd_client; struct client_tbl_entry *nbd_client;
/* Should clients use the killswitch? */ /** Should clients use the killswitch? */
int use_killswitch; int use_killswitch;
/** If this isn't set, newly accepted clients will be closed immediately */
int allow_new_clients;
/* Marker for whether this server has control over the data in /* Marker for whether this server has control over the data in
* the file, or if we're waiting to receive it from an inbound * the file, or if we're waiting to receive it from an inbound
@@ -135,6 +138,15 @@ void server_prevent_mirror_start( struct server *serve );
void server_allow_mirror_start( struct server *serve ); void server_allow_mirror_start( struct server *serve );
int server_mirror_can_start( struct server *serve ); int server_mirror_can_start( struct server *serve );
/* These three functions are used by mirror around the final pass, to close
* existing clients and prevent new ones from being around
*/
void server_forbid_new_clients( struct server *serve );
void server_close_clients( struct server *serve );
void server_join_clients( struct server *serve );
void server_allow_new_clients( struct server *serve );
void server_unlink( struct server * serve ); void server_unlink( struct server * serve );
int do_serve( struct server *, struct self_pipe * ); int do_serve( struct server *, struct self_pipe * );