Fix two bugs in mirroring.
First, Leaving off the source address caused a segfault in the command-sending process because there was no NULL check on the ARGV entry. Second, while the migration thread sent a signal to the server to close on successful completion, it didn't wait until the close actually happened before releasing the IO lock. This meant that any client thread waiting on that IO lock could have a read or a write queued up which could succeed despite the server shutdown. This would have meant dataloss as the guest would see a successful write to the wrong instance of the file. This patch adds a noddy serve_wait_for_close() function which the mirror_runner calls to ensure that any clients will reject operations they're waiting to complete. This patch also adds a simple scenario test for migration, and fixes TempFileWriter#read_original.
This commit is contained in:
@@ -58,6 +58,9 @@ void* mirror_runner(void* serve_params_uncast)
|
||||
const int last_pass = mirror_maximum_passes-1;
|
||||
int pass;
|
||||
struct server *serve = (struct server*) serve_params_uncast;
|
||||
NULLCHECK( serve );
|
||||
debug("Starting mirror" );
|
||||
|
||||
struct bitset_mapping *map = serve->mirror->dirty_map;
|
||||
|
||||
for (pass=0; pass < mirror_maximum_passes; pass++) {
|
||||
@@ -135,8 +138,18 @@ void* mirror_runner(void* serve_params_uncast)
|
||||
{
|
||||
case ACTION_EXIT:
|
||||
debug("exit!");
|
||||
close(serve->mirror->client);
|
||||
serve_signal_close( serve );
|
||||
/* We have to wait until the server is closed before
|
||||
* unlocking IO. This is because the client threads
|
||||
* check to see if the server is still open before
|
||||
* reading or writing inside their own locks. If we
|
||||
* don't wait for the close, there's no way to guarantee
|
||||
* the server thread will win the race and we risk the
|
||||
* clients seeing a "successful" write to a dead disc
|
||||
* image.
|
||||
*/
|
||||
serve_wait_for_close( serve );
|
||||
info("Server closed, quitting after successful migration");
|
||||
/* fall through */
|
||||
case ACTION_NOTHING:
|
||||
debug("nothing!");
|
||||
|
@@ -30,7 +30,9 @@ void do_remote_command(char* command, char* socket_name, int argc, char** argv)
|
||||
write(remote, command, strlen(command));
|
||||
write(remote, &newline, 1);
|
||||
for (i=0; i<argc; i++) {
|
||||
write(remote, argv[i], strlen(argv[i]));
|
||||
if ( NULL != argv[i] ) {
|
||||
write(remote, argv[i], strlen(argv[i]));
|
||||
}
|
||||
write(remote, &newline, 1);
|
||||
}
|
||||
write(remote, &newline, 1);
|
||||
|
@@ -559,6 +559,15 @@ void serve_signal_close( struct server * serve )
|
||||
self_pipe_signal( serve->close_signal );
|
||||
}
|
||||
|
||||
/* Block until the server closes the server_fd.
|
||||
*/
|
||||
void serve_wait_for_close( struct server * serve )
|
||||
{
|
||||
while( !fd_is_closed( serve->server_fd ) ){
|
||||
usleep(10000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Closes sockets, frees memory and waits for all client threads to finish */
|
||||
void serve_cleanup(struct server* params,
|
||||
|
@@ -86,6 +86,7 @@ void server_dirty(struct server *serve, off64_t from, int len);
|
||||
void server_lock_io( struct server * serve);
|
||||
void server_unlock_io( struct server* serve );
|
||||
void serve_signal_close( struct server *serve );
|
||||
void serve_wait_for_close( struct server * serve );
|
||||
void server_replace_acl( struct server *serve, struct acl * acl);
|
||||
|
||||
|
||||
|
@@ -21,7 +21,7 @@ class FlexNBD
|
||||
end
|
||||
|
||||
def debug?
|
||||
!@debug.empty?
|
||||
!@debug.empty? || ENV['DEBUG']
|
||||
end
|
||||
|
||||
def debug( msg )
|
||||
@@ -60,6 +60,14 @@ class FlexNBD
|
||||
end
|
||||
|
||||
|
||||
def mirror_cmd(dest_ip, dest_port)
|
||||
"#{@bin} mirror "\
|
||||
"--addr #{dest_ip} "\
|
||||
"--port #{dest_port} "\
|
||||
"--sock #{ctrl} "\
|
||||
"#{@debug} "
|
||||
end
|
||||
|
||||
def serve(file, *acl)
|
||||
File.unlink(ctrl) if File.exists?(ctrl)
|
||||
cmd =serve_cmd( file, acl )
|
||||
@@ -79,7 +87,10 @@ class FlexNBD
|
||||
def start_wait_thread( pid )
|
||||
Thread.start do
|
||||
Process.waitpid2( pid )
|
||||
unless @kill
|
||||
if @kill
|
||||
fail "flexnbd quit with a bad status #{$?.exitstatus}" unless
|
||||
$?.exitstatus == @kill
|
||||
else
|
||||
$stderr.puts "flexnbd quit"
|
||||
fail "flexnbd quit early"
|
||||
end
|
||||
@@ -87,9 +98,18 @@ class FlexNBD
|
||||
end
|
||||
|
||||
|
||||
def can_die(status=0)
|
||||
@kill = status
|
||||
end
|
||||
|
||||
def kill
|
||||
@kill = true
|
||||
Process.kill("INT", @pid)
|
||||
can_die()
|
||||
begin
|
||||
Process.kill("INT", @pid)
|
||||
rescue Errno::ESRCH => e
|
||||
# already dead. Presumably this means it went away after a
|
||||
# can_die() call.
|
||||
end
|
||||
end
|
||||
|
||||
def read(offset, length)
|
||||
@@ -114,8 +134,12 @@ class FlexNBD
|
||||
nil
|
||||
end
|
||||
|
||||
def mirror(bandwidth=nil, action=nil)
|
||||
control_command("mirror", ip, port, ip, bandwidth, action)
|
||||
def mirror(dest_ip, dest_port, bandwidth=nil, action=nil)
|
||||
cmd = mirror_cmd( dest_ip, dest_port)
|
||||
debug( cmd )
|
||||
system cmd
|
||||
raise IOError.new( "Migrate command failed") unless $?.success?
|
||||
nil
|
||||
end
|
||||
|
||||
def acl(*acl)
|
||||
|
@@ -14,6 +14,7 @@ class NBDScenarios < Test::Unit::TestCase
|
||||
@port1 = @available_ports.shift
|
||||
@port2 = @available_ports.shift
|
||||
@nbd1 = FlexNBD.new("../build/flexnbd", @ip, @port1)
|
||||
@nbd2 = FlexNBD.new("../build/flexnbd", @ip, @port2)
|
||||
end
|
||||
|
||||
def teardown
|
||||
@@ -70,15 +71,43 @@ class NBDScenarios < Test::Unit::TestCase
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def test_mirror
|
||||
writefile1( "f"*4 )
|
||||
serve1
|
||||
|
||||
writefile2( "0"*4 )
|
||||
serve2
|
||||
|
||||
@nbd1.can_die
|
||||
mirror12
|
||||
assert_equal(@file1.read_original( 0, @blocksize ),
|
||||
@file2.read( 0, @blocksize ) )
|
||||
end
|
||||
|
||||
protected
|
||||
def serve1(*acl)
|
||||
@nbd1.serve(@filename1, *acl)
|
||||
end
|
||||
|
||||
def serve2(*acl)
|
||||
@nbd2.serve(@filename2, *acl)
|
||||
end
|
||||
|
||||
def mirror12
|
||||
@nbd1.mirror( @nbd2.ip, @nbd2.port )
|
||||
end
|
||||
|
||||
def writefile1(data)
|
||||
@file1 = TestFileWriter.new(@filename1, @blocksize).write(data)
|
||||
end
|
||||
|
||||
def writefile2(data)
|
||||
@file2 = TestFileWriter.new(@filename2, @blocksize).write(data)
|
||||
end
|
||||
|
||||
|
||||
|
||||
def listening_ports
|
||||
`netstat -ltn`.
|
||||
split("\n").
|
||||
|
@@ -27,20 +27,15 @@ class TestFileWriter
|
||||
self
|
||||
end
|
||||
|
||||
|
||||
# Returns what the data ought to be at the given offset and length
|
||||
#
|
||||
def read_original(off, len)
|
||||
r=""
|
||||
current = 0
|
||||
@pattern.split("").each do |block|
|
||||
if off >= current && (off+len) < current + blocksize
|
||||
current += data(block, current)[
|
||||
current-off..(current+blocksize)-(off+len)
|
||||
]
|
||||
end
|
||||
current += @blocksize
|
||||
end
|
||||
r
|
||||
def read_original( off, len )
|
||||
patterns = @pattern.split( "" )
|
||||
patterns.zip( (0...patterns.length).to_a ).
|
||||
map { |blk, blk_off|
|
||||
data(blk, blk_off)
|
||||
}.join[off...(off+len)]
|
||||
end
|
||||
|
||||
# Read what's actually in the file
|
||||
@@ -51,7 +46,7 @@ class TestFileWriter
|
||||
end
|
||||
|
||||
def untouched?(offset, len)
|
||||
read(off, len) == read_original(off, len)
|
||||
read(offset, len) == read_original(offset, len)
|
||||
end
|
||||
|
||||
def close
|
||||
@@ -81,3 +76,48 @@ class TestFileWriter
|
||||
|
||||
end
|
||||
|
||||
if __FILE__==$0
|
||||
require 'tempfile'
|
||||
require 'test/unit'
|
||||
|
||||
class TestFileWriterTest < Test::Unit::TestCase
|
||||
def test_read_original_zeros
|
||||
Tempfile.open("test_read_original_zeros") do |tempfile|
|
||||
tempfile.close
|
||||
file = TestFileWriter.new( tempfile.path, 4096 )
|
||||
file.write( "0" )
|
||||
assert_equal file.read( 0, 4096 ), file.read_original( 0, 4096 )
|
||||
assert( file.untouched?(0,4096) , "Untouched file was touched." )
|
||||
end
|
||||
end
|
||||
|
||||
def test_read_original_offsets
|
||||
Tempfile.open("test_read_original_offsets") do |tempfile|
|
||||
tempfile.close
|
||||
file = TestFileWriter.new( tempfile.path, 4096 )
|
||||
file.write( "f" )
|
||||
assert_equal file.read( 0, 4096 ), file.read_original( 0, 4096 )
|
||||
assert( file.untouched?(0,4096) , "Untouched file was touched." )
|
||||
end
|
||||
end
|
||||
|
||||
def test_file_size
|
||||
Tempfile.open("test_file_size") do |tempfile|
|
||||
tempfile.close
|
||||
file = TestFileWriter.new( tempfile.path, 4096 )
|
||||
file.write( "f" )
|
||||
assert_equal 4096, File.stat( tempfile.path ).size
|
||||
end
|
||||
end
|
||||
|
||||
def test_read_original_size
|
||||
Tempfile.open("test_read_original_offsets") do |tempfile|
|
||||
tempfile.close
|
||||
file = TestFileWriter.new( tempfile.path, 4)
|
||||
file.write( "f"*4 )
|
||||
assert_equal 4, file.read_original(0, 4).length
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
Reference in New Issue
Block a user