This commit is contained in:
Patrick J Cherry
2018-02-02 21:34:14 +00:00
parent 1b7b688f7a
commit 9c48da82cc
40 changed files with 666 additions and 858 deletions

View File

@@ -1,22 +1,20 @@
# encoding: utf-8
require 'flexnbd' require 'flexnbd'
require 'file_writer' require 'file_writer'
class Environment class Environment
attr_reader( :blocksize, :filename1, :filename2, :ip, attr_reader(:blocksize, :filename1, :filename2, :ip,
:port1, :port2, :nbd1, :nbd2, :file1, :file2 ) :port1, :port2, :nbd1, :nbd2, :file1, :file2)
def initialize def initialize
@blocksize = 1024 @blocksize = 1024
@filename1 = "/tmp/.flexnbd.test.#{$$}.#{Time.now.to_i}.1" @filename1 = "/tmp/.flexnbd.test.#{$PROCESS_ID}.#{Time.now.to_i}.1"
@filename2 = "/tmp/.flexnbd.test.#{$$}.#{Time.now.to_i}.2" @filename2 = "/tmp/.flexnbd.test.#{$PROCESS_ID}.#{Time.now.to_i}.2"
@ip = "127.0.0.1" @ip = '127.0.0.1'
@available_ports = [*40000..41000] - listening_ports @available_ports = [*40_000..41_000] - listening_ports
@port1 = @available_ports.shift @port1 = @available_ports.shift
@port2 = @available_ports.shift @port2 = @available_ports.shift
@nbd1 = FlexNBD::FlexNBD.new("../../build/flexnbd", @ip, @port1) @nbd1 = FlexNBD::FlexNBD.new('../../build/flexnbd', @ip, @port1)
@nbd2 = FlexNBD::FlexNBD.new("../../build/flexnbd", @ip, @port2) @nbd2 = FlexNBD::FlexNBD.new('../../build/flexnbd', @ip, @port2)
@fake_pid = nil @fake_pid = nil
end end
@@ -26,14 +24,14 @@ class Environment
@nbd2.prefetch_proxy = true @nbd2.prefetch_proxy = true
end end
def proxy1(port=@port2) def proxy1(port = @port2)
@nbd1.proxy(@ip, port) @nbd1.proxy(@ip, port)
end end
def proxy2(port=@port1)
def proxy2(port = @port1)
@nbd2.proxy(@ip, port) @nbd2.proxy(@ip, port)
end end
def serve1(*acl) def serve1(*acl)
@nbd1.serve(@filename1, *acl) @nbd1.serve(@filename1, *acl)
end end
@@ -42,29 +40,26 @@ class Environment
@nbd2.serve(@filename2, *acl) @nbd2.serve(@filename2, *acl)
end end
def listen1(*acl)
def listen1( *acl ) @nbd1.listen(@filename1, *(acl.empty? ? @acl1 : acl))
@nbd1.listen( @filename1, *(acl.empty? ? @acl1: acl) )
end end
def listen2( *acl ) def listen2(*acl)
@nbd2.listen( @filename2, *acl ) @nbd2.listen(@filename2, *acl)
end end
def break1 def break1
@nbd1.break @nbd1.break
end end
def acl1( *acl ) def acl1(*acl)
@nbd1.acl( *acl ) @nbd1.acl(*acl)
end end
def acl2( *acl ) def acl2(*acl)
@nbd2.acl( *acl ) @nbd2.acl(*acl)
end end
def status1 def status1
@nbd1.status.first @nbd1.status.first
end end
@@ -73,23 +68,20 @@ class Environment
@nbd2.status.first @nbd2.status.first
end end
def mirror12 def mirror12
@nbd1.mirror( @nbd2.ip, @nbd2.port ) @nbd1.mirror(@nbd2.ip, @nbd2.port)
end end
def mirror12_unchecked def mirror12_unchecked
@nbd1.mirror_unchecked( @nbd2.ip, @nbd2.port, nil, nil, 10 ) @nbd1.mirror_unchecked(@nbd2.ip, @nbd2.port, nil, nil, 10)
end end
def mirror12_unlink def mirror12_unlink
@nbd1.mirror_unlink( @nbd2.ip, @nbd2.port, 2 ) @nbd1.mirror_unlink(@nbd2.ip, @nbd2.port, 2)
end end
def write1(data)
def write1( data ) @nbd1.write(0, data)
@nbd1.write( 0, data )
end end
def writefile1(data) def writefile1(data)
@@ -100,63 +92,54 @@ class Environment
@file2 = FileWriter.new(@filename2, @blocksize).write(data) @file2 = FileWriter.new(@filename2, @blocksize).write(data)
end end
def truncate1(size)
def truncate1( size )
system "truncate -s #{size} #{@filename1}" system "truncate -s #{size} #{@filename1}"
end end
def listening_ports def listening_ports
`netstat -ltn`. `netstat -ltn`
split("\n"). .split("\n")
map { |x| x.split(/\s+/) }[2..-1]. .map { |x| x.split(/\s+/) }[2..-1]
map { |l| l[3].split(":")[-1].to_i } .map { |l| l[3].split(':')[-1].to_i }
end end
def cleanup def cleanup
if @fake_pid if @fake_pid
begin begin
Process.waitpid2( @fake_pid ) Process.waitpid2(@fake_pid)
rescue Errno::ESRCH rescue Errno::ESRCH
end end
end end
@nbd1.can_die(0) @nbd1.can_die(0)
@nbd1.kill @nbd1.kill
@nbd2.kill @nbd2.kill
[@filename1, @filename2].each do |f| [@filename1, @filename2].each do |f|
File.unlink(f) if File.exists?(f) File.unlink(f) if File.exist?(f)
end end
end end
def run_fake(name, addr, port, sock = nil)
def run_fake( name, addr, port, sock=nil ) fakedir = File.join(File.dirname(__FILE__), 'fakes')
fakedir = File.join( File.dirname( __FILE__ ), "fakes" ) fakeglob = File.join(fakedir, name) + '*'
fakeglob = File.join( fakedir, name ) + "*" fake = Dir[fakeglob].sort.find do |fn|
fake = Dir[fakeglob].sort.find { |fn| File.executable?(fn)
File.executable?( fn ) end
}
raise "no fake executable at #{fakeglob}" unless fake raise "no fake executable at #{fakeglob}" unless fake
raise "no addr" unless addr raise 'no addr' unless addr
raise "no port" unless port raise 'no port' unless port
@fake_pid = fork do @fake_pid = fork do
exec [fake, addr, port, @nbd1.pid, sock].map{|x| x.to_s}.join(" ") exec [fake, addr, port, @nbd1.pid, sock].map(&:to_s).join(' ')
end end
sleep(0.5) sleep(0.5)
end end
def fake_reports_success def fake_reports_success
_,status = Process.waitpid2( @fake_pid ) _, status = Process.waitpid2(@fake_pid)
@fake_pid = nil @fake_pid = nil
status.success? status.success?
end end
end # class Environment end # class Environment

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# Open a server, accept a client, then cancel the migration by issuing # Open a server, accept a client, then cancel the migration by issuing
# a break command. # a break command.
@@ -8,28 +6,27 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port, src_pid, sock = *ARGV addr, port, src_pid, sock = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept client = server.accept
ctrl = UNIXSocket.open( sock ) ctrl = UNIXSocket.open(sock)
Process.kill("STOP", src_pid.to_i) Process.kill('STOP', src_pid.to_i)
ctrl.write( "break\n" ) ctrl.write("break\n")
ctrl.close_write ctrl.close_write
client.write_hello client.write_hello
Process.kill("CONT", src_pid.to_i) Process.kill('CONT', src_pid.to_i)
fail "Unexpected control response" unless raise 'Unexpected control response' unless
ctrl.read =~ /0: mirror stopped/ ctrl.read =~ /0: mirror stopped/
client2 = nil client2 = nil
begin begin
client2 = server.accept( "Expected timeout" ) client2 = server.accept('Expected timeout')
fail "Unexpected reconnection" raise 'Unexpected reconnection'
rescue Timeout::Error rescue Timeout::Error
# expected # expected
end end
client.close client.close
exit(0) exit(0)

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# Receive a mirror, and disconnect after sending the entrust reply but # Receive a mirror, and disconnect after sending the entrust reply but
# before it can send the disconnect signal. # before it can send the disconnect signal.
# #
@@ -11,26 +9,25 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port, src_pid = *ARGV addr, port, src_pid = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept client = server.accept
client.write_hello client.write_hello
while (req = client.read_request; req[:type] == 1) while req = client.read_request; req[:type] == 1
client.read_data( req[:len] ) client.read_data(req[:len])
client.write_reply( req[:handle] ) client.write_reply(req[:handle])
end end
system "kill -STOP #{src_pid}" system "kill -STOP #{src_pid}"
client.write_reply( req[:handle] ) client.write_reply(req[:handle])
client.close client.close
system "kill -CONT #{src_pid}" system "kill -CONT #{src_pid}"
sleep( 0.25 ) sleep(0.25)
client2 = server.accept( "Timed out waiting for a reconnection" ) client2 = server.accept('Timed out waiting for a reconnection')
client2.close client2.close
server.close server.close
$stderr.puts "done" warn 'done'
exit(0) exit(0)

View File

@@ -10,12 +10,12 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept( "Timed out waiting for a connection" ) client = server.accept('Timed out waiting for a connection')
client.write_hello client.write_hello
client.close client.close
new_client = server.accept( "Timed out waiting for a reconnection" ) new_client = server.accept('Timed out waiting for a reconnection')
new_client.close new_client.close
server.close server.close

View File

@@ -11,13 +11,13 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept( "Timed out waiting for a connection" ) client = server.accept('Timed out waiting for a connection')
client.write_hello client.write_hello
client.read_request client.read_request
client.close client.close
new_client = server.accept( "Timed out waiting for a reconnection" ) new_client = server.accept('Timed out waiting for a reconnection')
new_client.close new_client.close
server.close server.close

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# Open a server, accept a client, then we expect a single write # Open a server, accept a client, then we expect a single write
# followed by an entrust. However, we disconnect after the write so # followed by an entrust. However, we disconnect after the write so
# the entrust will fail. We don't expect a reconnection: the sender # the entrust will fail. We don't expect a reconnection: the sender
@@ -10,16 +8,16 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port, src_pid = *ARGV addr, port, src_pid = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept client = server.accept
client.write_hello client.write_hello
req = client.read_request req = client.read_request
data = client.read_data( req[:len] ) data = client.read_data(req[:len])
Process.kill("STOP", src_pid.to_i) Process.kill('STOP', src_pid.to_i)
client.write_reply( req[:handle], 0 ) client.write_reply(req[:handle], 0)
client.close client.close
Process.kill("CONT", src_pid.to_i) Process.kill('CONT', src_pid.to_i)
exit(0) exit(0)

View File

@@ -1,19 +1,16 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
require 'flexnbd/fake_dest' require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept client = server.accept
client.write_hello client.write_hello
handle = client.read_request[:handle] handle = client.read_request[:handle]
client.write_error( handle ) client.write_error(handle)
client2 = server.accept('Timed out waiting for a reconnection')
client2 = server.accept( "Timed out waiting for a reconnection" )
client.close client.close
client2.close client2.close

View File

@@ -14,8 +14,8 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept( "Client didn't make a connection" ) client = server.accept("Client didn't make a connection")
# Sleep for one second past the timeout (a bit of slop in case ruby # Sleep for one second past the timeout (a bit of slop in case ruby
# doesn't launch things quickly) # doesn't launch things quickly)
@@ -26,10 +26,10 @@ client.close
# Invert the sense of the timeout exception, since we *don't* want a # Invert the sense of the timeout exception, since we *don't* want a
# connection attempt # connection attempt
begin begin
server.accept( "Expected timeout" ) server.accept('Expected timeout')
fail "Unexpected reconnection" raise 'Unexpected reconnection'
rescue Timeout::Error rescue Timeout::Error
# expected # expected
end end
server.close server.close

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# Open a socket, say hello, receive a write, then sleep for > # Open a socket, say hello, receive a write, then sleep for >
# MS_REQUEST_LIMIT_SECS seconds. This should tell the source that the # MS_REQUEST_LIMIT_SECS seconds. This should tell the source that the
# write has gone MIA, and we expect a reconnect. # write has gone MIA, and we expect a reconnect.
@@ -9,24 +7,24 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client1 = server.accept( server ) client1 = server.accept(server)
client1.write_hello client1.write_hello
client1.read_request client1.read_request
t = Thread.start do t = Thread.start do
client2 = server.accept( "Timed out waiting for a reconnection", client2 = server.accept('Timed out waiting for a reconnection',
FlexNBD::MS_REQUEST_LIMIT_SECS + 2 ) FlexNBD::MS_REQUEST_LIMIT_SECS + 2)
client2.close client2.close
end end
sleep_time = if ENV.has_key?('FLEXNBD_MS_REQUEST_LIMIT_SECS') sleep_time = if ENV.key?('FLEXNBD_MS_REQUEST_LIMIT_SECS')
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'].to_f ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'].to_f
else else
FlexNBD::MS_REQUEST_LIMIT_SECS FlexNBD::MS_REQUEST_LIMIT_SECS
end end
sleep( sleep_time + 2.0 ) sleep(sleep_time + 2.0)
client1.close client1.close
t.join t.join

