Add the --unlink option to mirror

This deletes the local file before tearing down the mirror connection,
allowing us to avoid an ambiguous recovery situation.
This commit is contained in:
Alex Young
2012-07-23 13:39:27 +01:00
parent fd935ce4c9
commit 33f95e1986
9 changed files with 140 additions and 44 deletions

View File

@@ -89,7 +89,7 @@ mirror
~~~~~~ ~~~~~~
$ flexnbd mirror --addr <ADDR> --port <PORT> --sock SOCK $ flexnbd mirror --addr <ADDR> --port <PORT> --sock SOCK
[--bind <BIND-ADDR>] [global option]* [--unlink] [--bind <BIND-ADDR>] [global option]*
Start a migration from the server with control socket SOCK to the server Start a migration from the server with control socket SOCK to the server
listening at ADDR:PORT. listening at ADDR:PORT.
@@ -106,7 +106,15 @@ again. It is not safe to resume the migration from where it left off
because the source can't see that the backing store behind the because the source can't see that the backing store behind the
destination is intact, or even on the same machine. destination is intact, or even on the same machine.
Note: files smaller than 4096 bytes cannot be migrated. If the `--unlink` option is given, the local file will be deleted
immediately before the mirror connection is terminated. This allows
an otherwise-ambiguous situation to be resolved: if you don't unlink
the file and the flexnbd process at either end is terminated, it's not
possible to tell which copy of the data is canonical. Since the
unlink happens as soon as the sender knows that it has transmitted all
the data, there can be no ambiguity.
Note: files smaller than 4096 bytes cannot be mirrored.
Options Options
^^^^^^^ ^^^^^^^
@@ -120,6 +128,10 @@ Options
*--sock, -s SOCK*: *--sock, -s SOCK*:
The control socket of the local server to migrate from. Required. The control socket of the local server to migrate from. Required.
*--unlink, -u*:
Unlink the served file from the local filesystem after successfully
mirroring.
*--bind, -b BIND-ADDR*: *--bind, -b BIND-ADDR*:
The local address to bind to. You may need this if the remote server The local address to bind to. You may need this if the remote server
is using an access control list. is using an access control list.

View File

@@ -476,7 +476,7 @@ int client_serve_request(struct client* client)
} }
server_unlock_io( client->serve ); server_unlock_io( client->serve );
return failure; return stop;
} }

View File

@@ -324,22 +324,15 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
} }
connect_to->v4.sin_port = htobe16(raw_port); connect_to->v4.sin_port = htobe16(raw_port);
if (linesc > 2) {
connect_from = xmalloc( sizeof( union mysockaddr ) );
if (parse_ip_to_sockaddr(&connect_from->generic, lines[2]) == 0) {
write_socket("1: bad bind address");
return -1;
}
}
if (linesc > 3) { max_Bps = atoi(lines[2]); }
action_at_finish = ACTION_EXIT; action_at_finish = ACTION_EXIT;
if (linesc > 4) { if (linesc > 2) {
if (strcmp("exit", lines[3]) == 0) { if (strcmp("exit", lines[2]) == 0) {
action_at_finish = ACTION_EXIT; action_at_finish = ACTION_EXIT;
} }
else if (strcmp("nothing", lines[3]) == 0) { else if (strcmp( "unlink", lines[2]) == 0 ) {
action_at_finish = ACTION_UNLINK;
}
else if (strcmp("nothing", lines[2]) == 0) {
action_at_finish = ACTION_NOTHING; action_at_finish = ACTION_NOTHING;
} }
else { else {
@@ -347,6 +340,19 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
return -1; return -1;
} }
} }
if (linesc > 3) {
connect_from = xmalloc( sizeof( union mysockaddr ) );
if (parse_ip_to_sockaddr(&connect_from->generic, lines[2]) == 0) {
write_socket("1: bad bind address");
return -1;
}
}
if (linesc > 4) {
max_Bps = atoi(lines[2]);
}
if (linesc > 5) { if (linesc > 5) {
write_socket("1: unrecognised parameters to mirror"); write_socket("1: unrecognised parameters to mirror");
@@ -384,7 +390,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
control_write_mirror_response( state, client->socket ); control_write_mirror_response( state, client->socket );
} }
debug( "Control thread going away." ); debug( "Control thread going away." );
return 0; return 0;
} }

