mirror: honour max_bytes_per_second - naive scheme

If we're above max_bytes_per_second once we've finished a transfer
(8MB chunks, worst-case) then we delay the next transfer until
all_dirty_bytes / duration < max_bytes_per_second - checking once
per second.

If this isn't good enough, we can improve it - leaky bucket is one
option. To begin with, though, we'll mostly be using this to set
max_bps to either 0 or 100MB/sec or so. So it should be fine.
This commit is contained in:
nick
2013-08-14 16:24:50 +01:00
parent efdd613968
commit e13d1d8fb4

View File

@@ -73,6 +73,7 @@ struct mirror_ctrl {
ev_io read_watcher;
ev_io write_watcher;
ev_timer timeout_watcher;
ev_timer limit_watcher;
ev_io abandon_watcher;
/* Use this to keep track of what we're copying at any moment */
@@ -403,6 +404,20 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
return 1;
}
int mirror_exceeds_max_bps( struct mirror *mirror )
{
uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started;
uint64_t mig_speed = mirror->all_dirty / ( (duration_ms / 1000 ) + 1 );
debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second );
if ( mig_speed > mirror->max_bytes_per_second ) {
return 1;
}
return 0;
}
// ONLY CALL THIS WHEN SERVER IO IS LOCKED
void mirror_complete( struct server *serve )
{
@@ -620,10 +635,20 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
}
} while ( !mirror_setup_next_xfer( ctrl ) );
/* We're waiting for the socket to become writable again, so re-enable */
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
ev_io_start( loop, &ctrl->write_watcher );
ev_io_stop( loop, &ctrl->read_watcher );
ev_io_stop( loop, &ctrl->read_watcher );
if ( mirror_exceeds_max_bps( m ) ) {
/* We're over the bandwidth limit, so don't move onto the next transfer
* yet. Our limit_watcher will move us on once we're OK. timeout_watcher
* was disabled further up, so don't need to stop it here too */
debug( "max_bps exceeded, waiting" );
ev_timer_again( loop, &ctrl->limit_watcher );
} else {
/* We're waiting for the socket to become writable again, so re-enable */
ev_timer_again( loop, &ctrl->timeout_watcher );
ev_io_start( loop, &ctrl->write_watcher );
}
return;
}
@@ -656,6 +681,30 @@ void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
return;
}
void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
if ( !(revents & EV_TIMER ) ) {
warn( "Mirror limit callback executed but no timer event signalled" );
return;
}
if ( mirror_exceeds_max_bps( ctrl->mirror ) ) {
debug( "max_bps exceeded, waiting", ctrl->mirror->max_bytes_per_second );
ev_timer_again( loop, w );
} else {
/* We're below the limit, so do the next request */
debug("max_bps not exceeded, performing next transfer" );
ev_io_start( loop, &ctrl->write_watcher );
ev_timer_stop( loop, &ctrl->limit_watcher );
ev_timer_again( loop, &ctrl->timeout_watcher );
}
return;
}
void mirror_run( struct server *serve )
{
NULLCHECK( serve );
@@ -697,6 +746,10 @@ void mirror_run( struct server *serve )
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ;
ev_init( &ctrl.limit_watcher, mirror_limit_cb );
ctrl.limit_watcher.repeat = 1.0; // We check bps every second. seems sane.
ctrl.limit_watcher.data = (void*) &ctrl;
ev_init( &ctrl.abandon_watcher, mirror_abandon_cb );
ev_io_set( &ctrl.abandon_watcher, ctrl.mirror->abandon_signal->read_fd, EV_READ );
ctrl.abandon_watcher.data = (void*) &ctrl;