View File

@@ -7,21 +7,21 @@ include FlexNBD
Thread.abort_on_exception Thread.abort_on_exception
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client1 = server.accept client1 = server.accept
# We don't expect a reconnection attempt. # We don't expect a reconnection attempt.
t = Thread.new do t = Thread.new do
begin begin
client2 = server.accept( "Timed out waiting for a reconnection", client2 = server.accept('Timed out waiting for a reconnection',
FlexNBD::MS_RETRY_DELAY_SECS + 1 ) FlexNBD::MS_RETRY_DELAY_SECS + 1)
fail "Unexpected reconnection" raise 'Unexpected reconnection'
rescue Timeout::Error rescue Timeout::Error
#expected # expected
end end
end end
client1.write_hello( :magic => :wrong ) client1.write_hello(magic: :wrong)
t.join t.join

View File

@@ -9,7 +9,7 @@ include FlexNBD
Thread.abort_on_exception = true Thread.abort_on_exception = true
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept client = server.accept
t = Thread.new do t = Thread.new do
@@ -18,21 +18,21 @@ t = Thread.new do
# so it makes no sense to continue. This means we have to invert the # so it makes no sense to continue. This means we have to invert the
# sense of the exception. # sense of the exception.
begin begin
client2 = server.accept( "Timed out waiting for a reconnection", client2 = server.accept('Timed out waiting for a reconnection',
FlexNBD::MS_RETRY_DELAY_SECS + 1 ) FlexNBD::MS_RETRY_DELAY_SECS + 1)
client2.close client2.close
fail "Unexpected reconnection." raise 'Unexpected reconnection.'
rescue Timeout::Error rescue Timeout::Error
end end
end end
client.write_hello( :size => :wrong ) client.write_hello(size: :wrong)
t.join t.join
# Now check that the source closed the first socket (yes, this was an # Now check that the source closed the first socket (yes, this was an
# actual bug) # actual bug)
fail "Didn't close socket" unless client.disconnected? raise "Didn't close socket" unless client.disconnected?
exit 0 exit 0

View File

@@ -7,18 +7,16 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
server.accept.close server.accept.close
begin begin
server.accept server.accept
fail "Unexpected reconnection" raise 'Unexpected reconnection'
rescue Timeout::Error rescue Timeout::Error
# expected # expected
end end
server.close server.close
exit(0) exit(0)

View File

@@ -8,8 +8,8 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port, pid = *ARGV addr, port, pid = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept( "Timed out waiting for a connection" ) client = server.accept('Timed out waiting for a connection')
client.write_hello client.write_hello
Process.kill(15, pid.to_i) Process.kill(15, pid.to_i)

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# Accept a connection, write hello, wait for a write request, read the # Accept a connection, write hello, wait for a write request, read the
# data, then write back a reply with a bad magic field. We then # data, then write back a reply with a bad magic field. We then
# expect a reconnect. # expect a reconnect.
@@ -9,13 +7,13 @@ require 'flexnbd/fake_dest'
include FlexNBD include FlexNBD
addr, port = *ARGV addr, port = *ARGV
server = FakeDest.new( addr, port ) server = FakeDest.new(addr, port)
client = server.accept client = server.accept
client.write_hello client.write_hello
req = client.read_request req = client.read_request
client.read_data( req[:len] ) client.read_data(req[:len])
client.write_reply( req[:handle], 0, :magic => :wrong ) client.write_reply(req[:handle], 0, magic: :wrong)
client2 = server.accept client2 = server.accept
client.close client.close

View File

@@ -11,13 +11,13 @@ include FlexNBD
addr, port = *ARGV addr, port = *ARGV
FakeSource.new( addr, port, "Failed to connect" ).close FakeSource.new(addr, port, 'Failed to connect').close
# Sleep to be sure we don't try to connect too soon. That wouldn't # Sleep to be sure we don't try to connect too soon. That wouldn't
# be a problem for the destination, but it would prevent us from # be a problem for the destination, but it would prevent us from
# determining success or failure here in the case where we try to # determining success or failure here in the case where we try to
# reconnect before the destination has tidied up after the first # reconnect before the destination has tidied up after the first
# thread went away. # thread went away.
sleep(0.5) sleep(0.5)
FakeSource.new( addr, port, "Failed to reconnect" ).close FakeSource.new(addr, port, 'Failed to reconnect').close
exit 0 exit 0

View File

@@ -11,10 +11,10 @@ include FlexNBD
addr, port, srv_pid = *ARGV addr, port, srv_pid = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
client.read_hello client.read_hello
client.write_write_request( 0, 8 ) client.write_write_request(0, 8)
client.write_data( "12345678" ) client.write_data('12345678')
# Use system "kill" rather than Process.kill because Process.kill # Use system "kill" rather than Process.kill because Process.kill
# doesn't seem to work # doesn't seem to work
@@ -25,12 +25,11 @@ client.close
system "kill -CONT #{srv_pid}" system "kill -CONT #{srv_pid}"
sleep(0.25) sleep(0.25)
begin begin
client2 = FakeSource.new( addr, port, "Expected timeout" ) client2 = FakeSource.new(addr, port, 'Expected timeout')
fail "Unexpected reconnection" raise 'Unexpected reconnection'
rescue Timeout::Error rescue Timeout::Error
# expected # expected
end end

View File

@@ -10,10 +10,10 @@ include FlexNBD
addr, port, srv_pid = *ARGV addr, port, srv_pid = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
client.read_hello client.read_hello
client.write_write_request( 0, 8 ) client.write_write_request(0, 8)
client.write_data( "12345678" ) client.write_data('12345678')
client.write_entrust_request client.write_entrust_request
client.read_response client.read_response
@@ -21,13 +21,11 @@ client.close
sleep(0.25) sleep(0.25)
begin begin
client2 = FakeSource.new( addr, port, "Expected timeout" ) client2 = FakeSource.new(addr, port, 'Expected timeout')
fail "Unexpected reconnection" raise 'Unexpected reconnection'
rescue Timeout::Error rescue Timeout::Error
# expected # expected
end end
exit(0) exit(0)

View File

@@ -12,13 +12,12 @@ include FlexNBD
addr, port = *ARGV addr, port = *ARGV
client = FakeSource.new(addr, port, 'Timed out connecting.')
client = FakeSource.new( addr, port, "Timed out connecting." )
client.read_hello client.read_hello
client.close client.close
sleep(0.2) sleep(0.2)
FakeSource.new( addr, port, "Timed out reconnecting." ) FakeSource.new(addr, port, 'Timed out reconnecting.')
exit(0) exit(0)

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# We connect, pause the server, issue a write request, disconnect, # We connect, pause the server, issue a write request, disconnect,
# then cont the server. This ensures that our disconnect happens # then cont the server. This ensures that our disconnect happens
# while the server is trying to read the write data. # while the server is trying to read the write data.
@@ -10,11 +8,11 @@ include FlexNBD
addr, port, srv_pid = *ARGV addr, port, srv_pid = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
client.read_hello client.read_hello
system "kill -STOP #{srv_pid}" system "kill -STOP #{srv_pid}"
client.write_write_request( 0, 8 ) client.write_write_request(0, 8)
client.close client.close
system "kill -CONT #{srv_pid}" system "kill -CONT #{srv_pid}"
@@ -24,7 +22,7 @@ system "kill -CONT #{srv_pid}"
sleep(0.25) sleep(0.25)
# ...and can we reconnect? # ...and can we reconnect?
client2 = FakeSource.new( addr, port, "Timed out connecting" ) client2 = FakeSource.new(addr, port, 'Timed out connecting')
client2.close client2.close
exit(0) exit(0)

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# We connect, pause the server, issue a write request, send data, # We connect, pause the server, issue a write request, send data,
# disconnect, then cont the server. This ensures that our disconnect # disconnect, then cont the server. This ensures that our disconnect
# happens before the server can try to write the reply. # happens before the server can try to write the reply.
@@ -10,13 +8,13 @@ include FlexNBD
addr, port, srv_pid = *ARGV addr, port, srv_pid = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
client.read_hello client.read_hello
system "kill -STOP #{srv_pid}" system "kill -STOP #{srv_pid}"
client.write_write_request( 0, 8 ) client.write_write_request(0, 8)
client.write_data( "12345678" ) client.write_data('12345678')
client.close client.close
system "kill -CONT #{srv_pid}" system "kill -CONT #{srv_pid}"
@@ -27,7 +25,7 @@ system "kill -CONT #{srv_pid}"
sleep(0.25) sleep(0.25)
# ...and can we reconnect? # ...and can we reconnect?
client2 = FakeSource.new( addr, port, "Timed out reconnecting" ) client2 = FakeSource.new(addr, port, 'Timed out reconnecting')
client2.close client2.close
exit(0) exit(0)

View File

@@ -8,10 +8,9 @@ include FlexNBD
addr, port, srv_pid, newaddr, newport = *ARGV addr, port, srv_pid, newaddr, newport = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
client.write_read_request( 0, 8 ) client.write_read_request(0, 8)
client.read_raw( 4 ) client.read_raw(4)
client.close client.close
exit(0) exit(0)

View File

@@ -10,9 +10,9 @@ include FlexNBD
addr, port = *ARGV addr, port = *ARGV
client1 = FakeSource.new( addr, port, "Timed out connecting" ) client1 = FakeSource.new(addr, port, 'Timed out connecting')
sleep(0.25) sleep(0.25)
client2 = FakeSource.new( addr, port, "Timed out connecting a second time" ) client2 = FakeSource.new(addr, port, 'Timed out connecting a second time')
# This is the expected source crashing after connect # This is the expected source crashing after connect
client1.close client1.close

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# We connect from a local address which should be blocked, sleep for a # We connect from a local address which should be blocked, sleep for a
# bit, then try to read from the socket. We should get an instant EOF # bit, then try to read from the socket. We should get an instant EOF
# as we've been cut off by the destination. # as we've been cut off by the destination.
@@ -11,10 +9,9 @@ include FlexNBD
addr, port = *ARGV addr, port = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting", "127.0.0.6" ) client = FakeSource.new(addr, port, 'Timed out connecting', '127.0.0.6')
sleep( 0.25 ) sleep(0.25)
rsp = client.disconnected? ? 0 : 1 rsp = client.disconnected? ? 0 : 1
client.close client.close
exit(rsp) exit(rsp)

View File

@@ -7,10 +7,10 @@
# listening for an incoming migration. # listening for an incoming migration.
addr, port = *ARGV addr, port = *ARGV
require "flexnbd/fake_source" require 'flexnbd/fake_source'
include FlexNBD include FlexNBD
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
client.read_hello client.read_hello
# Now we do two things: # Now we do two things:
@@ -24,16 +24,16 @@ client.read_hello
kidpid = fork do kidpid = fork do
client.close client.close
new_client = nil new_client = nil
sleep( FlexNBD::CLIENT_MAX_WAIT_SECS + 1 ) sleep(FlexNBD::CLIENT_MAX_WAIT_SECS + 1)
new_client = FakeSource.new( addr, port, "Timed out reconnecting." ) new_client = FakeSource.new(addr, port, 'Timed out reconnecting.')
new_client.read_hello new_client.read_hello
exit 0 exit 0
end end
# Sleep for longer than the child, to give the flexnbd process a bit # Sleep for longer than the child, to give the flexnbd process a bit
# of slop # of slop
sleep( FlexNBD::CLIENT_MAX_WAIT_SECS + 3 ) sleep(FlexNBD::CLIENT_MAX_WAIT_SECS + 3)
client.close client.close
_,status = Process.waitpid2( kidpid ) _, status = Process.waitpid2(kidpid)
exit status.exitstatus exit status.exitstatus

View File

