mirror: Use libev to provide an event loop inside the mirror thread

We're doing this so we can implement bandwidth controls sanely.
This commit is contained in:
nick
2013-08-09 17:02:10 +01:00
parent 754949d43f
commit c2df38c9d3
4 changed files with 434 additions and 119 deletions

View File

@@ -22,7 +22,7 @@ TEST_SOURCES = FileList['tests/unit/*.c']
TEST_OBJECTS = TEST_SOURCES.pathmap( "%{^tests/unit,build/tests}X.o" )
LIBS = %w( pthread )
LDFLAGS = ["-lrt"]
LDFLAGS = ["-lrt -lev"]
CCFLAGS = %w(
-D_GNU_SOURCE=1
-Wall

4
debian/control vendored
View File

@@ -2,13 +2,13 @@ Source: flexnbd
Section: unknown
Priority: extra
Maintainer: Alex Young <alex@bytemark.co.uk>
Build-Depends: cdbs, debhelper (>= 7.0.50), ruby, rake, gcc
Build-Depends: cdbs, debhelper (>= 7.0.50), ruby, rake, gcc, libev-dev
Standards-Version: 3.8.1
Homepage: http://bigv.io/
Package: flexnbd
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}
Depends: ${shlibs:Depends}, ${misc:Depends}, libev3
Description: FlexNBD server
An NBD server offering push-mirroring and intelligent sparse file handling

View File