View File

@@ -236,7 +236,7 @@ void mirror_on_exit( struct server * serve )
debug("Unlinking %s", serve->filename ); debug("Unlinking %s", serve->filename );
if ( ACTION_UNLINK == serve->mirror->action_at_finish ) { if ( ACTION_UNLINK == serve->mirror->action_at_finish ) {
server_unlink( serve->mirror ); server_unlink( serve );
} }
debug("Sending disconnect"); debug("Sending disconnect");

View File

@@ -32,15 +32,25 @@ static char serve_help_text[] =
QUIET_LINE; QUIET_LINE;
static struct option * listen_options = serve_options; static struct option listen_options[] = {
static char * listen_short_options = serve_short_options; GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_FILE,
GETOPT_SOCK,
GETOPT_DENY,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char listen_short_options[] = "hl:p:f:s:d" SOPT_QUIET SOPT_VERBOSE;
static char listen_help_text[] = static char listen_help_text[] =
"Usage: flexnbd " CMD_LISTEN " <options> [<acl_address>*]\n\n" "Usage: flexnbd " CMD_LISTEN " <options> [<acl_address>*]\n\n"
"Listen for an incoming migration on ADDR:PORT.\n\n" "Listen for an incoming migration on ADDR:PORT."
HELP_LINE HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to listen on.\n" "\t--" OPT_ADDR ",-l <ADDR>\tThe address to listen on.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to listen on.\n" "\t--" OPT_PORT ",-p <PORT>\tThe port to listen on.\n"
"\t--" OPT_FILE ",-f <FILE>\tThe file to write to.\n" "\t--" OPT_FILE ",-f <FILE>\tThe file to serve.\n"
"\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n" "\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n"
SOCK_LINE SOCK_LINE
VERBOSE_LINE VERBOSE_LINE
@@ -107,12 +117,13 @@ static struct option mirror_options[] = {
GETOPT_SOCK, GETOPT_SOCK,
GETOPT_ADDR, GETOPT_ADDR,
GETOPT_PORT, GETOPT_PORT,
GETOPT_UNLINK,
GETOPT_BIND, GETOPT_BIND,
GETOPT_QUIET, GETOPT_QUIET,
GETOPT_VERBOSE, GETOPT_VERBOSE,
{0} {0}
}; };
static char mirror_short_options[] = "hs:l:p:b:" SOPT_QUIET SOPT_VERBOSE; static char mirror_short_options[] = "hs:l:p:ub:" SOPT_QUIET SOPT_VERBOSE;
static char mirror_help_text[] = static char mirror_help_text[] =
"Usage: flexnbd " CMD_MIRROR " <options>\n\n" "Usage: flexnbd " CMD_MIRROR " <options>\n\n"
"Start mirroring from the server with control socket SOCK to one at ADDR:PORT.\n\n" "Start mirroring from the server with control socket SOCK to one at ADDR:PORT.\n\n"
@@ -120,6 +131,7 @@ static char mirror_help_text[] =
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to mirror to.\n" "\t--" OPT_ADDR ",-l <ADDR>\tThe address to mirror to.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to mirror to.\n" "\t--" OPT_PORT ",-p <PORT>\tThe port to mirror to.\n"
SOCK_LINE SOCK_LINE
"\t--" OPT_UNLINK ",-u\tUnlink the local file when done.\n"
BIND_LINE BIND_LINE
VERBOSE_LINE VERBOSE_LINE
QUIET_LINE; QUIET_LINE;
@@ -316,7 +328,13 @@ void read_acl_param( int c, char **sock )
read_sock_param( c, sock, acl_help_text ); read_sock_param( c, sock, acl_help_text );
} }
void read_mirror_param( int c, char **sock, char **ip_addr, char **ip_port, char **bind_addr ) void read_mirror_param(
int c,
char **sock,
char **ip_addr,
char **ip_port,
int *unlink,
char **bind_addr )
{ {
switch( c ){ switch( c ){
case 'h': case 'h':
@@ -332,6 +350,9 @@ void read_mirror_param( int c, char **sock, char **ip_addr, char **ip_port, char
case 'p': case 'p':
*ip_port = optarg; *ip_port = optarg;
break; break;
case 'u':
*unlink = 1;
break;
case 'b': case 'b':
*bind_addr = optarg; *bind_addr = optarg;
break; break;
@@ -404,15 +425,7 @@ int mode_serve( int argc, char *argv[] )
} }
if ( err ) { exit_err( serve_help_text ); } if ( err ) { exit_err( serve_help_text ); }
flexnbd = flexnbd_create_serving( flexnbd = flexnbd_create_serving( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS );
ip_addr,
ip_port,
file,
sock,
default_deny,
argc - optind,
argv + optind,
MAX_NBD_CLIENTS );
flexnbd_serve( flexnbd ); flexnbd_serve( flexnbd );
flexnbd_destroy( flexnbd ); flexnbd_destroy( flexnbd );
@@ -459,7 +472,7 @@ int mode_listen( int argc, char *argv[] )
sock, sock,
default_deny, default_deny,
argc - optind, argc - optind,
argv + optind ); argv + optind);
success = flexnbd_serve( flexnbd ); success = flexnbd_serve( flexnbd );
flexnbd_destroy( flexnbd ); flexnbd_destroy( flexnbd );
@@ -639,11 +652,19 @@ int mode_mirror( int argc, char *argv[] )
char *sock = NULL; char *sock = NULL;
char *remote_argv[4] = {0}; char *remote_argv[4] = {0};
int err = 0; int err = 0;
int unlink = 0;
remote_argv[2] = "exit";
while (1) { while (1) {
c = getopt_long( argc, argv, mirror_short_options, mirror_options, NULL); c = getopt_long( argc, argv, mirror_short_options, mirror_options, NULL);
if ( -1 == c ) { break; } if ( -1 == c ) { break; }
read_mirror_param( c, &sock, &remote_argv[0], &remote_argv[1], &remote_argv[2] ); read_mirror_param( c,
&sock,
&remote_argv[0],
&remote_argv[1],
&unlink,
&remote_argv[3] );
} }
if ( NULL == sock ){ if ( NULL == sock ){
@@ -655,12 +676,13 @@ int mode_mirror( int argc, char *argv[] )
err = 1; err = 1;
} }
if ( err ) { exit_err( mirror_help_text ); } if ( err ) { exit_err( mirror_help_text ); }
if ( unlink ) { remote_argv[2] = "unlink"; }
if (remote_argv[2] == NULL) { if (remote_argv[3] == NULL) {
do_remote_command( "mirror", sock, 2, remote_argv ); do_remote_command( "mirror", sock, 3, remote_argv );
} }
else { else {
do_remote_command( "mirror", sock, 3, remote_argv ); do_remote_command( "mirror", sock, 4, remote_argv );
} }
return 0; return 0;

View File

@@ -21,6 +21,7 @@ void mode(char* mode, int argc, char **argv);
#define OPT_FROM "from" #define OPT_FROM "from"
#define OPT_SIZE "size" #define OPT_SIZE "size"
#define OPT_DENY "default-deny" #define OPT_DENY "default-deny"
#define OPT_UNLINK "unlink"
#define CMD_SERVE "serve" #define CMD_SERVE "serve"
#define CMD_LISTEN "listen" #define CMD_LISTEN "listen"
@@ -51,6 +52,7 @@ void mode(char* mode, int argc, char **argv);
#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' ) #define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' )
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' ) #define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' ) #define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' )
#define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' )
#define OPT_VERBOSE "verbose" #define OPT_VERBOSE "verbose"
#define SOPT_VERBOSE "v" #define SOPT_VERBOSE "v"