@@ -9,10 +9,10 @@ include FlexNBD
addr, port, pid = *ARGV addr, port, pid = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting." ) client = FakeSource.new(addr, port, 'Timed out connecting.')
client.read_hello client.read_hello
Process.kill( "TERM", pid.to_i ) Process.kill('TERM', pid.to_i)
sleep(0.2) sleep(0.2)
client.close client.close

View File

@@ -9,10 +9,9 @@ include FlexNBD
addr, port, srv_pid, newaddr, newport = *ARGV addr, port, srv_pid, newaddr, newport = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
client.send_mirror() client.send_mirror
sleep(1) sleep(1)
exit(0)
exit( 0 )

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# encoding: utf-8
# Connect, read the hello then make a write request with an impossible # Connect, read the hello then make a write request with an impossible
# (from,len) pair. We expect an error response, and not to be # (from,len) pair. We expect an error response, and not to be
# disconnected. # disconnected.
@@ -13,20 +11,20 @@ include FlexNBD
addr, port = *ARGV addr, port = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting" ) client = FakeSource.new(addr, port, 'Timed out connecting')
hello = client.read_hello hello = client.read_hello
client.write_write_request( hello[:size]+1, 32, "myhandle" ) client.write_write_request(hello[:size] + 1, 32, 'myhandle')
client.write_data("1"*32) client.write_data('1' * 32)
response = client.read_response response = client.read_response
fail "Not an error" if response[:error] == 0 raise 'Not an error' if response[:error] == 0
fail "Wrong handle" unless "myhandle" == response[:handle] raise 'Wrong handle' unless response[:handle] == 'myhandle'
client.write_write_request( 0, 32 ) client.write_write_request(0, 32)
client.write_data( "2"*32 ) client.write_data('2' * 32)
success_response = client.read_response success_response = client.read_response
fail "Second write failed" unless success_response[:error] == 0 raise 'Second write failed' unless success_response[:error] == 0
client.close client.close
exit(0) exit(0)

View File

@@ -3,13 +3,13 @@
# #
class FileWriter class FileWriter
def initialize(filename, blocksize) def initialize(filename, blocksize)
@fh = File.open(filename, "w+") @fh = File.open(filename, 'w+')
@blocksize = blocksize @blocksize = blocksize
@pattern = "" @pattern = ''
end end
def size def size
@blocksize * @pattern.split("").size @blocksize * @pattern.split('').size
end end
# We write in fixed block sizes, given by "blocksize" # We write in fixed block sizes, given by "blocksize"
@@ -20,8 +20,8 @@ class FileWriter
def write(data) def write(data)
@pattern += data @pattern += data
data.split("").each do |code| data.split('').each do |code|
if code == "_" if code == '_'
@fh.seek(@blocksize, IO::SEEK_CUR) @fh.seek(@blocksize, IO::SEEK_CUR)
else else
@fh.write(data(code)) @fh.write(data(code))
@@ -31,15 +31,14 @@ class FileWriter
self self
end end
# Returns what the data ought to be at the given offset and length # Returns what the data ought to be at the given offset and length
# #
def read_original( off, len ) def read_original(off, len)
patterns = @pattern.split( "" ) patterns = @pattern.split('')
patterns.zip( (0...patterns.length).to_a ). patterns.zip((0...patterns.length).to_a)
map { |blk, blk_off| .map do |blk, blk_off|
data(blk, blk_off) data(blk, blk_off)
}.join[off...(off+len)] end.join[off...(off + len)]
end end
# Read what's actually in the file # Read what's actually in the file
@@ -60,68 +59,66 @@ class FileWriter
protected protected
def data(code, at=@fh.tell) def data(code, at = @fh.tell)
case code case code
when "0", "_" when '0', '_'
"\0" * @blocksize "\0" * @blocksize
when "X" when 'X'
"X" * @blocksize 'X' * @blocksize
when "f" when 'f'
r = "" r = ''
(@blocksize/4).times do (@blocksize / 4).times do
r += [at].pack("I") r += [at].pack('I')
at += 4 at += 4
end end
r r
else else
raise "Unknown character '#{block}'" raise "Unknown character '#{block}'"
end end
end end
end end
if __FILE__==$0 if $PROGRAM_NAME == __FILE__
require 'tempfile' require 'tempfile'
require 'test/unit' require 'test/unit'
class FileWriterTest < Test::Unit::TestCase class FileWriterTest < Test::Unit::TestCase
def test_read_original_zeros def test_read_original_zeros
Tempfile.open("test_read_original_zeros") do |tempfile| Tempfile.open('test_read_original_zeros') do |tempfile|
tempfile.close tempfile.close
file = FileWriter.new( tempfile.path, 4096 ) file = FileWriter.new(tempfile.path, 4096)
file.write( "0" ) file.write('0')
assert_equal file.read( 0, 4096 ), file.read_original( 0, 4096 ) assert_equal file.read(0, 4096), file.read_original(0, 4096)
assert( file.untouched?(0,4096) , "Untouched file was touched." ) assert(file.untouched?(0, 4096), 'Untouched file was touched.')
end end
end end
def test_read_original_offsets def test_read_original_offsets
Tempfile.open("test_read_original_offsets") do |tempfile| Tempfile.open('test_read_original_offsets') do |tempfile|
tempfile.close tempfile.close
file = FileWriter.new( tempfile.path, 4096 ) file = FileWriter.new(tempfile.path, 4096)
file.write( "f" ) file.write('f')
assert_equal file.read( 0, 4096 ), file.read_original( 0, 4096 ) assert_equal file.read(0, 4096), file.read_original(0, 4096)
assert( file.untouched?(0,4096) , "Untouched file was touched." ) assert(file.untouched?(0, 4096), 'Untouched file was touched.')
end end
end end
def test_file_size def test_file_size
Tempfile.open("test_file_size") do |tempfile| Tempfile.open('test_file_size') do |tempfile|
tempfile.close tempfile.close
file = FileWriter.new( tempfile.path, 4096 ) file = FileWriter.new(tempfile.path, 4096)
file.write( "f" ) file.write('f')
assert_equal 4096, File.stat( tempfile.path ).size assert_equal 4096, File.stat(tempfile.path).size
end end
end end
def test_read_original_size def test_read_original_size
Tempfile.open("test_read_original_offsets") do |tempfile| Tempfile.open('test_read_original_offsets') do |tempfile|
tempfile.close tempfile.close
file = FileWriter.new( tempfile.path, 4) file = FileWriter.new(tempfile.path, 4)
file.write( "f"*4 ) file.write('f' * 4)
assert_equal 4, file.read_original(0, 4).length assert_equal 4, file.read_original(0, 4).length
end end
end end
end end
end end

View File

