From c2df38c9d3693cdb01ccc1533ae1c5300a251c57 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 9 Aug 2013 17:02:10 +0100 Subject: [PATCH] mirror: Use libev to provide an event loop inside the mirror thread We're doing this so we can implement bandwidth controls sanely. --- Rakefile | 2 +- debian/control | 4 +- src/mirror.c | 545 ++++++++++++++++++++++++++++++++++++++----------- src/mirror.h | 2 +- 4 files changed, 434 insertions(+), 119 deletions(-) diff --git a/Rakefile b/Rakefile index 0a35243..2b90f0d 100644 --- a/Rakefile +++ b/Rakefile @@ -22,7 +22,7 @@ TEST_SOURCES = FileList['tests/unit/*.c'] TEST_OBJECTS = TEST_SOURCES.pathmap( "%{^tests/unit,build/tests}X.o" ) LIBS = %w( pthread ) -LDFLAGS = ["-lrt"] +LDFLAGS = ["-lrt -lev"] CCFLAGS = %w( -D_GNU_SOURCE=1 -Wall diff --git a/debian/control b/debian/control index 969fa73..2578469 100644 --- a/debian/control +++ b/debian/control @@ -2,13 +2,13 @@ Source: flexnbd Section: unknown Priority: extra Maintainer: Alex Young -Build-Depends: cdbs, debhelper (>= 7.0.50), ruby, rake, gcc +Build-Depends: cdbs, debhelper (>= 7.0.50), ruby, rake, gcc, libev-dev Standards-Version: 3.8.1 Homepage: http://bigv.io/ Package: flexnbd Architecture: any -Depends: ${shlibs:Depends}, ${misc:Depends} +Depends: ${shlibs:Depends}, ${misc:Depends}, libev3 Description: FlexNBD server An NBD server offering push-mirroring and intelligent sparse file handling diff --git a/src/mirror.c b/src/mirror.c index 9bee6e2..076cefa 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -19,18 +19,64 @@ #include "serve.h" #include "util.h" #include "ioutil.h" +#include "sockutil.h" #include "parse.h" #include "readwrite.h" #include "bitset.h" #include "self_pipe.h" #include "status.h" - #include #include #include #include #include +#include + +/* compat with older libev */ +#ifndef EVBREAK_ONE + +#define ev_run( loop, flags ) ev_loop( loop, flags ) + +#define ev_break(loop, how) ev_unloop( loop, how ) + +#define EVBREAK_ONE EVUNLOOP_ONE +#define EVBREAK_ALL EVUNLOOP_ALL + +#endif + +/* We use this to keep track of the socket request data we need to send */ +struct xfer { + /* Store the bytes we need to send before the data, or receive back */ + union { + struct nbd_request_raw req_raw; + struct nbd_reply_raw rsp_raw; + } hdr; + + /* what in mirror->mapped we should write, and how much of it we've done */ + uint64_t from; + uint64_t len; + uint64_t written; + + /* number of bytes of response read */ + uint64_t read; + + +}; + +struct mirror_ctrl { + struct server *serve; + struct mirror *mirror; + + /* libev stuff */ + struct ev_loop *ev_loop; + ev_io read_watcher; + ev_io write_watcher; + ev_timer timeout_watcher; + + /* Use this to keep track of what we're copying at any moment */ + struct xfer xfer; +}; struct mirror * mirror_alloc( union mysockaddr * connect_to, @@ -69,6 +115,8 @@ enum mirror_state mirror_get_state( struct mirror * mirror ) return mirror->commit_state; } +#define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state + void mirror_init( struct mirror * mirror, const char * filename ) { @@ -95,7 +143,6 @@ void mirror_init( struct mirror * mirror, const char * filename ) ); mirror->dirty_map = bitset_alloc(size, 4096); - } @@ -105,13 +152,18 @@ void mirror_reset( struct mirror * mirror ) NULLCHECK( mirror ); NULLCHECK( mirror->dirty_map ); mirror_set_state( mirror, MS_INIT ); + + /* See the caveats in mirror_run if you change this! */ bitset_set(mirror->dirty_map); + mirror->all_dirty = 0; mirror->all_clean = 0; mirror->pass = 0; mirror->this_pass_dirty = 0; mirror->this_pass_clean = 0; mirror->migration_started = 0; + + return; } @@ -162,83 +214,7 @@ static const unsigned int mirror_last_pass_after_bytes_written = 100<<20; * cause the I/O to freeze, however many bytes are left to copy. */ static const int mirror_maximum_passes = 7; - - -/* A single mirror pass over the disc, optionally locking IO around the - * transfer. - */ -int mirror_pass(struct server * serve, int is_last_pass, uint64_t *written) -{ - uint64_t current = 0; - int success = 1; - struct bitset_mapping *map = serve->mirror->dirty_map; - struct mirror * m = serve->mirror; - *written = 0; - - - while (current < serve->size) { - uint64_t run = bitset_run_count(map, current, mirror_longest_write); - - if ( current + run > serve->size ) { - debug( - "Size not divisible by %i, adjusting final block", - block_allocation_resolution - ); - run = serve->size - current; - } - - debug("mirror current=%"PRIu64", run=%"PRIu64, current, run); - - /* FIXME: we could avoid sending sparse areas of the - * disc here, and probably save a lot of bandwidth and - * time (if we know the destination starts off zeroed). - */ - if (bitset_is_set_at(map, current)) { - /* We've found a dirty area, send it */ - debug("^^^ writing"); - - /* We need to stop the main thread from working - * because it might corrupt the dirty map. This - * is likely to slow things down but will be - * safe. - */ - if (!is_last_pass) { server_lock_io( serve ); } - { - debug("in lock block"); - /** FIXME: do something useful with bytes/second */ - - /** FIXME: error handling code here won't unlock */ - socket_nbd_write( serve->mirror->client, - current, - run, - 0, - serve->mirror->mapped + current, - MS_REQUEST_LIMIT_SECS); - - /* now mark it clean */ - bitset_clear_range(map, current, run); - debug("leaving lock block"); - } - if (!is_last_pass) { server_unlock_io( serve ); } - - m->this_pass_dirty += run; - m->all_dirty += run; - *written += run; - } else { - m->this_pass_clean += run; - m->all_clean += run; - } - current += run; - - if (serve->mirror->signal_abandon) { - debug("Abandon message received" ); - success = 0; - break; - } - } - - return success; -} +#define mirror_last_pass (mirror_maximum_passes - 1) /* THIS FUNCTION MUST ONLY BE CALLED WITH THE SERVER'S IO LOCKED. */ @@ -295,7 +271,6 @@ void mirror_cleanup( struct server * serve, } - int mirror_connect( struct mirror * mirror, off64_t local_size ) { struct sockaddr * connect_from = NULL; @@ -362,47 +337,388 @@ int mirror_should_quit( struct mirror * mirror ) } } +/* Iterates through the bitmap, finding a dirty run to form the basis of the + * next transfer, then puts it together. */ +int mirror_setup_next_xfer( struct mirror_ctrl *ctrl ) +{ + struct mirror* mirror = ctrl->mirror; + uint64_t current, run, size = ctrl->serve->size; + int found = 0; + + do { + int run_is_set = 0; + current = mirror->this_pass_dirty + mirror->this_pass_clean; + + run = bitset_run_count_ex( + mirror->dirty_map, current, mirror_longest_write, &run_is_set + ); + + if ( current + run > size ) { + debug( + "Size not divisible by %i, adjusting final block", + block_allocation_resolution + ); + run = size - current; + } + + /* FIXME: we could avoid sending sparse areas of the disc here, and + * probably save a lot of bandwidth and time (if we know the destination + * starts off zeroed). */ + if ( run_is_set ) { + found = 1; + } else { + mirror->this_pass_clean += run; + mirror->all_clean += run; + } + } while ( !found && current + run < size ); + + /* current and run specify our next transfer */ + if ( !found ) { + return 0; + } + debug( "Next dirty block: current=%"PRIu64", run=%"PRIu64, current, run ); + struct nbd_request req = { + .magic = REQUEST_MAGIC, + .type = REQUEST_WRITE, + .handle = ".MIRROR.", + .from = current, + .len = run + }; + nbd_h2r_request( &req, &ctrl->xfer.hdr.req_raw ); + + ctrl->xfer.from = current; + ctrl->xfer.len = run; + + ctrl->xfer.written = 0; + ctrl->xfer.read = 0; + + return 1; +} + +// ONLY CALL THIS WHEN SERVER IO IS LOCKED +void mirror_complete( struct server *serve ) +{ + /* FIXME: Pretty sure this is broken, if action != !QUIT. Just moving code + * around for now, can fix it later. Action is always quit in production */ + if ( mirror_should_quit( serve->mirror ) ) { + debug("exit!"); + /* FIXME: This depends on blocking I/O right now, so make sure we are */ + sock_set_nonblock( serve->mirror->client, 0 ); + mirror_on_exit( serve ); + info("Server closed, quitting after successful migration"); + } + + mirror_set_state( serve->mirror, MS_DONE ); + return; +} + +static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents ) +{ + struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data; + NULLCHECK( ctrl ); + + struct xfer *xfer = &ctrl->xfer; + + size_t to_write, hdr_size = sizeof( struct nbd_request_raw ); + char *data_loc; + ssize_t count; + + if ( !( revents & EV_WRITE ) ) { + warn( "No write event signalled in mirror write callback" ); + return; + } + + /* TODO: This needs to be its own event, not checked here */ + if ( ctrl->mirror->signal_abandon ) { + debug("Abandon message received" ); + ev_break( loop, EVBREAK_ONE ); + return; + } + + debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client ); + + if ( xfer->written < hdr_size ) { + data_loc = ( (char*) &xfer->hdr.req_raw ) + ctrl->xfer.written; + to_write = hdr_size - xfer->written; + } else { + data_loc = ctrl->mirror->mapped + xfer->from + ( xfer->written - hdr_size ); + to_write = xfer->len - ( ctrl->xfer.written - hdr_size ); + + // If we're in the last pass, we'll be locked anyway. If we're not in + // the last pass, we want to be locked for every write() call that + // we issue, to avoid the blocks being updated while we work. In + // particular, bitset_run_clear() must be called while the I/O is locked + // or we might clear a bit that had been set by another write. + if ( !server_io_locked( ctrl->serve ) ) { + server_lock_io( ctrl->serve ); + debug( "In block block" ); + } + + } + + // Actually read some bytes + if ( ( count = write( ctrl->mirror->client, data_loc, to_write ) ) < 0 ) { + if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) { + warn( SHOW_ERRNO( "Couldn't write to listener" ) ); + ev_break( loop, EVBREAK_ONE ); + } + return; + } + debug( "Wrote %"PRIu64" bytes", count ); + debug( "to_write was %"PRIu64", xfer->written was %"PRIu64, to_write, xfer->written ); + ctrl->xfer.written += count; + + // We write some bytes, so reset the timer + ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher ); + + // All bytes written, so now we need to read the NBD reply back. + if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) { + + // We can, however, clear the run here. If it turns out that the + // NBD request has been rejected, we're discarding it anyway, so the + // wrong data won't get used. If the request is a success, any blocks + // written to while waiting for the reply will be copied in the next + // pass; if it's the final pass, I/O remains locked. + debug( "Clearing bitset from=%"PRIu64" run=%"PRIu64", ctr->xfer.from, ctrl->xfer.len" ); + bitset_clear_range( ctrl->mirror->dirty_map, ctrl->xfer.from, ctrl->xfer.len ); + + if ( ctrl->mirror->pass != mirror_last_pass ) { + debug( "Leaving lock block" ); + server_unlock_io( ctrl->serve ); + } + ev_io_start( loop, &ctrl->read_watcher ); + ev_io_stop( loop, &ctrl->write_watcher ); + } + + return; +} + +static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents ) +{ + struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data; + NULLCHECK( ctrl ); + + struct mirror *m = ctrl->mirror; + NULLCHECK( m ); + + struct xfer *xfer = &ctrl->xfer; + NULLCHECK( xfer ); + + if ( !( revents & EV_READ ) ) { + warn( "No read event signalled in mirror read callback" ); + return; + } + + /* TODO: This needs to be its own event, not checked here */ + if ( m->signal_abandon ) { + debug("Abandon message received" ); + ev_break( loop, EVBREAK_ONE ); + return; + } + + struct nbd_reply rsp; + ssize_t count; + uint64_t left = sizeof( struct nbd_reply_raw ) - xfer->read; + + debug( "Mirror read callback invoked with events %d. fd:%i", revents, m->client ); + + /* Start / continue reading the NBD response from the mirror. */ + if ( ( count = read( m->client, ((void*) &xfer->hdr.rsp_raw) + xfer->read, left ) ) < 0 ) { + if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) { + warn( SHOW_ERRNO( "Couldn't read from listener" ) ); + ev_break( loop, EVBREAK_ONE ); + } + debug( SHOW_ERRNO( "Couldn't read from listener (non-scary)" ) ); + return; + } + + info( "count is %li, left was %"PRIu64, count, left ); + + if ( count == 0 ) { + warn( "EOF reading response from server!" ); + ev_break( loop, EVBREAK_ONE ); + return; + } + + // We read some bytes, so reset the timer + ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher ); + + debug( "Read %"PRIu64" bytes", count ); + debug( "left was %"PRIu64", xfer->read was %"PRIu64, left, xfer->read ); + xfer->read += count; + + if ( xfer->read < sizeof( struct nbd_reply_raw ) ) { + // Haven't read the whole response yet + return; + } + + nbd_r2h_reply( &xfer->hdr.rsp_raw, &rsp ); + + // validate reply, break event loop if bad + if ( rsp.magic != REPLY_MAGIC ) { + warn( "Bad reply magic from listener" ); + ev_break( loop, EVBREAK_ONE ); + return; + } + + if ( rsp.error != 0 ) { + warn( "Error returned from listener: %i", rsp.error ); + ev_break( loop, EVBREAK_ONE ); + return; + } + + if ( memcmp( ".MIRROR.", &rsp.handle[0], 8 ) != 0 ) { + warn( "Bad handle returned from listener" ); + ev_break( loop, EVBREAK_ONE ); + return; + } + + /* transfer was completed, so now we need to either set up the next + * transfer of this pass, set up the first transfer of the next pass, or + * complete the migration */ + m->this_pass_dirty += xfer->len; + m->all_dirty += xfer->len; + xfer->read = 0; + xfer->written = 0; + + /* This next bit could take a little while, which is fine */ + ev_timer_stop( ctrl->ev_loop, &ctrl->timeout_watcher ); + + do { + // This pass complete + if ( m->this_pass_dirty + m->this_pass_clean == ctrl->serve->size ) { + debug( "Pass %d completed", m->pass ); + /* Set up the next transfer, which may be n+1 in the current pass, + * or 0 in a new pass. If we can't find another transfer to do, that + * means the pass is complete. Advance pass and re-run the end-of- + * pass logic to complete migration ( pass == mirror_last_pass ), or + * move onto the last pass ( pass < mirror_last_pass, by virtue of + * this_pass_dirty being 0 ). + */ + + // last pass completed + if ( m->pass >= mirror_last_pass ) { + /* This was the last pass, so finish. */ + mirror_complete( ctrl->serve ); + ev_break( loop, EVBREAK_ONE ); + return; + } + + // this was not the last pass - set up for the next run. + if ( m->this_pass_dirty < mirror_last_pass_after_bytes_written ) { + /* Quiet disc, so skip to the final pass */ + m->pass = mirror_last_pass; + } else { + m->pass++; + } + // FIXME: Can status race with us if it inspects state here? + m->this_pass_dirty = 0; + m->this_pass_clean = 0; + + debug( "mirror start pass=%d", m->pass ); + /* This is the start of our next pass. If it happens to be the + * final pass, we need to lock server I/O so that other writes + * don't race with our call to mirror_setup_next_xfer() below */ + if ( m->pass == mirror_last_pass ) { + debug( "In lock block for last pass" ); + server_lock_io( ctrl->serve ); + } + } + } while ( !mirror_setup_next_xfer( ctrl ) ); + + /* We're waiting for the socket to become writable again, so re-enable */ + ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher ); + ev_io_start( loop, &ctrl->write_watcher ); + ev_io_stop( loop, &ctrl->read_watcher ); + return; +} + +void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)), int revents ) +{ + if ( !(revents & EV_TIMER ) ) { + warn( "Mirror timeout called but no timer event signalled" ); + return; + } + + info( "Mirror timeout signalled" ); + ev_break( loop, EVBREAK_ONE ); + return; +} void mirror_run( struct server *serve ) { NULLCHECK( serve ); NULLCHECK( serve->mirror ); - struct mirror* m = serve->mirror; - - uint64_t written; - + serve->mirror->migration_started = monotonic_time_ms(); info("Starting mirror" ); - m->migration_started = monotonic_time_ms(); - for (m->pass=0; m->pass < mirror_maximum_passes-1; m->pass++) { - m->this_pass_clean = 0; - m->this_pass_dirty = 0; - - debug("mirror start pass=%d", m->pass); - if ( !mirror_pass( serve, 0, &written ) ){ - debug("Failed mirror pass state is %d", mirror_get_state( serve->mirror ) ); - debug("pass failed, giving up"); - return; } - - /* if we've not written anything */ - if (written < mirror_last_pass_after_bytes_written) { break; } + /* mirror_setup_next_xfer won't be able to cope with this, so special-case + * it here. + * TODO: Another case we won't be able to handle is a non-zero-sized image + * where none of the blocks are set in the first pass. As it happens, we + * start with all blocks set and then pare them down, so it doesn't happen + * in the current codebase - but watch out for the future! + */ + if ( serve->size == 0 ) { + info( "0-byte image special case" ); + server_lock_io( serve ); + mirror_complete( serve ); + server_unlock_io( serve ); + return; } - server_lock_io( serve ); - { - m->this_pass_clean = 0; - m->this_pass_dirty = 0; + struct mirror_ctrl ctrl; + memset( &ctrl, 0, sizeof( struct mirror_ctrl ) ); - if ( mirror_pass( serve, 1, &written ) && - mirror_should_quit( serve->mirror ) ) { - debug("exit!"); - mirror_on_exit( serve ); - info("Server closed, quitting " - "after successful migration"); - } + ctrl.serve = serve; + ctrl.mirror = serve->mirror; + + ctrl.ev_loop = EV_DEFAULT; + + /* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */ + ev_io_init( &ctrl.read_watcher, mirror_read_cb, ctrl.mirror->client, EV_READ ); + ctrl.read_watcher.data = (void*) &ctrl; + + ev_io_init( &ctrl.write_watcher, mirror_write_cb, ctrl.mirror->client, EV_WRITE ); + ctrl.write_watcher.data = (void*) &ctrl; + + ev_init( &ctrl.timeout_watcher, mirror_timeout_cb ); + ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ; + + ERROR_UNLESS( + mirror_setup_next_xfer( &ctrl ), + "Couldn't find first transfer for mirror!" + ); + + /* Start by writing xfer 0 to the listener */ + ev_io_start( ctrl.ev_loop, &ctrl.write_watcher ); + + /* Everything up to here is blocking. We switch to non-blocking so we + * can handle rate-limiting and weird error conditions better. TODO: We + * should expand the event loop upwards so we can do the same there too */ + sock_set_nonblock( ctrl.mirror->client, 1 ); + info( "Entering event loop" ); + ev_run( ctrl.ev_loop, 0 ); + info( "Exited event loop" ); + /* Parent code might expect a non-blocking socket */ + sock_set_nonblock( ctrl.mirror->client, 0 ); + + + /* Errors in the event loop don't track I/O lock state or try to restore + * it to something sane - they just terminate the event loop with state != + * MS_DONE. We unlock here if it's locked. + */ + if ( server_io_locked( serve ) ) { + server_unlock_io( serve ); } - server_unlock_io( serve ); + + if ( serve->mirror->commit_state != MS_DONE ) { + error( "Event loop exited, but mirroring is not complete" ); + } + + /* returning here says "mirroring complete" to the runner */ + return; } @@ -475,8 +791,6 @@ void* mirror_runner(void* serve_params_uncast) } mirror_run( serve ); - - mirror_set_state( mirror, MS_DONE ); abandon_mirror: return NULL; } @@ -560,12 +874,13 @@ void * mirror_super_runner( void * serve_uncast ) debug( "Supervisor got commit signal" ); if ( first_pass ) { - /* Only retry if the connection attempt was - * successful. Otherwise the user will see an - * error reported while we're still trying to - * retry behind the scenes. + /* Only retry if the connection attempt was successful. Otherwise + * the user will see an error reported while we're still trying to + * retry behind the scenes. This may race with migration completing + * but since we "shouldn't retry" in that case either, that's fine */ should_retry = *commit_state == MS_GO; + /* Only send this signal the first time */ mirror_super_signal_committed( super, diff --git a/src/mirror.h b/src/mirror.h index ec246f9..53e750d 100644 --- a/src/mirror.h +++ b/src/mirror.h @@ -40,7 +40,7 @@ enum mirror_state; * between the end of the written data and the start of the NBD reply. */ #define MS_REQUEST_LIMIT_SECS 4 - +#define MS_REQUEST_LIMIT_SECS_F 4.0 enum mirror_finish_action { ACTION_EXIT,