From 314c0c2a2a5d2fe4b0c4bfe7d7a2aea58e35d58a Mon Sep 17 00:00:00 2001 From: Alex Young Date: Tue, 17 Jul 2012 16:30:49 +0100 Subject: [PATCH] Added the `flexnbd break` command to stop mirroring --- README.txt | 15 +++++ src/control.c | 51 +++++++++++++++ src/mode.c | 63 +++++++++++++++++++ src/mode.h | 1 + src/serve.c | 29 +++++++-- src/serve.h | 2 + tests/acceptance/environment.rb | 8 ++- .../fakes/dest/break_after_hello.rb | 35 +++++++++++ tests/acceptance/flexnbd.rb | 21 +++++++ .../acceptance/test_source_error_handling.rb | 6 +- 10 files changed, 223 insertions(+), 8 deletions(-) create mode 100755 tests/acceptance/fakes/dest/break_after_hello.rb diff --git a/README.txt b/README.txt index c9241c5..434b054 100644 --- a/README.txt +++ b/README.txt @@ -133,6 +133,21 @@ Options The local address to bind to. You may need this if the remote server is using an access control list. +break +~~~~~ + + $ flexnbd mirror --sock SOCK [global option]* + +Stop a running migration. + +Options +^^^^^^^ + +*--sock, -s SOCK*: + The control socket of the local server whose emigration to stop. + Required. + + acl ~~~ diff --git a/src/control.c b/src/control.c index bf23637..d36a1dc 100644 --- a/src/control.c +++ b/src/control.c @@ -428,6 +428,51 @@ int control_acl(struct control_client* client, int linesc, char** lines) return 0; } + +int control_break( + struct control_client* client, + int linesc __attribute__ ((unused)), + char** lines __attribute__((unused)) + ) +{ + NULLCHECK( client ); + NULLCHECK( client->flexnbd ); + + int result = 0; + struct flexnbd* flexnbd = client->flexnbd; + + flexnbd_lock_switch( flexnbd ); + { + struct server * serve = flexnbd_server( flexnbd ); + if ( server_is_mirroring( serve ) ) { + + info( "Signaling to abandon mirror" ); + server_abandon_mirror( serve ); + debug( "Abandon signaled" ); + + if ( server_is_closed( serve ) ) { + info( "Mirror completed while canceling" ); + write( client->socket, + "1: mirror completed\n", 20 ); + } + else { + info( "Mirror successfully stopped." ); + write( client->socket, + "0: mirror stopped\n", 18 ); + result = 1; + } + + } else { + warn( "Not mirroring." ); + write( client->socket, "1: not mirroring\n", 17 ); + } + } + flexnbd_unlock_switch( flexnbd ); + + return result; +} + + /** FIXME: add some useful statistics */ int control_status( struct control_client* client, @@ -486,6 +531,12 @@ void control_respond(struct control_client * client) debug("mirror command failed"); } } + else if (strcmp(lines[0], "break") == 0) { + info( "break command received" ); + if ( control_break( client, linesc-1, lines+1) < 0) { + debug( "break command failed" ); + } + } else if (strcmp(lines[0], "status") == 0) { info("status command received" ); if (control_status(client, linesc-1, lines+1) < 0) { diff --git a/src/mode.c b/src/mode.c index efca788..7220548 100644 --- a/src/mode.c +++ b/src/mode.c @@ -140,6 +140,22 @@ static char mirror_help_text[] = VERBOSE_LINE QUIET_LINE; +static struct option break_options[] = { + GETOPT_HELP, + GETOPT_SOCK, + GETOPT_QUIET, + GETOPT_VERBOSE, + {0} +}; +static char break_short_options[] = "hs:" SOPT_QUIET SOPT_VERBOSE; +static char break_help_text[] = + "Usage: flexnbd " CMD_BREAK " \n\n" + "Stop mirroring from the server with control socket SOCK.\n\n" + HELP_LINE + SOCK_LINE + VERBOSE_LINE + QUIET_LINE; + static struct option status_options[] = { GETOPT_HELP, @@ -355,6 +371,29 @@ void read_mirror_param( int c, char **sock, char **ip_addr, char **ip_port, char } } +void read_break_param( int c, char **sock ) +{ + switch( c ) { + case 'h': + fprintf( stdout, "%s\n", break_help_text ); + exit( 0 ); + break; + case 's': + *sock = optarg; + break; + case 'q': + log_level = 4; + break; + case 'v': + log_level = VERBOSE_LOG_LEVEL; + break; + default: + exit_err( break_help_text ); + break; + } +} + + void read_status_param( int c, char **sock ) { read_sock_param( c, sock, status_help_text ); @@ -649,6 +688,27 @@ int mode_mirror( int argc, char *argv[] ) } +int mode_break( int argc, char *argv[] ) +{ + int c; + char *sock = NULL; + + while (1) { + c = getopt_long( argc, argv, break_short_options, break_options, NULL ); + if ( -1 == c ) { break; } + read_break_param( c, &sock ); + } + + if ( NULL == sock ){ + fprintf( stderr, "--sock is required.\n" ); + exit_err( acl_help_text ); + } + + do_remote_command( "break", sock, argc - optind, argv + optind ); + + return 0; +} + int mode_status( int argc, char *argv[] ) { int c; @@ -722,6 +782,9 @@ void mode(char* mode, int argc, char **argv) else if ( IS_CMD( CMD_MIRROR, mode ) ) { mode_mirror( argc, argv ); } + else if ( IS_CMD( CMD_BREAK, mode ) ) { + mode_break( argc, argv ); + } else if ( IS_CMD( CMD_STATUS, mode ) ) { mode_status( argc, argv ); } diff --git a/src/mode.h b/src/mode.h index fe981cd..ead2159 100644 --- a/src/mode.h +++ b/src/mode.h @@ -28,6 +28,7 @@ void mode(char* mode, int argc, char **argv); #define CMD_WRITE "write" #define CMD_ACL "acl" #define CMD_MIRROR "mirror" +#define CMD_BREAK "break" #define CMD_STATUS "status" #define CMD_HELP "help" #define LEN_CMD_MAX 7 diff --git a/src/serve.c b/src/serve.c index fff834a..b4f4b06 100644 --- a/src/serve.c +++ b/src/serve.c @@ -722,11 +722,8 @@ void serve_cleanup(struct server* params, free(params->allocation_map); } - if (params->mirror_super) { - /* AWOOGA! RACE! */ - pthread_t mirror_t = params->mirror_super->thread; - params->mirror->signal_abandon = 1; - pthread_join( mirror_t, NULL ); + if ( server_is_mirroring( params ) ) { + server_abandon_mirror( params ); } for (i=0; i < params->max_nbd_clients; i++) { @@ -753,6 +750,28 @@ int server_is_in_control( struct server *serve ) return serve->has_control; } +int server_is_mirroring( struct server * serve ) +{ + NULLCHECK( serve ); + return !!serve->mirror_super; +} + +void server_abandon_mirror( struct server * serve ) +{ + NULLCHECK( serve ); + if ( serve->mirror_super ) { + /* FIXME: AWOOGA! RACE! + * We can set signal_abandon after mirror_super has + * checked it, but before the reset. This would lead to + * a hang. However, mirror_reset doesn't change the + * signal_abandon flag, so it'll just terminate early on + * the next pass. + * */ + serve->mirror->signal_abandon = 1; + pthread_join( serve->mirror_super->thread, NULL ); + } +} + int server_default_deny( struct server * serve ) { NULLCHECK( serve ); diff --git a/src/serve.h b/src/serve.h index e4ee856..5cb9387 100644 --- a/src/serve.h +++ b/src/serve.h @@ -96,6 +96,8 @@ int server_io_locked( struct server * serve ); int server_acl_locked( struct server * serve ); void server_lock_acl( struct server *serve ); void server_unlock_acl( struct server *serve ); +int server_is_mirroring( struct server * serve ); +void server_abandon_mirror( struct server * serve ); int do_serve( struct server * ); diff --git a/tests/acceptance/environment.rb b/tests/acceptance/environment.rb index 7675f14..b71c31b 100644 --- a/tests/acceptance/environment.rb +++ b/tests/acceptance/environment.rb @@ -42,6 +42,10 @@ class Environment end + def break1 + @nbd1.break + end + def acl1( *acl ) @nbd1.acl( *acl ) end @@ -111,7 +115,7 @@ class Environment end - def run_fake( name, addr, port, rebind_addr = addr, rebind_port = port ) + def run_fake( name, addr, port, rebind_addr = addr, rebind_port = port, sock=nil ) fakedir = File.join( File.dirname( __FILE__ ), "fakes" ) fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn| File.executable?( fn ) @@ -124,7 +128,7 @@ class Environment raise "no rebind_port" unless rebind_port @fake_pid = fork do - exec [fake, addr, port, @nbd1.pid, rebind_addr, rebind_port].map{|x| x.to_s}.join(" ") + exec [fake, addr, port, @nbd1.pid, rebind_addr, rebind_port, sock].map{|x| x.to_s}.join(" ") end sleep(0.5) end diff --git a/tests/acceptance/fakes/dest/break_after_hello.rb b/tests/acceptance/fakes/dest/break_after_hello.rb new file mode 100755 index 0000000..5bbe4a5 --- /dev/null +++ b/tests/acceptance/fakes/dest/break_after_hello.rb @@ -0,0 +1,35 @@ +#!/usr/bin/env ruby +# encoding: utf-8 + +# Open a server, accept a client, then cancel the migration by issuing +# a break command. + +require 'flexnbd/fake_dest' +include FlexNBD + +addr, port, src_pid, _, _, sock = *ARGV +server = FakeDest.new( addr, port ) +client = server.accept + +ctrl = UNIXSocket.open( sock ) + +Process.kill("STOP", src_pid.to_i) +ctrl.write( "break\n" ) +ctrl.close_write +client.write_hello +Process.kill("CONT", src_pid.to_i) + +fail "Unexpected control response" unless + ctrl.read =~ /0: mirror stopped/ + +client2 = nil +begin + client2 = server.accept( "Expected timeout" ) + fail "Unexpected reconnection" +rescue Timeout::Error + # expected +end +client.close + +exit(0) + diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index 5326bfe..19ec882 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -271,6 +271,11 @@ class FlexNBD "#{@debug} " end + def break_cmd + "#{@bin} break "\ + "--sock #{ctrl} "\ + "#{@debug}" + end def status_cmd "#{@bin} status "\ @@ -416,6 +421,14 @@ class FlexNBD end + def break(timeout=nil) + cmd = break_cmd + debug( cmd ) + + maybe_timeout( cmd, timeout ) + end + + def acl(*acl) cmd = acl_cmd( *acl ) debug( cmd ) @@ -439,6 +452,14 @@ class FlexNBD end + def paused + Process.kill( "STOP", @pid ) + yield + ensure + Process.kill( "CONT", @pid ) + end + + protected def control_command(*args) raise "Server not running" unless @pid diff --git a/tests/acceptance/test_source_error_handling.rb b/tests/acceptance/test_source_error_handling.rb index e048f24..688f4e0 100644 --- a/tests/acceptance/test_source_error_handling.rb +++ b/tests/acceptance/test_source_error_handling.rb @@ -90,10 +90,14 @@ class TestSourceErrorHandling < Test::Unit::TestCase end + def test_cancel_migration + run_fake( "dest/break_after_hello" ) + end + private def run_fake(name, opts = {}) - @env.run_fake( name, @env.ip, @env.port2 ) + @env.run_fake( name, @env.ip, @env.port2, @env.ip, @env.port2, @env.nbd1.ctrl ) stdout, stderr = @env.mirror12_unchecked assert_success assert_match( opts[:err], stderr ) if opts[:err]