From c9ece5a63f1d9522c76264e5bce21147c3af89e2 Mon Sep 17 00:00:00 2001 From: Alex Young Date: Wed, 13 Jun 2012 15:45:59 +0100 Subject: [PATCH] Tidy mirror_runner somewhat --- src/control.c | 275 ++++++++++++++++++++++++++++---------------------- 1 file changed, 152 insertions(+), 123 deletions(-) diff --git a/src/control.c b/src/control.c index 0a1eb61..f6131fd 100644 --- a/src/control.c +++ b/src/control.c @@ -52,131 +52,180 @@ static const unsigned int mirror_last_pass_after_bytes_written = 100<<20; */ static const int mirror_maximum_passes = 7; +/* A single mirror pass over the disc, optionally locking IO around the + * transfer. + */ +int mirror_pass(struct server * serve, int should_lock, uint64_t *written) +{ + uint64_t current = 0; + int success = 1; + struct bitset_mapping *map = serve->mirror->dirty_map; + *written = 0; + + while (current < serve->size) { + int run = bitset_run_count(map, current, mirror_longest_write); + + debug("mirror current=%ld, run=%d", current, run); + + /* FIXME: we could avoid sending sparse areas of the + * disc here, and probably save a lot of bandwidth and + * time (if we know the destination starts off zeroed). + */ + if (bitset_is_set_at(map, current)) { + /* We've found a dirty area, send it */ + debug("^^^ writing"); + + /* We need to stop the main thread from working + * because it might corrupt the dirty map. This + * is likely to slow things down but will be + * safe. + */ + if (should_lock) { server_lock_io( serve ); } + { + /** FIXME: do something useful with bytes/second */ + + /** FIXME: error handling code here won't unlock */ + socket_nbd_write( serve->mirror->client, + current, + run, + 0, + serve->mirror->mapped + current); + + /* now mark it clean */ + bitset_clear_range(map, current, run); + } + if (should_lock) { server_unlock_io( serve ); } + + *written += run; + } + current += run; + + if (serve->mirror->signal_abandon) { + success = 0; + break; + } + } + + return success; +} + + +void mirror_on_exit( struct server * serve ) +{ + serve_signal_close( serve ); + /* We have to wait until the server is closed before unlocking + * IO. This is because the client threads check to see if the + * server is still open before reading or writing inside their + * own locks. If we don't wait for the close, there's no way to + * guarantee the server thread will win the race and we risk the + * clients seeing a "successful" write to a dead disc image. + */ + serve_wait_for_close( serve ); +} + /** Thread launched to drive mirror process */ void* mirror_runner(void* serve_params_uncast) { - const int last_pass = mirror_maximum_passes-1; int pass; struct server *serve = (struct server*) serve_params_uncast; - NULLCHECK( serve ); - debug("Starting mirror" ); + uint64_t written; - struct bitset_mapping *map = serve->mirror->dirty_map; + NULLCHECK( serve ); + NULLCHECK( serve->mirror ); + NULLCHECK( serve->mirror->dirty_map ); + + debug("Starting mirror" ); - for (pass=0; pass < mirror_maximum_passes; pass++) { - uint64_t current = 0; - uint64_t written = 0; - + for (pass=0; pass < mirror_maximum_passes-1; pass++) { debug("mirror start pass=%d", pass); - if (pass == last_pass) { - server_lock_io( serve ); - } - - while (current < serve->size) { - int run; - - run = bitset_run_count(map, current, mirror_longest_write); - - debug("mirror current=%ld, run=%d", current, run); - - /* FIXME: we could avoid sending sparse areas of the - * disc here, and probably save a lot of bandwidth and - * time (if we know the destination starts off zeroed). - */ - if (bitset_is_set_at(map, current)) { - /* We've found a dirty area, send it */ - debug("^^^ writing"); - - /* We need to stop the main thread from working - * because it might corrupt the dirty map. This - * is likely to slow things down but will be - * safe. - */ - if (pass < last_pass) { - server_lock_io( serve ); - } - - /** FIXME: do something useful with bytes/second */ - - /** FIXME: error handling code here won't unlock */ - socket_nbd_write( - serve->mirror->client, - current, - run, - 0, - serve->mirror->mapped + current - ); - - /* now mark it clean */ - bitset_clear_range(map, current, run); - - if (pass < last_pass) { - server_unlock_io( serve ); - } - - written += run; - } - current += run; - - if (serve->mirror->signal_abandon) { - if (pass == last_pass) { server_unlock_io( serve ); } - close(serve->mirror->client); - goto abandon_mirror; - } + if ( !mirror_pass( serve, 1, &written ) ){ + goto abandon_mirror; } /* if we've not written anything */ - if (written < mirror_last_pass_after_bytes_written) { - pass = last_pass; - } + if (written < mirror_last_pass_after_bytes_written) { break; } } - - /* a successful finish ends here */ - switch (serve->mirror->action_at_finish) + server_lock_io( serve ); { - case ACTION_EXIT: - debug("exit!"); - serve_signal_close( serve ); - /* We have to wait until the server is closed before - * unlocking IO. This is because the client threads - * check to see if the server is still open before - * reading or writing inside their own locks. If we - * don't wait for the close, there's no way to guarantee - * the server thread will win the race and we risk the - * clients seeing a "successful" write to a dead disc - * image. - */ - serve_wait_for_close( serve ); - info("Server closed, quitting after successful migration"); - /* fall through */ - case ACTION_NOTHING: - debug("nothing!"); - close(serve->mirror->client); + if ( mirror_pass( serve, 0, &written ) && + ACTION_EXIT == serve->mirror->action_at_finish) { + debug("exit!"); + mirror_on_exit( serve ); + info("Server closed, quitting " + "after successful migration"); + } } server_unlock_io( serve ); - + abandon_mirror: - free(serve->mirror->dirty_map); - free(serve->mirror); + mirror_status_destroy( serve->mirror ); serve->mirror = NULL; /* and we're gone */ return NULL; } +struct mirror_status * mirror_status_create( + struct server * serve, + int fd, + int max_Bps, + int action_at_finish) +{ + /* FIXME: shouldn't map_fd get closed? */ + int map_fd; + off64_t size; + struct mirror_status * mirror; + + NULLCHECK( serve ); + + mirror = xmalloc(sizeof(struct mirror_status)); + mirror->client = fd; + mirror->max_bytes_per_second = max_Bps; + mirror->action_at_finish = action_at_finish; + + FATAL_IF_NEGATIVE( + open_and_mmap( + serve->filename, + &map_fd, + &size, + (void**) &mirror->mapped + ), + "Failed to open and mmap %s", + serve->filename + ); + + mirror->dirty_map = bitset_alloc(size, 4096); + bitset_set_range(mirror->dirty_map, 0, size); + + return mirror; +} + + +void mirror_status_destroy( struct mirror_status *mirror ) +{ + NULLCHECK( mirror ); + close(mirror->client); + free(mirror->dirty_map); + free(mirror); +} + + #define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1) /** Command parser to start mirror process from socket input */ int control_mirror(struct control_params* client, int linesc, char** lines) { - off64_t size, remote_size; - int fd, map_fd; + NULLCHECK( client ); + + off64_t remote_size; + struct server * serve = client->serve; + int fd; struct mirror_status *mirror; union mysockaddr connect_to; union mysockaddr connect_from; int use_connect_from = 0; - uint64_t max_bytes_per_second; + uint64_t max_Bps; int action_at_finish; int raw_port; @@ -206,9 +255,9 @@ int control_mirror(struct control_params* client, int linesc, char** lines) use_connect_from = 1; } - max_bytes_per_second = 0; + max_Bps = 0; if (linesc > 3) { - max_bytes_per_second = atoi(lines[2]); + max_Bps = atoi(lines[2]); } action_at_finish = ACTION_EXIT; @@ -231,43 +280,23 @@ int control_mirror(struct control_params* client, int linesc, char** lines) } /** I don't like use_connect_from but socket_connect doesn't take *mysockaddr :( */ - if (use_connect_from) { - fd = socket_connect(&connect_to.generic, &connect_from.generic); - } - else { - fd = socket_connect(&connect_to.generic, NULL); - } - + struct sockaddr *afrom = use_connect_from ? &connect_from.generic : NULL; + fd = socket_connect(&connect_to.generic, afrom); remote_size = socket_nbd_read_hello(fd); - mirror = xmalloc(sizeof(struct mirror_status)); - mirror->client = fd; - mirror->max_bytes_per_second = max_bytes_per_second; - mirror->action_at_finish = action_at_finish; - - FATAL_IF_NEGATIVE( - open_and_mmap( - client->serve->filename, - &map_fd, - &size, - (void**) &mirror->mapped - ), - "Failed to open and mmap %s", - client->serve->filename - ); - - mirror->dirty_map = bitset_alloc(size, 4096); - bitset_set_range(mirror->dirty_map, 0, size); - - client->serve->mirror = mirror; + mirror = mirror_status_create( serve, + fd, + max_Bps , + action_at_finish ); + serve->mirror = mirror; FATAL_IF( /* FIXME should free mirror on error */ 0 != pthread_create( &mirror->thread, NULL, mirror_runner, - client->serve + serve ), "Failed to create mirror thread" );