@@ -19,18 +19,64 @@
#include "serve.h"
#include "util.h"
#include "ioutil.h"
#include "sockutil.h"
#include "parse.h"
#include "readwrite.h"
#include "bitset.h"
#include "self_pipe.h"
#include "status.h"
#include <stdlib.h>
#include <string.h>
#include <sys/un.h>
#include <unistd.h>
#include <sys/mman.h>
#include <ev.h>
/* compat with older libev */
#ifndef EVBREAK_ONE
#define ev_run( loop, flags ) ev_loop( loop, flags )
#define ev_break(loop, how) ev_unloop( loop, how )
#define EVBREAK_ONE EVUNLOOP_ONE
#define EVBREAK_ALL EVUNLOOP_ALL
#endif
/* We use this to keep track of the socket request data we need to send */
struct xfer {
/* Store the bytes we need to send before the data, or receive back */
union {
struct nbd_request_raw req_raw;
struct nbd_reply_raw rsp_raw;
} hdr;
/* what in mirror->mapped we should write, and how much of it we've done */
uint64_t from;
uint64_t len;
uint64_t written;
/* number of bytes of response read */
uint64_t read;
};
struct mirror_ctrl {
struct server *serve;
struct mirror *mirror;
/* libev stuff */
struct ev_loop *ev_loop;
ev_io read_watcher;
ev_io write_watcher;
ev_timer timeout_watcher;
/* Use this to keep track of what we're copying at any moment */
struct xfer xfer;
};
struct mirror * mirror_alloc(
union mysockaddr * connect_to,
@@ -69,6 +115,8 @@ enum mirror_state mirror_get_state( struct mirror * mirror )
return mirror->commit_state;
}
#define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state
void mirror_init( struct mirror * mirror, const char * filename )
{
@@ -95,7 +143,6 @@ void mirror_init( struct mirror * mirror, const char * filename )
);
mirror->dirty_map = bitset_alloc(size, 4096);
}
@@ -105,13 +152,18 @@ void mirror_reset( struct mirror * mirror )
NULLCHECK( mirror );
NULLCHECK( mirror->dirty_map );
mirror_set_state( mirror, MS_INIT );
/* See the caveats in mirror_run if you change this! */
bitset_set(mirror->dirty_map);
mirror->all_dirty = 0;
mirror->all_clean = 0;
mirror->pass = 0;
mirror->this_pass_dirty = 0;
mirror->this_pass_clean = 0;
mirror->migration_started = 0;
return;
}
@@ -162,83 +214,7 @@ static const unsigned int mirror_last_pass_after_bytes_written = 100<<20;
* cause the I/O to freeze, however many bytes are left to copy.
*/
static const int mirror_maximum_passes = 7;
/* A single mirror pass over the disc, optionally locking IO around the
* transfer.
*/
int mirror_pass(struct server * serve, int is_last_pass, uint64_t *written)
{
uint64_t current = 0;
int success = 1;
struct bitset_mapping *map = serve->mirror->dirty_map;
struct mirror * m = serve->mirror;
*written = 0;
while (current < serve->size) {
uint64_t run = bitset_run_count(map, current, mirror_longest_write);
if ( current + run > serve->size ) {
debug(
"Size not divisible by %i, adjusting final block",
block_allocation_resolution
);
run = serve->size - current;
}
debug("mirror current=%"PRIu64", run=%"PRIu64, current, run);
/* FIXME: we could avoid sending sparse areas of the
* disc here, and probably save a lot of bandwidth and
* time (if we know the destination starts off zeroed).
*/
if (bitset_is_set_at(map, current)) {
/* We've found a dirty area, send it */
debug("^^^ writing");
/* We need to stop the main thread from working
* because it might corrupt the dirty map. This
* is likely to slow things down but will be
* safe.
*/
if (!is_last_pass) { server_lock_io( serve ); }
{
debug("in lock block");
/** FIXME: do something useful with bytes/second */
/** FIXME: error handling code here won't unlock */
socket_nbd_write( serve->mirror->client,
current,
run,
0,
serve->mirror->mapped + current,
MS_REQUEST_LIMIT_SECS);
/* now mark it clean */
bitset_clear_range(map, current, run);
debug("leaving lock block");
}
if (!is_last_pass) { server_unlock_io( serve ); }
m->this_pass_dirty += run;
m->all_dirty += run;
*written += run;
} else {
m->this_pass_clean += run;
m->all_clean += run;
}
current += run;
if (serve->mirror->signal_abandon) {
debug("Abandon message received" );
success = 0;
break;
}
}
return success;
}
#define mirror_last_pass (mirror_maximum_passes - 1)
/* THIS FUNCTION MUST ONLY BE CALLED WITH THE SERVER'S IO LOCKED. */
@@ -295,7 +271,6 @@ void mirror_cleanup( struct server * serve,
}
int mirror_connect( struct mirror * mirror, off64_t local_size )
{
struct sockaddr * connect_from = NULL;
@@ -362,47 +337,388 @@ int mirror_should_quit( struct mirror * mirror )
}
}
/* Iterates through the bitmap, finding a dirty run to form the basis of the
* next transfer, then puts it together. */
int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
{
struct mirror* mirror = ctrl->mirror;
uint64_t current, run, size = ctrl->serve->size;
int found = 0;
do {
int run_is_set = 0;
current = mirror->this_pass_dirty + mirror->this_pass_clean;
run = bitset_run_count_ex(
mirror->dirty_map, current, mirror_longest_write, &run_is_set
);
if ( current + run > size ) {
debug(
"Size not divisible by %i, adjusting final block",
block_allocation_resolution
);
run = size - current;
}
/* FIXME: we could avoid sending sparse areas of the disc here, and
* probably save a lot of bandwidth and time (if we know the destination
* starts off zeroed). */
if ( run_is_set ) {
found = 1;
} else {
mirror->this_pass_clean += run;
mirror->all_clean += run;
}
} while ( !found && current + run < size );
/* current and run specify our next transfer */
if ( !found ) {
return 0;
}
debug( "Next dirty block: current=%"PRIu64", run=%"PRIu64, current, run );
struct nbd_request req = {
.magic = REQUEST_MAGIC,
.type = REQUEST_WRITE,
.handle = ".MIRROR.",
.from = current,
.len = run
};
nbd_h2r_request( &req, &ctrl->xfer.hdr.req_raw );
ctrl->xfer.from = current;
ctrl->xfer.len = run;
ctrl->xfer.written = 0;
ctrl->xfer.read = 0;
return 1;
}
// ONLY CALL THIS WHEN SERVER IO IS LOCKED
void mirror_complete( struct server *serve )
{
/* FIXME: Pretty sure this is broken, if action != !QUIT. Just moving code
* around for now, can fix it later. Action is always quit in production */
if ( mirror_should_quit( serve->mirror ) ) {
debug("exit!");
/* FIXME: This depends on blocking I/O right now, so make sure we are */
sock_set_nonblock( serve->mirror->client, 0 );
mirror_on_exit( serve );
info("Server closed, quitting after successful migration");
}
mirror_set_state( serve->mirror, MS_DONE );
return;
}
static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
struct xfer *xfer = &ctrl->xfer;
size_t to_write, hdr_size = sizeof( struct nbd_request_raw );
char *data_loc;
ssize_t count;
if ( !( revents & EV_WRITE ) ) {
warn( "No write event signalled in mirror write callback" );
return;
}
/* TODO: This needs to be its own event, not checked here */
if ( ctrl->mirror->signal_abandon ) {
debug("Abandon message received" );
ev_break( loop, EVBREAK_ONE );
return;
}
debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client );
if ( xfer->written < hdr_size ) {
data_loc = ( (char*) &xfer->hdr.req_raw ) + ctrl->xfer.written;
to_write = hdr_size - xfer->written;
} else {
data_loc = ctrl->mirror->mapped + xfer->from + ( xfer->written - hdr_size );
to_write = xfer->len - ( ctrl->xfer.written - hdr_size );
// If we're in the last pass, we'll be locked anyway. If we're not in
// the last pass, we want to be locked for every write() call that
// we issue, to avoid the blocks being updated while we work. In
// particular, bitset_run_clear() must be called while the I/O is locked
// or we might clear a bit that had been set by another write.
if ( !server_io_locked( ctrl->serve ) ) {
server_lock_io( ctrl->serve );
debug( "In block block" );
}
}
// Actually read some bytes
if ( ( count = write( ctrl->mirror->client, data_loc, to_write ) ) < 0 ) {
if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
warn( SHOW_ERRNO( "Couldn't write to listener" ) );
ev_break( loop, EVBREAK_ONE );
}
return;
}
debug( "Wrote %"PRIu64" bytes", count );
debug( "to_write was %"PRIu64", xfer->written was %"PRIu64, to_write, xfer->written );
ctrl->xfer.written += count;
// We write some bytes, so reset the timer
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
// All bytes written, so now we need to read the NBD reply back.
if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) {
// We can, however, clear the run here. If it turns out that the
// NBD request has been rejected, we're discarding it anyway, so the
// wrong data won't get used. If the request is a success, any blocks
// written to while waiting for the reply will be copied in the next
// pass; if it's the final pass, I/O remains locked.
debug( "Clearing bitset from=%"PRIu64" run=%"PRIu64", ctr->xfer.from, ctrl->xfer.len" );
bitset_clear_range( ctrl->mirror->dirty_map, ctrl->xfer.from, ctrl->xfer.len );
if ( ctrl->mirror->pass != mirror_last_pass ) {
debug( "Leaving lock block" );
server_unlock_io( ctrl->serve );
}
ev_io_start( loop, &ctrl->read_watcher );
ev_io_stop( loop, &ctrl->write_watcher );
}
return;
}
static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
struct mirror *m = ctrl->mirror;
NULLCHECK( m );
struct xfer *xfer = &ctrl->xfer;
NULLCHECK( xfer );
if ( !( revents & EV_READ ) ) {
warn( "No read event signalled in mirror read callback" );
return;
}
/* TODO: This needs to be its own event, not checked here */
if ( m->signal_abandon ) {
debug("Abandon message received" );
ev_break( loop, EVBREAK_ONE );
return;
}
struct nbd_reply rsp;
ssize_t count;
uint64_t left = sizeof( struct nbd_reply_raw ) - xfer->read;
debug( "Mirror read callback invoked with events %d. fd:%i", revents, m->client );
/* Start / continue reading the NBD response from the mirror. */
if ( ( count = read( m->client, ((void*) &xfer->hdr.rsp_raw) + xfer->read, left ) ) < 0 ) {
if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
warn( SHOW_ERRNO( "Couldn't read from listener" ) );
ev_break( loop, EVBREAK_ONE );
}
debug( SHOW_ERRNO( "Couldn't read from listener (non-scary)" ) );
return;
}
info( "count is %li, left was %"PRIu64, count, left );
if ( count == 0 ) {
warn( "EOF reading response from server!" );
ev_break( loop, EVBREAK_ONE );
return;
}
// We read some bytes, so reset the timer
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
debug( "Read %"PRIu64" bytes", count );
debug( "left was %"PRIu64", xfer->read was %"PRIu64, left, xfer->read );
xfer->read += count;
if ( xfer->read < sizeof( struct nbd_reply_raw ) ) {
// Haven't read the whole response yet
return;
}
nbd_r2h_reply( &xfer->hdr.rsp_raw, &rsp );
// validate reply, break event loop if bad
if ( rsp.magic != REPLY_MAGIC ) {
warn( "Bad reply magic from listener" );
ev_break( loop, EVBREAK_ONE );
return;
}
if ( rsp.error != 0 ) {
warn( "Error returned from listener: %i", rsp.error );
ev_break( loop, EVBREAK_ONE );
return;
}
if ( memcmp( ".MIRROR.", &rsp.handle[0], 8 ) != 0 ) {
warn( "Bad handle returned from listener" );
ev_break( loop, EVBREAK_ONE );
return;
}
/* transfer was completed, so now we need to either set up the next
* transfer of this pass, set up the first transfer of the next pass, or
* complete the migration */
m->this_pass_dirty += xfer->len;
m->all_dirty += xfer->len;
xfer->read = 0;
xfer->written = 0;
/* This next bit could take a little while, which is fine */
ev_timer_stop( ctrl->ev_loop, &ctrl->timeout_watcher );
do {
// This pass complete
if ( m->this_pass_dirty + m->this_pass_clean == ctrl->serve->size ) {
debug( "Pass %d completed", m->pass );
/* Set up the next transfer, which may be n+1 in the current pass,
* or 0 in a new pass. If we can't find another transfer to do, that
* means the pass is complete. Advance pass and re-run the end-of-
* pass logic to complete migration ( pass == mirror_last_pass ), or
* move onto the last pass ( pass < mirror_last_pass, by virtue of
* this_pass_dirty being 0 ).
*/
// last pass completed
if ( m->pass >= mirror_last_pass ) {
/* This was the last pass, so finish. */
mirror_complete( ctrl->serve );
ev_break( loop, EVBREAK_ONE );
return;
}
// this was not the last pass - set up for the next run.
if ( m->this_pass_dirty < mirror_last_pass_after_bytes_written ) {
/* Quiet disc, so skip to the final pass */
m->pass = mirror_last_pass;
} else {
m->pass++;
}
// FIXME: Can status race with us if it inspects state here?
m->this_pass_dirty = 0;
m->this_pass_clean = 0;
debug( "mirror start pass=%d", m->pass );
/* This is the start of our next pass. If it happens to be the
* final pass, we need to lock server I/O so that other writes
* don't race with our call to mirror_setup_next_xfer() below */
if ( m->pass == mirror_last_pass ) {
debug( "In lock block for last pass" );
server_lock_io( ctrl->serve );
}
}
} while ( !mirror_setup_next_xfer( ctrl ) );
/* We're waiting for the socket to become writable again, so re-enable */
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
ev_io_start( loop, &ctrl->write_watcher );
ev_io_stop( loop, &ctrl->read_watcher );
return;
}
void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)), int revents )
{
if ( !(revents & EV_TIMER ) ) {
warn( "Mirror timeout called but no timer event signalled" );
return;
}
info( "Mirror timeout signalled" );
ev_break( loop, EVBREAK_ONE );
return;
}
void mirror_run( struct server *serve )
{
NULLCHECK( serve );
NULLCHECK( serve->mirror );
struct mirror* m = serve->mirror;
uint64_t written;
serve->mirror->migration_started = monotonic_time_ms();
info("Starting mirror" );
m->migration_started = monotonic_time_ms();
for (m->pass=0; m->pass < mirror_maximum_passes-1; m->pass++) {
m->this_pass_clean = 0;
m->this_pass_dirty = 0;
debug("mirror start pass=%d", m->pass);
if ( !mirror_pass( serve, 0, &written ) ){
debug("Failed mirror pass state is %d", mirror_get_state( serve->mirror ) );
debug("pass failed, giving up");
return; }
/* if we've not written anything */
if (written < mirror_last_pass_after_bytes_written) { break; }
}
/* mirror_setup_next_xfer won't be able to cope with this, so special-case
* it here.
* TODO: Another case we won't be able to handle is a non-zero-sized image
* where none of the blocks are set in the first pass. As it happens, we
* start with all blocks set and then pare them down, so it doesn't happen
* in the current codebase - but watch out for the future!
*/
if ( serve->size == 0 ) {
info( "0-byte image special case" );
server_lock_io( serve );
{
m->this_pass_clean = 0;
m->this_pass_dirty = 0;
if ( mirror_pass( serve, 1, &written ) &&
mirror_should_quit( serve->mirror ) ) {
debug("exit!");
mirror_on_exit( serve );
info("Server closed, quitting "
"after successful migration");
}
}
mirror_complete( serve );
server_unlock_io( serve );
return;
}
struct mirror_ctrl ctrl;
memset( &ctrl, 0, sizeof( struct mirror_ctrl ) );
ctrl.serve = serve;
ctrl.mirror = serve->mirror;
ctrl.ev_loop = EV_DEFAULT;
/* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */
ev_io_init( &ctrl.read_watcher, mirror_read_cb, ctrl.mirror->client, EV_READ );
ctrl.read_watcher.data = (void*) &ctrl;
ev_io_init( &ctrl.write_watcher, mirror_write_cb, ctrl.mirror->client, EV_WRITE );
ctrl.write_watcher.data = (void*) &ctrl;
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ;
ERROR_UNLESS(
mirror_setup_next_xfer( &ctrl ),
"Couldn't find first transfer for mirror!"
);
/* Start by writing xfer 0 to the listener */
ev_io_start( ctrl.ev_loop, &ctrl.write_watcher );
/* Everything up to here is blocking. We switch to non-blocking so we
* can handle rate-limiting and weird error conditions better. TODO: We
* should expand the event loop upwards so we can do the same there too */
sock_set_nonblock( ctrl.mirror->client, 1 );
info( "Entering event loop" );
ev_run( ctrl.ev_loop, 0 );
info( "Exited event loop" );
/* Parent code might expect a non-blocking socket */
sock_set_nonblock( ctrl.mirror->client, 0 );
/* Errors in the event loop don't track I/O lock state or try to restore
* it to something sane - they just terminate the event loop with state !=
* MS_DONE. We unlock here if it's locked.
*/
if ( server_io_locked( serve ) ) {
server_unlock_io( serve );
}
if ( serve->mirror->commit_state != MS_DONE ) {
error( "Event loop exited, but mirroring is not complete" );
}
/* returning here says "mirroring complete" to the runner */
return;
}
@@ -475,8 +791,6 @@ void* mirror_runner(void* serve_params_uncast)
}
mirror_run( serve );
mirror_set_state( mirror, MS_DONE );
abandon_mirror:
return NULL;
}
@@ -560,12 +874,13 @@ void * mirror_super_runner( void * serve_uncast )
debug( "Supervisor got commit signal" );
if ( first_pass ) {
/* Only retry if the connection attempt was
* successful. Otherwise the user will see an
* error reported while we're still trying to
* retry behind the scenes.
/* Only retry if the connection attempt was successful. Otherwise
* the user will see an error reported while we're still trying to
* retry behind the scenes. This may race with migration completing
* but since we "shouldn't retry" in that case either, that's fine
*/
should_retry = *commit_state == MS_GO;
/* Only send this signal the first time */
mirror_super_signal_committed(
super,

View File

@@ -40,7 +40,7 @@ enum mirror_state;
* between the end of the written data and the start of the NBD reply.
*/
#define MS_REQUEST_LIMIT_SECS 4
#define MS_REQUEST_LIMIT_SECS_F 4.0
enum mirror_finish_action {
ACTION_EXIT,