@@ -7,25 +7,22 @@ require 'rexml/streamlistener'
Thread.abort_on_exception = true Thread.abort_on_exception = true
class Executor class Executor
attr_reader :pid attr_reader :pid
def run( cmd ) def run(cmd)
@pid = fork do exec cmd end @pid = fork { exec cmd }
end end
end # class Executor end # class Executor
class ValgrindExecutor class ValgrindExecutor
attr_reader :pid attr_reader :pid
def run( cmd ) def run(cmd)
@pid = fork do exec "valgrind --track-origins=yes --suppressions=custom.supp #{cmd}" end @pid = fork { exec "valgrind --track-origins=yes --suppressions=custom.supp #{cmd}" }
end end
end # class ValgrindExecutor end # class ValgrindExecutor
class ValgrindKillingExecutor class ValgrindKillingExecutor
attr_reader :pid attr_reader :pid
@@ -33,10 +30,10 @@ class ValgrindKillingExecutor
attr_accessor :what, :kind, :pid attr_accessor :what, :kind, :pid
attr_reader :backtrace attr_reader :backtrace
def initialize def initialize
@backtrace=[] @backtrace = []
@what = "" @what = ''
@kind = "" @kind = ''
@pid = "" @pid = ''
end end
def add_frame def add_frame
@@ -56,115 +53,104 @@ class ValgrindKillingExecutor
end end
def to_s def to_s
([@what + " (#{@kind}) in #{@pid}"] + @backtrace.map{|h| "#{h[:file]}:#{h[:line]} #{h[:fn]}" }).join("\n") ([@what + " (#{@kind}) in #{@pid}"] + @backtrace.map { |h| "#{h[:file]}:#{h[:line]} #{h[:fn]}" }).join("\n")
end end
end # class Error end # class Error
class ErrorListener class ErrorListener
include REXML::StreamListener include REXML::StreamListener
def initialize( killer ) def initialize(killer)
@killer = killer @killer = killer
@error = Error.new @error = Error.new
@found = false @found = false
end end
def text( text ) def text(text)
@text = text @text = text
end end
def tag_start(tag, attrs) def tag_start(tag, _attrs)
case tag.to_s case tag.to_s
when "error" when 'error'
@found = true @found = true
when "frame" when 'frame'
@error.add_frame @error.add_frame
end end
end end
def tag_end(tag) def tag_end(tag)
case tag.to_s case tag.to_s
when "what" when 'what'
@error.what = @text if @found @error.what = @text if @found
@text = "" @text = ''
when "kind" when 'kind'
@error.kind = @text if @found @error.kind = @text if @found
when "file" when 'file'
@error.add_file( @text ) if @found @error.add_file(@text) if @found
when "fn" when 'fn'
@error.add_fn( @text ) if @found @error.add_fn(@text) if @found
when "line" when 'line'
@error.add_line( @text ) if @found @error.add_line(@text) if @found
when "error", "stack" when 'error', 'stack'
if @found @killer.call(@error) if @found
@killer.call( @error ) when 'pid'
end @error.pid = @text
when "pid"
@error.pid=@text
end end
end end
end # class ErrorListener end # class ErrorListener
class DebugErrorListener < ErrorListener class DebugErrorListener < ErrorListener
def text( txt ) def text(txt)
print txt print txt
super( txt ) super(txt)
end end
def tag_start( tag, attrs ) def tag_start(tag, attrs)
print "<#{tag}>" print "<#{tag}>"
super( tag, attrs ) super(tag, attrs)
end end
def tag_end( tag ) def tag_end(tag)
print "</#{tag}>" print "</#{tag}>"
super( tag ) super(tag)
end end
end end
def initialize def initialize
@pid = nil @pid = nil
end end
def run( cmd ) def run(cmd)
@io_r, io_w = IO.pipe @io_r, io_w = IO.pipe
@pid = fork do exec( "valgrind --suppressions=custom.supp --xml=yes --xml-fd=#{io_w.fileno} " + cmd ) end @pid = fork { exec("valgrind --suppressions=custom.supp --xml=yes --xml-fd=#{io_w.fileno} " + cmd) }
launch_watch_thread( @pid, @io_r ) launch_watch_thread(@pid, @io_r)
@pid @pid
end end
def call(err)
def call( err ) warn '*' * 72
$stderr.puts "*"*72 warn '* Valgrind error spotted:'
$stderr.puts "* Valgrind error spotted:" warn err.to_s.split("\n").map { |s| " #{s}" }
$stderr.puts err.to_s.split("\n").map{|s| " #{s}"} warn '*' * 72
$stderr.puts "*"*72 Process.kill('KILL', @pid)
Process.kill( "KILL", @pid )
exit(1) exit(1)
end end
private private
def pick_listener def pick_listener
ENV['DEBUG'] ? DebugErrorListener : ErrorListener ENV['DEBUG'] ? DebugErrorListener : ErrorListener
end end
def launch_watch_thread(pid, io_r) def launch_watch_thread(_pid, io_r)
Thread.start do Thread.start do
io_source = REXML::IOSource.new( io_r ) io_source = REXML::IOSource.new(io_r)
listener = pick_listener.new( self ) listener = pick_listener.new(self)
REXML::Document.parse_stream( io_source, listener ) REXML::Document.parse_stream(io_source, listener)
end end
end end
end # class ValgrindExecutor end # class ValgrindExecutor
module FlexNBD module FlexNBD
# Noddy test class to exercise FlexNBD from the outside for testing. # Noddy test class to exercise FlexNBD from the outside for testing.
# #
@@ -173,7 +159,7 @@ module FlexNBD
class << self class << self
def counter def counter
Dir['tmp/*'].select{|f| File.file?(f)}.length + 1 Dir['tmp/*'].select { |f| File.file?(f) }.length + 1
end end
end end
@@ -189,19 +175,18 @@ module FlexNBD
end end
end end
def build_debug_opt def build_debug_opt
if @do_debug if @do_debug
"--verbose" '--verbose'
else else
"--quiet" '--quiet'
end end
end end
attr_accessor :prefetch_proxy attr_accessor :prefetch_proxy
def initialize( bin, ip, port ) def initialize(bin, ip, port)
@bin = bin @bin = bin
@do_debug = ENV['DEBUG'] @do_debug = ENV['DEBUG']
@debug = build_debug_opt @debug = build_debug_opt
raise "#{bin} not executable" unless File.executable?(bin) raise "#{bin} not executable" unless File.executable?(bin)
@@ -213,17 +198,15 @@ module FlexNBD
@prefetch_proxy = false @prefetch_proxy = false
end end
def debug? def debug?
!!@do_debug !!@do_debug
end end
def debug( msg ) def debug(msg)
$stderr.puts msg if debug? warn msg if debug?
end end
def serve_cmd(file, acl)
def serve_cmd( file, acl )
"#{bin} serve "\ "#{bin} serve "\
"--addr #{ip} "\ "--addr #{ip} "\
"--port #{port} "\ "--port #{port} "\
@@ -233,8 +216,7 @@ module FlexNBD
"#{acl.join(' ')}" "#{acl.join(' ')}"
end end
def listen_cmd(file, acl)
def listen_cmd( file, acl )
"#{bin} listen "\ "#{bin} listen "\
"--addr #{ip} "\ "--addr #{ip} "\
"--port #{port} "\ "--port #{port} "\
@@ -244,18 +226,17 @@ module FlexNBD
"#{acl.join(' ')}" "#{acl.join(' ')}"
end end
def proxy_cmd( connect_ip, connect_port ) def proxy_cmd(connect_ip, connect_port)
"#{bin}-proxy "\ "#{bin}-proxy "\
"--addr #{ip} "\ "--addr #{ip} "\
"--port #{port} "\ "--port #{port} "\
"--conn-addr #{connect_ip} "\ "--conn-addr #{connect_ip} "\
"--conn-port #{connect_port} "\ "--conn-port #{connect_port} "\
"#{prefetch_proxy ? "--cache " : ""}"\ "#{prefetch_proxy ? '--cache ' : ''}"\
"#{@debug}" "#{@debug}"
end end
def read_cmd(offset, length)
def read_cmd( offset, length )
"#{bin} read "\ "#{bin} read "\
"--addr #{ip} "\ "--addr #{ip} "\
"--port #{port} "\ "--port #{port} "\
@@ -264,8 +245,7 @@ module FlexNBD
"--size #{length}" "--size #{length}"
end end
def write_cmd(offset, data)
def write_cmd( offset, data )
"#{bin} write "\ "#{bin} write "\
"--addr #{ip} "\ "--addr #{ip} "\
"--port #{port} "\ "--port #{port} "\
@@ -274,30 +254,29 @@ module FlexNBD
"--size #{data.length}" "--size #{data.length}"
end end
def base_mirror_opts(dest_ip, dest_port)
def base_mirror_opts( dest_ip, dest_port )
"--addr #{dest_ip} "\ "--addr #{dest_ip} "\
"--port #{dest_port} "\ "--port #{dest_port} "\
"--sock #{ctrl} "\ "--sock #{ctrl} "\
end end
def unlink_mirror_opts( dest_ip, dest_port ) def unlink_mirror_opts(dest_ip, dest_port)
"#{base_mirror_opts( dest_ip, dest_port )} "\ "#{base_mirror_opts(dest_ip, dest_port)} "\
"--unlink " '--unlink '
end end
def base_mirror_cmd( opts ) def base_mirror_cmd(opts)
"#{@bin} mirror "\ "#{@bin} mirror "\
"#{opts} "\ "#{opts} "\
"#{@debug}" "#{@debug}"
end end
def mirror_cmd(dest_ip, dest_port) def mirror_cmd(dest_ip, dest_port)
base_mirror_cmd( base_mirror_opts( dest_ip, dest_port ) ) base_mirror_cmd(base_mirror_opts(dest_ip, dest_port))
end end
def mirror_unlink_cmd( dest_ip, dest_port ) def mirror_unlink_cmd(dest_ip, dest_port)
base_mirror_cmd( unlink_mirror_opts( dest_ip, dest_port ) ) base_mirror_cmd(unlink_mirror_opts(dest_ip, dest_port))
end end
def break_cmd def break_cmd
@@ -312,58 +291,64 @@ module FlexNBD
"#{@debug}" "#{@debug}"
end end
def acl_cmd( *acl ) def acl_cmd(*acl)
"#{@bin} acl " \ "#{@bin} acl " \
"--sock #{ctrl} "\ "--sock #{ctrl} "\
"#{@debug} "\ "#{@debug} "\
"#{acl.join " "}" "#{acl.join ' '}"
end end
def run_serve_cmd(cmd) def run_serve_cmd(cmd)
File.unlink(ctrl) if File.exists?(ctrl) File.unlink(ctrl) if File.exist?(ctrl)
debug( cmd ) debug(cmd)
@pid = @executor.run( cmd ) @pid = @executor.run(cmd)
while !File.socket?(ctrl) until File.socket?(ctrl)
pid, status = Process.wait2(@pid, Process::WNOHANG) pid, status = Process.wait2(@pid, Process::WNOHANG)
raise "server did not start (#{cmd})" if pid raise "server did not start (#{cmd})" if pid
sleep 0.1 sleep 0.1
end end
start_wait_thread(@pid)
start_wait_thread( @pid )
at_exit { kill } at_exit { kill }
end end
private :run_serve_cmd private :run_serve_cmd
def serve(file, *acl)
def serve( file, *acl) cmd = serve_cmd(file, acl)
cmd = serve_cmd( file, acl ) run_serve_cmd(cmd)
run_serve_cmd( cmd ) sleep(0.2) until File.exist?(ctrl)
sleep( 0.2 ) until File.exists?( ctrl )
end end
def listen(file, *acl) def listen(file, *acl)
run_serve_cmd( listen_cmd( file, acl ) ) run_serve_cmd(listen_cmd(file, acl))
end end
def tcp_server_open? def tcp_server_open?
# raises if the other side doesn't accept() # raises if the other side doesn't accept()
sock = TCPSocket.new(ip, port) rescue nil sock = begin
TCPSocket.new(ip, port)
rescue StandardError
nil
end
success = !!sock success = !!sock
( sock.close rescue nil) if sock if sock
(begin
sock.close
rescue StandardError
nil
end)
end
success success
end end
def proxy( connect_ip, connect_port ) def proxy(connect_ip, connect_port)
cmd = proxy_cmd( connect_ip, connect_port ) cmd = proxy_cmd(connect_ip, connect_port)
debug( cmd ) debug(cmd)
@pid = @executor.run( cmd ) @pid = @executor.run(cmd)
until tcp_server_open? until tcp_server_open?
pid, status = Process.wait2(@pid, Process::WNOHANG) pid, status = Process.wait2(@pid, Process::WNOHANG)
@@ -371,31 +356,29 @@ module FlexNBD
sleep 0.1 sleep 0.1
end end
start_wait_thread( @pid ) start_wait_thread(@pid)
at_exit { kill } at_exit { kill }
end end
def start_wait_thread(pid)
def start_wait_thread( pid )
@wait_thread = Thread.start do @wait_thread = Thread.start do
_, status = Process.waitpid2( pid ) _, status = Process.waitpid2(pid)
if @kill if @kill
if status.signaled? if status.signaled?
fail "flexnbd quit with a bad signal: #{status.inspect}" unless raise "flexnbd quit with a bad signal: #{status.inspect}" unless
@kill.include? status.termsig @kill.include? status.termsig
else else
fail "flexnbd quit with a bad status: #{status.inspect}" unless raise "flexnbd quit with a bad status: #{status.inspect}" unless
@kill.include? status.exitstatus @kill.include? status.exitstatus
end end
else else
$stderr.puts "flexnbd #{self.pid} quit" warn "flexnbd #{self.pid} quit"
fail "flexnbd #{self.pid} quit early with status #{status.to_i}" raise "flexnbd #{self.pid} quit early with status #{status.to_i}"
end end
end end
end end
def can_die(*status) def can_die(*status)
status = [0] if status.empty? status = [0] if status.empty?
@kill += status @kill += status
@@ -407,7 +390,7 @@ module FlexNBD
can_die(1) can_die(1)
if @pid if @pid
begin begin
Process.kill("INT", @pid) Process.kill('INT', @pid)
rescue Errno::ESRCH => e rescue Errno::ESRCH => e
# already dead. Presumably this means it went away after a # already dead. Presumably this means it went away after a
# can_die() call. # can_die() call.
@@ -417,63 +400,60 @@ module FlexNBD
end end
def read(offset, length) def read(offset, length)
cmd = read_cmd( offset, length ) cmd = read_cmd(offset, length)
debug( cmd ) debug(cmd)
IO.popen(cmd) do |fh| IO.popen(cmd) do |fh|
return fh.read return fh.read
end end
raise IOError.new "NBD read failed" unless $?.success? raise IOError, 'NBD read failed' unless $CHILD_STATUS.success?
out out
end end
def write(offset, data) def write(offset, data)
cmd = write_cmd( offset, data ) cmd = write_cmd(offset, data)
debug( cmd ) debug(cmd)
IO.popen(cmd, "w") do |fh| IO.popen(cmd, 'w') do |fh|
fh.write(data) fh.write(data)
end end
raise IOError.new "NBD write failed" unless $?.success? raise IOError, 'NBD write failed' unless $CHILD_STATUS.success?
nil nil
end end
def join def join
@wait_thread.join @wait_thread.join
end end
def mirror_unchecked(dest_ip, dest_port, _bandwidth = nil, _action = nil, timeout = nil)
cmd = mirror_cmd(dest_ip, dest_port)
debug(cmd)
def mirror_unchecked( dest_ip, dest_port, bandwidth=nil, action=nil, timeout=nil ) maybe_timeout(cmd, timeout)
cmd = mirror_cmd( dest_ip, dest_port)
debug( cmd )
maybe_timeout( cmd, timeout )
end end
def mirror_unlink(dest_ip, dest_port, timeout = nil)
cmd = mirror_unlink_cmd(dest_ip, dest_port)
debug(cmd)
def mirror_unlink( dest_ip, dest_port, timeout=nil ) maybe_timeout(cmd, timeout)
cmd = mirror_unlink_cmd( dest_ip, dest_port )
debug( cmd )
maybe_timeout( cmd, timeout )
end end
def maybe_timeout(cmd, timeout = nil)
def maybe_timeout(cmd, timeout=nil ) stdout = ''
stdout, stderr = "","" stderr = ''
stat = nil stat = nil
run = Proc.new do run = proc do
# Ruby 1.9 changed the popen3 api. instead of 3 args, the block # Ruby 1.9 changed the popen3 api. instead of 3 args, the block
# gets 4. Not only that, but it no longer sets $?, so we have to # gets 4. Not only that, but it no longer sets $?, so we have to
# go elsewhere for the process' exit status. # go elsewhere for the process' exit status.
Open3.popen3( cmd ) do |io_in, io_out, io_err, maybe_thr| Open3.popen3(cmd) do |io_in, io_out, io_err, maybe_thr|
io_in.close io_in.close
stdout.replace io_out.read stdout.replace io_out.read
stderr.replace io_err.read stderr.replace io_err.read
stat = maybe_thr.value if maybe_thr stat = maybe_thr.value if maybe_thr
end end
stat ||= $? stat ||= $CHILD_STATUS
end end
if timeout if timeout
@@ -485,85 +465,73 @@ module FlexNBD
[stdout, stderr, stat] [stdout, stderr, stat]
end end
def mirror(dest_ip, dest_port, bandwidth = nil, action = nil)
def mirror(dest_ip, dest_port, bandwidth=nil, action=nil) stdout, stderr, status = mirror_unchecked(dest_ip, dest_port, bandwidth, action)
stdout, stderr, status = mirror_unchecked( dest_ip, dest_port, bandwidth, action ) raise IOError, "Migrate command failed\n" + stderr unless status.success?
raise IOError.new( "Migrate command failed\n" + stderr) unless status.success?
stdout stdout
end end
def break(timeout = nil)
def break(timeout=nil)
cmd = break_cmd cmd = break_cmd
debug( cmd ) debug(cmd)
maybe_timeout( cmd, timeout ) maybe_timeout(cmd, timeout)
end end
def acl(*acl) def acl(*acl)
cmd = acl_cmd( *acl ) cmd = acl_cmd(*acl)
debug( cmd ) debug(cmd)
maybe_timeout( cmd, 2 ) maybe_timeout(cmd, 2)
end end
def status(timeout = nil)
cmd = status_cmd
debug(cmd)
def status( timeout = nil ) o, e = maybe_timeout(cmd, timeout)
cmd = status_cmd()
debug( cmd )
o,e = maybe_timeout( cmd, timeout )
[parse_status(o), e] [parse_status(o), e]
end end
def launched? def launched?
!!@pid !!@pid
end end
def paused def paused
Process.kill( "STOP", @pid ) Process.kill('STOP', @pid)
yield yield
ensure ensure
Process.kill( "CONT", @pid ) Process.kill('CONT', @pid)
end end
protected protected
def control_command(*args) def control_command(*args)
raise "Server not running" unless @pid raise 'Server not running' unless @pid
args = args.compact args = args.compact
UNIXSocket.open(@ctrl) do |u| UNIXSocket.open(@ctrl) do |u|
u.write(args.join("\n") + "\n") u.write(args.join("\n") + "\n")
code, message = u.readline.split(": ", 2) code, message = u.readline.split(': ', 2)
return [code, message] return [code, message]
end end
end end
def parse_status(status)
def parse_status( status )
hsh = {} hsh = {}
status.split(" ").each do |part| status.split(' ').each do |part|
next if part.strip.empty? next if part.strip.empty?
a,b = part.split("=") a, b = part.split('=')
b.strip! b.strip!
b = true if b == "true" b = true if b == 'true'
b = false if b == "false" b = false if b == 'false'
hsh[a.strip] = b hsh[a.strip] = b
end end
hsh hsh
end end
end end
end end

