33 Commits
0.1.0 ... 0.1.1

Author SHA1 Message Date
Alex Young
83e3d65be9 Update the Makefile to work with dpkg-buildpackage 2014-02-21 19:39:27 +00:00
Alex Young
4f31bd9340 Switch from a rake-based build to a make-based build.
This commit beefs up the Makefile to do the build, instead of the
Rakefile.

It also removes from the Rakefile the dependency on rake_utils, which
should mean it's ok to build in a schroot.

The files are reorganised to make the Makefile rules more tractable,
although the reorganisation reveals a problem with our current code
organisation.

The problem is that the proxy-specific code transitively depends on the
server code via flexnbd.h, which has a circular dependency on the server
and client structs. This should be broken in a future commit by
separating the flexnbd struct into a shared config struct and
server-specific parts, so that the server code can be moved into
src/server to more accurately show the functional dependencies.
2014-02-21 19:10:55 +00:00
nick
0baf93fd7b proxy: Fix a read corruption issue caused by us failing to reset needles on timeout 2014-02-11 20:43:44 +00:00
nick
175f19b3e7 client: Add a cork TODO pair 2014-02-11 15:22:54 +00:00
nick
8d56316548 client: Start checking for exceptions on the client socket 2014-02-11 14:32:12 +00:00
nick
27f2cc7083 Some debug and whitespace tweaks 2014-02-11 14:31:58 +00:00
nick
8084a41ad2 flexnbd client: Catch a few cases where the killswitch wasn't disarmed 2014-01-28 11:45:27 +00:00
nick
5ca5858929 Increase a timeout on a test to handle slow unlink calls on other filesystems 2014-01-22 12:21:49 +00:00
nick
afcc07a181 Fix stop signal logic broken by the killswitch 2014-01-22 12:16:09 +00:00
nick
dcead04cf6 Fix up the check_util test once more 2014-01-22 12:10:34 +00:00
nick
4f7f5f1745 Fix a few dangling bits in client.h 2014-01-22 12:01:42 +00:00
nick
976e9ba07f Automated merge with ssh://dev.bytemark.co.uk//repos/flexnbd-c 2014-01-22 11:49:26 +00:00
nick
91d9531a60 flexnbd serve: Make the killswitch per-client-thread
This is a bit tricky, but calling shutdown() on a socket in a signal
handler is safe, and (at least in linux) appears to cause any read()
or write() calls blocked on that socket to return, even with SA_RESTART.

I'm not confident enough about the rest of flexnbd's syscall error
handling to turn SA_RESTART off for this signal...
2014-01-22 11:49:21 +00:00
nick
905d66af77 Rework a test 2014-01-22 11:45:35 +00:00
nick
eee7c9644c Another fedora build fix 2014-01-22 11:42:00 +00:00
nick
ce5c51cdcf Fix a test case 2014-01-22 11:40:19 +00:00
nick
c6c53c63ba Fix compilation on fedora 2014-01-22 10:39:29 +00:00
Tristan Heaven
20bd58749e Fix help_text errors for break and status modes 2013-11-07 16:45:04 +00:00
nick
866bf835e6 tests: Fix an uninitialized memory access 2013-10-30 22:46:49 +00:00
nick
53cbe14556 mirror: lengthen the request timeout to 60 seconds
This is complicated slightly by a need to keep the tests fast, so
we introduce an environment variable that can override the constant
2013-10-30 22:45:12 +00:00
nick
cd3281f62d acl: Make some compilers happy 2013-10-30 22:44:15 +00:00
nick
1e5457fed0 mirror: Couple of tiny cleanups 2013-10-30 22:04:41 +00:00
nick
0753369b77 mirror: Turn off the 'begin' timer before continuing 2013-10-30 20:25:50 +00:00
nick
9d9ae40953 Increase loglevel of some allocation map messages 2013-10-30 16:40:32 +00:00
nick
65d4f581b9 mirror: Clean up bps calculation slightly 2013-10-24 15:11:55 +01:00
nick
77c71ccf09 mirror: Ensure the bitset is actually disabled on mirror error 2013-10-23 16:18:00 +01:00
nick
97a923afdf mirror: Don't start migrating until the allocation map is built
There is a fun race that can happen if we begin migrating while the
allocation map is still building. We call bitset_enable_stream()
when the migration begins, which causes the builder to start putting
events into the stream. This is bad all by itself, as it slows the
migration down for no reason, but the stream is a limited-size queue
and there are situations (migration fails and is restarted) where we
can end up with the queue full and nobody able to empty it, freezing
the whole thing.
2013-10-23 15:58:47 +01:00
nick
335261869d mirror: Don't count bytes transferred for the purposes of keeping the stream empty as part of our bwlimit
This prevents a fairly nasty situation occurring where the rate of change on the disc is high enough that
just servicing it generates enough traffic to keep us over the bwlimit threshold indefinitely. That would
cause us to sleep during the only windows we'd ordinarily have to advance the offset.
2013-10-23 15:26:28 +01:00
nick
8cf9cae8c0 mirror: Don't sleep if our stream is filling up 2013-10-23 14:38:27 +01:00
nick
6986c70888 bitset: Swap pthread_cond_broadcast for pthread_cond_signal
Normally we'll only have one thread waiting anyway, but there's no
point activating a race here in the cases where we have > 1 waiting,
so signal is what we want.
2013-09-24 15:28:58 +01:00
nick
4b9ded0e1d bitset: More-efficient implementation of bitset_stream_queued_bytes
Rather than iterating the entire queue every time this function is
called, we instead take a small hit on enqueue and dequeue to keep
a running byte total keyed by event type that we can return.
2013-09-24 15:27:17 +01:00
nick
b177faacd6 mirror: Reduce the mirror convergence window to 5 seonds, from 60
Also remove some obsolete constants
2013-09-24 14:42:21 +01:00
nick
96e60a4a29 Added tag 0.1.0 for changeset acad9e9df53c 2013-09-24 12:27:29 +01:00
51 changed files with 455 additions and 468 deletions

102
Makefile
View File