View File

@@ -73,6 +73,10 @@ class Environment
@nbd1.mirror_unchecked( @nbd2.ip, @nbd2.port, nil, nil, 10 ) @nbd1.mirror_unchecked( @nbd2.ip, @nbd2.port, nil, nil, 10 )
end end
def mirror12_unlink
@nbd1.mirror_unlink( @nbd2.ip, @nbd2.port, 2 )
end
def writefile1(data) def writefile1(data)
@file1 = FileWriter.new(@filename1, @blocksize).write(data) @file1 = FileWriter.new(@filename1, @blocksize).write(data)

View File

@@ -195,7 +195,7 @@ class FlexNBD
end end
end end
def initialize(bin, ip, port) def initialize( bin, ip, port )
@bin = bin @bin = bin
@do_debug = ENV['DEBUG'] @do_debug = ENV['DEBUG']
@debug = build_debug_opt @debug = build_debug_opt
@@ -259,12 +259,29 @@ class FlexNBD
end end
def mirror_cmd(dest_ip, dest_port) def base_mirror_opts( dest_ip, dest_port )
"#{@bin} mirror "\ "--addr #{dest_ip} "\
"--addr #{dest_ip} "\
"--port #{dest_port} "\ "--port #{dest_port} "\
"--sock #{ctrl} "\ "--sock #{ctrl} "\
"#{@debug} " end
def unlink_mirror_opts( dest_ip, dest_port )
"#{base_mirror_opts( dest_ip, dest_port )} "\
"--unlink "
end
def base_mirror_cmd( opts )
"#{@bin} mirror "\
"#{opts} "\
"#{@debug}"
end
def mirror_cmd(dest_ip, dest_port)
base_mirror_cmd( base_mirror_opts( dest_ip, dest_port ) )
end
def mirror_unlink_cmd( dest_ip, dest_port )
base_mirror_cmd( unlink_mirror_opts( dest_ip, dest_port ) )
end end
def break_cmd def break_cmd
@@ -389,6 +406,14 @@ class FlexNBD
end end
def mirror_unlink( dest_ip, dest_port, timeout=nil )
cmd = mirror_unlink_cmd( dest_ip, dest_port )
debug( cmd )
maybe_timeout( cmd, timeout )
end
def maybe_timeout(cmd, timeout=nil ) def maybe_timeout(cmd, timeout=nil )
stdout, stderr = "","" stdout, stderr = "",""
run = Proc.new do run = Proc.new do
@@ -417,6 +442,7 @@ class FlexNBD
end end
def break(timeout=nil) def break(timeout=nil)
cmd = break_cmd cmd = break_cmd
debug( cmd ) debug( cmd )