View File

@@ -1,10 +1,7 @@
# encoding: utf-8
module FlexNBD module FlexNBD
def self.binary(str)
def self.binary( str )
if str.respond_to? :force_encoding if str.respond_to? :force_encoding
str.force_encoding "ASCII-8BIT" str.force_encoding 'ASCII-8BIT'
else else
str str
end end
@@ -13,36 +10,33 @@ module FlexNBD
# eeevil is his one and only name... # eeevil is his one and only name...
def self.read_constants def self.read_constants
parents = [] parents = []
current = File.expand_path(".") current = File.expand_path('.')
while current != "/" while current != '/'
parents << current parents << current
current = File.expand_path( File.join( current, ".." ) ) current = File.expand_path(File.join(current, '..'))
end end
source_root = parents.find do |dirname| source_root = parents.find do |dirname|
File.directory?( File.join( dirname, "src" ) ) File.directory?(File.join(dirname, 'src'))
end end
fail "No source root!" unless source_root raise 'No source root!' unless source_root
headers = Dir[File.join( source_root, "src", "{common,proxy,server}","*.h" ) ] headers = Dir[File.join(source_root, 'src', '{common,proxy,server}', '*.h')]
headers.each do |header_filename| headers.each do |header_filename|
txt_lines = File.readlines( header_filename ) txt_lines = File.readlines(header_filename)
txt_lines.each do |line| txt_lines.each do |line|
if line =~ /^#\s*define\s+([A-Z0-9_]+)\s+(\d+)\s*$/ if line =~ /^#\s*define\s+([A-Z0-9_]+)\s+(\d+)\s*$/
# Bodge until I can figure out what to do with #ifdefs # Bodge until I can figure out what to do with #ifdefs
const_set($1, $2.to_i) unless const_defined?( $1 ) const_set(Regexp.last_match(1), Regexp.last_match(2).to_i) unless const_defined?(Regexp.last_match(1))
end end
end end
end end
end end
read_constants() read_constants
REQUEST_MAGIC = binary("\x25\x60\x95\x13") unless defined?(REQUEST_MAGIC) REQUEST_MAGIC = binary("\x25\x60\x95\x13") unless defined?(REQUEST_MAGIC)
REPLY_MAGIC = binary("\x67\x44\x66\x98") unless defined?(REPLY_MAGIC) REPLY_MAGIC = binary("\x67\x44\x66\x98") unless defined?(REPLY_MAGIC)
end # module FlexNBD end # module FlexNBD

View File

@@ -1,5 +1,3 @@
# encoding: utf-8
require 'socket' require 'socket'
require 'timeout' require 'timeout'
@@ -7,114 +5,104 @@ require 'flexnbd/constants'
module FlexNBD module FlexNBD
class FakeDest class FakeDest
class Client class Client
def initialize( sock ) def initialize(sock)
@sock = sock @sock = sock
end end
def write_hello(opts = {})
def write_hello( opts = {} ) @sock.write('NBDMAGIC')
@sock.write( "NBDMAGIC" )
if opts[:magic] == :wrong if opts[:magic] == :wrong
write_rand( @sock, 8 ) write_rand(@sock, 8)
else else
@sock.write( "\x00\x00\x42\x02\x81\x86\x12\x53" ) @sock.write("\x00\x00\x42\x02\x81\x86\x12\x53")
end end
if opts[:size] == :wrong if opts[:size] == :wrong
write_rand( @sock, 8 ) write_rand(@sock, 8)
else else
@sock.write( "\x00\x00\x00\x00\x00\x00\x10\x00" ) @sock.write("\x00\x00\x00\x00\x00\x00\x10\x00")
end end
@sock.write( "\x00" * 128 ) @sock.write("\x00" * 128)
end end
def write_rand(sock, len)
def write_rand( sock, len ) len.times { sock.write(rand(256).chr) }
len.times do sock.write( rand(256).chr ) end
end end
def read_request
def read_request()
req = @sock.read(28) req = @sock.read(28)
magic_s = req[0 ... 4 ] magic_s = req[0...4]
type_s = req[4 ... 8 ] type_s = req[4...8]
handle_s = req[8 ... 16] handle_s = req[8...16]
from_s = req[16 ... 24] from_s = req[16...24]
len_s = req[24 ... 28] len_s = req[24...28]
{ {
:magic => magic_s, magic: magic_s,
:type => type_s.unpack("N").first, type: type_s.unpack('N').first,
:handle => handle_s, handle: handle_s,
:from => self.class.parse_be64( from_s ), from: self.class.parse_be64(from_s),
:len => len_s.unpack( "N").first len: len_s.unpack('N').first
} }
end end
def write_error( handle ) def write_error(handle)
write_reply( handle, 1 ) write_reply(handle, 1)
end end
def disconnected? def disconnected?
begin Timeout.timeout(2) do
Timeout.timeout(2) do @sock.read(1).nil?
@sock.read(1) == nil
end
rescue Timeout::Error
return false
end end
rescue Timeout::Error
return false
end end
def write_reply( handle, err=0, opts={} ) def write_reply(handle, err = 0, opts = {})
if opts[:magic] == :wrong if opts[:magic] == :wrong
write_rand( @sock, 4 ) write_rand(@sock, 4)
else else
@sock.write( ::FlexNBD::REPLY_MAGIC ) @sock.write(::FlexNBD::REPLY_MAGIC)
end end
@sock.write( [err].pack("N") ) @sock.write([err].pack('N'))
@sock.write( handle ) @sock.write(handle)
end end
def close def close
@sock.close @sock.close
end end
def read_data(len)
def read_data( len ) @sock.read(len)
@sock.read( len )
end end
def write_data( len ) def write_data(len)
@sock.write( len ) @sock.write(len)
end end
def self.parse_be64(str) def self.parse_be64(str)
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
str.length == 8 str.length == 8
top, bottom = str.unpack("NN") top, bottom = str.unpack('NN')
(top << 32) + bottom (top << 32) + bottom
end end
def receive_mirror(opts = {})
def receive_mirror( opts = {} ) write_hello
write_hello()
loop do loop do
req = read_request req = read_request
case req[:type] case req[:type]
when 1 when 1
read_data( req[:len] ) read_data(req[:len])
write_reply( req[:handle] ) write_reply(req[:handle])
when 65536 when 65_536
write_reply( req[:handle], opts[:err] == :entrust ? 1 : 0 ) write_reply(req[:handle], opts[:err] == :entrust ? 1 : 0)
break break
else else
raise "Unexpected request: #{req.inspect}" raise "Unexpected request: #{req.inspect}"
@@ -129,16 +117,13 @@ module FlexNBD
raise "Not a disconnect: #{req.inspect}" raise "Not a disconnect: #{req.inspect}"
end end
end end
end # class Client end # class Client
def initialize(addr, port)
def initialize( addr, port ) @sock = TCPServer.new(addr, port)
@sock = TCPServer.new( addr, port )
end end
def accept(err_msg = 'Timed out waiting for a connection', timeout = 5)
def accept( err_msg = "Timed out waiting for a connection", timeout = 5)
client_sock = nil client_sock = nil
begin begin
@@ -146,21 +131,16 @@ module FlexNBD
client_sock = @sock.accept client_sock = @sock.accept
end end
rescue Timeout::Error rescue Timeout::Error
raise Timeout::Error.new(err_msg) raise Timeout::Error, err_msg
end end
client_sock client_sock
Client.new( client_sock ) Client.new(client_sock)
end end
def close def close
@sock.close @sock.close
end end
end # module FakeDest end # module FakeDest
end # module FlexNBD end # module FlexNBD

View File

@@ -1,29 +1,25 @@
# encoding: utf-8
require 'socket' require 'socket'
require "timeout" require 'timeout'
require 'flexnbd/constants' require 'flexnbd/constants'
module FlexNBD module FlexNBD
class FakeSource class FakeSource
def initialize(addr, port, err_msg, source_addr = nil, source_port = 0)
def initialize( addr, port, err_msg, source_addr=nil, source_port=0 ) timing_out(2, err_msg) do
timing_out( 2, err_msg ) do
begin begin
@sock = if source_addr @sock = if source_addr
TCPSocket.new( addr, port, source_addr, source_port ) TCPSocket.new(addr, port, source_addr, source_port)
else else
TCPSocket.new( addr, port ) TCPSocket.new(addr, port)
end end
rescue Errno::ECONNREFUSED rescue Errno::ECONNREFUSED
$stderr.puts "Connection refused, retrying" warn 'Connection refused, retrying'
sleep(0.2) sleep(0.2)
retry retry
end end
end end
end end
def close def close
@sock.close @sock.close
end end
@@ -31,8 +27,8 @@ module FlexNBD
def read_hello def read_hello
timing_out(::FlexNBD::MS_HELLO_TIME_SECS, timing_out(::FlexNBD::MS_HELLO_TIME_SECS,
'Timed out waiting for hello.') do 'Timed out waiting for hello.') do
fail 'No hello.' unless (hello = @sock.read(152)) && raise 'No hello.' unless (hello = @sock.read(152)) &&
hello.length == 152 hello.length == 152
passwd_s = hello[0..7] passwd_s = hello[0..7]
magic = hello[8..15].unpack('Q>').first magic = hello[8..15].unpack('Q>').first
@@ -44,97 +40,86 @@ module FlexNBD
end end
end end
def send_request(type, handle = 'myhandle', from = 0, len = 0, magic = REQUEST_MAGIC)
raise 'Bad handle' unless handle.length == 8
def send_request( type, handle="myhandle", from=0, len=0, magic=REQUEST_MAGIC ) @sock.write(magic)
fail "Bad handle" unless handle.length == 8 @sock.write([type].pack('N'))
@sock.write(handle)
@sock.write( magic ) @sock.write([n64(from)].pack('q'))
@sock.write( [type].pack( 'N' ) ) @sock.write([len].pack('N'))
@sock.write( handle )
@sock.write( [n64( from )].pack( 'q' ) )
@sock.write( [len].pack( 'N' ) )
end end
def write_write_request(from, len, handle = 'myhandle')
def write_write_request( from, len, handle="myhandle" ) send_request(1, handle, from, len)
send_request( 1, handle, from, len )
end end
def write_entrust_request(handle = 'myhandle')
def write_entrust_request( handle="myhandle" ) send_request(65_536, handle)
send_request( 65536, handle )
end end
def write_disconnect_request( handle="myhandle" ) def write_disconnect_request(handle = 'myhandle')
send_request( 2, handle ) send_request(2, handle)
end end
def write_read_request( from, len, handle="myhandle" ) def write_read_request(from, len, _handle = 'myhandle')
send_request( 0, "myhandle", from, len ) send_request(0, 'myhandle', from, len)
end end
def write_data(data)
def write_data( data ) @sock.write(data)
@sock.write( data )
end end
# Handy utility # Handy utility
def read( from, len ) def read(from, len)
timing_out( 2, "Timed out reading" ) do timing_out(2, 'Timed out reading') do
send_request( 0, "myhandle", from, len ) send_request(0, 'myhandle', from, len)
read_raw( len ) read_raw(len)
end end
end end
def read_raw( len ) def read_raw(len)
@sock.read( len ) @sock.read(len)
end end
def send_mirror def send_mirror
read_hello() read_hello
write( 0, "12345678" ) write(0, '12345678')
read_response() read_response
write_disconnect_request() write_disconnect_request
close() close
end end
def write(from, data)
def write( from, data ) write_write_request(from, data.length)
write_write_request( from, data.length ) write_data(data)
write_data( data )
end end
def read_response def read_response
magic = @sock.read(4) magic = @sock.read(4)
error_s = @sock.read(4) error_s = @sock.read(4)
handle = @sock.read(8) handle = @sock.read(8)
{ {
:magic => magic, magic: magic,
:error => error_s.unpack("N").first, error: error_s.unpack('N').first,
:handle => handle handle: handle
} }
end end
def disconnected? def disconnected?
result = nil result = nil
Timeout.timeout( 2 ) { result = ( @sock.read(1) == nil ) } Timeout.timeout(2) { result = @sock.read(1).nil? }
result result
end end
def timing_out(time, msg)
def timing_out( time, msg ) Timeout.timeout(time) do
begin yield
Timeout.timeout( time ) do
yield
end
rescue Timeout::Error
$stderr.puts msg
exit 1
end end
rescue Timeout::Error
warn msg
exit 1
end end
private private
@@ -144,15 +129,13 @@ module FlexNBD
# ) # )
def n64(b) def n64(b)
((b & 0xff00000000000000) >> 56) | ((b & 0xff00000000000000) >> 56) |
((b & 0x00ff000000000000) >> 40) | ((b & 0x00ff000000000000) >> 40) |
((b & 0x0000ff0000000000) >> 24) | ((b & 0x0000ff0000000000) >> 24) |
((b & 0x000000ff00000000) >> 8) | ((b & 0x000000ff00000000) >> 8) |
((b & 0x00000000ff000000) << 8) | ((b & 0x00000000ff000000) << 8) |
((b & 0x0000000000ff0000) << 24) | ((b & 0x0000000000ff0000) << 24) |
((b & 0x000000000000ff00) << 40) | ((b & 0x000000000000ff00) << 40) |
((b & 0x00000000000000ff) << 56) ((b & 0x00000000000000ff) << 56)
end end
end # class FakeSource end # class FakeSource
end # module FlexNBD end # module FlexNBD