@@ -1,10 +1,102 @@
#!/usr/bin/make -f
all:
rake build
VPATH=src:tests/unit
DESTDIR?=/
PREFIX?=/usr/local/bin
INSTALLDIR=$(DESTDIR)/$(PREFIX)
ifdef DEBUG
CFLAGS_EXTRA=-g -DDEBUG
LDFLAGS_EXTRA=-g
else
CFLAGS_EXTRA=-O2
endif
all-debug:
DEBUG=1 rake build
CCFLAGS=-D_GNU_SOURCE=1 -Wall -Wextra -Werror-implicit-function-declaration -Wstrict-prototypes -Wno-missing-field-initializers $(CFLAGS_EXTRA) $(CFLAGS)
LLDFLAGS=-lrt -lev $(LDFLAGS_EXTRA) $(LDFLAGS)
CC?=gcc
LIBS=-lpthread
INC=-I/usr/include/libev -Isrc/common -Isrc/server -Isrc/proxy
COMPILE=$(CC) $(INC) -c $(CCFLAGS)
SAVEDEP=$(CC) $(INC) -MM $(CCFLAGS)
LINK=$(CC) $(LLDFLAGS) -Isrc $(LIBS)
EXISTING_OBJS := $(wildcard build/*.o)
-include $(EXISTING_OBJS:.o=.d)
COMMON_SRC := $(wildcard src/common/*.c)
SERVER_SRC := $(wildcard src/server/*.c)
PROXY_SRC := $(wildcard src/proxy/*.c)
COMMON_OBJ := $(COMMON_SRC:src/%.c=build/%.o)
SERVER_OBJ := $(SERVER_SRC:src/%.c=build/%.o)
PROXY_OBJ := $(PROXY_SRC:src/%.c=build/%.o)
SRCS := $(COMMON_SRC) $(SERVER_SRC) $(PROXY_SRC)
OBJS := $(COMMON_OBJ) $(SERVER_OBJ) $(PROXY_OBJ)
all: build/flexnbd build/flexnbd-proxy doc
build/%.o: %.c
mkdir -p $(dir $@)
$(COMPILE) $< -o $@
$(SAVEDEP) $< > build/$*.d
objs: $(OBJS)
build/flexnbd: $(COMMON_OBJ) $(SERVER_OBJ) build/main.o
$(LINK) $^ -o $@
build/flexnbd-proxy: $(COMMON_OBJ) $(PROXY_OBJ) build/proxy-main.o
$(LINK) $^ -o $@
server: build/flexnbd
proxy: build/flexnbd-proxy
CHECK_SRC := $(wildcard tests/unit/*.c)
CHECK_OBJ := $(CHECK_SRC:tests/unit/%.c=build/tests/%.o)
# Why can't we reuse the build/%.o rule above? Not sure.
build/tests/%.o: tests/unit/%.c
mkdir -p $(dir $@)
$(COMPILE) $< -o $@
$(SAVEDEP) $< > build/tests/$*.d
CHECK_BINS := $(CHECK_OBJ:build/tests/%.o=build/tests/%)
build/tests/%: build/tests/%.o $(OBJS)
$(LINK) $^ -o $@ -lcheck
check_objs: $(CHECK_OBJ)
check_bins: $(CHECK_BINS)
check: $(CHECK_BINS)
for bin in $^; do $$bin; done
build/flexnbd.1: README.txt
a2x --destination-dir build --format manpage $<
build/flexnbd-proxy.1: README.proxy.txt
a2x --destination-dir build --format manpage $<
# If we don't pipe to file, gzip clobbers the original, causing make
# to rebuild each time
%.1.gz: %.1
gzip -c -f $< > $@
server-man: build/flexnbd.1.gz
proxy-man: build/flexnbd-proxy.1.gz
doc: server-man proxy-man
install:
mkdir -p $(INSTALLDIR)
cp build/flexnbd build/flexnbd-proxy $(INSTALLDIR)
clean:
rake clean
rm -rf build/*
.PHONY: clean objs check_objs all server proxy check_bins check server-man proxy-man doc

View File

@@ -25,7 +25,7 @@ COMMANDS
serve
~~~~~
$ flexnbd serve --addr <ADDR> --port <PORT> --file <FILE>
[--sock <SOCK>] [--default-deny] [global option]* [acl entry]*
[--sock <SOCK>] [--default-deny] [-k] [global option]* [acl entry]*
Serve a file. If any ACL entries are given (which should be IP
addresses), only those clients listed will be permitted to connect.
@@ -55,6 +55,12 @@ Options
empty ACL will let no clients connect. If it is not given, an
empty ACL will let any client connect.
*--killswitch, -k*:
If set, we implement a 2-minute timeout on NBD requests and
responses. If a request takes longer than that to complete,
the client is disconnected. This is useful to keep broken
clients from breaking migrations, among other things.
listen
~~~~~~

312
Rakefile
View File

@@ -1,85 +1,34 @@
$: << '../rake_utils/lib'
require 'rake_utils/debian'
include RakeUtils::DSL
# encoding: utf-8
CC=ENV['CC'] || "gcc"
DEBUG = ENV.has_key?('DEBUG') &&
%w|yes y ok 1 true t|.include?(ENV['DEBUG'])
ALL_SOURCES = FileList['src/*']
PROXY_ONLY_SOURCES = FileList['src/{proxy-main,proxy}.c']
PROXY_ONLY_OBJECTS = PROXY_ONLY_SOURCES.pathmap( "%{^src,build}X.o" )
SOURCES = ALL_SOURCES.select { |c| c =~ /\.c$/ } - PROXY_ONLY_SOURCES
OBJECTS = SOURCES.pathmap( "%{^src,build}X.o" ) - PROXY_ONLY_OBJECTS
PROXY_SOURCES = FileList['src/{ioutil,nbdtypes,readwrite,sockutil,util,parse}.c'] + PROXY_ONLY_SOURCES
PROXY_OBJECTS = PROXY_SOURCES.pathmap( "%{^src,build}X.o" )
TEST_SOURCES = FileList['tests/unit/*.c']
TEST_OBJECTS = TEST_SOURCES.pathmap( "%{^tests/unit,build/tests}X.o" )
LIBS = %w( pthread )
LDFLAGS = ["-lrt -lev"]
CCFLAGS = %w(
-D_GNU_SOURCE=1
-Wall
-Wextra
-Werror-implicit-function-declaration
-Wstrict-prototypes
-Wno-missing-field-initializers
) + # Added -Wno-missing-field-initializers to shut GCC up over {0} struct initialisers
[ENV['CFLAGS']]
LIBCHECK = File.exists?("/usr/lib/libcheck.a") ?
"/usr/lib/libcheck.a" :
"/usr/local/lib/libcheck.a"
TEST_MODULES = Dir["tests/unit/check_*.c"].map { |n|
File.basename( n )[%r{check_(.+)\.c},1] }
if DEBUG
LDFLAGS << ["-g"]
CCFLAGS << ["-g -DDEBUG"]
else
CCFLAGS << "-O2"
def make(*targets)
sh "make #{targets.map{|t| t.to_s}.join(" ")}"
end
def maketask( opts )
case opts
when Symbol
maketask opts => opts
else
opts.each do |name, targets|
task( name ){make *[*targets]}
end
end
end
desc "Build the binary and man page"
task :build => [:flexnbd, :flexnbd_proxy, :man]
task :default => :build
maketask :build => [:all, :doc]
desc "Build just the flexnbd binary"
task :flexnbd => "build/flexnbd"
maketask :flexnbd => [:server]
file "build/flexnbd" => :flexnbd
desc "Build just the flexnbd-proxy binary"
task :flexnbd_proxy => "build/flexnbd-proxy"
def check(m)
"build/tests/check_#{m}"
end
file "README.txt"
file "README.proxy.txt"
def manpage(name, src)
FileUtils.mkdir_p( "build" )
sh "a2x --destination-dir build --format manpage #{src}"
sh "gzip -f build/#{name}"
end
file "build/flexnbd.1.gz" => "README.txt" do
manpage("flexnbd.1", "README.txt")
end
file "build/flexnbd-proxy.1.gz" => "README.proxy.txt" do
manpage("flexnbd-proxy.1", "README.proxy.txt")
end
maketask :flexnbd_proxy => [:proxy]
file "build/flexnbd-proxy" => :flexnbd_proxy
desc "Build just the man page"
task :man => ["build/flexnbd.1.gz", "build/flexnbd-proxy.1.gz"]
maketask :man => :doc
namespace "test" do
@@ -87,226 +36,17 @@ namespace "test" do
task 'run' => ["unit", "scenarios"]
desc "Build C tests"
task 'build' => TEST_MODULES.map { |n| check n}
TEST_MODULES.each do |m|
desc "Run tests for #{m}"
task "check_#{m}" => check(m) do
sh check m
end
end
maketask :build => :check_bins
desc "Run C tests"
task 'unit' => 'build' do
TEST_MODULES.each do |n|
ENV['EF_DISABLE_BANNER'] = '1'
sh check n
end
end
maketask :unit => :check
desc "Run NBD test scenarios"
task 'scenarios' => ['build/flexnbd', 'build/flexnbd-proxy'] do
sh "cd tests/acceptance; ruby nbd_scenarios -v"
task 'scenarios' => ["build/flexnbd", "build/flexnbd-proxy"] do
sh "cd tests/acceptance && RUBYOPT='-I.' ruby nbd_scenarios -v"
end
end
def gcc_compile( target, source )
FileUtils.mkdir_p File.dirname( target )
sh "#{CC} -Isrc -c #{CCFLAGS.join(' ')} -o #{target} #{source} "
end
def gcc_link(target, objects)
FileUtils.mkdir_p File.dirname( target )
sh "#{CC} #{LDFLAGS.join(' ')} "+
" -Isrc " +
" -o #{target} "+
objects.join(" ") +
" "+LIBS.map { |l| "-l#{l}" }.join(" ")
end
def headers(c)
`#{CC} -Isrc -MM #{c}`.gsub("\\\n", " ").split(" ")[2..-1]
end
rule 'build/flexnbd-proxy' => PROXY_OBJECTS do |t|
gcc_link(t.name, t.sources)
end
rule 'build/flexnbd' => OBJECTS do |t|
gcc_link(t.name, t.sources)
end
file check("client") =>
%w{build/tests/check_client.o
build/self_pipe.o
build/nbdtypes.o
build/flexnbd.o
build/flexthread.o
build/control.o
build/readwrite.o
build/parse.o
build/client.o
build/serve.o
build/acl.o
build/ioutil.o
build/mbox.o
build/mirror.o
build/status.o
build/sockutil.o
build/util.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
file check("acl") =>
%w{build/tests/check_acl.o
build/parse.o
build/acl.o
build/util.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
file check( "util" ) =>
%w{build/tests/check_util.o
build/util.o
build/self_pipe.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
file check("serve") =>
%w{build/tests/check_serve.o
build/self_pipe.o
build/nbdtypes.o
build/control.o
build/readwrite.o
build/parse.o
build/client.o
build/flexthread.o
build/serve.o
build/flexnbd.o
build/mirror.o
build/status.o
build/acl.o
build/mbox.o
build/ioutil.o
build/sockutil.o
build/util.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
file check("status") =>
%w{
build/tests/check_status.o
build/self_pipe.o
build/nbdtypes.o
build/control.o
build/readwrite.o
build/parse.o
build/client.o
build/flexthread.o
build/serve.o
build/flexnbd.o
build/mirror.o
build/status.o
build/acl.o
build/mbox.o
build/ioutil.o
build/sockutil.o
build/util.o
} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
file check("readwrite") =>
%w{build/tests/check_readwrite.o
build/readwrite.o
build/client.o
build/self_pipe.o
build/serve.o
build/parse.o
build/acl.o
build/flexthread.o
build/control.o
build/flexnbd.o
build/mirror.o
build/status.o
build/nbdtypes.o
build/mbox.o
build/ioutil.o
build/sockutil.o
build/util.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
file check("flexnbd") =>
%w{build/tests/check_flexnbd.o
build/flexnbd.o
build/ioutil.o
build/sockutil.o
build/util.o
build/control.o
build/mbox.o
build/flexthread.o
build/status.o
build/self_pipe.o
build/client.o
build/acl.o
build/parse.o
build/nbdtypes.o
build/readwrite.o
build/mirror.o
build/serve.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
file check("control") =>
%w{build/tests/check_control.o} + OBJECTS - ["build/main.o", 'build/proxy-main.o', 'build/proxy.o'] do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
(TEST_MODULES- %w{status control flexnbd acl client serve readwrite util}).each do |m|
tgt = "build/tests/check_#{m}.o"
maybe_obj_name = "build/#{m}.o"
# Take it out in case we're testing one of the utils
deps = ["build/ioutil.o", "build/util.o", "build/sockutil.o"] - [maybe_obj_name]
# Add it back in if it's something we need to compile
deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name )
file check( m ) => deps + [tgt] do |t|
gcc_link(t.name, deps + [tgt, LIBCHECK])
end
end
OBJECTS.zip( SOURCES ).each do |o,c|
file o => [c]+headers(c) do |t| gcc_compile( o, c ) end
end
PROXY_ONLY_OBJECTS.zip( PROXY_ONLY_SOURCES).each do |o, c|
file o => [c]+headers(c) do |t| gcc_compile( o, c ) end
end
TEST_OBJECTS.zip( TEST_SOURCES ).each do |o,c|
file o => [c] + headers(c) do |t| gcc_compile( o, c ) end
end
desc "Remove all build targets, binaries and temporary files"
task :clean do
sh "rm -rf *~ build"
end
namespace :pkg do
deb do |t|
t.code_files = ALL_SOURCES + ["Rakefile", "README.txt", "README.proxy.txt"]
t.pkg_name = "flexnbd"
t.generate_changelog!
end
end
maketask :clean

6
debian/rules vendored
View File

@@ -7,12 +7,6 @@
%:
dh $@
override_dh_auto_build:
rake build
override_dh_auto_clean:
rake clean
.PHONY: override_dh_strip
override_dh_strip:
dh_strip --dbg-package=flexnbd-dbg

View File

@@ -31,7 +31,7 @@ static int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], uni
for (i=0; i < list_length; i++) {
struct ip_and_mask *entry = &(*list)[i];
int testbits;
unsigned char *raw_address1, *raw_address2;
unsigned char *raw_address1 = NULL, *raw_address2 = NULL;
debug("checking acl entry %d (%d/%d)", i, test->generic.sa_family, entry->ip.family);

View File

@@ -15,6 +15,20 @@
#include <sys/stat.h>
#include <fcntl.h>
// When this signal is invoked, we call shutdown() on the client fd, which
// results in the thread being wound up
void client_killswitch_hit(int signal __attribute__ ((unused)), siginfo_t *info, void *ptr __attribute__ ((unused)))
{
int fd = info->si_value.sival_int;
warn( "Killswitch for fd %i activated, calling shutdown on socket", fd );
FATAL_IF(
-1 == shutdown( fd, SHUT_RDWR ),
SHOW_ERRNO( "Failed to shutdown() the socket, killing the server" )
);
}
struct client *client_create( struct server *serve, int socket )
{
NULLCHECK( serve );
@@ -25,6 +39,13 @@ struct client *client_create( struct server *serve, int socket )
.sigev_signo = CLIENT_KILLSWITCH_SIGNAL
};
/*
* Our killswitch closes this socket, forcing read() and write() calls
* blocked on it to return with an error. The thread then close()s the
* socket itself, avoiding races.
*/
evp.sigev_value.sival_int = socket;
c = xmalloc( sizeof( struct client ) );
c->stopped = 0;
c->socket = socket;
@@ -199,36 +220,6 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
NULLCHECK( out_request );
struct nbd_request_raw request_raw;
fd_set fds;
struct timeval * ptv = NULL;
int fd_count;
/* We want a timeout if this is an inbound migration, but not otherwise.
* This is compile-time selectable, as it will break mirror max_bps
*/
#ifdef HAS_LISTEN_TIMEOUT
struct timeval tv = {CLIENT_MAX_WAIT_SECS, 0};
if ( !server_is_in_control( client->serve ) ) {
ptv = &tv;
}
#endif
FD_ZERO(&fds);
FD_SET(client->socket, &fds);
self_pipe_fd_set( client->stop_signal, &fds );
fd_count = sock_try_select(FD_SETSIZE, &fds, NULL, NULL, ptv);
if ( fd_count == 0 ) {
/* This "can't ever happen" */
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
else { error("Timed out waiting for I/O"); }
}
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){
debug("Client received stop signal.");
return 0;
}
if (fd_read_request(client->socket, &request_raw) == -1) {
*disconnected = 1;
@@ -255,7 +246,6 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
}
nbd_r2h_request( &request_raw, out_request );
return 1;
}
@@ -269,7 +259,7 @@ int fd_write_reply( int fd, char *handle, int error )
memcpy( reply.handle, handle, 8 );
nbd_h2r_reply( &reply, &reply_raw );
debug( "Replying with %s, %d", handle, error );
debug( "Replying with handle=0x%08X, error=%"PRIu32, handle, error );
if( -1 == writeloop( fd, &reply_raw, sizeof( reply_raw ) ) ) {
switch( errno ) {
@@ -379,15 +369,15 @@ int client_request_needs_reply( struct client * client,
* forever.
*/
if (request.magic != REQUEST_MAGIC) {
warn("Bad magic 0x%08x from client", request.magic);
warn("Bad magic 0x%08X from client", request.magic);
client_write_reply( client, &request, EBADMSG );
client->disconnect = 1; // no need to flush
return 0;
}
debug(
"request type=%"PRIu32", from=%"PRIu64", len=%"PRIu32,
request.type, request.from, request.len
"request type=%"PRIu32", from=%"PRIu64", len=%"PRIu32", handle=0x%08X",
request.type, request.from, request.len, request.handle
);
/* check it's not out of range */
@@ -416,7 +406,7 @@ int client_request_needs_reply( struct client * client,
return 0;
default:
fatal("Unknown request %08x", request.type);
fatal("Unknown request 0x%08X", request.type);
}
return 1;
}
@@ -426,6 +416,7 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
{
off64_t offset;
// TODO: cork
debug("request read %ld+%d", request.from, request.len);
client_write_reply( client, &request, 0);
@@ -443,12 +434,14 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
"sendfile failed from=%ld, len=%d",
offset,
request.len);
// TODO: uncork
}
void client_reply_to_write( struct client* client, struct nbd_request request )
{
debug("request write %ld+%d", request.from, request.len);
debug("request write from=%"PRIu64", len=%"PRIu32", handle=0x%08X", request.from, request.len, request.handle);
if (client->serve->allocation_map_built) {
write_not_zeroes( client, request.from, request.len );
}
@@ -553,35 +546,79 @@ int client_serve_request(struct client* client)
struct nbd_request request = {0};
int stop = 1;
int disconnected = 0;
fd_set rfds, efds;
int fd_count;
if ( !client_read_request( client, &request, &disconnected ) ) { return stop; }
if ( disconnected ) { return stop; }
if ( !client_request_needs_reply( client, request ) ) {
/* wait until there are some bytes on the fd before committing to reads
* FIXME: this whole scheme is broken because we're using blocking reads.
* read() can block directly after a select anyway, and it's possible that,
* without the killswitch, we'd hang forever. With the killswitch, we just
* hang for "a while". The Right Thing to do is to rewrite client.c to be
* non-blocking.
*/
FD_ZERO( &rfds );
FD_SET( client->socket, &rfds );
self_pipe_fd_set( client->stop_signal, &rfds );
FD_ZERO( &efds );
FD_SET( client->socket, &efds );
fd_count = sock_try_select( FD_SETSIZE, &rfds, NULL, &efds, NULL );
if ( fd_count == 0 ) {
/* This "can't ever happen" */
fatal( "No FDs selected, and no timeout!" );
}
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
if ( self_pipe_fd_isset( client->stop_signal, &rfds ) ){
debug("Client received stop signal.");
return 1; // Don't try to serve more requests
}
if ( FD_ISSET( client->socket, &efds ) ) {
debug( "Client connection closed" );
return 1;
}
/* We arm / disarm around the whole request cycle. The reason for this is
* that the remote peer could uncleanly die at any point; if we're stuck on
* a blocking read(), then that will hang for (almost) forever. This is bad
* in general, makes the server respond only to kill -9, and breaks
* outward mirroring in a most unpleasant way.
*
* Don't forget to disarm before exiting, no matter what!
*
* The replication is simple: open a connection to the flexnbd server, write
* a single byte, and then wait.
*
*/
client_arm_killswitch( client );
if ( !client_read_request( client, &request, &disconnected ) ) {
client_disarm_killswitch( client );
return stop;
}
if ( disconnected ) {
client_disarm_killswitch( client );
return stop;
}
if ( !client_request_needs_reply( client, request ) ) {
client_disarm_killswitch( client );
return client->disconnect;
}
{
if ( !server_is_closed( client->serve ) ) {
/* We arm / disarm around client_reply() to catch cases where the
* remote peer sends part of a write request data before dying,
* and cases where we send part of read reply data before they die.
*
* That last is theoretical right now, but could break us in the
* same way as a half-write (which causes us to sit in read forever)
*
* We only arm/disarm inside the server io lock because it's common
* during migrations for us to be hanging on that mutex for quite
* a while while the final pass happens - it's held for the entire
* time.
*/
client_arm_killswitch( client );
client_reply( client, request );
client_disarm_killswitch( client );
stop = 0;
}
}
client_disarm_killswitch( client );
return stop;
}
@@ -596,6 +633,9 @@ void client_cleanup(struct client* client,
{
info("client cleanup for client %p", client);
/* If the thread hits an error, we need to ensure this is off */
client_disarm_killswitch( client );
if (client->socket) {
FATAL_IF_NEGATIVE( close(client->socket),
"Error closing client socket %d",

View File

@@ -4,18 +4,6 @@
#include <signal.h>
#include <time.h>
#ifdef HAS_LISTEN_TIMEOUT
/** CLIENT_MAX_WAIT_SECS
* This is the length of time an inbound migration will wait for a fresh
* write before assuming the source has Gone Away. Note: it is *not*
* the time from one write to the next, it is the gap between the end of
* one write and the start of the next.
*/
#define CLIENT_MAX_WAIT_SECS 5
#endif
/** CLIENT_HANDLER_TIMEOUT
* This is the length of time (in seconds) any request can be outstanding for.
* If we spend longer than this in a request, the whole server is killed.
@@ -24,8 +12,7 @@
/** CLIENT_KILLSWITCH_SIGNAL
* The signal number we use to kill the server when *any* killswitch timer
* fires. We don't actually need to install a signal handler for it, the default
* behaviour is perfectly fine.
* fires. The handler gets the fd of the client socket to work with.
*/
#define CLIENT_KILLSWITCH_SIGNAL ( SIGRTMIN + 1 )
@@ -58,6 +45,7 @@ struct client {
};
void client_killswitch_hit(int signal, siginfo_t *info, void *ptr);
void* client_serve(void* client_uncast);
struct client * client_create( struct server * serve, int socket );

View File

@@ -101,12 +101,24 @@ struct flexnbd * flexnbd_create_serving(
max_nbd_clients,
use_killswitch,
1);
flexnbd_create_shared( flexnbd,
s_ctrl_sock );
flexnbd_create_shared( flexnbd, s_ctrl_sock );
// Beats installing one handler per client instance
if ( use_killswitch ) {
struct sigaction act = {
.sa_sigaction = client_killswitch_hit,
.sa_flags = SA_RESTART | SA_SIGINFO
};
FATAL_UNLESS(
0 == sigaction( CLIENT_KILLSWITCH_SIGNAL, &act, NULL ),
"Installing client killswitch signal failed"
);
}
return flexnbd;
}
struct flexnbd * flexnbd_create_listening(
char* s_ip_address,
char* s_port,
@@ -127,6 +139,10 @@ struct flexnbd * flexnbd_create_listening(
s_acl_entries,
1, 0, 0);
flexnbd_create_shared( flexnbd, s_ctrl_sock );
// listen can't use killswitch, as mirror may pause on sending things
// for a very long time.
return flexnbd;
}

View File

@@ -5,6 +5,7 @@
#include "mirror.h"
#include "serve.h"
#include "proxy.h"
#include "client.h"
#include "self_pipe.h"
#include "mbox.h"
#include "control.h"

View File

@@ -71,7 +71,7 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
}
}
debug("Successfully built allocation map");
info("Successfully built allocation map");
return 1;
}
@@ -139,7 +139,7 @@ int readloop(int filedes, void *buffer, size_t size)
ssize_t result = read(filedes, buffer+readden, size-readden);
if ( result == 0 /* EOF */ ) {
warn( "end-of-file detected while reading" );
warn( "end-of-file detected while reading after %i bytes", readden );
return -1;
}
@@ -347,4 +347,3 @@ ssize_t iobuf_write( int fd, struct iobuf *iobuf )
return count;
}

View File

@@ -70,6 +70,7 @@ struct mirror_ctrl {
/* libev stuff */
struct ev_loop *ev_loop;
ev_timer begin_watcher;
ev_io read_watcher;
ev_io write_watcher;
ev_timer timeout_watcher;
@@ -213,18 +214,6 @@ void mirror_destroy( struct mirror *mirror )
/** The mirror code will split NBD writes, making them this long as a maximum */
static const int mirror_longest_write = 8<<20;
/** If, during a mirror pass, we have sent this number of bytes or fewer, we
* go to freeze the I/O and finish it off. This is just a guess.
*/
static const unsigned int mirror_last_pass_after_bytes_written = 100<<20;
/** The largest number of full passes we'll do - the last one will always
* cause the I/O to freeze, however many bytes are left to copy.
*/
static const int mirror_maximum_passes = 7;
#define mirror_last_pass (mirror_maximum_passes - 1)
/* This must not be called if there's any chance of further I/O. Methods to
* ensure this include:
* - Ensure image size is 0
@@ -347,6 +336,19 @@ int mirror_should_quit( struct mirror * mirror )
}
}
/* Bandwidth limiting - we hang around if bps is too high, unless we need to
* empty out the bitset stream a bit */
int mirror_should_wait( struct mirror_ctrl *ctrl )
{
int bps_over = server_mirror_bps( ctrl->serve ) >
ctrl->serve->mirror->max_bytes_per_second;
int stream_full = bitset_stream_size( ctrl->serve->allocation_map ) >
( BITSET_STREAM_SIZE / 2 );
return bps_over && !stream_full;
}
/*
* If there's an event in the bitset stream of the serve allocation map, we
* use it to construct the next transfer request, covering precisely the area
@@ -369,7 +371,7 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
* full, and stop when it's a quarter full. This stops a busy client from
* stalling a migration forever. FIXME: made-up numbers.
*/
if ( bitset_stream_size( serve->allocation_map ) > BITSET_STREAM_SIZE / 2 ) {
if ( mirror->offset < serve->size && bitset_stream_size( serve->allocation_map ) > BITSET_STREAM_SIZE / 2 ) {
ctrl->clear_events = 1;
}
@@ -425,24 +427,6 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
return 1;
}
uint64_t mirror_current_bps( struct mirror * mirror )
{
uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started;
return mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
}
int mirror_exceeds_max_bps( struct mirror * mirror )
{
uint64_t mig_speed = mirror_current_bps( mirror );
debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second );
if ( mig_speed > mirror->max_bytes_per_second ) {
return 1;
}
return 0;
}
// ONLY CALL THIS AFTER CLOSING CLIENTS
void mirror_complete( struct server *serve )
{
@@ -486,7 +470,7 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
to_write = xfer->len - ( ctrl->xfer.written - hdr_size );
}
// Actually read some bytes
// Actually write some bytes
if ( ( count = write( ctrl->mirror->client, data_loc, to_write ) ) < 0 ) {
if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
warn( SHOW_ERRNO( "Couldn't write to listener" ) );
@@ -496,10 +480,12 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
}
debug( "Wrote %"PRIu64" bytes", count );
debug( "to_write was %"PRIu64", xfer->written was %"PRIu64, to_write, xfer->written );
ctrl->xfer.written += count;
// We wrote some bytes, so reset the timer
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
// We wrote some bytes, so reset the timer and keep track for the next pass
if ( count > 0 ) {
ctrl->xfer.written += count;
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
}
// All bytes written, so now we need to read the NBD reply back.
if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) {
@@ -584,10 +570,17 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
/* transfer was completed, so now we need to either set up the next
* transfer of this pass, set up the first transfer of the next pass, or
* complete the migration */
m->all_dirty += xfer->len;
xfer->read = 0;
xfer->written = 0;
/* We don't account for bytes written in this mode, to stop high-throughput
* discs getting stuck in "drain the event queue!" mode forever
*/
if ( !ctrl->clear_events ) {
m->all_dirty += xfer->len;
}
/* This next bit could take a little while, which is fine */
ev_timer_stop( ctrl->ev_loop, &ctrl->timeout_watcher );
@@ -601,17 +594,15 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
int next_xfer = mirror_setup_next_xfer( ctrl );
debug( "next_xfer: %d", next_xfer );
/* Regardless of time estimates, if there's no waiting transfer, we can
* */
if ( !ctrl->clients_closed && ( !next_xfer || server_mirror_eta( ctrl->serve ) < 60 ) ) {
/* Regardless of time estimates, if there's no waiting transfer, we can start closing clients down. */
if ( !ctrl->clients_closed && ( !next_xfer || server_mirror_eta( ctrl->serve ) < MS_CONVERGE_TIME_SECS ) ) {
info( "Closing clients to allow mirroring to converge" );
server_forbid_new_clients( ctrl->serve );
server_close_clients( ctrl->serve );
server_join_clients( ctrl->serve );
ctrl->clients_closed = 1;
/* One more try - a new event may have been pushed since our last check
*/
/* One more try - a new event may have been pushed since our last check */
if ( !next_xfer ) {
next_xfer = mirror_setup_next_xfer( ctrl );
}
@@ -630,7 +621,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
/* FIXME: Should we ignore the bwlimit after server_close_clients has been called? */
if ( mirror_exceeds_max_bps( m ) ) {
if ( mirror_should_wait( ctrl ) ) {
/* We're over the bandwidth limit, so don't move onto the next transfer
* yet. Our limit_watcher will move us on once we're OK. timeout_watcher
* was disabled further up, so don't need to stop it here too */
@@ -674,6 +665,7 @@ void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
return;
}
void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
@@ -684,7 +676,7 @@ void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
return;
}
if ( mirror_exceeds_max_bps( ctrl->mirror ) ) {
if ( mirror_should_wait( ctrl ) ) {
debug( "max_bps exceeded, waiting", ctrl->mirror->max_bytes_per_second );
ev_timer_again( loop, w );
} else {
@@ -698,6 +690,37 @@ void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
return;
}
/* We use this to periodically check whether the allocation map has built, and
* if it has, start migrating. If it's not finished, then enabling the bitset
* stream does not go well for us.
*/
void mirror_begin_cb( struct ev_loop *loop, ev_timer *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
if ( !(revents & EV_TIMER ) ) {
warn( "Mirror limit callback executed but no timer event signalled" );
return;
}
if ( ctrl->serve->allocation_map_built || ctrl->serve->allocation_map_not_built ) {
info( "allocation map builder is finished, beginning migration" );
ev_timer_stop( loop, w );
/* Start by writing xfer 0 to the listener */
ev_io_start( loop, &ctrl->write_watcher );
/* We want to timeout during the first write as well as subsequent ones */
ev_timer_again( loop, &ctrl->timeout_watcher );
/* We're now interested in events */
bitset_enable_stream( ctrl->serve->allocation_map );
} else {
/* not done yet, so wait another second */
ev_timer_again( loop, w );
}
return;
}
void mirror_run( struct server *serve )
{
NULLCHECK( serve );
@@ -728,6 +751,10 @@ void mirror_run( struct server *serve )
ctrl.ev_loop = EV_DEFAULT;
/* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */
ev_init( &ctrl.begin_watcher, mirror_begin_cb );
ctrl.begin_watcher.repeat = 1.0; // We check bps every second. seems sane.
ctrl.begin_watcher.data = (void*) &ctrl;
ev_io_init( &ctrl.read_watcher, mirror_read_cb, m->client, EV_READ );
ctrl.read_watcher.data = (void*) &ctrl;
@@ -735,7 +762,22 @@ void mirror_run( struct server *serve )
ctrl.write_watcher.data = (void*) &ctrl;
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ;
char * env_request_limit = getenv( "FLEXNBD_MS_REQUEST_LIMIT_SECS" );
double timeout_limit = MS_REQUEST_LIMIT_SECS_F;
if ( NULL != env_request_limit ) {
char *endptr = NULL;
errno = 0;
double limit = strtod( env_request_limit, &endptr );
warn( SHOW_ERRNO( "Got %f from strtod", limit ) );
if ( errno == 0 ) {
timeout_limit = limit;
}
}
ctrl.timeout_watcher.repeat = timeout_limit;
ev_init( &ctrl.limit_watcher, mirror_limit_cb );
ctrl.limit_watcher.repeat = 1.0; // We check bps every second. seems sane.
@@ -751,19 +793,23 @@ void mirror_run( struct server *serve )
"Couldn't find first transfer for mirror!"
);
/* Start by writing xfer 0 to the listener */
ev_io_start( ctrl.ev_loop, &ctrl.write_watcher );
/* We want to timeout during the first write as well as subsequent ones */
ev_timer_again( ctrl.ev_loop, &ctrl.timeout_watcher );
if ( serve->allocation_map_built ) {
/* Start by writing xfer 0 to the listener */
ev_io_start( ctrl.ev_loop, &ctrl.write_watcher );
/* We want to timeout during the first write as well as subsequent ones */
ev_timer_again( ctrl.ev_loop, &ctrl.timeout_watcher );
bitset_enable_stream( serve->allocation_map );
} else {
debug( "Waiting for allocation map to be built" );
ev_timer_again( ctrl.ev_loop, &ctrl.begin_watcher );
}
/* Everything up to here is blocking. We switch to non-blocking so we
* can handle rate-limiting and weird error conditions better. TODO: We
* should expand the event loop upwards so we can do the same there too */
sock_set_nonblock( m->client, 1 );
bitset_enable_stream( serve->allocation_map );
info( "Entering event loop" );
ev_run( ctrl.ev_loop, 0 );
info( "Exited event loop" );
@@ -784,12 +830,11 @@ void mirror_run( struct server *serve )
* call retries the migration from scratch. */
if ( m->commit_state != MS_DONE ) {
error( "Event loop exited, but mirroring is not complete" );
/* mirror_reset will be called before a retry, so keeping hold of events
* between now and our next mirroring attempt is not useful
*/
bitset_disable_stream( serve->allocation_map );
error( "Event loop exited, but mirroring is not complete" );
}
return;
@@ -1016,4 +1061,3 @@ void * mirror_super_runner( void * serve_uncast )
return NULL;
}

View File

@@ -18,6 +18,18 @@ enum mirror_state;
*/
#define MS_CONNECT_TIME_SECS 60
/* MS_MAX_DOWNTIME_SECS
* The length of time a migration must be estimated to have remaining for us to
* disconnect clients for convergence
*
* TODO: Make this configurable so refusing-to-converge clients can be manually
* fixed.
* TODO: Make this adaptive - 5 seconds is fine, as long as we can guarantee
* that all migrations will be able to converge in time. We'd add a new
* state between open and closed, where gradually-increasing latency is
* added to client requests to allow the mirror to be faster.
*/
#define MS_CONVERGE_TIME_SECS 5
/* MS_HELLO_TIME_SECS
* The length of time the sender will wait for the NBD hello message
@@ -38,9 +50,12 @@ enum mirror_state;
* request, this is the time between the end of the NBD request and the
* start of the NBD reply. For a write request, this is the time
* between the end of the written data and the start of the NBD reply.
* Can be overridden by the environment variable:
* FLEXNBD_MS_REQUEST_LIMIT_SECS
*/
#define MS_REQUEST_LIMIT_SECS 4
#define MS_REQUEST_LIMIT_SECS_F 4.0
#define MS_REQUEST_LIMIT_SECS 60
#define MS_REQUEST_LIMIT_SECS_F 60.0
enum mirror_finish_action {
ACTION_EXIT,
@@ -122,7 +137,5 @@ struct mirror_super * mirror_super_create(
);
void * mirror_super_runner( void * serve_uncast );
uint64_t mirror_current_bps( struct mirror * mirror );
#endif

View File

@@ -787,7 +787,7 @@ int mode_break( int argc, char *argv[] )
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
exit_err( acl_help_text );
exit_err( break_help_text );
}
do_remote_command( "break", sock, argc - optind, argv + optind );
@@ -808,7 +808,7 @@ int mode_status( int argc, char *argv[] )
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
exit_err( acl_help_text );
exit_err( status_help_text );
}
do_remote_command( "status", sock, argc - optind, argv + optind );

View File

@@ -686,6 +686,7 @@ void* build_allocation_map_thread(void* serve_uncast)
* the future, we'll need to wait for the allocation map to finish or
* fail before we can complete the migration.
*/
serve->allocation_map_not_built = 1;
warn( "Didn't build allocation map for %s", serve->filename );
}
@@ -878,7 +879,19 @@ uint64_t server_mirror_eta( struct server * serve )
{
if ( server_is_mirroring( serve ) ) {
uint64_t bytes_to_xfer = server_mirror_bytes_remaining( serve );
return bytes_to_xfer / ( mirror_current_bps( serve->mirror ) + 1 );
return bytes_to_xfer / ( server_mirror_bps( serve ) + 1 );
}
return 0;
}
uint64_t server_mirror_bps( struct server * serve )
{
if ( server_is_mirroring( serve ) ) {
uint64_t duration_ms =
monotonic_time_ms() - serve->mirror->migration_started;
return serve->mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
}
return 0;
@@ -941,4 +954,3 @@ int do_serve( struct server* params, struct self_pipe * open_signal )
return success;
}

View File

@@ -76,8 +76,10 @@ struct server {
struct bitset * allocation_map;
/* when starting up, this thread builds the allocation_map */
pthread_t allocation_map_builder_thread;
/* when the thread has finished, it sets this to 1 */
volatile sig_atomic_t allocation_map_built;
volatile sig_atomic_t allocation_map_not_built;
int max_nbd_clients;
struct client_tbl_entry *nbd_client;
@@ -126,6 +128,7 @@ int server_is_mirroring( struct server * serve );
uint64_t server_mirror_bytes_remaining( struct server * serve );
uint64_t server_mirror_eta( struct server * serve );
uint64_t server_mirror_bps( struct server * serve );
void server_abandon_mirror( struct server * serve );
void server_prevent_mirror_start( struct server *serve );

View File

@@ -27,7 +27,7 @@ struct status * status_create( struct server * serve )
status->migration_duration = 0;
}
status->migration_duration /= 1000;
status->migration_speed = serve->mirror->all_dirty / ( status->migration_duration + 1 );
status->migration_speed = server_mirror_bps( serve );
status->migration_speed_limit = serve->mirror->max_bytes_per_second;
status->migration_seconds_left = server_mirror_eta( serve );

View File

@@ -797,6 +797,13 @@ void proxy_session( struct proxier* proxy )
proxy_session_state_names[state]
);
state = CONNECT_TO_UPSTREAM;
/* Since we've timed out, we won't have gone through the timeout logic
* in the various state handlers that resets these appropriately... */
proxy->init.size = 0;
proxy->init.needle = 0;
proxy->rsp.size = 0;
proxy->rsp.needle = 0;
}
}
}

