2012-06-27 15:45:33 +01:00
|
|
|
/* FlexNBD server (C) Bytemark Hosting 2012
|
|
|
|
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
|
|
it under the terms of the GNU General Public License as published by
|
|
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
|
|
(at your option) any later version.
|
|
|
|
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
GNU General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
#include "mirror.h"
|
|
|
|
#include "serve.h"
|
|
|
|
#include "util.h"
|
|
|
|
#include "ioutil.h"
|
2013-08-09 17:02:10 +01:00
|
|
|
#include "sockutil.h"
|
2012-06-27 15:45:33 +01:00
|
|
|
#include "parse.h"
|
|
|
|
#include "readwrite.h"
|
|
|
|
#include "bitset.h"
|
|
|
|
#include "self_pipe.h"
|
|
|
|
#include "status.h"
|
|
|
|
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <string.h>
|
|
|
|
#include <sys/un.h>
|
|
|
|
#include <unistd.h>
|
2012-12-28 11:38:54 +00:00
|
|
|
#include <sys/mman.h>
|
2013-08-09 17:02:10 +01:00
|
|
|
#include <ev.h>
|
|
|
|
|
|
|
|
/* compat with older libev */
|
|
|
|
#ifndef EVBREAK_ONE
|
|
|
|
|
|
|
|
#define ev_run( loop, flags ) ev_loop( loop, flags )
|
|
|
|
|
|
|
|
#define ev_break(loop, how) ev_unloop( loop, how )
|
|
|
|
|
|
|
|
#define EVBREAK_ONE EVUNLOOP_ONE
|
|
|
|
#define EVBREAK_ALL EVUNLOOP_ALL
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
/* We use this to keep track of the socket request data we need to send */
|
|
|
|
struct xfer {
|
|
|
|
/* Store the bytes we need to send before the data, or receive back */
|
|
|
|
union {
|
|
|
|
struct nbd_request_raw req_raw;
|
|
|
|
struct nbd_reply_raw rsp_raw;
|
|
|
|
} hdr;
|
|
|
|
|
|
|
|
/* what in mirror->mapped we should write, and how much of it we've done */
|
|
|
|
uint64_t from;
|
|
|
|
uint64_t len;
|
|
|
|
uint64_t written;
|
|
|
|
|
|
|
|
/* number of bytes of response read */
|
|
|
|
uint64_t read;
|
|
|
|
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
struct mirror_ctrl {
|
|
|
|
struct server *serve;
|
|
|
|
struct mirror *mirror;
|
|
|
|
|
|
|
|
/* libev stuff */
|
|
|
|
struct ev_loop *ev_loop;
|
|
|
|
ev_io read_watcher;
|
|
|
|
ev_io write_watcher;
|
|
|
|
ev_timer timeout_watcher;
|
2013-08-14 16:24:50 +01:00
|
|
|
ev_timer limit_watcher;
|
2013-08-12 15:30:21 +01:00
|
|
|
ev_io abandon_watcher;
|
2013-08-09 17:02:10 +01:00
|
|
|
|
|
|
|
/* Use this to keep track of what we're copying at any moment */
|
|
|
|
struct xfer xfer;
|
|
|
|
};
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
struct mirror * mirror_alloc(
|
2012-06-27 15:45:33 +01:00
|
|
|
union mysockaddr * connect_to,
|
|
|
|
union mysockaddr * connect_from,
|
2013-08-13 12:30:18 +01:00
|
|
|
uint64_t max_Bps,
|
|
|
|
enum mirror_finish_action action_at_finish,
|
2012-07-15 19:46:35 +01:00
|
|
|
struct mbox * commit_signal)
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
2012-07-12 14:54:48 +01:00
|
|
|
struct mirror * mirror;
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
mirror = xmalloc(sizeof(struct mirror));
|
2012-06-27 15:45:33 +01:00
|
|
|
mirror->connect_to = connect_to;
|
|
|
|
mirror->connect_from = connect_from;
|
|
|
|
mirror->max_bytes_per_second = max_Bps;
|
|
|
|
mirror->action_at_finish = action_at_finish;
|
|
|
|
mirror->commit_signal = commit_signal;
|
|
|
|
mirror->commit_state = MS_UNKNOWN;
|
2013-08-12 15:30:21 +01:00
|
|
|
mirror->abandon_signal = self_pipe_create();
|
|
|
|
|
|
|
|
if ( mirror->abandon_signal == NULL ) {
|
|
|
|
warn( "Couldn't create mirror abandon signal" );
|
|
|
|
return NULL;
|
|
|
|
}
|
2012-06-27 15:45:33 +01:00
|
|
|
|
|
|
|
return mirror;
|
|
|
|
}
|
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
void mirror_set_state_f( struct mirror * mirror, enum mirror_state state )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
NULLCHECK( mirror );
|
|
|
|
mirror->commit_state = state;
|
|
|
|
}
|
|
|
|
|
|
|
|
#define mirror_set_state( mirror, state ) do{\
|
|
|
|
debug( "Mirror state => " #state );\
|
|
|
|
mirror_set_state_f( mirror, state );\
|
|
|
|
} while(0)
|
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
enum mirror_state mirror_get_state( struct mirror * mirror )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
NULLCHECK( mirror );
|
|
|
|
return mirror->commit_state;
|
|
|
|
}
|
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
#define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state
|
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
void mirror_init( struct mirror * mirror, const char * filename )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
int map_fd;
|
|
|
|
off64_t size;
|
|
|
|
|
|
|
|
NULLCHECK( mirror );
|
|
|
|
NULLCHECK( filename );
|
|
|
|
|
|
|
|
FATAL_IF_NEGATIVE(
|
|
|
|
open_and_mmap(
|
2012-09-20 13:37:48 +01:00
|
|
|
filename,
|
2012-06-27 15:45:33 +01:00
|
|
|
&map_fd,
|
2012-09-20 13:37:48 +01:00
|
|
|
&size,
|
2012-06-27 15:45:33 +01:00
|
|
|
(void**) &mirror->mapped
|
|
|
|
),
|
|
|
|
"Failed to open and mmap %s",
|
|
|
|
filename
|
|
|
|
);
|
2012-09-20 13:37:48 +01:00
|
|
|
|
2013-05-28 14:16:49 +01:00
|
|
|
FATAL_IF_NEGATIVE(
|
|
|
|
madvise( mirror->mapped, size, MADV_SEQUENTIAL ),
|
|
|
|
SHOW_ERRNO( "Failed to madvise() %s", filename )
|
|
|
|
);
|
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
mirror->dirty_map = bitset_alloc(size, 4096);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/* Call this before a mirror attempt. */
|
2012-07-12 14:54:48 +01:00
|
|
|
void mirror_reset( struct mirror * mirror )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
NULLCHECK( mirror );
|
|
|
|
NULLCHECK( mirror->dirty_map );
|
|
|
|
mirror_set_state( mirror, MS_INIT );
|
2013-08-09 17:02:10 +01:00
|
|
|
|
|
|
|
/* See the caveats in mirror_run if you change this! */
|
2012-06-27 15:45:33 +01:00
|
|
|
bitset_set(mirror->dirty_map);
|
2013-08-09 17:02:10 +01:00
|
|
|
|
2013-07-26 11:50:01 +01:00
|
|
|
mirror->all_dirty = 0;
|
|
|
|
mirror->all_clean = 0;
|
|
|
|
mirror->pass = 0;
|
|
|
|
mirror->this_pass_dirty = 0;
|
|
|
|
mirror->this_pass_clean = 0;
|
|
|
|
mirror->migration_started = 0;
|
2013-08-09 17:02:10 +01:00
|
|
|
|
|
|
|
return;
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
struct mirror * mirror_create(
|
2012-06-27 15:45:33 +01:00
|
|
|
const char * filename,
|
|
|
|
union mysockaddr * connect_to,
|
|
|
|
union mysockaddr * connect_from,
|
2013-08-13 12:30:18 +01:00
|
|
|
uint64_t max_Bps,
|
2012-06-27 15:45:33 +01:00
|
|
|
int action_at_finish,
|
2012-07-15 19:46:35 +01:00
|
|
|
struct mbox * commit_signal)
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
/* FIXME: shouldn't map_fd get closed? */
|
2012-07-12 14:54:48 +01:00
|
|
|
struct mirror * mirror;
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
mirror = mirror_alloc( connect_to,
|
2012-06-27 15:45:33 +01:00
|
|
|
connect_from,
|
|
|
|
max_Bps,
|
|
|
|
action_at_finish,
|
|
|
|
commit_signal);
|
2012-09-20 13:37:48 +01:00
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
mirror_init( mirror, filename );
|
|
|
|
mirror_reset( mirror );
|
2012-06-27 15:45:33 +01:00
|
|
|
|
|
|
|
|
|
|
|
return mirror;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
void mirror_destroy( struct mirror *mirror )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
NULLCHECK( mirror );
|
2013-08-12 15:30:21 +01:00
|
|
|
self_pipe_destroy( mirror->abandon_signal );
|
2012-06-27 15:45:33 +01:00
|
|
|
free(mirror->connect_to);
|
|
|
|
free(mirror->connect_from);
|
2013-09-11 14:41:59 +01:00
|
|
|
bitset_free( mirror->dirty_map );
|
2012-06-27 15:45:33 +01:00
|
|
|
free(mirror);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/** The mirror code will split NBD writes, making them this long as a maximum */
|
|
|
|
static const int mirror_longest_write = 8<<20;
|
|
|
|
|
|
|
|
/** If, during a mirror pass, we have sent this number of bytes or fewer, we
|
|
|
|
* go to freeze the I/O and finish it off. This is just a guess.
|
|
|
|
*/
|
|
|
|
static const unsigned int mirror_last_pass_after_bytes_written = 100<<20;
|
|
|
|
|
2012-09-20 13:37:48 +01:00
|
|
|
/** The largest number of full passes we'll do - the last one will always
|
2012-06-27 15:45:33 +01:00
|
|
|
* cause the I/O to freeze, however many bytes are left to copy.
|
|
|
|
*/
|
|
|
|
static const int mirror_maximum_passes = 7;
|
2013-08-09 17:02:10 +01:00
|
|
|
#define mirror_last_pass (mirror_maximum_passes - 1)
|
2012-06-27 15:45:33 +01:00
|
|
|
|
|
|
|
|
|
|
|
/* THIS FUNCTION MUST ONLY BE CALLED WITH THE SERVER'S IO LOCKED. */
|
|
|
|
void mirror_on_exit( struct server * serve )
|
|
|
|
{
|
2012-07-23 10:22:25 +01:00
|
|
|
/* If we're still here, we can shut the server down.
|
2012-06-27 15:45:33 +01:00
|
|
|
*
|
|
|
|
* It doesn't matter if we get new client connections before
|
|
|
|
* now, the IO lock will stop them from doing anything.
|
|
|
|
*/
|
|
|
|
debug("serve_signal_close");
|
|
|
|
serve_signal_close( serve );
|
|
|
|
|
|
|
|
/* We have to wait until the server is closed before unlocking
|
|
|
|
* IO. This is because the client threads check to see if the
|
|
|
|
* server is still open before reading or writing inside their
|
|
|
|
* own locks. If we don't wait for the close, there's no way to
|
|
|
|
* guarantee the server thread will win the race and we risk the
|
|
|
|
* clients seeing a "successful" write to a dead disc image.
|
|
|
|
*/
|
|
|
|
debug("serve_wait_for_close");
|
|
|
|
serve_wait_for_close( serve );
|
2012-07-23 10:22:25 +01:00
|
|
|
|
|
|
|
if ( ACTION_UNLINK == serve->mirror->action_at_finish ) {
|
2013-05-30 11:05:26 +01:00
|
|
|
debug("Unlinking %s", serve->filename );
|
2012-07-23 13:39:27 +01:00
|
|
|
server_unlink( serve );
|
2012-07-23 10:22:25 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
debug("Sending disconnect");
|
|
|
|
socket_nbd_disconnect( serve->mirror->client );
|
2012-06-27 15:45:33 +01:00
|
|
|
info("Mirror sent.");
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-11 09:43:16 +01:00
|
|
|
void mirror_cleanup( struct server * serve,
|
2012-06-27 15:45:33 +01:00
|
|
|
int fatal __attribute__((unused)))
|
|
|
|
{
|
2012-07-11 09:43:16 +01:00
|
|
|
NULLCHECK( serve );
|
2012-07-12 14:54:48 +01:00
|
|
|
struct mirror * mirror = serve->mirror;
|
2012-06-27 15:45:33 +01:00
|
|
|
NULLCHECK( mirror );
|
|
|
|
info( "Cleaning up mirror thread");
|
|
|
|
|
2013-05-30 11:09:24 +01:00
|
|
|
if ( mirror->mapped ) {
|
|
|
|
munmap( mirror->mapped, serve->size );
|
|
|
|
}
|
|
|
|
mirror->mapped = NULL;
|
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
if( mirror->client && mirror->client > 0 ){
|
|
|
|
close( mirror->client );
|
|
|
|
}
|
|
|
|
mirror->client = -1;
|
2012-07-11 09:43:16 +01:00
|
|
|
|
|
|
|
if( server_io_locked( serve ) ){ server_unlock_io( serve ); }
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
int mirror_connect( struct mirror * mirror, off64_t local_size )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
struct sockaddr * connect_from = NULL;
|
2012-07-15 18:30:20 +01:00
|
|
|
int connected = 0;
|
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
if ( mirror->connect_from ) {
|
|
|
|
connect_from = &mirror->connect_from->generic;
|
|
|
|
}
|
|
|
|
|
|
|
|
NULLCHECK( mirror->connect_to );
|
|
|
|
|
|
|
|
mirror->client = socket_connect(&mirror->connect_to->generic, connect_from);
|
|
|
|
if ( 0 < mirror->client ) {
|
|
|
|
fd_set fds;
|
|
|
|
struct timeval tv = { MS_HELLO_TIME_SECS, 0};
|
|
|
|
FD_ZERO( &fds );
|
|
|
|
FD_SET( mirror->client, &fds );
|
|
|
|
|
|
|
|
FATAL_UNLESS( 0 <= select( FD_SETSIZE, &fds, NULL, NULL, &tv ),
|
|
|
|
"Select failed." );
|
|
|
|
|
|
|
|
if( FD_ISSET( mirror->client, &fds ) ){
|
|
|
|
off64_t remote_size;
|
|
|
|
if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) {
|
|
|
|
if( remote_size == local_size ){
|
2012-07-15 18:30:20 +01:00
|
|
|
connected = 1;
|
2012-06-27 15:45:33 +01:00
|
|
|
mirror_set_state( mirror, MS_GO );
|
|
|
|
}
|
|
|
|
else {
|
2012-09-20 13:37:48 +01:00
|
|
|
warn("Remote size (%d) doesn't match local (%d)",
|
2012-06-27 15:45:33 +01:00
|
|
|
remote_size, local_size );
|
|
|
|
mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH );
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
warn( "Mirror attempt rejected." );
|
|
|
|
mirror_set_state( mirror, MS_FAIL_REJECTED );
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
warn( "No NBD Hello received." );
|
|
|
|
mirror_set_state( mirror, MS_FAIL_NO_HELLO );
|
|
|
|
}
|
2012-07-15 18:30:20 +01:00
|
|
|
|
|
|
|
if ( !connected ) { close( mirror->client ); }
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
warn( "Mirror failed to connect.");
|
|
|
|
mirror_set_state( mirror, MS_FAIL_CONNECT );
|
|
|
|
}
|
|
|
|
|
2012-07-15 18:30:20 +01:00
|
|
|
return connected;
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-23 10:22:25 +01:00
|
|
|
int mirror_should_quit( struct mirror * mirror )
|
|
|
|
{
|
|
|
|
switch( mirror->action_at_finish ) {
|
|
|
|
case ACTION_EXIT:
|
|
|
|
case ACTION_UNLINK:
|
|
|
|
return 1;
|
|
|
|
default:
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
/* Iterates through the bitmap, finding a dirty run to form the basis of the
|
|
|
|
* next transfer, then puts it together. */
|
|
|
|
int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
|
|
|
|
{
|
|
|
|
struct mirror* mirror = ctrl->mirror;
|
|
|
|
uint64_t current, run, size = ctrl->serve->size;
|
|
|
|
int found = 0;
|
|
|
|
|
|
|
|
do {
|
|
|
|
int run_is_set = 0;
|
|
|
|
current = mirror->this_pass_dirty + mirror->this_pass_clean;
|
|
|
|
|
|
|
|
run = bitset_run_count_ex(
|
|
|
|
mirror->dirty_map, current, mirror_longest_write, &run_is_set
|
|
|
|
);
|
|
|
|
|
|
|
|
if ( current + run > size ) {
|
|
|
|
debug(
|
|
|
|
"Size not divisible by %i, adjusting final block",
|
|
|
|
block_allocation_resolution
|
|
|
|
);
|
|
|
|
run = size - current;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* FIXME: we could avoid sending sparse areas of the disc here, and
|
|
|
|
* probably save a lot of bandwidth and time (if we know the destination
|
|
|
|
* starts off zeroed). */
|
|
|
|
if ( run_is_set ) {
|
|
|
|
found = 1;
|
|
|
|
} else {
|
|
|
|
mirror->this_pass_clean += run;
|
|
|
|
mirror->all_clean += run;
|
|
|
|
}
|
|
|
|
} while ( !found && current + run < size );
|
|
|
|
|
|
|
|
/* current and run specify our next transfer */
|
|
|
|
if ( !found ) {
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
debug( "Next dirty block: current=%"PRIu64", run=%"PRIu64, current, run );
|
|
|
|
struct nbd_request req = {
|
|
|
|
.magic = REQUEST_MAGIC,
|
|
|
|
.type = REQUEST_WRITE,
|
|
|
|
.handle = ".MIRROR.",
|
|
|
|
.from = current,
|
|
|
|
.len = run
|
|
|
|
};
|
|
|
|
nbd_h2r_request( &req, &ctrl->xfer.hdr.req_raw );
|
|
|
|
|
|
|
|
ctrl->xfer.from = current;
|
|
|
|
ctrl->xfer.len = run;
|
|
|
|
|
|
|
|
ctrl->xfer.written = 0;
|
|
|
|
ctrl->xfer.read = 0;
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
2013-08-14 16:24:50 +01:00
|
|
|
int mirror_exceeds_max_bps( struct mirror *mirror )
|
|
|
|
{
|
|
|
|
uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started;
|
2013-09-10 16:03:26 +01:00
|
|
|
uint64_t mig_speed = mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
|
2013-08-14 16:24:50 +01:00
|
|
|
|
|
|
|
debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second );
|
|
|
|
|
|
|
|
if ( mig_speed > mirror->max_bytes_per_second ) {
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
// ONLY CALL THIS WHEN SERVER IO IS LOCKED
|
|
|
|
void mirror_complete( struct server *serve )
|
|
|
|
{
|
|
|
|
/* FIXME: Pretty sure this is broken, if action != !QUIT. Just moving code
|
|
|
|
* around for now, can fix it later. Action is always quit in production */
|
|
|
|
if ( mirror_should_quit( serve->mirror ) ) {
|
|
|
|
debug("exit!");
|
|
|
|
/* FIXME: This depends on blocking I/O right now, so make sure we are */
|
|
|
|
sock_set_nonblock( serve->mirror->client, 0 );
|
|
|
|
mirror_on_exit( serve );
|
|
|
|
info("Server closed, quitting after successful migration");
|
|
|
|
}
|
|
|
|
|
|
|
|
mirror_set_state( serve->mirror, MS_DONE );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|
|
|
{
|
|
|
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
|
|
|
NULLCHECK( ctrl );
|
|
|
|
|
|
|
|
struct xfer *xfer = &ctrl->xfer;
|
|
|
|
|
|
|
|
size_t to_write, hdr_size = sizeof( struct nbd_request_raw );
|
|
|
|
char *data_loc;
|
|
|
|
ssize_t count;
|
|
|
|
|
|
|
|
if ( !( revents & EV_WRITE ) ) {
|
|
|
|
warn( "No write event signalled in mirror write callback" );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client );
|
|
|
|
|
|
|
|
if ( xfer->written < hdr_size ) {
|
|
|
|
data_loc = ( (char*) &xfer->hdr.req_raw ) + ctrl->xfer.written;
|
|
|
|
to_write = hdr_size - xfer->written;
|
|
|
|
} else {
|
|
|
|
data_loc = ctrl->mirror->mapped + xfer->from + ( xfer->written - hdr_size );
|
|
|
|
to_write = xfer->len - ( ctrl->xfer.written - hdr_size );
|
|
|
|
|
|
|
|
// If we're in the last pass, we'll be locked anyway. If we're not in
|
|
|
|
// the last pass, we want to be locked for every write() call that
|
|
|
|
// we issue, to avoid the blocks being updated while we work. In
|
|
|
|
// particular, bitset_run_clear() must be called while the I/O is locked
|
|
|
|
// or we might clear a bit that had been set by another write.
|
|
|
|
if ( !server_io_locked( ctrl->serve ) ) {
|
|
|
|
server_lock_io( ctrl->serve );
|
2013-09-10 16:03:26 +01:00
|
|
|
debug( "In lock block" );
|
2013-08-09 17:02:10 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// Actually read some bytes
|
|
|
|
if ( ( count = write( ctrl->mirror->client, data_loc, to_write ) ) < 0 ) {
|
|
|
|
if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
|
|
|
|
warn( SHOW_ERRNO( "Couldn't write to listener" ) );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
debug( "Wrote %"PRIu64" bytes", count );
|
|
|
|
debug( "to_write was %"PRIu64", xfer->written was %"PRIu64, to_write, xfer->written );
|
|
|
|
ctrl->xfer.written += count;
|
|
|
|
|
|
|
|
// We write some bytes, so reset the timer
|
|
|
|
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
|
|
|
|
|
|
|
|
// All bytes written, so now we need to read the NBD reply back.
|
|
|
|
if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) {
|
|
|
|
|
|
|
|
// We can, however, clear the run here. If it turns out that the
|
|
|
|
// NBD request has been rejected, we're discarding it anyway, so the
|
|
|
|
// wrong data won't get used. If the request is a success, any blocks
|
|
|
|
// written to while waiting for the reply will be copied in the next
|
|
|
|
// pass; if it's the final pass, I/O remains locked.
|
|
|
|
debug( "Clearing bitset from=%"PRIu64" run=%"PRIu64", ctr->xfer.from, ctrl->xfer.len" );
|
|
|
|
bitset_clear_range( ctrl->mirror->dirty_map, ctrl->xfer.from, ctrl->xfer.len );
|
|
|
|
|
|
|
|
if ( ctrl->mirror->pass != mirror_last_pass ) {
|
|
|
|
debug( "Leaving lock block" );
|
|
|
|
server_unlock_io( ctrl->serve );
|
|
|
|
}
|
|
|
|
ev_io_start( loop, &ctrl->read_watcher );
|
|
|
|
ev_io_stop( loop, &ctrl->write_watcher );
|
|
|
|
}
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|
|
|
{
|
|
|
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
|
|
|
NULLCHECK( ctrl );
|
|
|
|
|
|
|
|
struct mirror *m = ctrl->mirror;
|
|
|
|
NULLCHECK( m );
|
|
|
|
|
|
|
|
struct xfer *xfer = &ctrl->xfer;
|
|
|
|
NULLCHECK( xfer );
|
|
|
|
|
|
|
|
if ( !( revents & EV_READ ) ) {
|
|
|
|
warn( "No read event signalled in mirror read callback" );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
struct nbd_reply rsp;
|
|
|
|
ssize_t count;
|
|
|
|
uint64_t left = sizeof( struct nbd_reply_raw ) - xfer->read;
|
|
|
|
|
|
|
|
debug( "Mirror read callback invoked with events %d. fd:%i", revents, m->client );
|
|
|
|
|
|
|
|
/* Start / continue reading the NBD response from the mirror. */
|
|
|
|
if ( ( count = read( m->client, ((void*) &xfer->hdr.rsp_raw) + xfer->read, left ) ) < 0 ) {
|
|
|
|
if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
|
|
|
|
warn( SHOW_ERRNO( "Couldn't read from listener" ) );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
}
|
|
|
|
debug( SHOW_ERRNO( "Couldn't read from listener (non-scary)" ) );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( count == 0 ) {
|
|
|
|
warn( "EOF reading response from server!" );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// We read some bytes, so reset the timer
|
|
|
|
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
|
|
|
|
|
|
|
|
debug( "Read %"PRIu64" bytes", count );
|
|
|
|
debug( "left was %"PRIu64", xfer->read was %"PRIu64, left, xfer->read );
|
|
|
|
xfer->read += count;
|
|
|
|
|
|
|
|
if ( xfer->read < sizeof( struct nbd_reply_raw ) ) {
|
|
|
|
// Haven't read the whole response yet
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
nbd_r2h_reply( &xfer->hdr.rsp_raw, &rsp );
|
|
|
|
|
|
|
|
// validate reply, break event loop if bad
|
|
|
|
if ( rsp.magic != REPLY_MAGIC ) {
|
|
|
|
warn( "Bad reply magic from listener" );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( rsp.error != 0 ) {
|
|
|
|
warn( "Error returned from listener: %i", rsp.error );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( memcmp( ".MIRROR.", &rsp.handle[0], 8 ) != 0 ) {
|
|
|
|
warn( "Bad handle returned from listener" );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* transfer was completed, so now we need to either set up the next
|
|
|
|
* transfer of this pass, set up the first transfer of the next pass, or
|
|
|
|
* complete the migration */
|
|
|
|
m->this_pass_dirty += xfer->len;
|
|
|
|
m->all_dirty += xfer->len;
|
|
|
|
xfer->read = 0;
|
|
|
|
xfer->written = 0;
|
|
|
|
|
|
|
|
/* This next bit could take a little while, which is fine */
|
|
|
|
ev_timer_stop( ctrl->ev_loop, &ctrl->timeout_watcher );
|
|
|
|
|
|
|
|
do {
|
|
|
|
// This pass complete
|
|
|
|
if ( m->this_pass_dirty + m->this_pass_clean == ctrl->serve->size ) {
|
|
|
|
debug( "Pass %d completed", m->pass );
|
|
|
|
/* Set up the next transfer, which may be n+1 in the current pass,
|
|
|
|
* or 0 in a new pass. If we can't find another transfer to do, that
|
|
|
|
* means the pass is complete. Advance pass and re-run the end-of-
|
|
|
|
* pass logic to complete migration ( pass == mirror_last_pass ), or
|
|
|
|
* move onto the last pass ( pass < mirror_last_pass, by virtue of
|
|
|
|
* this_pass_dirty being 0 ).
|
|
|
|
*/
|
|
|
|
|
|
|
|
// last pass completed
|
|
|
|
if ( m->pass >= mirror_last_pass ) {
|
|
|
|
/* This was the last pass, so finish. */
|
|
|
|
mirror_complete( ctrl->serve );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// this was not the last pass - set up for the next run.
|
|
|
|
if ( m->this_pass_dirty < mirror_last_pass_after_bytes_written ) {
|
|
|
|
/* Quiet disc, so skip to the final pass */
|
|
|
|
m->pass = mirror_last_pass;
|
|
|
|
} else {
|
|
|
|
m->pass++;
|
|
|
|
}
|
|
|
|
// FIXME: Can status race with us if it inspects state here?
|
|
|
|
m->this_pass_dirty = 0;
|
|
|
|
m->this_pass_clean = 0;
|
|
|
|
|
|
|
|
debug( "mirror start pass=%d", m->pass );
|
|
|
|
if ( m->pass == mirror_last_pass ) {
|
2013-09-10 16:03:26 +01:00
|
|
|
/* This is the start of our next pass. If it happens to be the
|
|
|
|
* final pass, we need to wait for all the clients to exit before
|
|
|
|
* continuing */
|
2013-08-09 17:02:10 +01:00
|
|
|
debug( "In lock block for last pass" );
|
2013-09-10 16:03:26 +01:00
|
|
|
/* FIXME: this could block */
|
|
|
|
server_forbid_new_clients( ctrl->serve );
|
|
|
|
server_close_clients( ctrl->serve );
|
|
|
|
|
|
|
|
// FIXME: In theory, we won't need this any more...
|
2013-08-09 17:02:10 +01:00
|
|
|
server_lock_io( ctrl->serve );
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} while ( !mirror_setup_next_xfer( ctrl ) );
|
|
|
|
|
2013-08-14 16:24:50 +01:00
|
|
|
ev_io_stop( loop, &ctrl->read_watcher );
|
|
|
|
|
|
|
|
if ( mirror_exceeds_max_bps( m ) ) {
|
|
|
|
/* We're over the bandwidth limit, so don't move onto the next transfer
|
|
|
|
* yet. Our limit_watcher will move us on once we're OK. timeout_watcher
|
|
|
|
* was disabled further up, so don't need to stop it here too */
|
|
|
|
debug( "max_bps exceeded, waiting" );
|
|
|
|
ev_timer_again( loop, &ctrl->limit_watcher );
|
|
|
|
} else {
|
|
|
|
/* We're waiting for the socket to become writable again, so re-enable */
|
|
|
|
ev_timer_again( loop, &ctrl->timeout_watcher );
|
|
|
|
ev_io_start( loop, &ctrl->write_watcher );
|
|
|
|
}
|
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)), int revents )
|
|
|
|
{
|
|
|
|
if ( !(revents & EV_TIMER ) ) {
|
|
|
|
warn( "Mirror timeout called but no timer event signalled" );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
info( "Mirror timeout signalled" );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
return;
|
|
|
|
}
|
2012-12-28 11:38:54 +00:00
|
|
|
|
2013-08-12 15:30:21 +01:00
|
|
|
void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|
|
|
{
|
|
|
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
|
|
|
NULLCHECK( ctrl );
|
|
|
|
|
|
|
|
if ( !(revents & EV_READ ) ) {
|
|
|
|
warn( "Mirror abandon called but no abandon event signalled" );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
debug( "Abandon message received" );
|
2013-08-12 16:14:53 +01:00
|
|
|
mirror_set_state( ctrl->mirror, MS_ABANDONED );
|
2013-08-12 15:30:21 +01:00
|
|
|
self_pipe_signal_clear( ctrl->mirror->abandon_signal );
|
|
|
|
ev_break( loop, EVBREAK_ONE );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2013-08-14 16:24:50 +01:00
|
|
|
void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
|
|
|
{
|
|
|
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
|
|
|
NULLCHECK( ctrl );
|
|
|
|
|
|
|
|
if ( !(revents & EV_TIMER ) ) {
|
|
|
|
warn( "Mirror limit callback executed but no timer event signalled" );
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( mirror_exceeds_max_bps( ctrl->mirror ) ) {
|
|
|
|
debug( "max_bps exceeded, waiting", ctrl->mirror->max_bytes_per_second );
|
|
|
|
ev_timer_again( loop, w );
|
|
|
|
} else {
|
|
|
|
/* We're below the limit, so do the next request */
|
|
|
|
debug("max_bps not exceeded, performing next transfer" );
|
|
|
|
ev_io_start( loop, &ctrl->write_watcher );
|
|
|
|
ev_timer_stop( loop, &ctrl->limit_watcher );
|
|
|
|
ev_timer_again( loop, &ctrl->timeout_watcher );
|
|
|
|
}
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
void mirror_run( struct server *serve )
|
|
|
|
{
|
|
|
|
NULLCHECK( serve );
|
|
|
|
NULLCHECK( serve->mirror );
|
|
|
|
|
2013-09-10 16:03:26 +01:00
|
|
|
struct mirror *m = serve->mirror;
|
|
|
|
|
|
|
|
m->migration_started = monotonic_time_ms();
|
2013-08-09 17:02:10 +01:00
|
|
|
info("Starting mirror" );
|
2013-07-08 09:58:31 +01:00
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
/* mirror_setup_next_xfer won't be able to cope with this, so special-case
|
|
|
|
* it here.
|
|
|
|
* TODO: Another case we won't be able to handle is a non-zero-sized image
|
|
|
|
* where none of the blocks are set in the first pass. As it happens, we
|
|
|
|
* start with all blocks set and then pare them down, so it doesn't happen
|
|
|
|
* in the current codebase - but watch out for the future!
|
|
|
|
*/
|
|
|
|
if ( serve->size == 0 ) {
|
|
|
|
info( "0-byte image special case" );
|
|
|
|
server_lock_io( serve );
|
|
|
|
mirror_complete( serve );
|
|
|
|
server_unlock_io( serve );
|
|
|
|
return;
|
|
|
|
}
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
struct mirror_ctrl ctrl;
|
|
|
|
memset( &ctrl, 0, sizeof( struct mirror_ctrl ) );
|
|
|
|
|
|
|
|
ctrl.serve = serve;
|
2013-09-10 16:03:26 +01:00
|
|
|
ctrl.mirror = m;
|
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
|
|
|
|
ctrl.ev_loop = EV_DEFAULT;
|
|
|
|
|
|
|
|
/* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */
|
2013-09-10 16:03:26 +01:00
|
|
|
ev_io_init( &ctrl.read_watcher, mirror_read_cb, m->client, EV_READ );
|
2013-08-09 17:02:10 +01:00
|
|
|
ctrl.read_watcher.data = (void*) &ctrl;
|
|
|
|
|
2013-09-10 16:03:26 +01:00
|
|
|
ev_io_init( &ctrl.write_watcher, mirror_write_cb, m->client, EV_WRITE );
|
2013-08-09 17:02:10 +01:00
|
|
|
ctrl.write_watcher.data = (void*) &ctrl;
|
2013-07-26 11:50:01 +01:00
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
|
|
|
|
ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ;
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2013-08-14 16:24:50 +01:00
|
|
|
ev_init( &ctrl.limit_watcher, mirror_limit_cb );
|
|
|
|
ctrl.limit_watcher.repeat = 1.0; // We check bps every second. seems sane.
|
|
|
|
ctrl.limit_watcher.data = (void*) &ctrl;
|
|
|
|
|
2013-08-12 15:30:21 +01:00
|
|
|
ev_init( &ctrl.abandon_watcher, mirror_abandon_cb );
|
2013-09-10 16:03:26 +01:00
|
|
|
ev_io_set( &ctrl.abandon_watcher, m->abandon_signal->read_fd, EV_READ );
|
2013-08-12 15:54:49 +01:00
|
|
|
ctrl.abandon_watcher.data = (void*) &ctrl;
|
2013-08-12 15:30:21 +01:00
|
|
|
ev_io_start( ctrl.ev_loop, &ctrl.abandon_watcher );
|
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
ERROR_UNLESS(
|
|
|
|
mirror_setup_next_xfer( &ctrl ),
|
|
|
|
"Couldn't find first transfer for mirror!"
|
|
|
|
);
|
|
|
|
|
|
|
|
/* Start by writing xfer 0 to the listener */
|
|
|
|
ev_io_start( ctrl.ev_loop, &ctrl.write_watcher );
|
|
|
|
|
2013-08-14 15:29:24 +01:00
|
|
|
/* We want to timeout during the first write as well as subsequent ones */
|
|
|
|
ev_timer_again( ctrl.ev_loop, &ctrl.timeout_watcher );
|
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
/* Everything up to here is blocking. We switch to non-blocking so we
|
|
|
|
* can handle rate-limiting and weird error conditions better. TODO: We
|
|
|
|
* should expand the event loop upwards so we can do the same there too */
|
2013-09-10 16:03:26 +01:00
|
|
|
sock_set_nonblock( m->client, 1 );
|
2013-08-09 17:02:10 +01:00
|
|
|
info( "Entering event loop" );
|
|
|
|
ev_run( ctrl.ev_loop, 0 );
|
|
|
|
info( "Exited event loop" );
|
|
|
|
/* Parent code might expect a non-blocking socket */
|
2013-09-10 16:03:26 +01:00
|
|
|
sock_set_nonblock( m->client, 0 );
|
2012-09-20 13:37:48 +01:00
|
|
|
|
2013-08-09 17:02:10 +01:00
|
|
|
|
|
|
|
/* Errors in the event loop don't track I/O lock state or try to restore
|
|
|
|
* it to something sane - they just terminate the event loop with state !=
|
2013-09-10 16:03:26 +01:00
|
|
|
* MS_DONE. We unlock I/O and re-allow new clients here if necessary.
|
2013-08-09 17:02:10 +01:00
|
|
|
*/
|
|
|
|
if ( server_io_locked( serve ) ) {
|
|
|
|
server_unlock_io( serve );
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
|
2013-09-10 16:03:26 +01:00
|
|
|
if ( m->action_at_finish == ACTION_NOTHING || m->commit_state != MS_DONE ) {
|
|
|
|
server_allow_new_clients( serve );
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Returning here says "mirroring complete" to the runner. The error
|
|
|
|
* call retries the migration from scratch. */
|
|
|
|
|
|
|
|
if ( m->commit_state != MS_DONE ) {
|
2013-08-09 17:02:10 +01:00
|
|
|
error( "Event loop exited, but mirroring is not complete" );
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
2013-08-09 17:02:10 +01:00
|
|
|
|
|
|
|
return;
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-15 19:46:35 +01:00
|
|
|
void mbox_post_mirror_state( struct mbox * mbox, enum mirror_state st )
|
|
|
|
{
|
|
|
|
NULLCHECK( mbox );
|
|
|
|
enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) );
|
|
|
|
|
|
|
|
*contents = st;
|
|
|
|
|
|
|
|
mbox_post( mbox, contents );
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
void mirror_signal_commit( struct mirror * mirror )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
NULLCHECK( mirror );
|
|
|
|
|
2012-09-20 13:37:48 +01:00
|
|
|
mbox_post_mirror_state( mirror->commit_signal,
|
2012-07-15 19:46:35 +01:00
|
|
|
mirror_get_state( mirror ) );
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
|
2012-09-20 13:37:48 +01:00
|
|
|
/** Thread launched to drive mirror process
|
2012-07-15 19:46:35 +01:00
|
|
|
* This is needed for two reasons: firstly, it decouples the mirroring
|
|
|
|
* from the control thread (although that's less valid with mboxes
|
|
|
|
* passing state back and forth) and to provide an error context so that
|
|
|
|
* retries can be cleanly handled without a bespoke error handling
|
|
|
|
* mechanism.
|
|
|
|
* */
|
2012-06-27 15:45:33 +01:00
|
|
|
void* mirror_runner(void* serve_params_uncast)
|
|
|
|
{
|
|
|
|
/* The supervisor thread relies on there not being any ERROR
|
|
|
|
* calls until after the mirror_signal_commit() call in this
|
|
|
|
* function.
|
|
|
|
* However, *after* that, we should call ERROR_* instead of
|
|
|
|
* FATAL_* wherever possible.
|
|
|
|
*/
|
|
|
|
struct server *serve = (struct server*) serve_params_uncast;
|
|
|
|
|
|
|
|
NULLCHECK( serve );
|
|
|
|
NULLCHECK( serve->mirror );
|
2012-07-12 14:54:48 +01:00
|
|
|
struct mirror * mirror = serve->mirror;
|
2012-06-27 15:45:33 +01:00
|
|
|
NULLCHECK( mirror->dirty_map );
|
|
|
|
|
2012-07-11 09:43:16 +01:00
|
|
|
error_set_handler( (cleanup_handler *) mirror_cleanup, serve );
|
2012-06-27 15:45:33 +01:00
|
|
|
|
|
|
|
info( "Connecting to mirror" );
|
2012-09-20 13:37:48 +01:00
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
time_t start_time = time(NULL);
|
2012-07-12 14:54:48 +01:00
|
|
|
int connected = mirror_connect( mirror, serve->size );
|
2012-06-27 15:45:33 +01:00
|
|
|
mirror_signal_commit( mirror );
|
|
|
|
if ( !connected ) { goto abandon_mirror; }
|
|
|
|
|
|
|
|
/* After this point, if we see a failure we need to disconnect
|
|
|
|
* and retry everything from mirror_set_state(_, MS_INIT), but
|
|
|
|
* *without* signaling the commit or abandoning the mirror.
|
|
|
|
* */
|
2012-09-20 13:37:48 +01:00
|
|
|
|
|
|
|
if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){
|
2012-06-27 15:45:33 +01:00
|
|
|
/* If we get here, then we managed to connect but the
|
|
|
|
* control thread feeding status back to the user will
|
|
|
|
* have gone away, leaving the user without meaningful
|
|
|
|
* feedback. In this instance, they have to assume a
|
|
|
|
* failure, so we can't afford to let the mirror happen.
|
|
|
|
* We have to set the state to avoid a race.
|
|
|
|
*/
|
|
|
|
mirror_set_state( mirror, MS_FAIL_CONNECT );
|
|
|
|
warn( "Mirror connected, but too slowly" );
|
|
|
|
goto abandon_mirror;
|
|
|
|
}
|
|
|
|
|
|
|
|
mirror_run( serve );
|
2013-08-27 15:54:59 +01:00
|
|
|
|
|
|
|
/* On success, this is unnecessary, and harmless ( mirror_cleanup does it
|
|
|
|
* for us ). But if we've failed and are going to retry on the next run, we
|
|
|
|
* must close this socket here to have any chance of it succeeding.
|
|
|
|
*/
|
|
|
|
if ( !mirror->client < 0 ) {
|
|
|
|
sock_try_close( mirror->client );
|
|
|
|
mirror->client = -1;
|
|
|
|
}
|
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
abandon_mirror:
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
struct mirror_super * mirror_super_create(
|
|
|
|
const char * filename,
|
|
|
|
union mysockaddr * connect_to,
|
|
|
|
union mysockaddr * connect_from,
|
2013-08-13 12:30:18 +01:00
|
|
|
uint64_t max_Bps,
|
|
|
|
enum mirror_finish_action action_at_finish,
|
2012-07-15 21:57:36 +01:00
|
|
|
struct mbox * state_mbox)
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
struct mirror_super * super = xmalloc( sizeof( struct mirror_super) );
|
2012-09-20 13:37:48 +01:00
|
|
|
super->mirror = mirror_create(
|
|
|
|
filename,
|
|
|
|
connect_to,
|
|
|
|
connect_from,
|
|
|
|
max_Bps,
|
2012-07-15 19:46:35 +01:00
|
|
|
action_at_finish,
|
|
|
|
mbox_create() ) ;
|
2012-07-15 21:57:36 +01:00
|
|
|
super->state_mbox = state_mbox;
|
2012-06-27 15:45:33 +01:00
|
|
|
return super;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-07-15 21:57:36 +01:00
|
|
|
/* Post the current state of the mirror into super->state_mbox.*/
|
2012-09-20 13:37:48 +01:00
|
|
|
void mirror_super_signal_committed(
|
2012-07-15 19:46:35 +01:00
|
|
|
struct mirror_super * super ,
|
|
|
|
enum mirror_state commit_state )
|
2012-06-27 15:45:33 +01:00
|
|
|
{
|
|
|
|
NULLCHECK( super );
|
|
|
|
NULLCHECK( super->state_mbox );
|
|
|
|
|
2012-09-20 13:37:48 +01:00
|
|
|
mbox_post_mirror_state(
|
|
|
|
super->state_mbox,
|
2012-07-15 19:46:35 +01:00
|
|
|
commit_state );
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void mirror_super_destroy( struct mirror_super * super )
|
|
|
|
{
|
|
|
|
NULLCHECK( super );
|
|
|
|
|
2012-07-15 19:46:35 +01:00
|
|
|
mbox_destroy( super->mirror->commit_signal );
|
2012-07-12 14:54:48 +01:00
|
|
|
mirror_destroy( super->mirror );
|
2012-06-27 15:45:33 +01:00
|
|
|
free( super );
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/* The mirror supervisor thread. Responsible for kicking off retries if
|
|
|
|
* the mirror thread fails.
|
2012-07-12 14:54:48 +01:00
|
|
|
* The mirror and mirror_super objects are never freed, and the
|
2012-06-27 15:45:33 +01:00
|
|
|
* mirror_super_runner thread is never joined.
|
|
|
|
*/
|
|
|
|
void * mirror_super_runner( void * serve_uncast )
|
|
|
|
{
|
|
|
|
struct server * serve = (struct server *) serve_uncast;
|
|
|
|
NULLCHECK( serve );
|
|
|
|
NULLCHECK( serve->mirror );
|
|
|
|
NULLCHECK( serve->mirror_super );
|
|
|
|
|
2012-07-15 20:07:17 +01:00
|
|
|
int first_pass = 1;
|
2012-06-27 15:45:33 +01:00
|
|
|
int should_retry = 0;
|
2013-08-12 15:30:21 +01:00
|
|
|
int success = 0, abandoned = 0;
|
2012-06-27 15:45:33 +01:00
|
|
|
|
2012-07-12 14:54:48 +01:00
|
|
|
struct mirror * mirror = serve->mirror;
|
2012-06-27 15:45:33 +01:00
|
|
|
struct mirror_super * super = serve->mirror_super;
|
|
|
|
|
|
|
|
do {
|
|
|
|
FATAL_IF( 0 != pthread_create(
|
2012-09-20 13:37:48 +01:00
|
|
|
&mirror->thread,
|
|
|
|
NULL,
|
|
|
|
mirror_runner,
|
2012-06-27 15:45:33 +01:00
|
|
|
serve),
|
|
|
|
"Failed to create mirror thread");
|
|
|
|
|
|
|
|
debug("Supervisor waiting for commit signal");
|
2012-09-20 13:37:48 +01:00
|
|
|
enum mirror_state * commit_state =
|
2012-07-15 19:46:35 +01:00
|
|
|
mbox_receive( mirror->commit_signal );
|
|
|
|
|
|
|
|
debug( "Supervisor got commit signal" );
|
2012-07-15 20:07:17 +01:00
|
|
|
if ( first_pass ) {
|
2013-08-09 17:02:10 +01:00
|
|
|
/* Only retry if the connection attempt was successful. Otherwise
|
|
|
|
* the user will see an error reported while we're still trying to
|
|
|
|
* retry behind the scenes. This may race with migration completing
|
|
|
|
* but since we "shouldn't retry" in that case either, that's fine
|
2012-07-15 20:07:17 +01:00
|
|
|
*/
|
|
|
|
should_retry = *commit_state == MS_GO;
|
2013-08-09 17:02:10 +01:00
|
|
|
|
2012-07-15 19:46:35 +01:00
|
|
|
/* Only send this signal the first time */
|
|
|
|
mirror_super_signal_committed(
|
2012-09-20 13:37:48 +01:00
|
|
|
super,
|
2012-07-15 19:46:35 +01:00
|
|
|
*commit_state);
|
|
|
|
debug("Mirror supervisor committed");
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
2012-07-15 20:07:17 +01:00
|
|
|
/* We only care about the value of the commit signal on
|
|
|
|
* the first pass, so this is ok
|
|
|
|
*/
|
2012-07-15 19:46:35 +01:00
|
|
|
free( commit_state );
|
2012-06-27 15:45:33 +01:00
|
|
|
|
|
|
|
debug("Supervisor waiting for mirror thread" );
|
|
|
|
pthread_join( mirror->thread, NULL );
|
|
|
|
|
2013-08-12 15:30:21 +01:00
|
|
|
/* If we can't connect to the remote end, the watcher for the abandon
|
|
|
|
* signal never gets installed at the moment, which is why we also check
|
|
|
|
* it here. */
|
|
|
|
abandoned =
|
|
|
|
mirror_get_state( mirror ) == MS_ABANDONED ||
|
|
|
|
self_pipe_signal_clear( mirror->abandon_signal );
|
|
|
|
|
2012-06-27 15:45:33 +01:00
|
|
|
success = MS_DONE == mirror_get_state( mirror );
|
|
|
|
|
2013-08-12 15:30:21 +01:00
|
|
|
|
2012-09-20 13:37:48 +01:00
|
|
|
if( success ){
|
2013-08-12 15:30:21 +01:00
|
|
|
info( "Mirror supervisor success, exiting" );
|
|
|
|
} else if ( abandoned ) {
|
2012-06-27 15:45:33 +01:00
|
|
|
info( "Mirror abandoned" );
|
|
|
|
should_retry = 0;
|
2013-08-12 15:30:21 +01:00
|
|
|
} else if ( should_retry ) {
|
2012-07-15 18:30:20 +01:00
|
|
|
info( "Mirror failed, retrying" );
|
2013-08-12 15:30:21 +01:00
|
|
|
} else {
|
|
|
|
info( "Mirror failed before commit, giving up" );
|
2012-06-27 15:45:33 +01:00
|
|
|
}
|
2012-07-15 20:07:17 +01:00
|
|
|
|
|
|
|
first_pass = 0;
|
|
|
|
|
2012-09-20 13:37:48 +01:00
|
|
|
if ( should_retry ) {
|
2012-07-15 20:07:17 +01:00
|
|
|
/* We don't want to hammer the destination too
|
|
|
|
* hard, so if this is a retry, insert a delay. */
|
|
|
|
sleep( MS_RETRY_DELAY_SECS );
|
|
|
|
|
|
|
|
/* We also have to reset the bitmap to be sure
|
|
|
|
* we transfer everything */
|
|
|
|
mirror_reset( mirror );
|
|
|
|
}
|
|
|
|
|
2012-09-20 13:37:48 +01:00
|
|
|
}
|
2012-06-27 15:45:33 +01:00
|
|
|
while ( should_retry && !success );
|
|
|
|
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|