View File

@@ -1,4 +1,4 @@
# encoding: utf-8
require 'flexnbd/fake_source' require 'flexnbd/fake_source'
require 'flexnbd/fake_dest' require 'flexnbd/fake_dest'
@@ -7,20 +7,23 @@ module ProxyTests
"\xFF".b "\xFF".b
end end
def with_proxied_client( override_size = nil ) def with_proxied_client(override_size = nil)
@env.serve1 unless @server_up @env.serve1 unless @server_up
@env.proxy2 unless @proxy_up @env.proxy2 unless @proxy_up
@env.nbd2.can_die(0) @env.nbd2.can_die(0)
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy") client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
begin begin
result = client.read_hello result = client.read_hello
assert_equal "NBDMAGIC", result[:passwd] assert_equal 'NBDMAGIC', result[:passwd]
assert_equal override_size || @env.file1.size, result[:size] assert_equal override_size || @env.file1.size, result[:size]
yield client yield client
ensure ensure
client.close rescue nil begin
client.close
rescue StandardError
nil
end
end end
end end
@@ -32,11 +35,11 @@ module ProxyTests
with_proxied_client do |client| with_proxied_client do |client|
(0..3).each do |n| (0..3).each do |n|
offset = n * 4096 offset = n * 4096
client.write_read_request(offset, 4096, "myhandle") client.write_read_request(offset, 4096, 'myhandle')
rsp = client.read_response rsp = client.read_response
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle] assert_equal 'myhandle', rsp[:handle]
assert_equal 0, rsp[:error] assert_equal 0, rsp[:error]
orig_data = @env.file1.read(offset, 4096) orig_data = @env.file1.read(offset, 4096)
@@ -45,8 +48,8 @@ module ProxyTests
assert_equal 4096, orig_data.size assert_equal 4096, orig_data.size
assert_equal 4096, data.size assert_equal 4096, data.size
assert_equal( orig_data, data, assert_equal(orig_data, data,
"Returned data does not match on request #{n+1}" ) "Returned data does not match on request #{n + 1}")
end end
end end
end end
@@ -59,12 +62,12 @@ module ProxyTests
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle] assert_equal 'myhandle', rsp[:handle]
assert_equal 0, rsp[:error] assert_equal 0, rsp[:error]
data = @env.file1.read(offset, 4096) data = @env.file1.read(offset, 4096)
assert_equal( ( b * 4096 ), data, "Data not written correctly (offset is #{n})" ) assert_equal((b * 4096), data, "Data not written correctly (offset is #{n})")
end end
end end
end end
@@ -78,7 +81,7 @@ module ProxyTests
sc = server.accept # just tell the supervisor we're up sc = server.accept # just tell the supervisor we're up
sc.write_hello sc.write_hello
[ server, sc ] [server, sc]
end end
end end
@@ -89,7 +92,7 @@ module ProxyTests
server, sc1 = maker.value server, sc1 = maker.value
# Send the read request to the proxy # Send the read request to the proxy
client.write_read_request( 0, 4096 ) client.write_read_request(0, 4096)
# ensure we're given the read request # ensure we're given the read request
req1 = sc1.read_request req1 = sc1.read_request
@@ -110,8 +113,8 @@ module ProxyTests
assert_equal req1, req2 assert_equal req1, req2
# The reply should be proxied back to the client. # The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] ) sc2.write_reply(req2[:handle])
sc2.write_data( b * 4096 ) sc2.write_data(b * 4096)
# Check it to make sure it's correct # Check it to make sure it's correct
rsp = Timeout.timeout(15) { client.read_response } rsp = Timeout.timeout(15) { client.read_response }
@@ -119,13 +122,12 @@ module ProxyTests
assert_equal 0, rsp[:error] assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle] assert_equal req1[:handle], rsp[:handle]
data = client.read_raw( 4096 ) data = client.read_raw(4096)
assert_equal( (b * 4096), data, "Wrong data returned" ) assert_equal((b * 4096), data, 'Wrong data returned')
sc2.close sc2.close
server.close server.close
end end
end end
def test_write_request_retried_when_upstream_dies_partway def test_write_request_retried_when_upstream_dies_partway
@@ -135,7 +137,7 @@ module ProxyTests
server, sc1 = maker.value server, sc1 = maker.value
# Send the read request to the proxy # Send the read request to the proxy
client.write( 0, ( b * 4096 ) ) client.write(0, (b * 4096))
# ensure we're given the read request # ensure we're given the read request
req1 = sc1.read_request req1 = sc1.read_request
@@ -143,8 +145,8 @@ module ProxyTests
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from] assert_equal 0, req1[:from]
assert_equal 4096, req1[:len] assert_equal 4096, req1[:len]
data1 = sc1.read_data( 4096 ) data1 = sc1.read_data(4096)
assert_equal( ( b * 4096 ), data1, "Data not proxied successfully" ) assert_equal((b * 4096), data1, 'Data not proxied successfully')
# Kill the server again, now we're sure the read request has been sent once # Kill the server again, now we're sure the read request has been sent once
sc1.close sc1.close
@@ -156,11 +158,11 @@ module ProxyTests
# And once reconnected, it should resend an identical request. # And once reconnected, it should resend an identical request.
req2 = sc2.read_request req2 = sc2.read_request
assert_equal req1, req2 assert_equal req1, req2
data2 = sc2.read_data( 4096 ) data2 = sc2.read_data(4096)
assert_equal data1, data2 assert_equal data1, data2
# The reply should be proxied back to the client. # The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] ) sc2.write_reply(req2[:handle])
# Check it to make sure it's correct # Check it to make sure it's correct
rsp = Timeout.timeout(15) { client.read_response } rsp = Timeout.timeout(15) { client.read_response }
@@ -174,8 +176,7 @@ module ProxyTests
end end
def test_only_one_client_can_connect_to_proxy_at_a_time def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |client| with_proxied_client do |_client|
c2 = nil c2 = nil
assert_raises(Timeout::Error) do assert_raises(Timeout::Error) do
Timeout.timeout(1) do Timeout.timeout(1) do
@@ -183,12 +184,13 @@ module ProxyTests
c2.read_hello c2.read_hello
end end
end end
c2.close rescue nil if c2 if c2
begin
c2.close
rescue StandardError
nil
end
end
end end
end end
end end

View File

@@ -1,13 +1,10 @@
# encoding: utf-8
require 'test/unit' require 'test/unit'
require 'environment' require 'environment'
class TestDestErrorHandling < Test::Unit::TestCase class TestDestErrorHandling < Test::Unit::TestCase
def setup def setup
@env = Environment.new @env = Environment.new
@env.writefile1( "0" * 4 ) @env.writefile1('0' * 4)
@env.listen1 @env.listen1
end end
@@ -15,89 +12,77 @@ class TestDestErrorHandling < Test::Unit::TestCase
@env.cleanup @env.cleanup
end end
def test_hello_blocked_by_disconnect_causes_error_not_fatal def test_hello_blocked_by_disconnect_causes_error_not_fatal
run_fake( "source/close_after_connect" ) run_fake('source/close_after_connect')
assert_no_control assert_no_control
end end
=begin # # This is disabled while CLIENT_MAX_WAIT_SECS is removed
# This is disabled while CLIENT_MAX_WAIT_SECS is removed # def test_hello_goes_astray_causes_timeout_error
def test_hello_goes_astray_causes_timeout_error # run_fake( "source/hang_after_hello" )
run_fake( "source/hang_after_hello" ) # assert_no_control
assert_no_control # end
end
=end
def test_sigterm_has_bad_exit_status def test_sigterm_has_bad_exit_status
@env.nbd1.can_die(1) @env.nbd1.can_die(1)
run_fake( "source/sigterm_after_hello" ) run_fake('source/sigterm_after_hello')
end end
def test_disconnect_after_hello_causes_error_not_fatal def test_disconnect_after_hello_causes_error_not_fatal
run_fake( "source/close_after_hello" ) run_fake('source/close_after_hello')
assert_no_control assert_no_control
end end
def test_partial_read_causes_error def test_partial_read_causes_error
run_fake( "source/close_mid_read" ) run_fake('source/close_mid_read')
end end
def test_double_connect_during_hello def test_double_connect_during_hello
run_fake( "source/connect_during_hello" ) run_fake('source/connect_during_hello')
end end
def test_acl_rejection def test_acl_rejection
@env.acl1("127.0.0.1") @env.acl1('127.0.0.1')
run_fake( "source/connect_from_banned_ip") run_fake('source/connect_from_banned_ip')
end end
def test_bad_write def test_bad_write
run_fake( "source/write_out_of_range" ) run_fake('source/write_out_of_range')
end end
def test_disconnect_before_write_data_causes_error def test_disconnect_before_write_data_causes_error
run_fake( "source/close_after_write" ) run_fake('source/close_after_write')
end end
def test_disconnect_before_write_reply_causes_error def test_disconnect_before_write_reply_causes_error
# Note that this is an odd case: writing the reply doesn't fail. # Note that this is an odd case: writing the reply doesn't fail.
# The test passes because the next attempt by flexnbd to read a # The test passes because the next attempt by flexnbd to read a
# request returns EOF. # request returns EOF.
run_fake( "source/close_after_write_data" ) run_fake('source/close_after_write_data')
end end
def test_straight_migration def test_straight_migration
@env.nbd1.can_die(0) @env.nbd1.can_die(0)
run_fake( "source/successful_transfer" ) run_fake('source/successful_transfer')
end end
private private
def run_fake( name )
@env.run_fake( name, @env.ip, @env.port1 ) def run_fake(name)
@env.run_fake(name, @env.ip, @env.port1)
assert @env.fake_reports_success, "#{name} failed." assert @env.fake_reports_success, "#{name} failed."
end end
def status def status
stat, _ = @env.status1 stat, = @env.status1
stat stat
end end
def assert_no_control def assert_no_control
assert !status['has_control'], "Thought it had control" assert !status['has_control'], 'Thought it had control'
end end
def assert_control def assert_control
assert status['has_control'], "Didn't think it had control" assert status['has_control'], "Didn't think it had control"
end end
end # class TestDestErrorHandling end # class TestDestErrorHandling

View File