View File

@@ -64,12 +64,17 @@ class TestHappyPath < Test::Unit::TestCase
end end
def test_mirror def setup_to_mirror
@env.writefile1( "f"*4 ) @env.writefile1( "f"*4 )
@env.serve1 @env.serve1
@env.writefile2( "0"*4 ) @env.writefile2( "0"*4 )
@env.listen2 @env.listen2
end
def test_mirror
setup_to_mirror()
@env.nbd1.can_die @env.nbd1.can_die
@env.nbd2.can_die(0) @env.nbd2.can_die(0)
@@ -78,11 +83,30 @@ class TestHappyPath < Test::Unit::TestCase
@env.nbd1.join @env.nbd1.join
@env.nbd2.join @env.nbd2.join
assert( File.file?( @env.filename1 ),
"The source file was incorrectly deleted")
assert_equal(@env.file1.read_original( 0, @env.blocksize ), assert_equal(@env.file1.read_original( 0, @env.blocksize ),
@env.file2.read( 0, @env.blocksize ) ) @env.file2.read( 0, @env.blocksize ) )
end end
def test_mirror_unlink
setup_to_mirror()
assert File.file?( @env.filename1 )
stdout, stderr = @env.mirror12_unlink
assert_no_match( /unrecognized/, stderr )
@env.nbd1.can_die(0)
@env.nbd2.can_die(0)
Timeout.timeout(2) do @env.nbd1.join end
assert !File.file?( @env.filename1 )
end
def test_write_to_high_block def test_write_to_high_block
# Create a large file, then try to write to somewhere after the 2G boundary # Create a large file, then try to write to somewhere after the 2G boundary
@env.truncate1 "4G" @env.truncate1 "4G"