diff --git a/tests/acceptance/active_migration_test/check.rb b/tests/acceptance/active_migration_test/check.rb deleted file mode 100755 index ca61445..0000000 --- a/tests/acceptance/active_migration_test/check.rb +++ /dev/null @@ -1,124 +0,0 @@ -#!/usr/bin/env ruby -require 'rubygems' -require 'flexnbd/fake_source' -require 'socket' -require 'fileutils' - - -FLEXNBD = ARGV.shift -raise "No binary!" unless File.executable?( FLEXNBD ) -raise "No trickle!" unless system "which trickle > /dev/null" - -Thread.abort_on_exception = true - - -#BREAKIT = false -BREAKIT = true - -MB = 1024*1024 -GB = 1024*MB -SIZE = 20*MB -WRITE_DATA = "foo!" * 2048 # 8K write -SOURCE_PORT = 9990 -DEST_PORT = 9991 -SOURCE_SOCK = "src.sock" -DEST_SOCK = "dst.sock" -SOURCE_FILE = "src.file" -DEST_FILE = "dst.file" - -puts "Cleaning up from old runs..." -FileUtils.rm_f "src*" -FileUtils.rm_f "dst*" - -puts "Creating source & destination files..." -FileUtils.touch(SOURCE_FILE) -File.truncate(SOURCE_FILE, SIZE) -FileUtils.touch(DEST_FILE) -File.truncate(DEST_FILE, SIZE) - -puts "Making source data" -File.open(SOURCE_FILE, "wb"){|f| f.write "a"*SIZE } -#Kernel.system "dd if=/dev/urandom of=#{SOURCE_FILE} bs=#{SIZE} count=1" - -puts "Starting destination process..." -dst_proc = fork() { - cmd = "#{FLEXNBD} listen -l 127.0.0.1 -p #{DEST_PORT} -f #{DEST_FILE} -s #{DEST_SOCK} -v" - exec cmd -} - -puts "Starting source process..." -src_proc = fork() { - cmd = "#{FLEXNBD} serve -l 127.0.0.1 -p #{SOURCE_PORT} -f #{SOURCE_FILE} -s #{SOURCE_SOCK} -v" - #exec "trickle -t 1 -u 5120 -s #{cmd}" - exec cmd -} - -puts "dst_proc is #{dst_proc}" -puts "src_proc is #{src_proc}" - - -at_exit { - [dst_proc, src_proc].each do |pid| - Process.kill( "KILL", pid ) rescue nil - end - [SOURCE_FILE, DEST_FILE, SOURCE_SOCK, DEST_SOCK].each do |filename| - FileUtils.rm_f filename - end -} - - - -puts "Sleeping to let flexnbds run..." -Timeout.timeout(10) do - sleep 0.1 while !File.exists?( SOURCE_SOCK ) - puts "Got source sock" - sleep 0.1 while !File.exists?( DEST_SOCK ) - puts "Got dest sock" -end - - -if BREAKIT - # Start writing to the source - puts "Creating thread to write to the source..." - - src_writer = Thread.new do - client = FlexNBD::FakeSource.new( "127.0.0.1", SOURCE_PORT, "Timed out connecting" ) - loop do - begin - client.write(0, WRITE_DATA) - rescue => err - puts "Writer encountered #{err.inspect} writing to source" - break - end - end - end -end - -puts "Starting mirroring process..." -UNIXSocket.open(SOURCE_SOCK) {|sock| - sock.write(["mirror", "127.0.0.1", DEST_PORT.to_s, "unlink"].join("\x0A") + "\x0A\x0A") - sock.flush - rsp = sock.readline - puts "Response: #{rsp}" -} - -# tell serve to migrate to the listen one - -Timeout.timeout( 10 ) do - start_time = Time.now - puts "Waiting for destination #{dst_proc} to exit..." - dst_result = Process::waitpid2(dst_proc) - puts "destination exited: #{dst_result.inspect}" - - puts "Waiting for source to exit..." - src_result = Process::waitpid2(src_proc) - puts "Source exited after #{Time.now - start_time} seconds: #{src_result.inspect}" -end - - -if BREAKIT - puts "Waiting for writer to die..." - src_writer.join - puts "Writer has died" -end - diff --git a/tests/acceptance/environment.rb b/tests/acceptance/environment.rb index 4607fe4..5c1c1c8 100644 --- a/tests/acceptance/environment.rb +++ b/tests/acceptance/environment.rb @@ -17,8 +17,8 @@ class Environment @rebind_port1 = @available_ports.shift @port2 = @available_ports.shift @rebind_port2 = @available_ports.shift - @nbd1 = FlexNBD.new("../../build/flexnbd", @ip, @port1) - @nbd2 = FlexNBD.new("../../build/flexnbd", @ip, @port2) + @nbd1 = FlexNBD::FlexNBD.new("../../build/flexnbd", @ip, @port1) + @nbd2 = FlexNBD::FlexNBD.new("../../build/flexnbd", @ip, @port2) @fake_pid = nil end diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index 409dcf5..1262982 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -163,353 +163,355 @@ class ValgrindKillingExecutor end # class ValgrindExecutor -# Noddy test class to exercise FlexNBD from the outside for testing. -# -class FlexNBD - attr_reader :bin, :ctrl, :pid, :ip, :port +module FlexNBD + # Noddy test class to exercise FlexNBD from the outside for testing. + # + class FlexNBD + attr_reader :bin, :ctrl, :pid, :ip, :port - class << self - def counter - Dir['tmp/*'].select{|f| File.file?(f)}.length + 1 - end - end - - def pick_executor - kls = if ENV['VALGRIND'] - if ENV['VALGRIND'] =~ /kill/ - ValgrindKillingExecutor - else - ValgrindExecutor + class << self + def counter + Dir['tmp/*'].select{|f| File.file?(f)}.length + 1 end - else - Executor end - end - - def build_debug_opt - if @do_debug - "--verbose" - else - "--quiet" + def pick_executor + kls = if ENV['VALGRIND'] + if ENV['VALGRIND'] =~ /kill/ + ValgrindKillingExecutor + else + ValgrindExecutor + end + else + Executor + end end - end - - def initialize( bin, ip, port ) - @bin = bin - @do_debug = ENV['DEBUG'] - @debug = build_debug_opt - raise "#{bin} not executable" unless File.executable?(bin) - @executor = pick_executor.new - @ctrl = "/tmp/.flexnbd.ctrl.#{Time.now.to_i}.#{rand}" - @ip = ip - @port = port - @kill = [] - end - def debug? - !!@do_debug - end - - def debug( msg ) - $stderr.puts msg if debug? - end - - - def serve_cmd( file, acl ) - "#{bin} serve "\ - "--addr #{ip} "\ - "--port #{port} "\ - "--file #{file} "\ - "--sock #{ctrl} "\ - "#{@debug} "\ - "#{acl.join(' ')}" - end - - - def listen_cmd( file, acl ) - "#{bin} listen "\ - "--addr #{ip} "\ - "--port #{port} "\ - "--file #{file} "\ - "--sock #{ctrl} "\ - "#{@debug} "\ - "#{acl.join(' ')}" - end - - - def read_cmd( offset, length ) - "#{bin} read "\ - "--addr #{ip} "\ - "--port #{port} "\ - "--from #{offset} "\ - "#{@debug} "\ - "--size #{length}" - end - - - def write_cmd( offset, data ) - "#{bin} write "\ - "--addr #{ip} "\ - "--port #{port} "\ - "--from #{offset} "\ - "#{@debug} "\ - "--size #{data.length}" - end - - - def base_mirror_opts( dest_ip, dest_port ) - "--addr #{dest_ip} "\ - "--port #{dest_port} "\ - "--sock #{ctrl} "\ - end - - def unlink_mirror_opts( dest_ip, dest_port ) - "#{base_mirror_opts( dest_ip, dest_port )} "\ - "--unlink " - end - - def base_mirror_cmd( opts ) - "#{@bin} mirror "\ - "#{opts} "\ - "#{@debug}" - end - - def mirror_cmd(dest_ip, dest_port) - base_mirror_cmd( base_mirror_opts( dest_ip, dest_port ) ) - end - - def mirror_unlink_cmd( dest_ip, dest_port ) - base_mirror_cmd( unlink_mirror_opts( dest_ip, dest_port ) ) - end - - def break_cmd - "#{@bin} break "\ - "--sock #{ctrl} "\ - "#{@debug}" - end - - def status_cmd - "#{@bin} status "\ - "--sock #{ctrl} "\ - "#{@debug}" - end - - def acl_cmd( *acl ) - "#{@bin} acl " \ - "--sock #{ctrl} "\ - "#{@debug} "\ - "#{acl.join " "}" - end - - - def run_serve_cmd(cmd) - File.unlink(ctrl) if File.exists?(ctrl) - debug( cmd ) - - @pid = @executor.run( cmd ) - start_wait_thread( @pid ) - - while !File.socket?(ctrl) - pid, status = Process.wait2(@pid, Process::WNOHANG) - raise "server did not start (#{cmd})" if pid - sleep 0.1 + def build_debug_opt + if @do_debug + "--verbose" + else + "--quiet" + end + end + + def initialize( bin, ip, port ) + @bin = bin + @do_debug = ENV['DEBUG'] + @debug = build_debug_opt + raise "#{bin} not executable" unless File.executable?(bin) + @executor = pick_executor.new + @ctrl = "/tmp/.flexnbd.ctrl.#{Time.now.to_i}.#{rand}" + @ip = ip + @port = port + @kill = [] end - at_exit { kill } - end - private :run_serve_cmd - def serve( file, *acl) - run_serve_cmd( serve_cmd( file, acl ) ) - end + def debug? + !!@do_debug + end - def listen(file, *acl) - run_serve_cmd( listen_cmd( file, acl ) ) - end + def debug( msg ) + $stderr.puts msg if debug? + end - def start_wait_thread( pid ) - @wait_thread = Thread.start do - _, status = Process.waitpid2( pid ) + def serve_cmd( file, acl ) + "#{bin} serve "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--file #{file} "\ + "--sock #{ctrl} "\ + "#{@debug} "\ + "#{acl.join(' ')}" + end - if @kill - if status.signaled? - fail "flexnbd quit with a bad signal: #{status.inspect}" unless + + def listen_cmd( file, acl ) + "#{bin} listen "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--file #{file} "\ + "--sock #{ctrl} "\ + "#{@debug} "\ + "#{acl.join(' ')}" + end + + + def read_cmd( offset, length ) + "#{bin} read "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--from #{offset} "\ + "#{@debug} "\ + "--size #{length}" + end + + + def write_cmd( offset, data ) + "#{bin} write "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--from #{offset} "\ + "#{@debug} "\ + "--size #{data.length}" + end + + + def base_mirror_opts( dest_ip, dest_port ) + "--addr #{dest_ip} "\ + "--port #{dest_port} "\ + "--sock #{ctrl} "\ + end + + def unlink_mirror_opts( dest_ip, dest_port ) + "#{base_mirror_opts( dest_ip, dest_port )} "\ + "--unlink " + end + + def base_mirror_cmd( opts ) + "#{@bin} mirror "\ + "#{opts} "\ + "#{@debug}" + end + + def mirror_cmd(dest_ip, dest_port) + base_mirror_cmd( base_mirror_opts( dest_ip, dest_port ) ) + end + + def mirror_unlink_cmd( dest_ip, dest_port ) + base_mirror_cmd( unlink_mirror_opts( dest_ip, dest_port ) ) + end + + def break_cmd + "#{@bin} break "\ + "--sock #{ctrl} "\ + "#{@debug}" + end + + def status_cmd + "#{@bin} status "\ + "--sock #{ctrl} "\ + "#{@debug}" + end + + def acl_cmd( *acl ) + "#{@bin} acl " \ + "--sock #{ctrl} "\ + "#{@debug} "\ + "#{acl.join " "}" + end + + + def run_serve_cmd(cmd) + File.unlink(ctrl) if File.exists?(ctrl) + debug( cmd ) + + @pid = @executor.run( cmd ) + start_wait_thread( @pid ) + + while !File.socket?(ctrl) + pid, status = Process.wait2(@pid, Process::WNOHANG) + raise "server did not start (#{cmd})" if pid + sleep 0.1 + end + at_exit { kill } + end + private :run_serve_cmd + + + def serve( file, *acl) + run_serve_cmd( serve_cmd( file, acl ) ) + end + + def listen(file, *acl) + run_serve_cmd( listen_cmd( file, acl ) ) + end + + + def start_wait_thread( pid ) + @wait_thread = Thread.start do + _, status = Process.waitpid2( pid ) + + if @kill + if status.signaled? + fail "flexnbd quit with a bad signal: #{status.inspect}" unless @kill.include? status.termsig - else - fail "flexnbd quit with a bad status: #{status.inspect}" unless + else + fail "flexnbd quit with a bad status: #{status.inspect}" unless @kill.include? status.exitstatus + end + else + $stderr.puts "flexnbd #{self.pid} quit" + fail "flexnbd #{self.pid} quit early with status #{status.to_i}" end + end + end + + + def can_die(*status) + status = [0] if status.empty? + @kill += status + end + + def kill + # At this point, to a certain degree we don't care what the exit + # status is + can_die(1) + if @pid + begin + Process.kill("INT", @pid) + rescue Errno::ESRCH => e + # already dead. Presumably this means it went away after a + # can_die() call. + end + end + @wait_thread.join if @wait_thread + end + + def read(offset, length) + cmd = read_cmd( offset, length ) + debug( cmd ) + + IO.popen(cmd) do |fh| + return fh.read + end + raise IOError.new "NBD read failed" unless $?.success? + out + end + + def write(offset, data) + cmd = write_cmd( offset, data ) + debug( cmd ) + + IO.popen(cmd, "w") do |fh| + fh.write(data) + end + raise IOError.new "NBD write failed" unless $?.success? + nil + end + + + def join + @wait_thread.join + end + + + def mirror_unchecked( dest_ip, dest_port, bandwidth=nil, action=nil, timeout=nil ) + cmd = mirror_cmd( dest_ip, dest_port) + debug( cmd ) + + maybe_timeout( cmd, timeout ) + end + + + def mirror_unlink( dest_ip, dest_port, timeout=nil ) + cmd = mirror_unlink_cmd( dest_ip, dest_port ) + debug( cmd ) + + maybe_timeout( cmd, timeout ) + end + + + def maybe_timeout(cmd, timeout=nil ) + stdout, stderr = "","" + run = Proc.new do + Open3.popen3( cmd ) do |io_in, io_out, io_err| + io_in.close + stdout.replace io_out.read + stderr.replace io_err.read + end + end + + if timeout + Timeout.timeout(timeout, &run) else - $stderr.puts "flexnbd #{self.pid} quit" - fail "flexnbd #{self.pid} quit early with status #{status.to_i}" + run.call end + + [stdout, stderr] end - end - def can_die(*status) - status = [0] if status.empty? - @kill += status - end + def mirror(dest_ip, dest_port, bandwidth=nil, action=nil) + stdout, stderr = mirror_unchecked( dest_ip, dest_port, bandwidth, action ) + raise IOError.new( "Migrate command failed\n" + stderr) unless $?.success? - def kill - # At this point, to a certain degree we don't care what the exit - # status is - can_die(1) - if @pid - begin - Process.kill("INT", @pid) - rescue Errno::ESRCH => e - # already dead. Presumably this means it went away after a - # can_die() call. - end + stdout end - @wait_thread.join if @wait_thread - end - def read(offset, length) - cmd = read_cmd( offset, length ) - debug( cmd ) - IO.popen(cmd) do |fh| - return fh.read + + def break(timeout=nil) + cmd = break_cmd + debug( cmd ) + + maybe_timeout( cmd, timeout ) end - raise IOError.new "NBD read failed" unless $?.success? - out - end - def write(offset, data) - cmd = write_cmd( offset, data ) - debug( cmd ) - IO.popen(cmd, "w") do |fh| - fh.write(data) + def acl(*acl) + cmd = acl_cmd( *acl ) + debug( cmd ) + + maybe_timeout( cmd, 2 ) end - raise IOError.new "NBD write failed" unless $?.success? - nil - end - def join - @wait_thread.join - end + def status( timeout = nil ) + cmd = status_cmd() + debug( cmd ) + + o,e = maybe_timeout( cmd, timeout ) + + [parse_status(o), e] + end - def mirror_unchecked( dest_ip, dest_port, bandwidth=nil, action=nil, timeout=nil ) - cmd = mirror_cmd( dest_ip, dest_port) - debug( cmd ) - - maybe_timeout( cmd, timeout ) - end + def launched? + !!@pid + end - def mirror_unlink( dest_ip, dest_port, timeout=nil ) - cmd = mirror_unlink_cmd( dest_ip, dest_port ) - debug( cmd ) - - maybe_timeout( cmd, timeout ) - end + def paused + Process.kill( "STOP", @pid ) + yield + ensure + Process.kill( "CONT", @pid ) + end - def maybe_timeout(cmd, timeout=nil ) - stdout, stderr = "","" - run = Proc.new do - Open3.popen3( cmd ) do |io_in, io_out, io_err| - io_in.close - stdout.replace io_out.read - stderr.replace io_err.read + protected + def control_command(*args) + raise "Server not running" unless @pid + args = args.compact + UNIXSocket.open(@ctrl) do |u| + u.write(args.join("\n") + "\n") + code, message = u.readline.split(": ", 2) + return [code, message] end end - if timeout - Timeout.timeout(timeout, &run) - else - run.call + + def parse_status( status ) + hsh = {} + + status.split(" ").each do |part| + next if part.strip.empty? + a,b = part.split("=") + b.strip! + b = true if b == "true" + b = false if b == "false" + + hsh[a.strip] = b + end + + hsh end - [stdout, stderr] + end - - def mirror(dest_ip, dest_port, bandwidth=nil, action=nil) - stdout, stderr = mirror_unchecked( dest_ip, dest_port, bandwidth, action ) - raise IOError.new( "Migrate command failed\n" + stderr) unless $?.success? - - stdout - end - - - - def break(timeout=nil) - cmd = break_cmd - debug( cmd ) - - maybe_timeout( cmd, timeout ) - end - - - def acl(*acl) - cmd = acl_cmd( *acl ) - debug( cmd ) - - maybe_timeout( cmd, 2 ) - end - - - def status( timeout = nil ) - cmd = status_cmd() - debug( cmd ) - - o,e = maybe_timeout( cmd, timeout ) - - [parse_status(o), e] - end - - - def launched? - !!@pid - end - - - def paused - Process.kill( "STOP", @pid ) - yield - ensure - Process.kill( "CONT", @pid ) - end - - - protected - def control_command(*args) - raise "Server not running" unless @pid - args = args.compact - UNIXSocket.open(@ctrl) do |u| - u.write(args.join("\n") + "\n") - code, message = u.readline.split(": ", 2) - return [code, message] - end - end - - - def parse_status( status ) - hsh = {} - - status.split(" ").each do |part| - next if part.strip.empty? - a,b = part.split("=") - b.strip! - b = true if b == "true" - b = false if b == "false" - - hsh[a.strip] = b - end - - hsh - end - - end - diff --git a/tests/acceptance/test_write_during_migration.rb b/tests/acceptance/test_write_during_migration.rb new file mode 100755 index 0000000..4bad9c2 --- /dev/null +++ b/tests/acceptance/test_write_during_migration.rb @@ -0,0 +1,128 @@ +#!/usr/bin/env ruby + +require 'test/unit' +require 'flexnbd/fake_source' +require 'socket' +require 'fileutils' +require 'tmpdir' + +Thread.abort_on_exception = true + +class TestWriteDuringMigration < Test::Unit::TestCase + + def setup + @flexnbd = File.expand_path("../../build/flexnbd") + + raise "No binary!" unless File.executable?( @flexnbd ) + raise "No trickle!" unless system "which trickle > /dev/null" + + + @size = 20*1024*1024 # 20MB + @write_data = "foo!" * 2048 # 8K write + @source_port = 9990 + @dest_port = 9991 + @source_sock = "src.sock" + @dest_sock = "dst.sock" + @source_file = "src.file" + @dest_file = "dst.file" + end + + + def teardown + [@dst_proc, @src_proc].each do |pid| + if pid + Process.kill( "KILL", pid ) rescue nil + end + end + end + + + def launch_servers + @dst_proc = fork() { + cmd = "#{@flexnbd} listen -l 127.0.0.1 -p #{@dest_port} -f #{@dest_file} -s #{@dest_sock}" + exec cmd + } + + @src_proc = fork() { + cmd = "#{@flexnbd} serve -l 127.0.0.1 -p #{@source_port} -f #{@source_file} -s #{@source_sock}" + exec cmd + } + begin + awaiting = nil + Timeout.timeout(10) do + awaiting = :source + sleep 0.1 while !File.exists?( @source_sock ) + awaiting = :dest + sleep 0.1 while !File.exists?( @dest_sock ) + end + rescue Timeout::Error + case awaiting + when :source + fail "Couldn't get a source socket." + when :dest + fail "Couldn't get a destination socket." + else + fail "Something went wrong I don't understand." + end + end + end + + + def make_files() + FileUtils.touch(@source_file) + File.truncate(@source_file, @size) + FileUtils.touch(@dest_file) + File.truncate(@dest_file, @size) + + File.open(@source_file, "wb"){|f| f.write "a"*@size } + end + + + def start_mirror + UNIXSocket.open(@source_sock) {|sock| + sock.write(["mirror", "127.0.0.1", @dest_port.to_s, "unlink"].join("\x0A") + "\x0A\x0A") + sock.flush + rsp = sock.readline + } + end + + + def wait_for_quit() + Timeout.timeout( 10 ) do + start_time = Time.now + dst_result = Process::waitpid2(@dst_proc) + src_result = Process::waitpid2(@src_proc) + end + + end + + + def test_write_during_migration + + Dir.mktmpdir() do |tmpdir| + Dir.chdir( tmpdir ) do + make_files() + + launch_servers() + + src_writer = Thread.new do + client = FlexNBD::FakeSource.new( "127.0.0.1", @source_port, "Timed out connecting" ) + loop do + begin + client.write(0, @write_data) + rescue => err + # We expect a broken write at some point, so ignore it + break + end + end + end + + start_mirror() + wait_for_quit() + src_writer.join + end + end + end + + +end diff --git a/tests/fuzz b/tests/fuzz index 7d136c5..9739bed 100644 --- a/tests/fuzz +++ b/tests/fuzz @@ -29,7 +29,7 @@ end @local = File.open(testname_local, "r+") -@serve = FlexNBD.new(binary, "127.0.0.1", 41234) +@serve = FlexNBD::FlexNBD.new(binary, "127.0.0.1", 41234) @serve.serve(testname_serve) $record = []