@@ -1,5 +1,3 @@
# encoding: utf-8
require 'test/unit' require 'test/unit'
require 'environment' require 'environment'
require 'flexnbd/constants' require 'flexnbd/constants'
@@ -19,20 +17,18 @@ class TestHappyPath < Test::Unit::TestCase
@env.cleanup @env.cleanup
end end
def test_read1 def test_read1
@env.writefile1("f"*64) @env.writefile1('f' * 64)
@env.serve1 @env.serve1
[0, 12, 63].each do |num| [0, 12, 63].each do |num|
assert_equal( assert_equal(
bin( @env.nbd1.read(num*@env.blocksize, @env.blocksize) ), bin(@env.nbd1.read(num * @env.blocksize, @env.blocksize)),
bin( @env.file1.read(num*@env.blocksize, @env.blocksize) ) bin(@env.file1.read(num * @env.blocksize, @env.blocksize))
) )
end end
[124, 1200, 10028, 25488].each do |num| [124, 1200, 10_028, 25_488].each do |num|
assert_equal(bin(@env.nbd1.read(num, 4)), bin(@env.file1.read(num, 4))) assert_equal(bin(@env.nbd1.read(num, 4)), bin(@env.file1.read(num, 4)))
end end
end end
@@ -40,14 +36,14 @@ class TestHappyPath < Test::Unit::TestCase
# Check that we're not # Check that we're not
# #
def test_writeread1 def test_writeread1
@env.writefile1("0"*64) @env.writefile1('0' * 64)
@env.serve1 @env.serve1
[0, 12, 63].each do |num| [0, 12, 63].each do |num|
data = "X"*@env.blocksize data = 'X' * @env.blocksize
@env.nbd1.write(num*@env.blocksize, data) @env.nbd1.write(num * @env.blocksize, data)
assert_equal(data, @env.file1.read(num*@env.blocksize, data.size)) assert_equal(data, @env.file1.read(num * @env.blocksize, data.size))
assert_equal(data, @env.nbd1.read(num*@env.blocksize, data.size)) assert_equal(data, @env.nbd1.read(num * @env.blocksize, data.size))
end end
end end
@@ -55,115 +51,105 @@ class TestHappyPath < Test::Unit::TestCase
# up. # up.
# #
def test_writeread2 def test_writeread2
@env.writefile1("0"*1024) @env.writefile1('0' * 1024)
@env.serve1 @env.serve1
d0 = "\0"*@env.blocksize d0 = "\0" * @env.blocksize
d1 = "X"*@env.blocksize d1 = 'X' * @env.blocksize
(0..63).each do |num| (0..63).each do |num|
@env.nbd1.write(num*@env.blocksize*2, d1) @env.nbd1.write(num * @env.blocksize * 2, d1)
end end
(0..63).each do |num| (0..63).each do |num|
assert_equal(d0, @env.nbd1.read(((2*num)+1)*@env.blocksize, d0.size)) assert_equal(d0, @env.nbd1.read(((2 * num) + 1) * @env.blocksize, d0.size))
end end
end end
def setup_to_mirror def setup_to_mirror
@env.writefile1( "f"*4 ) @env.writefile1('f' * 4)
@env.serve1 @env.serve1
@env.writefile2( "0"*4 ) @env.writefile2('0' * 4)
@env.listen2 @env.listen2
end end
def test_mirror def test_mirror
@env.nbd1.can_die @env.nbd1.can_die
@env.nbd2.can_die(0) @env.nbd2.can_die(0)
setup_to_mirror() setup_to_mirror
stdout, stderr = @env.mirror12 stdout, stderr = @env.mirror12
@env.nbd1.join @env.nbd1.join
@env.nbd2.join @env.nbd2.join
assert( File.file?( @env.filename1 ), assert(File.file?(@env.filename1),
"The source file was incorrectly deleted") 'The source file was incorrectly deleted')
assert_equal(@env.file1.read_original( 0, @env.blocksize ), assert_equal(@env.file1.read_original(0, @env.blocksize),
@env.file2.read( 0, @env.blocksize ) ) @env.file2.read(0, @env.blocksize))
end end
def test_mirror_unlink def test_mirror_unlink
@env.nbd1.can_die(0) @env.nbd1.can_die(0)
@env.nbd2.can_die(0) @env.nbd2.can_die(0)
setup_to_mirror() setup_to_mirror
assert File.file?( @env.filename1 ) assert File.file?(@env.filename1)
stdout, stderr = @env.mirror12_unlink stdout, stderr = @env.mirror12_unlink
assert_no_match( /unrecognized/, stderr ) assert_no_match(/unrecognized/, stderr)
Timeout.timeout(10) { @env.nbd1.join }
Timeout.timeout(10) do @env.nbd1.join end assert !File.file?(@env.filename1)
assert !File.file?( @env.filename1 )
end end
def test_write_to_high_block def test_write_to_high_block
# #
# This test does not work on 32 bit platforms. # This test does not work on 32 bit platforms.
# #
skip("Not relevant on 32-bit platforms") if ( ["a"].pack("p").size < 8 ) skip('Not relevant on 32-bit platforms') if ['a'].pack('p').size < 8
# Create a large file, then try to write to somewhere after the 2G boundary # Create a large file, then try to write to somewhere after the 2G boundary
@env.truncate1 "4G" @env.truncate1 '4G'
@env.serve1 @env.serve1
@env.nbd1.write( 2**31+2**29, "12345678" ) @env.nbd1.write(2**31 + 2**29, '12345678')
sleep(1) sleep(1)
assert_equal "12345678", @env.nbd1.read( 2**31+2**29, 8 ) assert_equal '12345678', @env.nbd1.read(2**31 + 2**29, 8)
end end
def test_set_acl def test_set_acl
# Just check that we get sane feedback here # Just check that we get sane feedback here
@env.writefile1( "f"*4 ) @env.writefile1('f' * 4)
@env.serve1 @env.serve1
_,stderr = @env.acl1("127.0.0.1") _, stderr = @env.acl1('127.0.0.1')
assert_no_match( /^(F|E):/, stderr ) assert_no_match(/^(F|E):/, stderr)
end end
def test_write_more_than_one_run def test_write_more_than_one_run
one_mb = 2**20 one_mb = 2**20
data = "\0" * 256 * one_mb data = "\0" * 256 * one_mb
File.open(@env.filename1, "wb") do |f| f.write( "1" * 256 * one_mb ) end File.open(@env.filename1, 'wb') { |f| f.write('1' * 256 * one_mb) }
@env.serve1 @env.serve1
sleep 5 sleep 5
@env.write1( data ) @env.write1(data)
@env.nbd1.can_die(0) @env.nbd1.can_die(0)
@env.nbd1.kill @env.nbd1.kill
i = 0 i = 0
File.open(@env.filename1, "rb") do |f| File.open(@env.filename1, 'rb') do |f|
while mb = f.read( one_mb ) while mb = f.read(one_mb)
unless "\0"*one_mb == mb unless "\0" * one_mb == mb
msg = "Read non-zeros after offset %x:\n"%(i * one_mb) msg = format("Read non-zeros after offset %x:\n", (i * one_mb))
msg += `hexdump #{@env.filename1} | head -n5` msg += `hexdump #{@env.filename1} | head -n5`
fail msg raise msg
end end
i += 1 i += 1
end end
end end
end end
end end

View File

@@ -2,7 +2,6 @@ require 'test/unit'
require 'environment' require 'environment'
require 'proxy_tests' require 'proxy_tests'
class TestPrefetchProxyMode < Test::Unit::TestCase class TestPrefetchProxyMode < Test::Unit::TestCase
include ProxyTests include ProxyTests
@@ -10,7 +9,7 @@ class TestPrefetchProxyMode < Test::Unit::TestCase
super super
@env = Environment.new @env = Environment.new
@env.prefetch_proxy! @env.prefetch_proxy!
@env.writefile1( "f" * 16 ) @env.writefile1('f' * 16)
end end
def teardown def teardown
@@ -18,5 +17,3 @@ class TestPrefetchProxyMode < Test::Unit::TestCase
super super
end end
end end

View File

@@ -2,14 +2,13 @@ require 'test/unit'
require 'environment' require 'environment'
require 'proxy_tests' require 'proxy_tests'
class TestProxyMode < Test::Unit::TestCase class TestProxyMode < Test::Unit::TestCase
include ProxyTests include ProxyTests
def setup def setup
super super
@env = Environment.new @env = Environment.new
@env.writefile1( "f" * 16 ) @env.writefile1('f' * 16)
end end
def teardown def teardown
@@ -17,4 +16,3 @@ class TestProxyMode < Test::Unit::TestCase
super super
end end
end end

View File

@@ -4,12 +4,11 @@ require 'flexnbd/fake_source'
require 'pp' require 'pp'
class TestServeMode < Test::Unit::TestCase class TestServeMode < Test::Unit::TestCase
def setup def setup
super super
@b = "\xFF".b @b = "\xFF".b
@env = Environment.new @env = Environment.new
@env.writefile1( "0" ) @env.writefile1('0')
@env.serve1 @env.serve1
end end
@@ -19,7 +18,7 @@ class TestServeMode < Test::Unit::TestCase
end end
def connect_to_server def connect_to_server
client = FlexNBD::FakeSource.new(@env.ip, @env.port1, "Connecting to server failed") client = FlexNBD::FakeSource.new(@env.ip, @env.port1, 'Connecting to server failed')
begin begin
result = client.read_hello result = client.read_hello
assert_equal 'NBDMAGIC', result[:passwd] assert_equal 'NBDMAGIC', result[:passwd]
@@ -29,62 +28,63 @@ class TestServeMode < Test::Unit::TestCase
assert_equal "\x0" * 124, result[:reserved] assert_equal "\x0" * 124, result[:reserved]
yield client yield client
ensure ensure
client.close rescue nil begin
client.close
rescue StandardError
nil
end
end end
end end
def test_bad_request_magic_receives_error_response def test_bad_request_magic_receives_error_response
connect_to_server do |client| connect_to_server do |client|
# replace REQUEST_MAGIC with all 0s to make it look bad # replace REQUEST_MAGIC with all 0s to make it look bad
client.send_request( 0, "myhandle", 0, 0, "\x00\x00\x00\x00" ) client.send_request(0, 'myhandle', 0, 0, "\x00\x00\x00\x00")
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle] assert_equal 'myhandle', rsp[:handle]
assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}" assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}"
# The client should be disconnected now # The client should be disconnected now
assert client.disconnected?, "Server not disconnected" assert client.disconnected?, 'Server not disconnected'
end end
end end
def test_long_write_on_top_of_short_write_is_respected def test_long_write_on_top_of_short_write_is_respected
connect_to_server do |client| connect_to_server do |client|
# Start with a file of all-zeroes. # Start with a file of all-zeroes.
client.write( 0, "\x00" * @env.file1.size ) client.write(0, "\x00" * @env.file1.size)
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error] assert_equal 0, rsp[:error]
client.write( 0, @b ) client.write(0, @b)
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error] assert_equal 0, rsp[:error]
client.write( 0, @b * 2 ) client.write(0, @b * 2)
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error] assert_equal 0, rsp[:error]
end end
assert_equal @b * 2, @env.file1.read( 0, 2 ) assert_equal @b * 2, @env.file1.read(0, 2)
end end
def test_read_request_out_of_bounds_receives_error_response def test_read_request_out_of_bounds_receives_error_response
connect_to_server do |client| connect_to_server do |client|
client.write_read_request( @env.file1.size, 4096 ) client.write_read_request(@env.file1.size, 4096)
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle] assert_equal 'myhandle', rsp[:handle]
assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}" assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}"
# Ensure we're not disconnected by sending a request. We don't care about # Ensure we're not disconnected by sending a request. We don't care about
# whether the reply is good or not, here. # whether the reply is good or not, here.
client.write_read_request( 0, 4096 ) client.write_read_request(0, 4096)
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
end end
@@ -92,23 +92,18 @@ class TestServeMode < Test::Unit::TestCase
def test_write_request_out_of_bounds_receives_error_response def test_write_request_out_of_bounds_receives_error_response
connect_to_server do |client| connect_to_server do |client|
client.write( @env.file1.size, "\x00" * 4096 ) client.write(@env.file1.size, "\x00" * 4096)
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle] assert_equal 'myhandle', rsp[:handle]
assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}" assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}"
# Ensure we're not disconnected by sending a request. We don't care about # Ensure we're not disconnected by sending a request. We don't care about
# whether the reply is good or not, here. # whether the reply is good or not, here.
client.write( 0, "\x00" * @env.file1.size ) client.write(0, "\x00" * @env.file1.size)
rsp = client.read_response rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
end end
end end
end end

View File