View File

@@ -116,6 +116,7 @@ enum bitset_stream_events {
BITSET_STREAM_ON = 2,
BITSET_STREAM_OFF = 3
};
#define BITSET_STREAM_EVENTS_ENUM_SIZE 4
struct bitset_stream_entry {
enum bitset_stream_events event;
@@ -138,6 +139,7 @@ struct bitset_stream {
pthread_mutex_t mutex;
pthread_cond_t cond_not_full;
pthread_cond_t cond_not_empty;
uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE];
};
@@ -217,13 +219,14 @@ static inline void bitset_stream_enqueue(
stream->entries[stream->in].event = event;
stream->entries[stream->in].from = from;
stream->entries[stream->in].len = len;
stream->queued_bytes[event] += len;
stream->size++;
stream->in++;
stream->in %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( & stream->mutex );
pthread_cond_broadcast( &stream->cond_not_empty );
pthread_cond_signal( &stream->cond_not_empty );
return;
}
@@ -234,6 +237,7 @@ static inline void bitset_stream_dequeue(
)
{
struct bitset_stream * stream = set->stream;
struct bitset_stream_entry * dequeued;
pthread_mutex_lock( &stream->mutex );
@@ -241,18 +245,21 @@ static inline void bitset_stream_dequeue(
pthread_cond_wait( &stream->cond_not_empty, &stream->mutex );
}
dequeued = &stream->entries[stream->out];
if ( out != NULL ) {
out->event = stream->entries[stream->out].event;
out->from = stream->entries[stream->out].from;
out->len = stream->entries[stream->out].len;
out->event = dequeued->event;
out->from = dequeued->from;
out->len = dequeued->len;
}
stream->queued_bytes[dequeued->event] -= dequeued->len;
stream->size--;
stream->out++;
stream->out %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( &stream->mutex );
pthread_cond_broadcast( &stream->cond_not_full );
pthread_cond_signal( &stream->cond_not_full );
return;
}
@@ -273,17 +280,10 @@ static inline uint64_t bitset_stream_queued_bytes(
enum bitset_stream_events event
)
{
uint64_t total = 0;
int i;
uint64_t total;
pthread_mutex_lock( &set->stream->mutex );
for ( i = set->stream->out; i < set->stream->in ; i++ ) {
if ( set->stream->entries[i].event == event ) {
total += set->stream->entries[i].len;
}
}
total = set->stream->queued_bytes[event];
pthread_mutex_unlock( &set->stream->mutex );
return total;

View File

@@ -20,7 +20,13 @@ t = Thread.start do
client2.close
end
sleep( FlexNBD::MS_REQUEST_LIMIT_SECS + 2 )
sleep_time = if ENV.has_key?('FLEXNBD_MS_REQUEST_LIMIT_SECS')
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'].to_f
else
FlexNBD::MS_REQUEST_LIMIT_SECS
end
sleep( sleep_time + 2.0 )
client1.close
t.join

View File

@@ -458,12 +458,18 @@ module FlexNBD
def maybe_timeout(cmd, timeout=nil )
stdout, stderr = "",""
stat = nil
run = Proc.new do
Open3.popen3( cmd ) do |io_in, io_out, io_err|
# Ruby 1.9 changed the popen3 api. instead of 3 args, the block
# gets 4. Not only that, but it no longer sets $?, so we have to
# go elsewhere for the process' exit status.
Open3.popen3( cmd ) do |io_in, io_out, io_err, maybe_thr|
io_in.close
stdout.replace io_out.read
stderr.replace io_err.read
stat = maybe_thr.value if maybe_thr
end
stat ||= $?
end
if timeout
@@ -472,13 +478,13 @@ module FlexNBD
run.call
end
[stdout, stderr]
[stdout, stderr, stat]
end
def mirror(dest_ip, dest_port, bandwidth=nil, action=nil)
stdout, stderr = mirror_unchecked( dest_ip, dest_port, bandwidth, action )
raise IOError.new( "Migrate command failed\n" + stderr) unless $?.success?
stdout, stderr, status = mirror_unchecked( dest_ip, dest_port, bandwidth, action )
raise IOError.new( "Migrate command failed\n" + stderr) unless status.success?
stdout
end

View File

@@ -2,6 +2,14 @@
module FlexNBD
def self.binary( str )
if str.respond_to? :force_encoding
str.force_encoding "ASCII-8BIT"
else
str
end
end
# eeevil is his one and only name...
def self.read_constants
parents = []
@@ -17,7 +25,7 @@ module FlexNBD
fail "No source root!" unless source_root
headers = Dir[File.join( source_root, "src", "*.h" ) ]
headers = Dir[File.join( source_root, "src", "{common,proxy,server}","*.h" ) ]
headers.each do |header_filename|
txt_lines = File.readlines( header_filename )
@@ -33,8 +41,8 @@ module FlexNBD
read_constants()
REQUEST_MAGIC = "\x25\x60\x95\x13" unless defined?(REQUEST_MAGIC)
REPLY_MAGIC = "\x67\x44\x66\x98" unless defined?(REPLY_MAGIC)
REQUEST_MAGIC = binary("\x25\x60\x95\x13") unless defined?(REQUEST_MAGIC)
REPLY_MAGIC = binary("\x67\x44\x66\x98") unless defined?(REPLY_MAGIC)
end # module FlexNBD

View File

@@ -2,12 +2,17 @@
require 'test/unit'
require 'environment'
require 'flexnbd/constants'
class TestHappyPath < Test::Unit::TestCase
def setup
@env = Environment.new
end
def bin(str)
FlexNBD.binary str
end
def teardown
@env.nbd1.can_die(0)
@env.nbd2.can_die(0)
@@ -22,13 +27,13 @@ class TestHappyPath < Test::Unit::TestCase
[0, 12, 63].each do |num|
assert_equal(
@env.nbd1.read(num*@env.blocksize, @env.blocksize),
@env.file1.read(num*@env.blocksize, @env.blocksize)
bin( @env.nbd1.read(num*@env.blocksize, @env.blocksize) ),
bin( @env.file1.read(num*@env.blocksize, @env.blocksize) )
)
end
[124, 1200, 10028, 25488].each do |num|
assert_equal(@env.nbd1.read(num, 4), @env.file1.read(num, 4))
assert_equal(bin(@env.nbd1.read(num, 4)), bin(@env.file1.read(num, 4)))
end
end
@@ -102,7 +107,7 @@ class TestHappyPath < Test::Unit::TestCase
assert_no_match( /unrecognized/, stderr )
Timeout.timeout(2) do @env.nbd1.join end
Timeout.timeout(10) do @env.nbd1.join end
assert !File.file?( @env.filename1 )
end

View File

@@ -7,6 +7,9 @@ require 'environment'
class TestSourceErrorHandling < Test::Unit::TestCase
def setup
@old_env = ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS']
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = "4.0"
@env = Environment.new
@env.writefile1( "f" * 4 )
@env.serve1
@@ -16,6 +19,7 @@ class TestSourceErrorHandling < Test::Unit::TestCase
def teardown
@env.nbd1.can_die(0)
@env.cleanup
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = @old_env
end

View File

@@ -76,8 +76,8 @@ START_TEST( test_read_request_quits_on_stop_signal )
client_signal_stop( c );
int client_read_request( struct client *, struct nbd_request *);
fail_unless( 0 == client_read_request( c, &nbdr ), "Didn't quit on stop." );
int client_serve_request( struct client *);
fail_unless( 1 == client_serve_request( c ), "Didn't quit on stop." );
close( fds[0] );
close( fds[1] );

View File

@@ -72,9 +72,11 @@ START_TEST( test_sockaddr_address_string_doesnt_overflow_short_buffer )
char testbuf[128];
const char* result;
memset( testbuf, 0, 128 );
v4->sin_family = AF_INET;
v4->sin_port = htons( 4777 );
ck_assert_int_eq( 1, inet_pton( AF_INET, "192.168.0.1", &v4->sin_addr ));
memset( &testbuf, 0, 128 );
result = sockaddr_address_string( &sa, &testbuf[0], 2 );
ck_assert( result == NULL );

View File

@@ -71,11 +71,12 @@ START_TEST( test_fatal_kills_process )
sleep(10);
}
else {
int kidstatus;
int result;
result = waitpid( pid, &kidstatus, 0 );
int kidret, kidstatus, result;
result = waitpid( pid, &kidret, 0 );
fail_if( result < 0, "Wait failed." );
fail_unless( kidstatus == 6, "Kid was not aborted." );
fail_unless( WIFSIGNALED( kidret ), "Process didn't exit via signal" );
kidstatus = WTERMSIG( kidret );
ck_assert_int_eq( kidstatus, SIGABRT );
}
}