/* FlexNBD server (C) Bytemark Hosting 2012 This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "mirror.h" #include "serve.h" #include "util.h" #include "ioutil.h" #include "sockutil.h" #include "parse.h" #include "readwrite.h" #include "bitset.h" #include "self_pipe.h" #include "status.h" #include #include #include #include #include #include /* compat with older libev */ #ifndef EVBREAK_ONE #define ev_run( loop, flags ) ev_loop( loop, flags ) #define ev_break(loop, how) ev_unloop( loop, how ) #define EVBREAK_ONE EVUNLOOP_ONE #define EVBREAK_ALL EVUNLOOP_ALL #endif /* We use this to keep track of the socket request data we need to send */ struct xfer { /* Store the bytes we need to send before the data, or receive back */ union { struct nbd_request_raw req_raw; struct nbd_reply_raw rsp_raw; } hdr; /* what in mirror->mapped we should write, and how much of it we've done */ uint64_t from; uint64_t len; uint64_t written; /* number of bytes of response read */ uint64_t read; }; struct mirror_ctrl { struct server *serve; struct mirror *mirror; /* libev stuff */ struct ev_loop *ev_loop; ev_timer begin_watcher; ev_io read_watcher; ev_io write_watcher; ev_timer timeout_watcher; ev_timer limit_watcher; ev_io abandon_watcher; /* We set this if the bitset stream is getting uncomfortably full, and unset * once it's emptier */ int clear_events; /* This is set once all clients have been closed, to let the mirror know * it's safe to finish once the queue is empty */ int clients_closed; /* Use this to keep track of what we're copying at any moment */ struct xfer xfer; }; struct mirror *mirror_alloc(union mysockaddr *connect_to, union mysockaddr *connect_from, uint64_t max_Bps, enum mirror_finish_action action_at_finish, struct mbox *commit_signal) { struct mirror *mirror; mirror = xmalloc(sizeof(struct mirror)); mirror->connect_to = connect_to; mirror->connect_from = connect_from; mirror->max_bytes_per_second = max_Bps; mirror->action_at_finish = action_at_finish; mirror->commit_signal = commit_signal; mirror->commit_state = MS_UNKNOWN; mirror->abandon_signal = self_pipe_create(); if (mirror->abandon_signal == NULL) { warn("Couldn't create mirror abandon signal"); return NULL; } return mirror; } void mirror_set_state_f(struct mirror *mirror, enum mirror_state state) { NULLCHECK(mirror); mirror->commit_state = state; } #define mirror_set_state( mirror, state ) do{\ debug( "Mirror state => " #state );\ mirror_set_state_f( mirror, state );\ } while(0) enum mirror_state mirror_get_state(struct mirror *mirror) { NULLCHECK(mirror); return mirror->commit_state; } #define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state void mirror_init(struct mirror *mirror, const char *filename) { int map_fd; uint64_t size; NULLCHECK(mirror); NULLCHECK(filename); FATAL_IF_NEGATIVE(open_and_mmap(filename, &map_fd, &size, (void **) &mirror->mapped), "Failed to open and mmap %s", filename); FATAL_IF_NEGATIVE(madvise(mirror->mapped, size, MADV_SEQUENTIAL), SHOW_ERRNO("Failed to madvise() %s", filename) ); } /* Call this before a mirror attempt. */ void mirror_reset(struct mirror *mirror) { NULLCHECK(mirror); mirror_set_state(mirror, MS_INIT); mirror->all_dirty = 0; mirror->migration_started = 0; mirror->offset = 0; return; } struct mirror *mirror_create(const char *filename, union mysockaddr *connect_to, union mysockaddr *connect_from, uint64_t max_Bps, int action_at_finish, struct mbox *commit_signal) { /* FIXME: shouldn't map_fd get closed? */ struct mirror *mirror; mirror = mirror_alloc(connect_to, connect_from, max_Bps, action_at_finish, commit_signal); mirror_init(mirror, filename); mirror_reset(mirror); return mirror; } void mirror_destroy(struct mirror *mirror) { NULLCHECK(mirror); self_pipe_destroy(mirror->abandon_signal); free(mirror->connect_to); free(mirror->connect_from); free(mirror); } /** The mirror code will split NBD writes, making them this long as a maximum */ static const int mirror_longest_write = 8 << 20; /* This must not be called if there's any chance of further I/O. Methods to * ensure this include: * - Ensure image size is 0 * - call server_forbid_new_clients() followed by a successful * server_close_clients() ; server_join_clients() */ void mirror_on_exit(struct server *serve) { /* If we're still here, we can shut the server down. * * */ debug("serve_signal_close"); serve_signal_close(serve); /* We have to wait until the server is closed before unlocking * IO. This is because the client threads check to see if the * server is still open before reading or writing inside their * own locks. If we don't wait for the close, there's no way to * guarantee the server thread will win the race and we risk the * clients seeing a "successful" write to a dead disc image. */ debug("serve_wait_for_close"); serve_wait_for_close(serve); if (ACTION_UNLINK == serve->mirror->action_at_finish) { debug("Unlinking %s", serve->filename); server_unlink(serve); } debug("Sending disconnect"); socket_nbd_disconnect(serve->mirror->client); info("Mirror sent."); } void mirror_cleanup(struct server *serve, int fatal __attribute__ ((unused))) { NULLCHECK(serve); struct mirror *mirror = serve->mirror; NULLCHECK(mirror); info("Cleaning up mirror thread"); if (mirror->mapped) { munmap(mirror->mapped, serve->size); } mirror->mapped = NULL; if (mirror->client && mirror->client > 0) { close(mirror->client); } mirror->client = -1; } int mirror_connect(struct mirror *mirror, uint64_t local_size) { struct sockaddr *connect_from = NULL; int connected = 0; if (mirror->connect_from) { connect_from = &mirror->connect_from->generic; } NULLCHECK(mirror->connect_to); mirror->client = socket_connect(&mirror->connect_to->generic, connect_from); if (0 < mirror->client) { fd_set fds; struct timeval tv = { MS_HELLO_TIME_SECS, 0 }; FD_ZERO(&fds); FD_SET(mirror->client, &fds); FATAL_UNLESS(0 <= select(FD_SETSIZE, &fds, NULL, NULL, &tv), "Select failed."); if (FD_ISSET(mirror->client, &fds)) { uint64_t remote_size; uint32_t remote_flags; if (socket_nbd_read_hello (mirror->client, &remote_size, &remote_flags)) { if (remote_size == local_size) { connected = 1; mirror_set_state(mirror, MS_GO); } else { warn("Remote size (%d) doesn't match local (%d)", remote_size, local_size); mirror_set_state(mirror, MS_FAIL_SIZE_MISMATCH); } } else { warn("Mirror attempt rejected."); mirror_set_state(mirror, MS_FAIL_REJECTED); } } else { warn("No NBD Hello received."); mirror_set_state(mirror, MS_FAIL_NO_HELLO); } if (!connected) { close(mirror->client); } } else { warn("Mirror failed to connect."); mirror_set_state(mirror, MS_FAIL_CONNECT); } return connected; } int mirror_should_quit(struct mirror *mirror) { switch (mirror->action_at_finish) { case ACTION_EXIT: case ACTION_UNLINK: return 1; default: return 0; } } /* Bandwidth limiting - we hang around if bps is too high, unless we need to * empty out the bitset stream a bit */ int mirror_should_wait(struct mirror_ctrl *ctrl) { int bps_over = server_mirror_bps(ctrl->serve) > ctrl->serve->mirror->max_bytes_per_second; int stream_full = bitset_stream_size(ctrl->serve->allocation_map) > (BITSET_STREAM_SIZE / 2); return bps_over && !stream_full; } /* * If there's an event in the bitset stream of the serve allocation map, we * use it to construct the next transfer request, covering precisely the area * that has changed. If there are no events, we take the next * TODO: should we detect short events and lengthen them to reduce overhead? * * iterates through the bitmap, finding a dirty run to form the basis of the * next transfer, then puts it together. */ int mirror_setup_next_xfer(struct mirror_ctrl *ctrl) { struct mirror *mirror = ctrl->mirror; struct server *serve = ctrl->serve; struct bitset_stream_entry e = {.event = BITSET_STREAM_UNSET }; uint64_t current = mirror->offset, run = 0, size = serve->size; /* Technically, we'd be interested in UNSET events too, but they are never * generated. TODO if that changes. * * We use ctrl->clear_events to start emptying the stream when it's half * full, and stop when it's a quarter full. This stops a busy client from * stalling a migration forever. FIXME: made-up numbers. */ if (mirror->offset < serve->size && bitset_stream_size(serve->allocation_map) > BITSET_STREAM_SIZE / 2) { ctrl->clear_events = 1; } while ((mirror->offset == serve->size || ctrl->clear_events) && e.event != BITSET_STREAM_SET) { uint64_t events = bitset_stream_size(serve->allocation_map); if (events == 0) { break; } debug("Dequeueing event"); bitset_stream_dequeue(ctrl->serve->allocation_map, &e); debug("Dequeued event %i, %zu, %zu", e.event, e.from, e.len); if (events < (BITSET_STREAM_SIZE / 4)) { ctrl->clear_events = 0; } } if (e.event == BITSET_STREAM_SET) { current = e.from; run = e.len; } else if (current < serve->size) { current = mirror->offset; run = mirror_longest_write; /* Adjust final block if necessary */ if (current + run > serve->size) { run = size - current; } mirror->offset += run; } else { return 0; } debug("Next transfer: current=%" PRIu64 ", run=%" PRIu64, current, run); struct nbd_request req = { .magic = REQUEST_MAGIC, .type = REQUEST_WRITE, .handle.b = ".MIRROR.", .from = current, .len = run }; nbd_h2r_request(&req, &ctrl->xfer.hdr.req_raw); ctrl->xfer.from = current; ctrl->xfer.len = run; ctrl->xfer.written = 0; ctrl->xfer.read = 0; return 1; } // ONLY CALL THIS AFTER CLOSING CLIENTS void mirror_complete(struct server *serve) { /* FIXME: Pretty sure this is broken, if action != !QUIT. Just moving code * around for now, can fix it later. Action is always quit in production */ if (mirror_should_quit(serve->mirror)) { debug("exit!"); /* FIXME: This depends on blocking I/O right now, so make sure we are */ sock_set_nonblock(serve->mirror->client, 0); mirror_on_exit(serve); info("Server closed, quitting after successful migration"); } mirror_set_state(serve->mirror, MS_DONE); return; } static void mirror_write_cb(struct ev_loop *loop, ev_io * w, int revents) { struct mirror_ctrl *ctrl = (struct mirror_ctrl *) w->data; NULLCHECK(ctrl); struct xfer *xfer = &ctrl->xfer; size_t to_write, hdr_size = sizeof(struct nbd_request_raw); char *data_loc; ssize_t count; if (!(revents & EV_WRITE)) { warn("No write event signalled in mirror write callback"); return; } debug("Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client); /* FIXME: We can end up corking multiple times in unusual circumstances; this * is annoying, but harmless */ if (xfer->written == 0) { sock_set_tcp_cork(ctrl->mirror->client, 1); } if (xfer->written < hdr_size) { data_loc = ((char *) &xfer->hdr.req_raw) + ctrl->xfer.written; to_write = hdr_size - xfer->written; } else { data_loc = ctrl->mirror->mapped + xfer->from + (xfer->written - hdr_size); to_write = xfer->len - (ctrl->xfer.written - hdr_size); } // Actually write some bytes if ((count = write(ctrl->mirror->client, data_loc, to_write)) < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { warn(SHOW_ERRNO("Couldn't write to listener")); ev_break(loop, EVBREAK_ONE); } return; } debug("Wrote %" PRIu64 " bytes", count); debug("to_write was %" PRIu64 ", xfer->written was %" PRIu64, to_write, xfer->written); // We wrote some bytes, so reset the timer and keep track for the next pass if (count > 0) { ctrl->xfer.written += count; ev_timer_again(ctrl->ev_loop, &ctrl->timeout_watcher); } // All bytes written, so now we need to read the NBD reply back. if (ctrl->xfer.written == ctrl->xfer.len + hdr_size) { sock_set_tcp_cork(ctrl->mirror->client, 0); ev_io_start(loop, &ctrl->read_watcher); ev_io_stop(loop, &ctrl->write_watcher); } return; } static void mirror_read_cb(struct ev_loop *loop, ev_io * w, int revents) { struct mirror_ctrl *ctrl = (struct mirror_ctrl *) w->data; NULLCHECK(ctrl); struct mirror *m = ctrl->mirror; NULLCHECK(m); struct xfer *xfer = &ctrl->xfer; NULLCHECK(xfer); if (!(revents & EV_READ)) { warn("No read event signalled in mirror read callback"); return; } struct nbd_reply rsp; ssize_t count; uint64_t left = sizeof(struct nbd_reply_raw) - xfer->read; debug("Mirror read callback invoked with events %d. fd:%i", revents, m->client); /* Start / continue reading the NBD response from the mirror. */ if ((count = read(m->client, ((void *) &xfer->hdr.rsp_raw) + xfer->read, left)) < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { warn(SHOW_ERRNO("Couldn't read from listener")); ev_break(loop, EVBREAK_ONE); } debug(SHOW_ERRNO("Couldn't read from listener (non-scary)")); return; } if (count == 0) { warn("EOF reading response from server!"); ev_break(loop, EVBREAK_ONE); return; } // We read some bytes, so reset the timer ev_timer_again(ctrl->ev_loop, &ctrl->timeout_watcher); debug("Read %i bytes", count); debug("left was %" PRIu64 ", xfer->read was %" PRIu64, left, xfer->read); xfer->read += count; if (xfer->read < sizeof(struct nbd_reply_raw)) { // Haven't read the whole response yet return; } nbd_r2h_reply(&xfer->hdr.rsp_raw, &rsp); // validate reply, break event loop if bad if (rsp.magic != REPLY_MAGIC) { warn("Bad reply magic from listener"); ev_break(loop, EVBREAK_ONE); return; } if (rsp.error != 0) { warn("Error returned from listener: %i", rsp.error); ev_break(loop, EVBREAK_ONE); return; } if (memcmp(".MIRROR.", rsp.handle.b, 8) != 0) { warn("Bad handle returned from listener"); ev_break(loop, EVBREAK_ONE); return; } /* transfer was completed, so now we need to either set up the next * transfer of this pass, set up the first transfer of the next pass, or * complete the migration */ xfer->read = 0; xfer->written = 0; /* We don't account for bytes written in this mode, to stop high-throughput * discs getting stuck in "drain the event queue!" mode forever */ if (!ctrl->clear_events) { m->all_dirty += xfer->len; } /* This next bit could take a little while, which is fine */ ev_timer_stop(ctrl->ev_loop, &ctrl->timeout_watcher); /* Set up the next transfer, which may be offset + mirror_longest_write * or an event from the bitset stream. When offset hits serve->size, * xfers will be constructed solely from the event stream. Once our estimate * of time left reaches a sensible number (or the event stream empties), * we stop new clients from connecting, disconnect existing ones, then * continue emptying the bitstream. Once it's empty again, we're finished. */ int next_xfer = mirror_setup_next_xfer(ctrl); debug("next_xfer: %d", next_xfer); /* Regardless of time estimates, if there's no waiting transfer, we can start closing clients down. */ if (!ctrl->clients_closed && (!next_xfer || server_mirror_eta(ctrl->serve) < MS_CONVERGE_TIME_SECS)) { info("Closing clients to allow mirroring to converge"); server_forbid_new_clients(ctrl->serve); server_close_clients(ctrl->serve); server_join_clients(ctrl->serve); ctrl->clients_closed = 1; /* One more try - a new event may have been pushed since our last check */ if (!next_xfer) { next_xfer = mirror_setup_next_xfer(ctrl); } } if (ctrl->clients_closed && !next_xfer) { mirror_complete(ctrl->serve); ev_break(loop, EVBREAK_ONE); return; } /* This is a guard Just In Case */ ERROR_IF(!next_xfer, "Unknown problem - no next transfer to do!"); ev_io_stop(loop, &ctrl->read_watcher); /* FIXME: Should we ignore the bwlimit after server_close_clients has been called? */ if (mirror_should_wait(ctrl)) { /* We're over the bandwidth limit, so don't move onto the next transfer * yet. Our limit_watcher will move us on once we're OK. timeout_watcher * was disabled further up, so don't need to stop it here too */ debug("max_bps exceeded, waiting"); ev_timer_again(loop, &ctrl->limit_watcher); } else { /* We're waiting for the socket to become writable again, so re-enable */ ev_timer_again(loop, &ctrl->timeout_watcher); ev_io_start(loop, &ctrl->write_watcher); } return; } static void mirror_timeout_cb(struct ev_loop *loop, ev_timer * w __attribute__ ((unused)), int revents) { if (!(revents & EV_TIMER)) { warn("Mirror timeout called but no timer event signalled"); return; } info("Mirror timeout signalled"); ev_break(loop, EVBREAK_ONE); return; } static void mirror_abandon_cb(struct ev_loop *loop, ev_io * w, int revents) { struct mirror_ctrl *ctrl = (struct mirror_ctrl *) w->data; NULLCHECK(ctrl); if (!(revents & EV_READ)) { warn("Mirror abandon called but no abandon event signalled"); return; } debug("Abandon message received"); mirror_set_state(ctrl->mirror, MS_ABANDONED); self_pipe_signal_clear(ctrl->mirror->abandon_signal); ev_break(loop, EVBREAK_ONE); return; } static void mirror_limit_cb(struct ev_loop *loop, ev_timer * w, int revents) { struct mirror_ctrl *ctrl = (struct mirror_ctrl *) w->data; NULLCHECK(ctrl); if (!(revents & EV_TIMER)) { warn("Mirror limit callback executed but no timer event signalled"); return; } if (mirror_should_wait(ctrl)) { debug("max_bps exceeded, waiting", ctrl->mirror->max_bytes_per_second); ev_timer_again(loop, w); } else { /* We're below the limit, so do the next request */ debug("max_bps not exceeded, performing next transfer"); ev_io_start(loop, &ctrl->write_watcher); ev_timer_stop(loop, &ctrl->limit_watcher); ev_timer_again(loop, &ctrl->timeout_watcher); } return; } /* We use this to periodically check whether the allocation map has built, and * if it has, start migrating. If it's not finished, then enabling the bitset * stream does not go well for us. */ static void mirror_begin_cb(struct ev_loop *loop, ev_timer * w, int revents) { struct mirror_ctrl *ctrl = (struct mirror_ctrl *) w->data; NULLCHECK(ctrl); if (!(revents & EV_TIMER)) { warn("Mirror limit callback executed but no timer event signalled"); return; } if (ctrl->serve->allocation_map_built || ctrl->serve->allocation_map_not_built) { info("allocation map builder is finished, beginning migration"); ev_timer_stop(loop, w); /* Start by writing xfer 0 to the listener */ ev_io_start(loop, &ctrl->write_watcher); /* We want to timeout during the first write as well as subsequent ones */ ev_timer_again(loop, &ctrl->timeout_watcher); /* We're now interested in events */ bitset_enable_stream(ctrl->serve->allocation_map); } else { /* not done yet, so wait another second */ ev_timer_again(loop, w); } return; } void mirror_run(struct server *serve) { NULLCHECK(serve); NULLCHECK(serve->mirror); struct mirror *m = serve->mirror; m->migration_started = monotonic_time_ms(); info("Starting mirror"); /* mirror_setup_next_xfer won't be able to cope with this, so special-case * it here. There can't be any writes going on, so don't bother locking * anything. * */ if (serve->size == 0) { info("0-byte image special case"); mirror_complete(serve); return; } struct mirror_ctrl ctrl; memset(&ctrl, 0, sizeof(struct mirror_ctrl)); ctrl.serve = serve; ctrl.mirror = m; ctrl.ev_loop = EV_DEFAULT; /* gcc warns with -Wstrict-aliasing on -O2. clang doesn't * implement this warning. Seems to be the fault of ev.h */ ev_init(&ctrl.begin_watcher, mirror_begin_cb); ctrl.begin_watcher.repeat = 1.0; // We check bps every second. seems sane. ctrl.begin_watcher.data = (void *) &ctrl; ev_io_init(&ctrl.read_watcher, mirror_read_cb, m->client, EV_READ); ctrl.read_watcher.data = (void *) &ctrl; ev_io_init(&ctrl.write_watcher, mirror_write_cb, m->client, EV_WRITE); ctrl.write_watcher.data = (void *) &ctrl; ev_init(&ctrl.timeout_watcher, mirror_timeout_cb); char *env_request_limit = getenv("FLEXNBD_MS_REQUEST_LIMIT_SECS"); double timeout_limit = MS_REQUEST_LIMIT_SECS_F; if (NULL != env_request_limit) { char *endptr = NULL; errno = 0; double limit = strtod(env_request_limit, &endptr); warn(SHOW_ERRNO("Got %f from strtod", limit)); if (errno == 0) { timeout_limit = limit; } } ctrl.timeout_watcher.repeat = timeout_limit; ev_init(&ctrl.limit_watcher, mirror_limit_cb); ctrl.limit_watcher.repeat = 1.0; // We check bps every second. seems sane. ctrl.limit_watcher.data = (void *) &ctrl; ev_init(&ctrl.abandon_watcher, mirror_abandon_cb); ev_io_set(&ctrl.abandon_watcher, m->abandon_signal->read_fd, EV_READ); ctrl.abandon_watcher.data = (void *) &ctrl; ev_io_start(ctrl.ev_loop, &ctrl.abandon_watcher); ERROR_UNLESS(mirror_setup_next_xfer(&ctrl), "Couldn't find first transfer for mirror!"); if (serve->allocation_map_built) { /* Start by writing xfer 0 to the listener */ ev_io_start(ctrl.ev_loop, &ctrl.write_watcher); /* We want to timeout during the first write as well as subsequent ones */ ev_timer_again(ctrl.ev_loop, &ctrl.timeout_watcher); bitset_enable_stream(serve->allocation_map); } else { debug("Waiting for allocation map to be built"); ev_timer_again(ctrl.ev_loop, &ctrl.begin_watcher); } /* Everything up to here is blocking. We switch to non-blocking so we * can handle rate-limiting and weird error conditions better. TODO: We * should expand the event loop upwards so we can do the same there too */ sock_set_nonblock(m->client, 1); info("Entering event loop"); ev_run(ctrl.ev_loop, 0); info("Exited event loop"); /* Parent code might expect a non-blocking socket */ sock_set_nonblock(m->client, 0); /* Errors in the event loop don't track I/O lock state or try to restore * it to something sane - they just terminate the event loop with state != * MS_DONE. We re-allow new clients here if necessary. */ if (m->action_at_finish == ACTION_NOTHING || m->commit_state != MS_DONE) { server_allow_new_clients(serve); } /* Returning here says "mirroring complete" to the runner. The error * call retries the migration from scratch. */ if (m->commit_state != MS_DONE) { /* mirror_reset will be called before a retry, so keeping hold of events * between now and our next mirroring attempt is not useful */ bitset_disable_stream(serve->allocation_map); error("Event loop exited, but mirroring is not complete"); } return; } void mbox_post_mirror_state(struct mbox *mbox, enum mirror_state st) { NULLCHECK(mbox); enum mirror_state *contents = xmalloc(sizeof(enum mirror_state)); *contents = st; mbox_post(mbox, contents); } void mirror_signal_commit(struct mirror *mirror) { NULLCHECK(mirror); mbox_post_mirror_state(mirror->commit_signal, mirror_get_state(mirror)); } /** Thread launched to drive mirror process * This is needed for two reasons: firstly, it decouples the mirroring * from the control thread (although that's less valid with mboxes * passing state back and forth) and to provide an error context so that * retries can be cleanly handled without a bespoke error handling * mechanism. * */ void *mirror_runner(void *serve_params_uncast) { /* The supervisor thread relies on there not being any ERROR * calls until after the mirror_signal_commit() call in this * function. * However, *after* that, we should call ERROR_* instead of * FATAL_* wherever possible. */ struct server *serve = (struct server *) serve_params_uncast; NULLCHECK(serve); NULLCHECK(serve->mirror); struct mirror *mirror = serve->mirror; error_set_handler((cleanup_handler *) mirror_cleanup, serve); info("Connecting to mirror"); time_t start_time = time(NULL); int connected = mirror_connect(mirror, serve->size); mirror_signal_commit(mirror); if (!connected) { goto abandon_mirror; } /* After this point, if we see a failure we need to disconnect * and retry everything from mirror_set_state(_, MS_INIT), but * *without* signaling the commit or abandoning the mirror. * */ if ((time(NULL) - start_time) > MS_CONNECT_TIME_SECS) { /* If we get here, then we managed to connect but the * control thread feeding status back to the user will * have gone away, leaving the user without meaningful * feedback. In this instance, they have to assume a * failure, so we can't afford to let the mirror happen. * We have to set the state to avoid a race. */ mirror_set_state(mirror, MS_FAIL_CONNECT); warn("Mirror connected, but too slowly"); goto abandon_mirror; } mirror_run(serve); /* On success, this is unnecessary, and harmless ( mirror_cleanup does it * for us ). But if we've failed and are going to retry on the next run, we * must close this socket here to have any chance of it succeeding. */ if (!(mirror->client < 0)) { sock_try_close(mirror->client); mirror->client = -1; } abandon_mirror: return NULL; } struct mirror_super *mirror_super_create(const char *filename, union mysockaddr *connect_to, union mysockaddr *connect_from, uint64_t max_Bps, enum mirror_finish_action action_at_finish, struct mbox *state_mbox) { struct mirror_super *super = xmalloc(sizeof(struct mirror_super)); super->mirror = mirror_create(filename, connect_to, connect_from, max_Bps, action_at_finish, mbox_create()); super->state_mbox = state_mbox; return super; } /* Post the current state of the mirror into super->state_mbox.*/ void mirror_super_signal_committed(struct mirror_super *super, enum mirror_state commit_state) { NULLCHECK(super); NULLCHECK(super->state_mbox); mbox_post_mirror_state(super->state_mbox, commit_state); } void mirror_super_destroy(struct mirror_super *super) { NULLCHECK(super); mbox_destroy(super->mirror->commit_signal); mirror_destroy(super->mirror); free(super); } /* The mirror supervisor thread. Responsible for kicking off retries if * the mirror thread fails. * The mirror and mirror_super objects are never freed, and the * mirror_super_runner thread is never joined. */ void *mirror_super_runner(void *serve_uncast) { struct server *serve = (struct server *) serve_uncast; NULLCHECK(serve); NULLCHECK(serve->mirror); NULLCHECK(serve->mirror_super); int first_pass = 1; int should_retry = 0; int success = 0, abandoned = 0; struct mirror *mirror = serve->mirror; struct mirror_super *super = serve->mirror_super; do { FATAL_IF(0 != pthread_create(&mirror->thread, NULL, mirror_runner, serve), "Failed to create mirror thread"); debug("Supervisor waiting for commit signal"); enum mirror_state *commit_state = mbox_receive(mirror->commit_signal); debug("Supervisor got commit signal"); if (first_pass) { /* Only retry if the connection attempt was successful. Otherwise * the user will see an error reported while we're still trying to * retry behind the scenes. This may race with migration completing * but since we "shouldn't retry" in that case either, that's fine */ should_retry = *commit_state == MS_GO; /* Only send this signal the first time */ mirror_super_signal_committed(super, *commit_state); debug("Mirror supervisor committed"); } /* We only care about the value of the commit signal on * the first pass, so this is ok */ free(commit_state); debug("Supervisor waiting for mirror thread"); pthread_join(mirror->thread, NULL); /* If we can't connect to the remote end, the watcher for the abandon * signal never gets installed at the moment, which is why we also check * it here. */ abandoned = mirror_get_state(mirror) == MS_ABANDONED || self_pipe_signal_clear(mirror->abandon_signal); success = MS_DONE == mirror_get_state(mirror); if (success) { info("Mirror supervisor success, exiting"); } else if (abandoned) { info("Mirror abandoned"); should_retry = 0; } else if (should_retry) { info("Mirror failed, retrying"); } else { info("Mirror failed before commit, giving up"); } first_pass = 0; if (should_retry) { /* We don't want to hammer the destination too * hard, so if this is a retry, insert a delay. */ sleep(MS_RETRY_DELAY_SECS); /* We also have to reset the bitmap to be sure * we transfer everything */ mirror_reset(mirror); } } while (should_retry && !success); return NULL; }