@@ -1,126 +1,105 @@
# encoding: utf-8
require 'test/unit' require 'test/unit'
require 'environment' require 'environment'
class TestSourceErrorHandling < Test::Unit::TestCase class TestSourceErrorHandling < Test::Unit::TestCase
def setup def setup
@old_env = ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] @old_env = ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS']
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = "4.0" ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = '4.0'
@env = Environment.new @env = Environment.new
@env.writefile1( "f" * 4 ) @env.writefile1('f' * 4)
@env.serve1 @env.serve1
end end
def teardown def teardown
@env.nbd1.can_die(0) @env.nbd1.can_die(0)
@env.cleanup @env.cleanup
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = @old_env ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = @old_env
end end
def expect_term_during_migration def expect_term_during_migration
@env.nbd1.can_die(1,9) @env.nbd1.can_die(1, 9)
end end
def test_failure_to_connect_reported_in_mirror_cmd_response def test_failure_to_connect_reported_in_mirror_cmd_response
stdout, stderr = @env.mirror12_unchecked stdout, stderr = @env.mirror12_unchecked
expect_term_during_migration expect_term_during_migration
assert_match( /failed to connect/, stderr ) assert_match(/failed to connect/, stderr)
end end
def test_sigterm_after_hello_quits_with_status_of_1 def test_sigterm_after_hello_quits_with_status_of_1
expect_term_during_migration expect_term_during_migration
run_fake( "dest/sigterm_after_hello" ) run_fake('dest/sigterm_after_hello')
end end
def test_destination_hangs_after_connect_reports_error_at_source def test_destination_hangs_after_connect_reports_error_at_source
run_fake( "dest/hang_after_connect", run_fake('dest/hang_after_connect',
:err => /Remote server failed to respond/ ) err: /Remote server failed to respond/)
end end
def test_destination_rejects_connection_reports_error_at_source def test_destination_rejects_connection_reports_error_at_source
run_fake( "dest/reject_acl", run_fake('dest/reject_acl',
:err => /Mirror was rejected/ ) err: /Mirror was rejected/)
end end
def test_wrong_size_causes_disconnect def test_wrong_size_causes_disconnect
run_fake( "dest/hello_wrong_size", run_fake('dest/hello_wrong_size',
:err => /Remote size does not match local size/ ) err: /Remote size does not match local size/)
end end
def test_wrong_magic_causes_disconnect def test_wrong_magic_causes_disconnect
expect_term_during_migration expect_term_during_migration
run_fake( "dest/hello_wrong_magic", run_fake('dest/hello_wrong_magic',
:err => /Mirror was rejected/ ) err: /Mirror was rejected/)
end end
def test_disconnect_after_hello_causes_retry def test_disconnect_after_hello_causes_retry
expect_term_during_migration expect_term_during_migration
run_fake( "dest/close_after_hello", run_fake('dest/close_after_hello',
:out => /Mirror started/ ) out: /Mirror started/)
end end
def test_write_times_out_causes_retry def test_write_times_out_causes_retry
expect_term_during_migration expect_term_during_migration
run_fake( "dest/hang_after_write" ) run_fake('dest/hang_after_write')
end end
def test_rejected_write_causes_retry def test_rejected_write_causes_retry
expect_term_during_migration expect_term_during_migration
run_fake( "dest/error_on_write" ) run_fake('dest/error_on_write')
end end
def test_disconnect_before_write_reply_causes_retry def test_disconnect_before_write_reply_causes_retry
expect_term_during_migration expect_term_during_migration
run_fake( "dest/close_after_write" ) run_fake('dest/close_after_write')
end end
def test_bad_write_reply_causes_retry def test_bad_write_reply_causes_retry
expect_term_during_migration expect_term_during_migration
run_fake( "dest/write_wrong_magic" ) run_fake('dest/write_wrong_magic')
end end
def test_pre_entrust_disconnect_causes_retry def test_pre_entrust_disconnect_causes_retry
expect_term_during_migration expect_term_during_migration
run_fake( "dest/close_after_writes" ) run_fake('dest/close_after_writes')
end end
def test_cancel_migration def test_cancel_migration
run_fake( "dest/break_after_hello" ) run_fake('dest/break_after_hello')
end end
private private
def run_fake(name, opts = {}) def run_fake(name, opts = {})
@env.run_fake( name, @env.ip, @env.port2, @env.nbd1.ctrl ) @env.run_fake(name, @env.ip, @env.port2, @env.nbd1.ctrl)
stdout, stderr = @env.mirror12_unchecked stdout, stderr = @env.mirror12_unchecked
assert_success assert_success
assert_match( opts[:err], stderr ) if opts[:err] assert_match(opts[:err], stderr) if opts[:err]
assert_match( opts[:out], stdout ) if opts[:out] assert_match(opts[:out], stdout) if opts[:out]
return stdout, stderr [stdout, stderr]
end end
def assert_success( msg=nil ) def assert_success(msg = nil)
assert @env.fake_reports_success, msg || "Fake failed" assert @env.fake_reports_success, msg || 'Fake failed'
end end
end # class TestSourceErrorHandling end # class TestSourceErrorHandling

View File

@@ -9,102 +9,98 @@ require 'tmpdir'
Thread.abort_on_exception = true Thread.abort_on_exception = true
class TestWriteDuringMigration < Test::Unit::TestCase class TestWriteDuringMigration < Test::Unit::TestCase
def setup def setup
@flexnbd = File.expand_path("../../build/flexnbd") @flexnbd = File.expand_path('../../build/flexnbd')
raise "No binary!" unless File.executable?( @flexnbd ) raise 'No binary!' unless File.executable?(@flexnbd)
@size = 20 * 1024 * 1024 # 20MB
@size = 20*1024*1024 # 20MB @write_data = 'foo!' * 2048 # 8K write
@write_data = "foo!" * 2048 # 8K write
@source_port = 9990 @source_port = 9990
@dest_port = 9991 @dest_port = 9991
@source_sock = "src.sock" @source_sock = 'src.sock'
@dest_sock = "dst.sock" @dest_sock = 'dst.sock'
@source_file = "src.file" @source_file = 'src.file'
@dest_file = "dst.file" @dest_file = 'dst.file'
end end
def teardown def teardown
[@dst_proc, @src_proc].each do |pid| [@dst_proc, @src_proc].each do |pid|
if pid next unless pid
Process.kill( "KILL", pid ) rescue nil begin
Process.kill('KILL', pid)
rescue StandardError
nil
end end
end end
end end
def debug_arg def debug_arg
ENV['DEBUG'] ? "--verbose" : "" ENV['DEBUG'] ? '--verbose' : ''
end end
def launch_servers def launch_servers
@dst_proc = fork() { @dst_proc = fork do
cmd = "#{@flexnbd} listen -l 127.0.0.1 -p #{@dest_port} -f #{@dest_file} -s #{@dest_sock} #{debug_arg}" cmd = "#{@flexnbd} listen -l 127.0.0.1 -p #{@dest_port} -f #{@dest_file} -s #{@dest_sock} #{debug_arg}"
exec cmd exec cmd
} end
@src_proc = fork() { @src_proc = fork do
cmd = "#{@flexnbd} serve -l 127.0.0.1 -p #{@source_port} -f #{@source_file} -s #{@source_sock} #{debug_arg}" cmd = "#{@flexnbd} serve -l 127.0.0.1 -p #{@source_port} -f #{@source_file} -s #{@source_sock} #{debug_arg}"
exec cmd exec cmd
} end
begin begin
awaiting = nil awaiting = nil
Timeout.timeout(10) do Timeout.timeout(10) do
awaiting = :source awaiting = :source
sleep 0.1 while !File.exists?( @source_sock ) sleep 0.1 until File.exist?(@source_sock)
awaiting = :dest awaiting = :dest
sleep 0.1 while !File.exists?( @dest_sock ) sleep 0.1 until File.exist?(@dest_sock)
end end
rescue Timeout::Error rescue Timeout::Error
case awaiting case awaiting
when :source when :source
fail "Couldn't get a source socket." raise "Couldn't get a source socket."
when :dest when :dest
fail "Couldn't get a destination socket." raise "Couldn't get a destination socket."
else else
fail "Something went wrong I don't understand." raise "Something went wrong I don't understand."
end end
end end
end end
def make_files
def make_files()
FileUtils.touch(@source_file) FileUtils.touch(@source_file)
File.truncate(@source_file, @size) File.truncate(@source_file, @size)
FileUtils.touch(@dest_file) FileUtils.touch(@dest_file)
File.truncate(@dest_file, @size) File.truncate(@dest_file, @size)
File.open(@source_file, "wb"){|f| f.write "a"*@size } File.open(@source_file, 'wb') { |f| f.write 'a' * @size }
end end
def start_mirror def start_mirror
UNIXSocket.open(@source_sock) {|sock| UNIXSocket.open(@source_sock) do |sock|
sock.write(["mirror", "127.0.0.1", @dest_port.to_s, "exit"].join("\x0A") + "\x0A\x0A") sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A")
sock.flush sock.flush
rsp = sock.readline rsp = sock.readline
} end
end end
def wait_for_quit
def wait_for_quit() Timeout.timeout(10) do
Timeout.timeout( 10 ) do start_time = Time.now
start_time = Time.now dst_result = Process.waitpid2(@dst_proc)
dst_result = Process::waitpid2(@dst_proc) src_result = Process.waitpid2(@src_proc)
src_result = Process::waitpid2(@src_proc)
end end
end end
def source_writer def source_writer
client = FlexNBD::FakeSource.new( "127.0.0.1", @source_port, "Timed out connecting" ) client = FlexNBD::FakeSource.new('127.0.0.1', @source_port, 'Timed out connecting')
offsets = Range.new(0, (@size - @write_data.size) / 4096 ).to_a offsets = Range.new(0, (@size - @write_data.size) / 4096).to_a
loop do loop do
begin begin
client.write(offsets[rand(offsets.size)] * 4096, @write_data) client.write(offsets[rand(offsets.size)] * 4096, @write_data)
rescue => err rescue StandardError => err
# We expect a broken write at some point, so ignore it # We expect a broken write at some point, so ignore it
break break
end end
@@ -115,32 +111,32 @@ class TestWriteDuringMigration < Test::Unit::TestCase
# puts `md5sum #{@source_file} #{@dest_file}` # puts `md5sum #{@source_file} #{@dest_file}`
# Ensure each block matches # Ensure each block matches
File.open(@source_file, "r") do |source| File.open(@source_file, 'r') do |source|
File.open(@dest_file, "r") do |dest| File.open(@dest_file, 'r') do |dest|
0.upto( @size / 4096 ) do |block_num| 0.upto(@size / 4096) do |block_num|
s_data = source.read( 4096 ) s_data = source.read(4096)
d_data = dest.read( 4096 ) d_data = dest.read(4096)
assert s_data == d_data, "Block #{block_num} mismatch!" assert s_data == d_data, "Block #{block_num} mismatch!"
source.seek( 4096, IO::SEEK_CUR ) source.seek(4096, IO::SEEK_CUR)
dest.seek( 4096, IO::SEEK_CUR ) dest.seek(4096, IO::SEEK_CUR)
end end
end end
end end
end end
def test_write_during_migration def test_write_during_migration
Dir.mktmpdir() do |tmpdir| Dir.mktmpdir do |tmpdir|
Dir.chdir( tmpdir ) do Dir.chdir(tmpdir) do
make_files() make_files
launch_servers() launch_servers
src_writer = Thread.new { source_writer } src_writer = Thread.new { source_writer }
start_mirror() start_mirror
wait_for_quit() wait_for_quit
src_writer.join src_writer.join
assert_both_sides_identical assert_both_sides_identical
end end
@@ -148,24 +144,21 @@ class TestWriteDuringMigration < Test::Unit::TestCase
end end
def test_many_clients_during_migration def test_many_clients_during_migration
Dir.mktmpdir() do |tmpdir| Dir.mktmpdir do |tmpdir|
Dir.chdir( tmpdir ) do Dir.chdir(tmpdir) do
make_files() make_files
launch_servers() launch_servers
src_writers_1 = (1..5).collect { Thread.new { source_writer } } src_writers_1 = (1..5).collect { Thread.new { source_writer } }
start_mirror() start_mirror
src_writers_2 = (1..5).collect { Thread.new { source_writer } } src_writers_2 = (1..5).collect { Thread.new { source_writer } }
wait_for_quit() wait_for_quit
( src_writers_1 + src_writers_2 ).each {|t| t.join } (src_writers_1 + src_writers_2).each(&:join)
assert_both_sides_identical assert_both_sides_identical
end end
end end end end
end end