Added the flexnbd break
command to stop mirroring
This commit is contained in:
15
README.txt
15
README.txt
@@ -133,6 +133,21 @@ Options
|
|||||||
The local address to bind to. You may need this if the remote server
|
The local address to bind to. You may need this if the remote server
|
||||||
is using an access control list.
|
is using an access control list.
|
||||||
|
|
||||||
|
break
|
||||||
|
~~~~~
|
||||||
|
|
||||||
|
$ flexnbd mirror --sock SOCK [global option]*
|
||||||
|
|
||||||
|
Stop a running migration.
|
||||||
|
|
||||||
|
Options
|
||||||
|
^^^^^^^
|
||||||
|
|
||||||
|
*--sock, -s SOCK*:
|
||||||
|
The control socket of the local server whose emigration to stop.
|
||||||
|
Required.
|
||||||
|
|
||||||
|
|
||||||
acl
|
acl
|
||||||
~~~
|
~~~
|
||||||
|
|
||||||
|
@@ -428,6 +428,51 @@ int control_acl(struct control_client* client, int linesc, char** lines)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int control_break(
|
||||||
|
struct control_client* client,
|
||||||
|
int linesc __attribute__ ((unused)),
|
||||||
|
char** lines __attribute__((unused))
|
||||||
|
)
|
||||||
|
{
|
||||||
|
NULLCHECK( client );
|
||||||
|
NULLCHECK( client->flexnbd );
|
||||||
|
|
||||||
|
int result = 0;
|
||||||
|
struct flexnbd* flexnbd = client->flexnbd;
|
||||||
|
|
||||||
|
flexnbd_lock_switch( flexnbd );
|
||||||
|
{
|
||||||
|
struct server * serve = flexnbd_server( flexnbd );
|
||||||
|
if ( server_is_mirroring( serve ) ) {
|
||||||
|
|
||||||
|
info( "Signaling to abandon mirror" );
|
||||||
|
server_abandon_mirror( serve );
|
||||||
|
debug( "Abandon signaled" );
|
||||||
|
|
||||||
|
if ( server_is_closed( serve ) ) {
|
||||||
|
info( "Mirror completed while canceling" );
|
||||||
|
write( client->socket,
|
||||||
|
"1: mirror completed\n", 20 );
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
info( "Mirror successfully stopped." );
|
||||||
|
write( client->socket,
|
||||||
|
"0: mirror stopped\n", 18 );
|
||||||
|
result = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
warn( "Not mirroring." );
|
||||||
|
write( client->socket, "1: not mirroring\n", 17 );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
flexnbd_unlock_switch( flexnbd );
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** FIXME: add some useful statistics */
|
/** FIXME: add some useful statistics */
|
||||||
int control_status(
|
int control_status(
|
||||||
struct control_client* client,
|
struct control_client* client,
|
||||||
@@ -486,6 +531,12 @@ void control_respond(struct control_client * client)
|
|||||||
debug("mirror command failed");
|
debug("mirror command failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (strcmp(lines[0], "break") == 0) {
|
||||||
|
info( "break command received" );
|
||||||
|
if ( control_break( client, linesc-1, lines+1) < 0) {
|
||||||
|
debug( "break command failed" );
|
||||||
|
}
|
||||||
|
}
|
||||||
else if (strcmp(lines[0], "status") == 0) {
|
else if (strcmp(lines[0], "status") == 0) {
|
||||||
info("status command received" );
|
info("status command received" );
|
||||||
if (control_status(client, linesc-1, lines+1) < 0) {
|
if (control_status(client, linesc-1, lines+1) < 0) {
|
||||||
|
63
src/mode.c
63
src/mode.c
@@ -140,6 +140,22 @@ static char mirror_help_text[] =
|
|||||||
VERBOSE_LINE
|
VERBOSE_LINE
|
||||||
QUIET_LINE;
|
QUIET_LINE;
|
||||||
|
|
||||||
|
static struct option break_options[] = {
|
||||||
|
GETOPT_HELP,
|
||||||
|
GETOPT_SOCK,
|
||||||
|
GETOPT_QUIET,
|
||||||
|
GETOPT_VERBOSE,
|
||||||
|
{0}
|
||||||
|
};
|
||||||
|
static char break_short_options[] = "hs:" SOPT_QUIET SOPT_VERBOSE;
|
||||||
|
static char break_help_text[] =
|
||||||
|
"Usage: flexnbd " CMD_BREAK " <options>\n\n"
|
||||||
|
"Stop mirroring from the server with control socket SOCK.\n\n"
|
||||||
|
HELP_LINE
|
||||||
|
SOCK_LINE
|
||||||
|
VERBOSE_LINE
|
||||||
|
QUIET_LINE;
|
||||||
|
|
||||||
|
|
||||||
static struct option status_options[] = {
|
static struct option status_options[] = {
|
||||||
GETOPT_HELP,
|
GETOPT_HELP,
|
||||||
@@ -355,6 +371,29 @@ void read_mirror_param( int c, char **sock, char **ip_addr, char **ip_port, char
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void read_break_param( int c, char **sock )
|
||||||
|
{
|
||||||
|
switch( c ) {
|
||||||
|
case 'h':
|
||||||
|
fprintf( stdout, "%s\n", break_help_text );
|
||||||
|
exit( 0 );
|
||||||
|
break;
|
||||||
|
case 's':
|
||||||
|
*sock = optarg;
|
||||||
|
break;
|
||||||
|
case 'q':
|
||||||
|
log_level = 4;
|
||||||
|
break;
|
||||||
|
case 'v':
|
||||||
|
log_level = VERBOSE_LOG_LEVEL;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
exit_err( break_help_text );
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void read_status_param( int c, char **sock )
|
void read_status_param( int c, char **sock )
|
||||||
{
|
{
|
||||||
read_sock_param( c, sock, status_help_text );
|
read_sock_param( c, sock, status_help_text );
|
||||||
@@ -649,6 +688,27 @@ int mode_mirror( int argc, char *argv[] )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int mode_break( int argc, char *argv[] )
|
||||||
|
{
|
||||||
|
int c;
|
||||||
|
char *sock = NULL;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
c = getopt_long( argc, argv, break_short_options, break_options, NULL );
|
||||||
|
if ( -1 == c ) { break; }
|
||||||
|
read_break_param( c, &sock );
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( NULL == sock ){
|
||||||
|
fprintf( stderr, "--sock is required.\n" );
|
||||||
|
exit_err( acl_help_text );
|
||||||
|
}
|
||||||
|
|
||||||
|
do_remote_command( "break", sock, argc - optind, argv + optind );
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int mode_status( int argc, char *argv[] )
|
int mode_status( int argc, char *argv[] )
|
||||||
{
|
{
|
||||||
int c;
|
int c;
|
||||||
@@ -722,6 +782,9 @@ void mode(char* mode, int argc, char **argv)
|
|||||||
else if ( IS_CMD( CMD_MIRROR, mode ) ) {
|
else if ( IS_CMD( CMD_MIRROR, mode ) ) {
|
||||||
mode_mirror( argc, argv );
|
mode_mirror( argc, argv );
|
||||||
}
|
}
|
||||||
|
else if ( IS_CMD( CMD_BREAK, mode ) ) {
|
||||||
|
mode_break( argc, argv );
|
||||||
|
}
|
||||||
else if ( IS_CMD( CMD_STATUS, mode ) ) {
|
else if ( IS_CMD( CMD_STATUS, mode ) ) {
|
||||||
mode_status( argc, argv );
|
mode_status( argc, argv );
|
||||||
}
|
}
|
||||||
|
@@ -28,6 +28,7 @@ void mode(char* mode, int argc, char **argv);
|
|||||||
#define CMD_WRITE "write"
|
#define CMD_WRITE "write"
|
||||||
#define CMD_ACL "acl"
|
#define CMD_ACL "acl"
|
||||||
#define CMD_MIRROR "mirror"
|
#define CMD_MIRROR "mirror"
|
||||||
|
#define CMD_BREAK "break"
|
||||||
#define CMD_STATUS "status"
|
#define CMD_STATUS "status"
|
||||||
#define CMD_HELP "help"
|
#define CMD_HELP "help"
|
||||||
#define LEN_CMD_MAX 7
|
#define LEN_CMD_MAX 7
|
||||||
|
29
src/serve.c
29
src/serve.c
@@ -722,11 +722,8 @@ void serve_cleanup(struct server* params,
|
|||||||
free(params->allocation_map);
|
free(params->allocation_map);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (params->mirror_super) {
|
if ( server_is_mirroring( params ) ) {
|
||||||
/* AWOOGA! RACE! */
|
server_abandon_mirror( params );
|
||||||
pthread_t mirror_t = params->mirror_super->thread;
|
|
||||||
params->mirror->signal_abandon = 1;
|
|
||||||
pthread_join( mirror_t, NULL );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i=0; i < params->max_nbd_clients; i++) {
|
for (i=0; i < params->max_nbd_clients; i++) {
|
||||||
@@ -753,6 +750,28 @@ int server_is_in_control( struct server *serve )
|
|||||||
return serve->has_control;
|
return serve->has_control;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int server_is_mirroring( struct server * serve )
|
||||||
|
{
|
||||||
|
NULLCHECK( serve );
|
||||||
|
return !!serve->mirror_super;
|
||||||
|
}
|
||||||
|
|
||||||
|
void server_abandon_mirror( struct server * serve )
|
||||||
|
{
|
||||||
|
NULLCHECK( serve );
|
||||||
|
if ( serve->mirror_super ) {
|
||||||
|
/* FIXME: AWOOGA! RACE!
|
||||||
|
* We can set signal_abandon after mirror_super has
|
||||||
|
* checked it, but before the reset. This would lead to
|
||||||
|
* a hang. However, mirror_reset doesn't change the
|
||||||
|
* signal_abandon flag, so it'll just terminate early on
|
||||||
|
* the next pass.
|
||||||
|
* */
|
||||||
|
serve->mirror->signal_abandon = 1;
|
||||||
|
pthread_join( serve->mirror_super->thread, NULL );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int server_default_deny( struct server * serve )
|
int server_default_deny( struct server * serve )
|
||||||
{
|
{
|
||||||
NULLCHECK( serve );
|
NULLCHECK( serve );
|
||||||
|
@@ -96,6 +96,8 @@ int server_io_locked( struct server * serve );
|
|||||||
int server_acl_locked( struct server * serve );
|
int server_acl_locked( struct server * serve );
|
||||||
void server_lock_acl( struct server *serve );
|
void server_lock_acl( struct server *serve );
|
||||||
void server_unlock_acl( struct server *serve );
|
void server_unlock_acl( struct server *serve );
|
||||||
|
int server_is_mirroring( struct server * serve );
|
||||||
|
void server_abandon_mirror( struct server * serve );
|
||||||
|
|
||||||
|
|
||||||
int do_serve( struct server * );
|
int do_serve( struct server * );
|
||||||
|
@@ -42,6 +42,10 @@ class Environment
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def break1
|
||||||
|
@nbd1.break
|
||||||
|
end
|
||||||
|
|
||||||
def acl1( *acl )
|
def acl1( *acl )
|
||||||
@nbd1.acl( *acl )
|
@nbd1.acl( *acl )
|
||||||
end
|
end
|
||||||
@@ -111,7 +115,7 @@ class Environment
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
def run_fake( name, addr, port, rebind_addr = addr, rebind_port = port )
|
def run_fake( name, addr, port, rebind_addr = addr, rebind_port = port, sock=nil )
|
||||||
fakedir = File.join( File.dirname( __FILE__ ), "fakes" )
|
fakedir = File.join( File.dirname( __FILE__ ), "fakes" )
|
||||||
fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn|
|
fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn|
|
||||||
File.executable?( fn )
|
File.executable?( fn )
|
||||||
@@ -124,7 +128,7 @@ class Environment
|
|||||||
raise "no rebind_port" unless rebind_port
|
raise "no rebind_port" unless rebind_port
|
||||||
|
|
||||||
@fake_pid = fork do
|
@fake_pid = fork do
|
||||||
exec [fake, addr, port, @nbd1.pid, rebind_addr, rebind_port].map{|x| x.to_s}.join(" ")
|
exec [fake, addr, port, @nbd1.pid, rebind_addr, rebind_port, sock].map{|x| x.to_s}.join(" ")
|
||||||
end
|
end
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
end
|
end
|
||||||
|
35
tests/acceptance/fakes/dest/break_after_hello.rb
Executable file
35
tests/acceptance/fakes/dest/break_after_hello.rb
Executable file
@@ -0,0 +1,35 @@
|
|||||||
|
#!/usr/bin/env ruby
|
||||||
|
# encoding: utf-8
|
||||||
|
|
||||||
|
# Open a server, accept a client, then cancel the migration by issuing
|
||||||
|
# a break command.
|
||||||
|
|
||||||
|
require 'flexnbd/fake_dest'
|
||||||
|
include FlexNBD
|
||||||
|
|
||||||
|
addr, port, src_pid, _, _, sock = *ARGV
|
||||||
|
server = FakeDest.new( addr, port )
|
||||||
|
client = server.accept
|
||||||
|
|
||||||
|
ctrl = UNIXSocket.open( sock )
|
||||||
|
|
||||||
|
Process.kill("STOP", src_pid.to_i)
|
||||||
|
ctrl.write( "break\n" )
|
||||||
|
ctrl.close_write
|
||||||
|
client.write_hello
|
||||||
|
Process.kill("CONT", src_pid.to_i)
|
||||||
|
|
||||||
|
fail "Unexpected control response" unless
|
||||||
|
ctrl.read =~ /0: mirror stopped/
|
||||||
|
|
||||||
|
client2 = nil
|
||||||
|
begin
|
||||||
|
client2 = server.accept( "Expected timeout" )
|
||||||
|
fail "Unexpected reconnection"
|
||||||
|
rescue Timeout::Error
|
||||||
|
# expected
|
||||||
|
end
|
||||||
|
client.close
|
||||||
|
|
||||||
|
exit(0)
|
||||||
|
|
@@ -271,6 +271,11 @@ class FlexNBD
|
|||||||
"#{@debug} "
|
"#{@debug} "
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def break_cmd
|
||||||
|
"#{@bin} break "\
|
||||||
|
"--sock #{ctrl} "\
|
||||||
|
"#{@debug}"
|
||||||
|
end
|
||||||
|
|
||||||
def status_cmd
|
def status_cmd
|
||||||
"#{@bin} status "\
|
"#{@bin} status "\
|
||||||
@@ -416,6 +421,14 @@ class FlexNBD
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def break(timeout=nil)
|
||||||
|
cmd = break_cmd
|
||||||
|
debug( cmd )
|
||||||
|
|
||||||
|
maybe_timeout( cmd, timeout )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
def acl(*acl)
|
def acl(*acl)
|
||||||
cmd = acl_cmd( *acl )
|
cmd = acl_cmd( *acl )
|
||||||
debug( cmd )
|
debug( cmd )
|
||||||
@@ -439,6 +452,14 @@ class FlexNBD
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def paused
|
||||||
|
Process.kill( "STOP", @pid )
|
||||||
|
yield
|
||||||
|
ensure
|
||||||
|
Process.kill( "CONT", @pid )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
protected
|
protected
|
||||||
def control_command(*args)
|
def control_command(*args)
|
||||||
raise "Server not running" unless @pid
|
raise "Server not running" unless @pid
|
||||||
|
@@ -90,10 +90,14 @@ class TestSourceErrorHandling < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_cancel_migration
|
||||||
|
run_fake( "dest/break_after_hello" )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
private
|
private
|
||||||
def run_fake(name, opts = {})
|
def run_fake(name, opts = {})
|
||||||
@env.run_fake( name, @env.ip, @env.port2 )
|
@env.run_fake( name, @env.ip, @env.port2, @env.ip, @env.port2, @env.nbd1.ctrl )
|
||||||
stdout, stderr = @env.mirror12_unchecked
|
stdout, stderr = @env.mirror12_unchecked
|
||||||
assert_success
|
assert_success
|
||||||
assert_match( opts[:err], stderr ) if opts[:err]
|
assert_match( opts[:err], stderr ) if opts[:err]
|
||||||
|
Reference in New Issue
Block a user