diff --git a/src/mirror.c b/src/mirror.c index 422b48e..974cade 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -73,6 +73,7 @@ struct mirror_ctrl { ev_io read_watcher; ev_io write_watcher; ev_timer timeout_watcher; + ev_timer limit_watcher; ev_io abandon_watcher; /* Use this to keep track of what we're copying at any moment */ @@ -403,6 +404,20 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl ) return 1; } +int mirror_exceeds_max_bps( struct mirror *mirror ) +{ + uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started; + uint64_t mig_speed = mirror->all_dirty / ( (duration_ms / 1000 ) + 1 ); + + debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second ); + + if ( mig_speed > mirror->max_bytes_per_second ) { + return 1; + } + + return 0; +} + // ONLY CALL THIS WHEN SERVER IO IS LOCKED void mirror_complete( struct server *serve ) { @@ -620,10 +635,20 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents ) } } while ( !mirror_setup_next_xfer( ctrl ) ); - /* We're waiting for the socket to become writable again, so re-enable */ - ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher ); - ev_io_start( loop, &ctrl->write_watcher ); - ev_io_stop( loop, &ctrl->read_watcher ); + ev_io_stop( loop, &ctrl->read_watcher ); + + if ( mirror_exceeds_max_bps( m ) ) { + /* We're over the bandwidth limit, so don't move onto the next transfer + * yet. Our limit_watcher will move us on once we're OK. timeout_watcher + * was disabled further up, so don't need to stop it here too */ + debug( "max_bps exceeded, waiting" ); + ev_timer_again( loop, &ctrl->limit_watcher ); + } else { + /* We're waiting for the socket to become writable again, so re-enable */ + ev_timer_again( loop, &ctrl->timeout_watcher ); + ev_io_start( loop, &ctrl->write_watcher ); + } + return; } @@ -656,6 +681,30 @@ void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents ) return; } +void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents ) +{ + struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data; + NULLCHECK( ctrl ); + + if ( !(revents & EV_TIMER ) ) { + warn( "Mirror limit callback executed but no timer event signalled" ); + return; + } + + if ( mirror_exceeds_max_bps( ctrl->mirror ) ) { + debug( "max_bps exceeded, waiting", ctrl->mirror->max_bytes_per_second ); + ev_timer_again( loop, w ); + } else { + /* We're below the limit, so do the next request */ + debug("max_bps not exceeded, performing next transfer" ); + ev_io_start( loop, &ctrl->write_watcher ); + ev_timer_stop( loop, &ctrl->limit_watcher ); + ev_timer_again( loop, &ctrl->timeout_watcher ); + } + + return; +} + void mirror_run( struct server *serve ) { NULLCHECK( serve ); @@ -697,6 +746,10 @@ void mirror_run( struct server *serve ) ev_init( &ctrl.timeout_watcher, mirror_timeout_cb ); ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ; + ev_init( &ctrl.limit_watcher, mirror_limit_cb ); + ctrl.limit_watcher.repeat = 1.0; // We check bps every second. seems sane. + ctrl.limit_watcher.data = (void*) &ctrl; + ev_init( &ctrl.abandon_watcher, mirror_abandon_cb ); ev_io_set( &ctrl.abandon_watcher, ctrl.mirror->abandon_signal->read_fd, EV_READ ); ctrl.abandon_watcher.data = (void*) &ctrl;