mirror: Fix mirroring, break status

This removes the concept of 'passes' completely from mirror.c,
although it leaves the relevant bits in mirror.h to keep status from
failing - although its current code is now Wrong. FIXME.

We also now get the previous test passing, meaning mirroring works
again.
This commit is contained in:
nick
2013-09-20 17:08:14 +01:00
parent 9770bbe42b
commit 6553907972
2 changed files with 78 additions and 61 deletions

View File

@@ -82,7 +82,7 @@ struct mirror_ctrl {
/* This is set once all clients have been closed, to let the mirror know /* This is set once all clients have been closed, to let the mirror know
* it's safe to finish once the queue is empty */ * it's safe to finish once the queue is empty */
int last_pass; int clients_closed;
/* Use this to keep track of what we're copying at any moment */ /* Use this to keep track of what we're copying at any moment */
struct xfer xfer; struct xfer xfer;
@@ -173,7 +173,7 @@ void mirror_reset( struct mirror * mirror )
mirror->this_pass_dirty = 0; mirror->this_pass_dirty = 0;
mirror->this_pass_clean = 0; mirror->this_pass_clean = 0;
mirror->migration_started = 0; mirror->migration_started = 0;
mirror->pass_offset = 0; mirror->offset = 0;
return; return;
} }
@@ -365,7 +365,7 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
struct mirror* mirror = ctrl->mirror; struct mirror* mirror = ctrl->mirror;
struct server* serve = ctrl->serve; struct server* serve = ctrl->serve;
struct bitset_stream_entry e = { .event = BITSET_STREAM_UNSET }; struct bitset_stream_entry e = { .event = BITSET_STREAM_UNSET };
uint64_t current = mirror->pass_offset, run = 0, size = serve->size; uint64_t current = mirror->offset, run = 0, size = serve->size;
/* Technically, we'd be interested in UNSET events too, but they are never /* Technically, we'd be interested in UNSET events too, but they are never
* generated. TODO if that changes. * generated. TODO if that changes.
@@ -378,12 +378,19 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
ctrl->clear_events = 1; ctrl->clear_events = 1;
} }
while ( ( ctrl->last_pass || ctrl->clear_events ) && e.event != BITSET_STREAM_SET ) {
while ( ( mirror->offset == serve->size || ctrl->clear_events ) && e.event != BITSET_STREAM_SET ) {
uint64_t events = bitset_stream_size( serve->allocation_map );
if ( events == 0 ) {
break;
}
debug("Dequeueing event"); debug("Dequeueing event");
bitset_stream_dequeue( ctrl->serve->allocation_map, &e ); bitset_stream_dequeue( ctrl->serve->allocation_map, &e );
debug("Dequeued event %i, %zu, %zu", e.event, e.from, e.len); debug("Dequeued event %i, %zu, %zu", e.event, e.from, e.len);
if ( bitset_stream_size( serve->allocation_map ) < BITSET_STREAM_SIZE / 4 ) { if ( events < ( BITSET_STREAM_SIZE / 4 ) ) {
ctrl->clear_events = 0; ctrl->clear_events = 0;
} }
} }
@@ -391,18 +398,16 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
if ( e.event == BITSET_STREAM_SET ) { if ( e.event == BITSET_STREAM_SET ) {
current = e.from; current = e.from;
run = e.len; run = e.len;
} else if ( !ctrl->last_pass && current < serve->size ) { } else if ( current < serve->size ) {
current = mirror->pass_offset; current = mirror->offset;
run = mirror_longest_write; run = mirror_longest_write;
/* Adjust final block if necessary */ /* Adjust final block if necessary */
if ( current + run > serve->size ) { if ( current + run > serve->size ) {
run = size - current; run = size - current;
} }
mirror->pass_offset += run; mirror->offset += run;
} } else {
if ( run == 0 ) {
return 0; return 0;
} }
@@ -425,11 +430,15 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
return 1; return 1;
} }
int mirror_exceeds_max_bps( struct mirror *mirror ) uint64_t mirror_current_bps( struct mirror * mirror )
{ {
uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started; uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started;
uint64_t mig_speed = mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 ); return mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
}
int mirror_exceeds_max_bps( struct mirror * mirror )
{
uint64_t mig_speed = mirror_current_bps( mirror );
debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second ); debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second );
if ( mig_speed > mirror->max_bytes_per_second ) { if ( mig_speed > mirror->max_bytes_per_second ) {
@@ -439,6 +448,20 @@ int mirror_exceeds_max_bps( struct mirror *mirror )
return 0; return 0;
} }
/* Given historic bps measurements and number of bytes left to transfer, give
* an estimate of how many seconds are remaining before the migration is
* complete, assuming no new bytes are written.
*/
uint64_t mirror_estimate_time( struct server * serve )
{
uint64_t bytes_to_xfer =
bitset_stream_size( serve->allocation_map ) +
( serve->size - serve->mirror->offset );
debug("Bytes left to transfer: %"PRIu64, bytes_to_xfer );
return bytes_to_xfer / ( mirror_current_bps( serve->mirror ) + 1 );
}
// ONLY CALL THIS AFTER CLOSING CLIENTS // ONLY CALL THIS AFTER CLOSING CLIENTS
void mirror_complete( struct server *serve ) void mirror_complete( struct server *serve )
{ {
@@ -588,46 +611,40 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
/* This next bit could take a little while, which is fine */ /* This next bit could take a little while, which is fine */
ev_timer_stop( ctrl->ev_loop, &ctrl->timeout_watcher ); ev_timer_stop( ctrl->ev_loop, &ctrl->timeout_watcher );
int finished_setup = 0; /* Set up the next transfer, which may be offset + mirror_longest_write
* or an event from the bitset stream. When offset hits serve->size,
/* Set up the next transfer, which may be pass_offset + mirror_longest_write * xfers will be constructed solely from the event stream. Once our estimate
* or an event from the bitset stream. When pass_offset hits serve->size, * of time left reaches a sensible number (or the event stream empties),
* we disable new clients from connecting, disconnect existing clients, and * we stop new clients from connecting, disconnect existing ones, then
* wait for the bitset stream to empty. At that point, we know that source * continue emptying the bitstream. Once it's empty again, we're finished.
* and destination have the same data, so can finish.
*/ */
do { int next_xfer = mirror_setup_next_xfer( ctrl );
if ( m->pass_offset == ctrl->serve->size ) { debug( "next_xfer: %d", next_xfer );
debug( "Pass %d completed", m->pass );
if ( ctrl->last_pass ) { /* Regardless of time estimates, if there's no waiting transfer, we can
mirror_complete( ctrl->serve ); * */
ev_break( loop, EVBREAK_ONE ); if ( !ctrl->clients_closed && ( !next_xfer || mirror_estimate_time( ctrl->serve ) < 60 ) ) {
return; info( "Closing clients to allow mirroring to converge" );
} else { server_forbid_new_clients( ctrl->serve );
// FIXME: Can status race with us if it inspects state here? server_close_clients( ctrl->serve );
m->this_pass_dirty = 0; server_join_clients( ctrl->serve );
m->this_pass_clean = 0; ctrl->clients_closed = 1;
ctrl->last_pass++;
m->pass++; // TODO: remove this in favour of last_pass?
/* Prevent further I/O from happening. /* One more try - a new event may have been pushed since our last check
* FIXME: We should actually only do this if the amount of data */
* left to transfer is under a certain threshold. As-is, we have if ( !next_xfer ) {
* no control over length of I/O freeze - it's entirely next_xfer = mirror_setup_next_xfer( ctrl );
* dependent on amount of traffic from the guest. More traffic =
* longer downtime.
*/
server_forbid_new_clients( ctrl->serve );
server_close_clients( ctrl->serve );
server_join_clients( ctrl->serve );
}
} else {
/* Use mirror_setup_next_xfer to find out what to read next */
finished_setup = mirror_setup_next_xfer( ctrl );
} }
} while ( !finished_setup ); }
if ( ctrl->clients_closed && !next_xfer ) {
mirror_complete( ctrl->serve );
ev_break( loop, EVBREAK_ONE );
return;
}
/* This is a guard Just In Case */
ERROR_IF( !next_xfer, "Unknown problem - no next transfer to do!" );
ev_io_stop( loop, &ctrl->read_watcher ); ev_io_stop( loop, &ctrl->read_watcher );

View File

@@ -79,8 +79,8 @@ struct mirror {
char *mapped; char *mapped;
/* Keep an eye on where in the current pass we are */ /* We need to send every byte at least once; we do so by */
uint64_t pass_offset; uint64_t offset;
enum mirror_state commit_state; enum mirror_state commit_state;
@@ -89,20 +89,20 @@ struct mirror {
*/ */
struct mbox * commit_signal; struct mbox * commit_signal;
/* The current mirror pass. We put this here so status can query it */
int pass;
/* Number of dirty and clean bytes for the entire migration */
uint64_t all_dirty;
uint64_t all_clean;
/* The time (from monotonic_time_ms()) the migration was started. Can be /* The time (from monotonic_time_ms()) the migration was started. Can be
* used to calculate bps, etc. */ * used to calculate bps, etc. */
uint64_t migration_started; uint64_t migration_started;
/* The current mirror pass. We put this here so status can query it. FIXME: remove */
int pass;
/* Number of dirty and clean bytes for the entire migration. FIXME: Remove */
uint64_t all_dirty;
uint64_t all_clean;
/* The number of dirty (had to send to dest) and clean (could skip) bytes /* The number of dirty (had to send to dest) and clean (could skip) bytes
* for this pass. Add them together and subtract from size to get remaining * for this pass. FIXME: No longer used, so need removing */
* bytes. */
uint64_t this_pass_dirty; uint64_t this_pass_dirty;
uint64_t this_pass_clean; uint64_t this_pass_clean;
}; };