diff --git a/src/control.c b/src/control.c index 3d381ad..0a1eb61 100644 --- a/src/control.c +++ b/src/control.c @@ -58,6 +58,9 @@ void* mirror_runner(void* serve_params_uncast) const int last_pass = mirror_maximum_passes-1; int pass; struct server *serve = (struct server*) serve_params_uncast; + NULLCHECK( serve ); + debug("Starting mirror" ); + struct bitset_mapping *map = serve->mirror->dirty_map; for (pass=0; pass < mirror_maximum_passes; pass++) { @@ -135,8 +138,18 @@ void* mirror_runner(void* serve_params_uncast) { case ACTION_EXIT: debug("exit!"); - close(serve->mirror->client); serve_signal_close( serve ); + /* We have to wait until the server is closed before + * unlocking IO. This is because the client threads + * check to see if the server is still open before + * reading or writing inside their own locks. If we + * don't wait for the close, there's no way to guarantee + * the server thread will win the race and we risk the + * clients seeing a "successful" write to a dead disc + * image. + */ + serve_wait_for_close( serve ); + info("Server closed, quitting after successful migration"); /* fall through */ case ACTION_NOTHING: debug("nothing!"); diff --git a/src/remote.c b/src/remote.c index 5eb9df6..7ffbe48 100644 --- a/src/remote.c +++ b/src/remote.c @@ -30,7 +30,9 @@ void do_remote_command(char* command, char* socket_name, int argc, char** argv) write(remote, command, strlen(command)); write(remote, &newline, 1); for (i=0; iclose_signal ); } +/* Block until the server closes the server_fd. + */ +void serve_wait_for_close( struct server * serve ) +{ + while( !fd_is_closed( serve->server_fd ) ){ + usleep(10000); + } +} + /** Closes sockets, frees memory and waits for all client threads to finish */ void serve_cleanup(struct server* params, diff --git a/src/serve.h b/src/serve.h index bc750d9..72f6c2c 100644 --- a/src/serve.h +++ b/src/serve.h @@ -86,6 +86,7 @@ void server_dirty(struct server *serve, off64_t from, int len); void server_lock_io( struct server * serve); void server_unlock_io( struct server* serve ); void serve_signal_close( struct server *serve ); +void serve_wait_for_close( struct server * serve ); void server_replace_acl( struct server *serve, struct acl * acl); diff --git a/tests/flexnbd.rb b/tests/flexnbd.rb index 94a0ebd..f1a6d6a 100644 --- a/tests/flexnbd.rb +++ b/tests/flexnbd.rb @@ -21,7 +21,7 @@ class FlexNBD end def debug? - !@debug.empty? + !@debug.empty? || ENV['DEBUG'] end def debug( msg ) @@ -60,6 +60,14 @@ class FlexNBD end + def mirror_cmd(dest_ip, dest_port) + "#{@bin} mirror "\ + "--addr #{dest_ip} "\ + "--port #{dest_port} "\ + "--sock #{ctrl} "\ + "#{@debug} " + end + def serve(file, *acl) File.unlink(ctrl) if File.exists?(ctrl) cmd =serve_cmd( file, acl ) @@ -79,7 +87,10 @@ class FlexNBD def start_wait_thread( pid ) Thread.start do Process.waitpid2( pid ) - unless @kill + if @kill + fail "flexnbd quit with a bad status #{$?.exitstatus}" unless + $?.exitstatus == @kill + else $stderr.puts "flexnbd quit" fail "flexnbd quit early" end @@ -87,9 +98,18 @@ class FlexNBD end + def can_die(status=0) + @kill = status + end + def kill - @kill = true - Process.kill("INT", @pid) + can_die() + begin + Process.kill("INT", @pid) + rescue Errno::ESRCH => e + # already dead. Presumably this means it went away after a + # can_die() call. + end end def read(offset, length) @@ -114,8 +134,12 @@ class FlexNBD nil end - def mirror(bandwidth=nil, action=nil) - control_command("mirror", ip, port, ip, bandwidth, action) + def mirror(dest_ip, dest_port, bandwidth=nil, action=nil) + cmd = mirror_cmd( dest_ip, dest_port) + debug( cmd ) + system cmd + raise IOError.new( "Migrate command failed") unless $?.success? + nil end def acl(*acl) diff --git a/tests/nbd_scenarios b/tests/nbd_scenarios index be58cfe..8e44bf5 100644 --- a/tests/nbd_scenarios +++ b/tests/nbd_scenarios @@ -14,6 +14,7 @@ class NBDScenarios < Test::Unit::TestCase @port1 = @available_ports.shift @port2 = @available_ports.shift @nbd1 = FlexNBD.new("../build/flexnbd", @ip, @port1) + @nbd2 = FlexNBD.new("../build/flexnbd", @ip, @port2) end def teardown @@ -70,15 +71,43 @@ class NBDScenarios < Test::Unit::TestCase end end + + def test_mirror + writefile1( "f"*4 ) + serve1 + + writefile2( "0"*4 ) + serve2 + + @nbd1.can_die + mirror12 + assert_equal(@file1.read_original( 0, @blocksize ), + @file2.read( 0, @blocksize ) ) + end + protected def serve1(*acl) @nbd1.serve(@filename1, *acl) end + def serve2(*acl) + @nbd2.serve(@filename2, *acl) + end + + def mirror12 + @nbd1.mirror( @nbd2.ip, @nbd2.port ) + end + def writefile1(data) @file1 = TestFileWriter.new(@filename1, @blocksize).write(data) end + def writefile2(data) + @file2 = TestFileWriter.new(@filename2, @blocksize).write(data) + end + + + def listening_ports `netstat -ltn`. split("\n"). diff --git a/tests/test_file_writer.rb b/tests/test_file_writer.rb index 0ff6f5d..025e340 100644 --- a/tests/test_file_writer.rb +++ b/tests/test_file_writer.rb @@ -27,20 +27,15 @@ class TestFileWriter self end + # Returns what the data ought to be at the given offset and length # - def read_original(off, len) - r="" - current = 0 - @pattern.split("").each do |block| - if off >= current && (off+len) < current + blocksize - current += data(block, current)[ - current-off..(current+blocksize)-(off+len) - ] - end - current += @blocksize - end - r + def read_original( off, len ) + patterns = @pattern.split( "" ) + patterns.zip( (0...patterns.length).to_a ). + map { |blk, blk_off| + data(blk, blk_off) + }.join[off...(off+len)] end # Read what's actually in the file @@ -51,7 +46,7 @@ class TestFileWriter end def untouched?(offset, len) - read(off, len) == read_original(off, len) + read(offset, len) == read_original(offset, len) end def close @@ -81,3 +76,48 @@ class TestFileWriter end +if __FILE__==$0 + require 'tempfile' + require 'test/unit' + + class TestFileWriterTest < Test::Unit::TestCase + def test_read_original_zeros + Tempfile.open("test_read_original_zeros") do |tempfile| + tempfile.close + file = TestFileWriter.new( tempfile.path, 4096 ) + file.write( "0" ) + assert_equal file.read( 0, 4096 ), file.read_original( 0, 4096 ) + assert( file.untouched?(0,4096) , "Untouched file was touched." ) + end + end + + def test_read_original_offsets + Tempfile.open("test_read_original_offsets") do |tempfile| + tempfile.close + file = TestFileWriter.new( tempfile.path, 4096 ) + file.write( "f" ) + assert_equal file.read( 0, 4096 ), file.read_original( 0, 4096 ) + assert( file.untouched?(0,4096) , "Untouched file was touched." ) + end + end + + def test_file_size + Tempfile.open("test_file_size") do |tempfile| + tempfile.close + file = TestFileWriter.new( tempfile.path, 4096 ) + file.write( "f" ) + assert_equal 4096, File.stat( tempfile.path ).size + end + end + + def test_read_original_size + Tempfile.open("test_read_original_offsets") do |tempfile| + tempfile.close + file = TestFileWriter.new( tempfile.path, 4) + file.write( "f"*4 ) + assert_equal 4, file.read_original(0, 4).length + end + end + end +end +