Compare commits
33 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
83e3d65be9 | ||
![]() |
4f31bd9340 | ||
![]() |
0baf93fd7b | ||
![]() |
175f19b3e7 | ||
![]() |
8d56316548 | ||
![]() |
27f2cc7083 | ||
![]() |
8084a41ad2 | ||
![]() |
5ca5858929 | ||
![]() |
afcc07a181 | ||
![]() |
dcead04cf6 | ||
![]() |
4f7f5f1745 | ||
![]() |
976e9ba07f | ||
![]() |
91d9531a60 | ||
![]() |
905d66af77 | ||
![]() |
eee7c9644c | ||
![]() |
ce5c51cdcf | ||
![]() |
c6c53c63ba | ||
![]() |
20bd58749e | ||
![]() |
866bf835e6 | ||
![]() |
53cbe14556 | ||
![]() |
cd3281f62d | ||
![]() |
1e5457fed0 | ||
![]() |
0753369b77 | ||
![]() |
9d9ae40953 | ||
![]() |
65d4f581b9 | ||
![]() |
77c71ccf09 | ||
![]() |
97a923afdf | ||
![]() |
335261869d | ||
![]() |
8cf9cae8c0 | ||
![]() |
6986c70888 | ||
![]() |
4b9ded0e1d | ||
![]() |
b177faacd6 | ||
![]() |
96e60a4a29 |
102
Makefile
102
Makefile
@@ -1,10 +1,102 @@
|
||||
#!/usr/bin/make -f
|
||||
|
||||
all:
|
||||
rake build
|
||||
VPATH=src:tests/unit
|
||||
DESTDIR?=/
|
||||
PREFIX?=/usr/local/bin
|
||||
INSTALLDIR=$(DESTDIR)/$(PREFIX)
|
||||
|
||||
all-debug:
|
||||
DEBUG=1 rake build
|
||||
ifdef DEBUG
|
||||
CFLAGS_EXTRA=-g -DDEBUG
|
||||
LDFLAGS_EXTRA=-g
|
||||
else
|
||||
CFLAGS_EXTRA=-O2
|
||||
endif
|
||||
|
||||
CCFLAGS=-D_GNU_SOURCE=1 -Wall -Wextra -Werror-implicit-function-declaration -Wstrict-prototypes -Wno-missing-field-initializers $(CFLAGS_EXTRA) $(CFLAGS)
|
||||
LLDFLAGS=-lrt -lev $(LDFLAGS_EXTRA) $(LDFLAGS)
|
||||
|
||||
CC?=gcc
|
||||
|
||||
LIBS=-lpthread
|
||||
INC=-I/usr/include/libev -Isrc/common -Isrc/server -Isrc/proxy
|
||||
COMPILE=$(CC) $(INC) -c $(CCFLAGS)
|
||||
SAVEDEP=$(CC) $(INC) -MM $(CCFLAGS)
|
||||
LINK=$(CC) $(LLDFLAGS) -Isrc $(LIBS)
|
||||
|
||||
|
||||
EXISTING_OBJS := $(wildcard build/*.o)
|
||||
-include $(EXISTING_OBJS:.o=.d)
|
||||
|
||||
COMMON_SRC := $(wildcard src/common/*.c)
|
||||
SERVER_SRC := $(wildcard src/server/*.c)
|
||||
PROXY_SRC := $(wildcard src/proxy/*.c)
|
||||
|
||||
COMMON_OBJ := $(COMMON_SRC:src/%.c=build/%.o)
|
||||
SERVER_OBJ := $(SERVER_SRC:src/%.c=build/%.o)
|
||||
PROXY_OBJ := $(PROXY_SRC:src/%.c=build/%.o)
|
||||
|
||||
SRCS := $(COMMON_SRC) $(SERVER_SRC) $(PROXY_SRC)
|
||||
OBJS := $(COMMON_OBJ) $(SERVER_OBJ) $(PROXY_OBJ)
|
||||
|
||||
|
||||
all: build/flexnbd build/flexnbd-proxy doc
|
||||
|
||||
build/%.o: %.c
|
||||
mkdir -p $(dir $@)
|
||||
$(COMPILE) $< -o $@
|
||||
$(SAVEDEP) $< > build/$*.d
|
||||
|
||||
objs: $(OBJS)
|
||||
|
||||
build/flexnbd: $(COMMON_OBJ) $(SERVER_OBJ) build/main.o
|
||||
$(LINK) $^ -o $@
|
||||
|
||||
build/flexnbd-proxy: $(COMMON_OBJ) $(PROXY_OBJ) build/proxy-main.o
|
||||
$(LINK) $^ -o $@
|
||||
|
||||
server: build/flexnbd
|
||||
proxy: build/flexnbd-proxy
|
||||
|
||||
|
||||
CHECK_SRC := $(wildcard tests/unit/*.c)
|
||||
CHECK_OBJ := $(CHECK_SRC:tests/unit/%.c=build/tests/%.o)
|
||||
# Why can't we reuse the build/%.o rule above? Not sure.
|
||||
build/tests/%.o: tests/unit/%.c
|
||||
mkdir -p $(dir $@)
|
||||
$(COMPILE) $< -o $@
|
||||
$(SAVEDEP) $< > build/tests/$*.d
|
||||
|
||||
CHECK_BINS := $(CHECK_OBJ:build/tests/%.o=build/tests/%)
|
||||
build/tests/%: build/tests/%.o $(OBJS)
|
||||
$(LINK) $^ -o $@ -lcheck
|
||||
|
||||
check_objs: $(CHECK_OBJ)
|
||||
|
||||
check_bins: $(CHECK_BINS)
|
||||
check: $(CHECK_BINS)
|
||||
for bin in $^; do $$bin; done
|
||||
|
||||
build/flexnbd.1: README.txt
|
||||
a2x --destination-dir build --format manpage $<
|
||||
build/flexnbd-proxy.1: README.proxy.txt
|
||||
a2x --destination-dir build --format manpage $<
|
||||
# If we don't pipe to file, gzip clobbers the original, causing make
|
||||
# to rebuild each time
|
||||
%.1.gz: %.1
|
||||
gzip -c -f $< > $@
|
||||
|
||||
|
||||
server-man: build/flexnbd.1.gz
|
||||
proxy-man: build/flexnbd-proxy.1.gz
|
||||
|
||||
doc: server-man proxy-man
|
||||
|
||||
install:
|
||||
mkdir -p $(INSTALLDIR)
|
||||
cp build/flexnbd build/flexnbd-proxy $(INSTALLDIR)
|
||||
|
||||
clean:
|
||||
rake clean
|
||||
rm -rf build/*
|
||||
|
||||
|
||||
.PHONY: clean objs check_objs all server proxy check_bins check server-man proxy-man doc
|
||||
|
@@ -25,7 +25,7 @@ COMMANDS
|
||||
serve
|
||||
~~~~~
|
||||
$ flexnbd serve --addr <ADDR> --port <PORT> --file <FILE>
|
||||
[--sock <SOCK>] [--default-deny] [global option]* [acl entry]*
|
||||
[--sock <SOCK>] [--default-deny] [-k] [global option]* [acl entry]*
|
||||
|
||||
Serve a file. If any ACL entries are given (which should be IP
|
||||
addresses), only those clients listed will be permitted to connect.
|
||||
@@ -55,6 +55,12 @@ Options
|
||||
empty ACL will let no clients connect. If it is not given, an
|
||||
empty ACL will let any client connect.
|
||||
|
||||
*--killswitch, -k*:
|
||||
If set, we implement a 2-minute timeout on NBD requests and
|
||||
responses. If a request takes longer than that to complete,
|
||||
the client is disconnected. This is useful to keep broken
|
||||
clients from breaking migrations, among other things.
|
||||
|
||||
listen
|
||||
~~~~~~
|
||||
|
||||
|
312
Rakefile
312
Rakefile
@@ -1,85 +1,34 @@
|
||||
$: << '../rake_utils/lib'
|
||||
require 'rake_utils/debian'
|
||||
include RakeUtils::DSL
|
||||
# encoding: utf-8
|
||||
|
||||
CC=ENV['CC'] || "gcc"
|
||||
|
||||
DEBUG = ENV.has_key?('DEBUG') &&
|
||||
%w|yes y ok 1 true t|.include?(ENV['DEBUG'])
|
||||
|
||||
ALL_SOURCES = FileList['src/*']
|
||||
|
||||
PROXY_ONLY_SOURCES = FileList['src/{proxy-main,proxy}.c']
|
||||
PROXY_ONLY_OBJECTS = PROXY_ONLY_SOURCES.pathmap( "%{^src,build}X.o" )
|
||||
|
||||
SOURCES = ALL_SOURCES.select { |c| c =~ /\.c$/ } - PROXY_ONLY_SOURCES
|
||||
OBJECTS = SOURCES.pathmap( "%{^src,build}X.o" ) - PROXY_ONLY_OBJECTS
|
||||
|
||||
PROXY_SOURCES = FileList['src/{ioutil,nbdtypes,readwrite,sockutil,util,parse}.c'] + PROXY_ONLY_SOURCES
|
||||
PROXY_OBJECTS = PROXY_SOURCES.pathmap( "%{^src,build}X.o" )
|
||||
|
||||
TEST_SOURCES = FileList['tests/unit/*.c']
|
||||
TEST_OBJECTS = TEST_SOURCES.pathmap( "%{^tests/unit,build/tests}X.o" )
|
||||
|
||||
LIBS = %w( pthread )
|
||||
LDFLAGS = ["-lrt -lev"]
|
||||
CCFLAGS = %w(
|
||||
-D_GNU_SOURCE=1
|
||||
-Wall
|
||||
-Wextra
|
||||
-Werror-implicit-function-declaration
|
||||
-Wstrict-prototypes
|
||||
-Wno-missing-field-initializers
|
||||
) + # Added -Wno-missing-field-initializers to shut GCC up over {0} struct initialisers
|
||||
[ENV['CFLAGS']]
|
||||
|
||||
LIBCHECK = File.exists?("/usr/lib/libcheck.a") ?
|
||||
"/usr/lib/libcheck.a" :
|
||||
"/usr/local/lib/libcheck.a"
|
||||
|
||||
TEST_MODULES = Dir["tests/unit/check_*.c"].map { |n|
|
||||
File.basename( n )[%r{check_(.+)\.c},1] }
|
||||
|
||||
if DEBUG
|
||||
LDFLAGS << ["-g"]
|
||||
CCFLAGS << ["-g -DDEBUG"]
|
||||
else
|
||||
CCFLAGS << "-O2"
|
||||
def make(*targets)
|
||||
sh "make #{targets.map{|t| t.to_s}.join(" ")}"
|
||||
end
|
||||
|
||||
def maketask( opts )
|
||||
case opts
|
||||
when Symbol
|
||||
maketask opts => opts
|
||||
else
|
||||
opts.each do |name, targets|
|
||||
task( name ){make *[*targets]}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
desc "Build the binary and man page"
|
||||
task :build => [:flexnbd, :flexnbd_proxy, :man]
|
||||
task :default => :build
|
||||
maketask :build => [:all, :doc]
|
||||
|
||||
desc "Build just the flexnbd binary"
|
||||
task :flexnbd => "build/flexnbd"
|
||||
maketask :flexnbd => [:server]
|
||||
file "build/flexnbd" => :flexnbd
|
||||
|
||||
desc "Build just the flexnbd-proxy binary"
|
||||
task :flexnbd_proxy => "build/flexnbd-proxy"
|
||||
|
||||
def check(m)
|
||||
"build/tests/check_#{m}"
|
||||
end
|
||||
|
||||
file "README.txt"
|
||||
file "README.proxy.txt"
|
||||
|
||||
def manpage(name, src)
|
||||
FileUtils.mkdir_p( "build" )
|
||||
sh "a2x --destination-dir build --format manpage #{src}"
|
||||
sh "gzip -f build/#{name}"
|
||||
end
|
||||
|
||||
file "build/flexnbd.1.gz" => "README.txt" do
|
||||
manpage("flexnbd.1", "README.txt")
|
||||
end
|
||||
|
||||
file "build/flexnbd-proxy.1.gz" => "README.proxy.txt" do
|
||||
manpage("flexnbd-proxy.1", "README.proxy.txt")
|
||||
end
|
||||
maketask :flexnbd_proxy => [:proxy]
|
||||
file "build/flexnbd-proxy" => :flexnbd_proxy
|
||||
|
||||
desc "Build just the man page"
|
||||
task :man => ["build/flexnbd.1.gz", "build/flexnbd-proxy.1.gz"]
|
||||
maketask :man => :doc
|
||||
|
||||
|
||||
namespace "test" do
|
||||
@@ -87,226 +36,17 @@ namespace "test" do
|
||||
task 'run' => ["unit", "scenarios"]
|
||||
|
||||
desc "Build C tests"
|
||||
task 'build' => TEST_MODULES.map { |n| check n}
|
||||
|
||||
TEST_MODULES.each do |m|
|
||||
desc "Run tests for #{m}"
|
||||
task "check_#{m}" => check(m) do
|
||||
sh check m
|
||||
end
|
||||
end
|
||||
maketask :build => :check_bins
|
||||
|
||||
desc "Run C tests"
|
||||
task 'unit' => 'build' do
|
||||
TEST_MODULES.each do |n|
|
||||
ENV['EF_DISABLE_BANNER'] = '1'
|
||||
sh check n
|
||||
end
|
||||
end
|
||||
maketask :unit => :check
|
||||
|
||||
desc "Run NBD test scenarios"
|
||||
task 'scenarios' => ['build/flexnbd', 'build/flexnbd-proxy'] do
|
||||
sh "cd tests/acceptance; ruby nbd_scenarios -v"
|
||||
task 'scenarios' => ["build/flexnbd", "build/flexnbd-proxy"] do
|
||||
sh "cd tests/acceptance && RUBYOPT='-I.' ruby nbd_scenarios -v"
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
def gcc_compile( target, source )
|
||||
FileUtils.mkdir_p File.dirname( target )
|
||||
sh "#{CC} -Isrc -c #{CCFLAGS.join(' ')} -o #{target} #{source} "
|
||||
end
|
||||
|
||||
def gcc_link(target, objects)
|
||||
FileUtils.mkdir_p File.dirname( target )
|
||||
|
||||
sh "#{CC} #{LDFLAGS.join(' ')} "+
|
||||
" -Isrc " +
|
||||
" -o #{target} "+
|
||||
objects.join(" ") +
|
||||
" "+LIBS.map { |l| "-l#{l}" }.join(" ")
|
||||
end
|
||||
|
||||
def headers(c)
|
||||
`#{CC} -Isrc -MM #{c}`.gsub("\\\n", " ").split(" ")[2..-1]
|
||||
end
|
||||
|
||||
rule 'build/flexnbd-proxy' => PROXY_OBJECTS do |t|
|
||||
gcc_link(t.name, t.sources)
|
||||
end
|
||||
|
||||
rule 'build/flexnbd' => OBJECTS do |t|
|
||||
gcc_link(t.name, t.sources)
|
||||
end
|
||||
|
||||
|
||||
file check("client") =>
|
||||
%w{build/tests/check_client.o
|
||||
build/self_pipe.o
|
||||
build/nbdtypes.o
|
||||
build/flexnbd.o
|
||||
build/flexthread.o
|
||||
build/control.o
|
||||
build/readwrite.o
|
||||
build/parse.o
|
||||
build/client.o
|
||||
build/serve.o
|
||||
build/acl.o
|
||||
build/ioutil.o
|
||||
build/mbox.o
|
||||
build/mirror.o
|
||||
build/status.o
|
||||
build/sockutil.o
|
||||
build/util.o} do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
file check("acl") =>
|
||||
%w{build/tests/check_acl.o
|
||||
build/parse.o
|
||||
build/acl.o
|
||||
build/util.o} do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
file check( "util" ) =>
|
||||
%w{build/tests/check_util.o
|
||||
build/util.o
|
||||
build/self_pipe.o} do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
file check("serve") =>
|
||||
%w{build/tests/check_serve.o
|
||||
build/self_pipe.o
|
||||
build/nbdtypes.o
|
||||
build/control.o
|
||||
build/readwrite.o
|
||||
build/parse.o
|
||||
build/client.o
|
||||
build/flexthread.o
|
||||
build/serve.o
|
||||
build/flexnbd.o
|
||||
build/mirror.o
|
||||
build/status.o
|
||||
build/acl.o
|
||||
build/mbox.o
|
||||
build/ioutil.o
|
||||
build/sockutil.o
|
||||
build/util.o} do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
file check("status") =>
|
||||
%w{
|
||||
build/tests/check_status.o
|
||||
build/self_pipe.o
|
||||
build/nbdtypes.o
|
||||
build/control.o
|
||||
build/readwrite.o
|
||||
build/parse.o
|
||||
build/client.o
|
||||
build/flexthread.o
|
||||
build/serve.o
|
||||
build/flexnbd.o
|
||||
build/mirror.o
|
||||
build/status.o
|
||||
build/acl.o
|
||||
build/mbox.o
|
||||
build/ioutil.o
|
||||
build/sockutil.o
|
||||
build/util.o
|
||||
} do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
|
||||
file check("readwrite") =>
|
||||
%w{build/tests/check_readwrite.o
|
||||
build/readwrite.o
|
||||
build/client.o
|
||||
build/self_pipe.o
|
||||
build/serve.o
|
||||
build/parse.o
|
||||
build/acl.o
|
||||
build/flexthread.o
|
||||
build/control.o
|
||||
build/flexnbd.o
|
||||
build/mirror.o
|
||||
build/status.o
|
||||
build/nbdtypes.o
|
||||
build/mbox.o
|
||||
build/ioutil.o
|
||||
build/sockutil.o
|
||||
build/util.o} do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
|
||||
file check("flexnbd") =>
|
||||
%w{build/tests/check_flexnbd.o
|
||||
build/flexnbd.o
|
||||
build/ioutil.o
|
||||
build/sockutil.o
|
||||
build/util.o
|
||||
build/control.o
|
||||
build/mbox.o
|
||||
build/flexthread.o
|
||||
build/status.o
|
||||
build/self_pipe.o
|
||||
build/client.o
|
||||
build/acl.o
|
||||
build/parse.o
|
||||
build/nbdtypes.o
|
||||
build/readwrite.o
|
||||
build/mirror.o
|
||||
build/serve.o} do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
|
||||
file check("control") =>
|
||||
%w{build/tests/check_control.o} + OBJECTS - ["build/main.o", 'build/proxy-main.o', 'build/proxy.o'] do |t|
|
||||
gcc_link t.name, t.prerequisites + [LIBCHECK]
|
||||
end
|
||||
|
||||
(TEST_MODULES- %w{status control flexnbd acl client serve readwrite util}).each do |m|
|
||||
tgt = "build/tests/check_#{m}.o"
|
||||
maybe_obj_name = "build/#{m}.o"
|
||||
# Take it out in case we're testing one of the utils
|
||||
deps = ["build/ioutil.o", "build/util.o", "build/sockutil.o"] - [maybe_obj_name]
|
||||
|
||||
# Add it back in if it's something we need to compile
|
||||
deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name )
|
||||
|
||||
file check( m ) => deps + [tgt] do |t|
|
||||
gcc_link(t.name, deps + [tgt, LIBCHECK])
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
OBJECTS.zip( SOURCES ).each do |o,c|
|
||||
file o => [c]+headers(c) do |t| gcc_compile( o, c ) end
|
||||
end
|
||||
|
||||
PROXY_ONLY_OBJECTS.zip( PROXY_ONLY_SOURCES).each do |o, c|
|
||||
file o => [c]+headers(c) do |t| gcc_compile( o, c ) end
|
||||
end
|
||||
|
||||
TEST_OBJECTS.zip( TEST_SOURCES ).each do |o,c|
|
||||
file o => [c] + headers(c) do |t| gcc_compile( o, c ) end
|
||||
end
|
||||
|
||||
desc "Remove all build targets, binaries and temporary files"
|
||||
task :clean do
|
||||
sh "rm -rf *~ build"
|
||||
end
|
||||
|
||||
namespace :pkg do
|
||||
deb do |t|
|
||||
t.code_files = ALL_SOURCES + ["Rakefile", "README.txt", "README.proxy.txt"]
|
||||
t.pkg_name = "flexnbd"
|
||||
t.generate_changelog!
|
||||
end
|
||||
end
|
||||
|
||||
maketask :clean
|
||||
|
6
debian/rules
vendored
6
debian/rules
vendored
@@ -7,12 +7,6 @@
|
||||
%:
|
||||
dh $@
|
||||
|
||||
override_dh_auto_build:
|
||||
rake build
|
||||
|
||||
override_dh_auto_clean:
|
||||
rake clean
|
||||
|
||||
.PHONY: override_dh_strip
|
||||
override_dh_strip:
|
||||
dh_strip --dbg-package=flexnbd-dbg
|
||||
|
@@ -31,7 +31,7 @@ static int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], uni
|
||||
for (i=0; i < list_length; i++) {
|
||||
struct ip_and_mask *entry = &(*list)[i];
|
||||
int testbits;
|
||||
unsigned char *raw_address1, *raw_address2;
|
||||
unsigned char *raw_address1 = NULL, *raw_address2 = NULL;
|
||||
|
||||
debug("checking acl entry %d (%d/%d)", i, test->generic.sa_family, entry->ip.family);
|
||||
|
@@ -15,6 +15,20 @@
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
|
||||
// When this signal is invoked, we call shutdown() on the client fd, which
|
||||
// results in the thread being wound up
|
||||
void client_killswitch_hit(int signal __attribute__ ((unused)), siginfo_t *info, void *ptr __attribute__ ((unused)))
|
||||
{
|
||||
int fd = info->si_value.sival_int;
|
||||
warn( "Killswitch for fd %i activated, calling shutdown on socket", fd );
|
||||
|
||||
FATAL_IF(
|
||||
-1 == shutdown( fd, SHUT_RDWR ),
|
||||
SHOW_ERRNO( "Failed to shutdown() the socket, killing the server" )
|
||||
);
|
||||
}
|
||||
|
||||
struct client *client_create( struct server *serve, int socket )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
@@ -25,6 +39,13 @@ struct client *client_create( struct server *serve, int socket )
|
||||
.sigev_signo = CLIENT_KILLSWITCH_SIGNAL
|
||||
};
|
||||
|
||||
/*
|
||||
* Our killswitch closes this socket, forcing read() and write() calls
|
||||
* blocked on it to return with an error. The thread then close()s the
|
||||
* socket itself, avoiding races.
|
||||
*/
|
||||
evp.sigev_value.sival_int = socket;
|
||||
|
||||
c = xmalloc( sizeof( struct client ) );
|
||||
c->stopped = 0;
|
||||
c->socket = socket;
|
||||
@@ -199,36 +220,6 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
|
||||
NULLCHECK( out_request );
|
||||
|
||||
struct nbd_request_raw request_raw;
|
||||
fd_set fds;
|
||||
struct timeval * ptv = NULL;
|
||||
int fd_count;
|
||||
|
||||
/* We want a timeout if this is an inbound migration, but not otherwise.
|
||||
* This is compile-time selectable, as it will break mirror max_bps
|
||||
*/
|
||||
#ifdef HAS_LISTEN_TIMEOUT
|
||||
struct timeval tv = {CLIENT_MAX_WAIT_SECS, 0};
|
||||
|
||||
if ( !server_is_in_control( client->serve ) ) {
|
||||
ptv = &tv;
|
||||
}
|
||||
#endif
|
||||
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(client->socket, &fds);
|
||||
self_pipe_fd_set( client->stop_signal, &fds );
|
||||
fd_count = sock_try_select(FD_SETSIZE, &fds, NULL, NULL, ptv);
|
||||
if ( fd_count == 0 ) {
|
||||
/* This "can't ever happen" */
|
||||
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
|
||||
else { error("Timed out waiting for I/O"); }
|
||||
}
|
||||
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
|
||||
|
||||
if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){
|
||||
debug("Client received stop signal.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (fd_read_request(client->socket, &request_raw) == -1) {
|
||||
*disconnected = 1;
|
||||
@@ -255,7 +246,6 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
|
||||
}
|
||||
|
||||
nbd_r2h_request( &request_raw, out_request );
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -269,7 +259,7 @@ int fd_write_reply( int fd, char *handle, int error )
|
||||
memcpy( reply.handle, handle, 8 );
|
||||
|
||||
nbd_h2r_reply( &reply, &reply_raw );
|
||||
debug( "Replying with %s, %d", handle, error );
|
||||
debug( "Replying with handle=0x%08X, error=%"PRIu32, handle, error );
|
||||
|
||||
if( -1 == writeloop( fd, &reply_raw, sizeof( reply_raw ) ) ) {
|
||||
switch( errno ) {
|
||||
@@ -379,15 +369,15 @@ int client_request_needs_reply( struct client * client,
|
||||
* forever.
|
||||
*/
|
||||
if (request.magic != REQUEST_MAGIC) {
|
||||
warn("Bad magic 0x%08x from client", request.magic);
|
||||
warn("Bad magic 0x%08X from client", request.magic);
|
||||
client_write_reply( client, &request, EBADMSG );
|
||||
client->disconnect = 1; // no need to flush
|
||||
return 0;
|
||||
}
|
||||
|
||||
debug(
|
||||
"request type=%"PRIu32", from=%"PRIu64", len=%"PRIu32,
|
||||
request.type, request.from, request.len
|
||||
"request type=%"PRIu32", from=%"PRIu64", len=%"PRIu32", handle=0x%08X",
|
||||
request.type, request.from, request.len, request.handle
|
||||
);
|
||||
|
||||
/* check it's not out of range */
|
||||
@@ -416,7 +406,7 @@ int client_request_needs_reply( struct client * client,
|
||||
return 0;
|
||||
|
||||
default:
|
||||
fatal("Unknown request %08x", request.type);
|
||||
fatal("Unknown request 0x%08X", request.type);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@@ -426,6 +416,7 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
|
||||
{
|
||||
off64_t offset;
|
||||
|
||||
// TODO: cork
|
||||
debug("request read %ld+%d", request.from, request.len);
|
||||
client_write_reply( client, &request, 0);
|
||||
|
||||
@@ -443,12 +434,14 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
|
||||
"sendfile failed from=%ld, len=%d",
|
||||
offset,
|
||||
request.len);
|
||||
|
||||
// TODO: uncork
|
||||
}
|
||||
|
||||
|
||||
void client_reply_to_write( struct client* client, struct nbd_request request )
|
||||
{
|
||||
debug("request write %ld+%d", request.from, request.len);
|
||||
debug("request write from=%"PRIu64", len=%"PRIu32", handle=0x%08X", request.from, request.len, request.handle);
|
||||
if (client->serve->allocation_map_built) {
|
||||
write_not_zeroes( client, request.from, request.len );
|
||||
}
|
||||
@@ -553,35 +546,79 @@ int client_serve_request(struct client* client)
|
||||
struct nbd_request request = {0};
|
||||
int stop = 1;
|
||||
int disconnected = 0;
|
||||
fd_set rfds, efds;
|
||||
int fd_count;
|
||||
|
||||
/* wait until there are some bytes on the fd before committing to reads
|
||||
* FIXME: this whole scheme is broken because we're using blocking reads.
|
||||
* read() can block directly after a select anyway, and it's possible that,
|
||||
* without the killswitch, we'd hang forever. With the killswitch, we just
|
||||
* hang for "a while". The Right Thing to do is to rewrite client.c to be
|
||||
* non-blocking.
|
||||
*/
|
||||
|
||||
FD_ZERO( &rfds );
|
||||
FD_SET( client->socket, &rfds );
|
||||
self_pipe_fd_set( client->stop_signal, &rfds );
|
||||
|
||||
FD_ZERO( &efds );
|
||||
FD_SET( client->socket, &efds );
|
||||
|
||||
fd_count = sock_try_select( FD_SETSIZE, &rfds, NULL, &efds, NULL );
|
||||
|
||||
if ( fd_count == 0 ) {
|
||||
/* This "can't ever happen" */
|
||||
fatal( "No FDs selected, and no timeout!" );
|
||||
}
|
||||
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
|
||||
|
||||
if ( self_pipe_fd_isset( client->stop_signal, &rfds ) ){
|
||||
debug("Client received stop signal.");
|
||||
return 1; // Don't try to serve more requests
|
||||
}
|
||||
|
||||
if ( FD_ISSET( client->socket, &efds ) ) {
|
||||
debug( "Client connection closed" );
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
/* We arm / disarm around the whole request cycle. The reason for this is
|
||||
* that the remote peer could uncleanly die at any point; if we're stuck on
|
||||
* a blocking read(), then that will hang for (almost) forever. This is bad
|
||||
* in general, makes the server respond only to kill -9, and breaks
|
||||
* outward mirroring in a most unpleasant way.
|
||||
*
|
||||
* Don't forget to disarm before exiting, no matter what!
|
||||
*
|
||||
* The replication is simple: open a connection to the flexnbd server, write
|
||||
* a single byte, and then wait.
|
||||
*
|
||||
*/
|
||||
client_arm_killswitch( client );
|
||||
|
||||
if ( !client_read_request( client, &request, &disconnected ) ) {
|
||||
client_disarm_killswitch( client );
|
||||
return stop;
|
||||
}
|
||||
if ( disconnected ) {
|
||||
client_disarm_killswitch( client );
|
||||
return stop;
|
||||
}
|
||||
|
||||
if ( !client_read_request( client, &request, &disconnected ) ) { return stop; }
|
||||
if ( disconnected ) { return stop; }
|
||||
if ( !client_request_needs_reply( client, request ) ) {
|
||||
client_disarm_killswitch( client );
|
||||
return client->disconnect;
|
||||
}
|
||||
|
||||
{
|
||||
if ( !server_is_closed( client->serve ) ) {
|
||||
/* We arm / disarm around client_reply() to catch cases where the
|
||||
* remote peer sends part of a write request data before dying,
|
||||
* and cases where we send part of read reply data before they die.
|
||||
*
|
||||
* That last is theoretical right now, but could break us in the
|
||||
* same way as a half-write (which causes us to sit in read forever)
|
||||
*
|
||||
* We only arm/disarm inside the server io lock because it's common
|
||||
* during migrations for us to be hanging on that mutex for quite
|
||||
* a while while the final pass happens - it's held for the entire
|
||||
* time.
|
||||
*/
|
||||
client_arm_killswitch( client );
|
||||
client_reply( client, request );
|
||||
client_disarm_killswitch( client );
|
||||
stop = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
client_disarm_killswitch( client );
|
||||
return stop;
|
||||
}
|
||||
|
||||
@@ -596,6 +633,9 @@ void client_cleanup(struct client* client,
|
||||
{
|
||||
info("client cleanup for client %p", client);
|
||||
|
||||
/* If the thread hits an error, we need to ensure this is off */
|
||||
client_disarm_killswitch( client );
|
||||
|
||||
if (client->socket) {
|
||||
FATAL_IF_NEGATIVE( close(client->socket),
|
||||
"Error closing client socket %d",
|
@@ -4,18 +4,6 @@
|
||||
#include <signal.h>
|
||||
#include <time.h>
|
||||
|
||||
#ifdef HAS_LISTEN_TIMEOUT
|
||||
|
||||
/** CLIENT_MAX_WAIT_SECS
|
||||
* This is the length of time an inbound migration will wait for a fresh
|
||||
* write before assuming the source has Gone Away. Note: it is *not*
|
||||
* the time from one write to the next, it is the gap between the end of
|
||||
* one write and the start of the next.
|
||||
*/
|
||||
#define CLIENT_MAX_WAIT_SECS 5
|
||||
|
||||
#endif
|
||||
|
||||
/** CLIENT_HANDLER_TIMEOUT
|
||||
* This is the length of time (in seconds) any request can be outstanding for.
|
||||
* If we spend longer than this in a request, the whole server is killed.
|
||||
@@ -24,8 +12,7 @@
|
||||
|
||||
/** CLIENT_KILLSWITCH_SIGNAL
|
||||
* The signal number we use to kill the server when *any* killswitch timer
|
||||
* fires. We don't actually need to install a signal handler for it, the default
|
||||
* behaviour is perfectly fine.
|
||||
* fires. The handler gets the fd of the client socket to work with.
|
||||
*/
|
||||
#define CLIENT_KILLSWITCH_SIGNAL ( SIGRTMIN + 1 )
|
||||
|
||||
@@ -58,6 +45,7 @@ struct client {
|
||||
|
||||
};
|
||||
|
||||
void client_killswitch_hit(int signal, siginfo_t *info, void *ptr);
|
||||
|
||||
void* client_serve(void* client_uncast);
|
||||
struct client * client_create( struct server * serve, int socket );
|
@@ -101,12 +101,24 @@ struct flexnbd * flexnbd_create_serving(
|
||||
max_nbd_clients,
|
||||
use_killswitch,
|
||||
1);
|
||||
flexnbd_create_shared( flexnbd,
|
||||
s_ctrl_sock );
|
||||
flexnbd_create_shared( flexnbd, s_ctrl_sock );
|
||||
|
||||
// Beats installing one handler per client instance
|
||||
if ( use_killswitch ) {
|
||||
struct sigaction act = {
|
||||
.sa_sigaction = client_killswitch_hit,
|
||||
.sa_flags = SA_RESTART | SA_SIGINFO
|
||||
};
|
||||
|
||||
FATAL_UNLESS(
|
||||
0 == sigaction( CLIENT_KILLSWITCH_SIGNAL, &act, NULL ),
|
||||
"Installing client killswitch signal failed"
|
||||
);
|
||||
}
|
||||
|
||||
return flexnbd;
|
||||
}
|
||||
|
||||
|
||||
struct flexnbd * flexnbd_create_listening(
|
||||
char* s_ip_address,
|
||||
char* s_port,
|
||||
@@ -127,6 +139,10 @@ struct flexnbd * flexnbd_create_listening(
|
||||
s_acl_entries,
|
||||
1, 0, 0);
|
||||
flexnbd_create_shared( flexnbd, s_ctrl_sock );
|
||||
|
||||
// listen can't use killswitch, as mirror may pause on sending things
|
||||
// for a very long time.
|
||||
|
||||
return flexnbd;
|
||||
}
|
||||
|
@@ -5,6 +5,7 @@
|
||||
#include "mirror.h"
|
||||
#include "serve.h"
|
||||
#include "proxy.h"
|
||||
#include "client.h"
|
||||
#include "self_pipe.h"
|
||||
#include "mbox.h"
|
||||
#include "control.h"
|
@@ -71,7 +71,7 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
|
||||
}
|
||||
}
|
||||
|
||||
debug("Successfully built allocation map");
|
||||
info("Successfully built allocation map");
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ int readloop(int filedes, void *buffer, size_t size)
|
||||
ssize_t result = read(filedes, buffer+readden, size-readden);
|
||||
|
||||
if ( result == 0 /* EOF */ ) {
|
||||
warn( "end-of-file detected while reading" );
|
||||
warn( "end-of-file detected while reading after %i bytes", readden );
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -347,4 +347,3 @@ ssize_t iobuf_write( int fd, struct iobuf *iobuf )
|
||||
|
||||
return count;
|
||||
}
|
||||
|
@@ -70,6 +70,7 @@ struct mirror_ctrl {
|
||||
|
||||
/* libev stuff */
|
||||
struct ev_loop *ev_loop;
|
||||
ev_timer begin_watcher;
|
||||
ev_io read_watcher;
|
||||
ev_io write_watcher;
|
||||
ev_timer timeout_watcher;
|
||||
@@ -213,18 +214,6 @@ void mirror_destroy( struct mirror *mirror )
|
||||
/** The mirror code will split NBD writes, making them this long as a maximum */
|
||||
static const int mirror_longest_write = 8<<20;
|
||||
|
||||
/** If, during a mirror pass, we have sent this number of bytes or fewer, we
|
||||
* go to freeze the I/O and finish it off. This is just a guess.
|
||||
*/
|
||||
static const unsigned int mirror_last_pass_after_bytes_written = 100<<20;
|
||||
|
||||
/** The largest number of full passes we'll do - the last one will always
|
||||
* cause the I/O to freeze, however many bytes are left to copy.
|
||||
*/
|
||||
static const int mirror_maximum_passes = 7;
|
||||
#define mirror_last_pass (mirror_maximum_passes - 1)
|
||||
|
||||
|
||||
/* This must not be called if there's any chance of further I/O. Methods to
|
||||
* ensure this include:
|
||||
* - Ensure image size is 0
|
||||
@@ -347,6 +336,19 @@ int mirror_should_quit( struct mirror * mirror )
|
||||
}
|
||||
}
|
||||
|
||||
/* Bandwidth limiting - we hang around if bps is too high, unless we need to
|
||||
* empty out the bitset stream a bit */
|
||||
int mirror_should_wait( struct mirror_ctrl *ctrl )
|
||||
{
|
||||
int bps_over = server_mirror_bps( ctrl->serve ) >
|
||||
ctrl->serve->mirror->max_bytes_per_second;
|
||||
|
||||
int stream_full = bitset_stream_size( ctrl->serve->allocation_map ) >
|
||||
( BITSET_STREAM_SIZE / 2 );
|
||||
|
||||
return bps_over && !stream_full;
|
||||
}
|
||||
|
||||
/*
|
||||
* If there's an event in the bitset stream of the serve allocation map, we
|
||||
* use it to construct the next transfer request, covering precisely the area
|
||||
@@ -369,7 +371,7 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
|
||||
* full, and stop when it's a quarter full. This stops a busy client from
|
||||
* stalling a migration forever. FIXME: made-up numbers.
|
||||
*/
|
||||
if ( bitset_stream_size( serve->allocation_map ) > BITSET_STREAM_SIZE / 2 ) {
|
||||
if ( mirror->offset < serve->size && bitset_stream_size( serve->allocation_map ) > BITSET_STREAM_SIZE / 2 ) {
|
||||
ctrl->clear_events = 1;
|
||||
}
|
||||
|
||||
@@ -425,24 +427,6 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
|
||||
return 1;
|
||||
}
|
||||
|
||||
uint64_t mirror_current_bps( struct mirror * mirror )
|
||||
{
|
||||
uint64_t duration_ms = monotonic_time_ms() - mirror->migration_started;
|
||||
return mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
|
||||
}
|
||||
|
||||
int mirror_exceeds_max_bps( struct mirror * mirror )
|
||||
{
|
||||
uint64_t mig_speed = mirror_current_bps( mirror );
|
||||
debug( "current_bps: %"PRIu64"; max_bps: %"PRIu64, mig_speed, mirror->max_bytes_per_second );
|
||||
|
||||
if ( mig_speed > mirror->max_bytes_per_second ) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ONLY CALL THIS AFTER CLOSING CLIENTS
|
||||
void mirror_complete( struct server *serve )
|
||||
{
|
||||
@@ -486,7 +470,7 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||
to_write = xfer->len - ( ctrl->xfer.written - hdr_size );
|
||||
}
|
||||
|
||||
// Actually read some bytes
|
||||
// Actually write some bytes
|
||||
if ( ( count = write( ctrl->mirror->client, data_loc, to_write ) ) < 0 ) {
|
||||
if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
|
||||
warn( SHOW_ERRNO( "Couldn't write to listener" ) );
|
||||
@@ -496,10 +480,12 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||
}
|
||||
debug( "Wrote %"PRIu64" bytes", count );
|
||||
debug( "to_write was %"PRIu64", xfer->written was %"PRIu64, to_write, xfer->written );
|
||||
ctrl->xfer.written += count;
|
||||
|
||||
// We wrote some bytes, so reset the timer
|
||||
// We wrote some bytes, so reset the timer and keep track for the next pass
|
||||
if ( count > 0 ) {
|
||||
ctrl->xfer.written += count;
|
||||
ev_timer_again( ctrl->ev_loop, &ctrl->timeout_watcher );
|
||||
}
|
||||
|
||||
// All bytes written, so now we need to read the NBD reply back.
|
||||
if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) {
|
||||
@@ -584,10 +570,17 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||
/* transfer was completed, so now we need to either set up the next
|
||||
* transfer of this pass, set up the first transfer of the next pass, or
|
||||
* complete the migration */
|
||||
m->all_dirty += xfer->len;
|
||||
xfer->read = 0;
|
||||
xfer->written = 0;
|
||||
|
||||
/* We don't account for bytes written in this mode, to stop high-throughput
|
||||
* discs getting stuck in "drain the event queue!" mode forever
|
||||
*/
|
||||
if ( !ctrl->clear_events ) {
|
||||
m->all_dirty += xfer->len;
|
||||
}
|
||||
|
||||
|
||||
/* This next bit could take a little while, which is fine */
|
||||
ev_timer_stop( ctrl->ev_loop, &ctrl->timeout_watcher );
|
||||
|
||||
@@ -601,17 +594,15 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||
int next_xfer = mirror_setup_next_xfer( ctrl );
|
||||
debug( "next_xfer: %d", next_xfer );
|
||||
|
||||
/* Regardless of time estimates, if there's no waiting transfer, we can
|
||||
* */
|
||||
if ( !ctrl->clients_closed && ( !next_xfer || server_mirror_eta( ctrl->serve ) < 60 ) ) {
|
||||
/* Regardless of time estimates, if there's no waiting transfer, we can start closing clients down. */
|
||||
if ( !ctrl->clients_closed && ( !next_xfer || server_mirror_eta( ctrl->serve ) < MS_CONVERGE_TIME_SECS ) ) {
|
||||
info( "Closing clients to allow mirroring to converge" );
|
||||
server_forbid_new_clients( ctrl->serve );
|
||||
server_close_clients( ctrl->serve );
|
||||
server_join_clients( ctrl->serve );
|
||||
ctrl->clients_closed = 1;
|
||||
|
||||
/* One more try - a new event may have been pushed since our last check
|
||||
*/
|
||||
/* One more try - a new event may have been pushed since our last check */
|
||||
if ( !next_xfer ) {
|
||||
next_xfer = mirror_setup_next_xfer( ctrl );
|
||||
}
|
||||
@@ -630,7 +621,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||
|
||||
/* FIXME: Should we ignore the bwlimit after server_close_clients has been called? */
|
||||
|
||||
if ( mirror_exceeds_max_bps( m ) ) {
|
||||
if ( mirror_should_wait( ctrl ) ) {
|
||||
/* We're over the bandwidth limit, so don't move onto the next transfer
|
||||
* yet. Our limit_watcher will move us on once we're OK. timeout_watcher
|
||||
* was disabled further up, so don't need to stop it here too */
|
||||
@@ -674,6 +665,7 @@ void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
||||
{
|
||||
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
||||
@@ -684,7 +676,7 @@ void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
||||
return;
|
||||
}
|
||||
|
||||
if ( mirror_exceeds_max_bps( ctrl->mirror ) ) {
|
||||
if ( mirror_should_wait( ctrl ) ) {
|
||||
debug( "max_bps exceeded, waiting", ctrl->mirror->max_bytes_per_second );
|
||||
ev_timer_again( loop, w );
|
||||
} else {
|
||||
@@ -698,6 +690,37 @@ void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
||||
return;
|
||||
}
|
||||
|
||||
/* We use this to periodically check whether the allocation map has built, and
|
||||
* if it has, start migrating. If it's not finished, then enabling the bitset
|
||||
* stream does not go well for us.
|
||||
*/
|
||||
void mirror_begin_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
||||
{
|
||||
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
||||
NULLCHECK( ctrl );
|
||||
|
||||
if ( !(revents & EV_TIMER ) ) {
|
||||
warn( "Mirror limit callback executed but no timer event signalled" );
|
||||
return;
|
||||
}
|
||||
|
||||
if ( ctrl->serve->allocation_map_built || ctrl->serve->allocation_map_not_built ) {
|
||||
info( "allocation map builder is finished, beginning migration" );
|
||||
ev_timer_stop( loop, w );
|
||||
/* Start by writing xfer 0 to the listener */
|
||||
ev_io_start( loop, &ctrl->write_watcher );
|
||||
/* We want to timeout during the first write as well as subsequent ones */
|
||||
ev_timer_again( loop, &ctrl->timeout_watcher );
|
||||
/* We're now interested in events */
|
||||
bitset_enable_stream( ctrl->serve->allocation_map );
|
||||
} else {
|
||||
/* not done yet, so wait another second */
|
||||
ev_timer_again( loop, w );
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void mirror_run( struct server *serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
@@ -728,6 +751,10 @@ void mirror_run( struct server *serve )
|
||||
ctrl.ev_loop = EV_DEFAULT;
|
||||
|
||||
/* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */
|
||||
ev_init( &ctrl.begin_watcher, mirror_begin_cb );
|
||||
ctrl.begin_watcher.repeat = 1.0; // We check bps every second. seems sane.
|
||||
ctrl.begin_watcher.data = (void*) &ctrl;
|
||||
|
||||
ev_io_init( &ctrl.read_watcher, mirror_read_cb, m->client, EV_READ );
|
||||
ctrl.read_watcher.data = (void*) &ctrl;
|
||||
|
||||
@@ -735,7 +762,22 @@ void mirror_run( struct server *serve )
|
||||
ctrl.write_watcher.data = (void*) &ctrl;
|
||||
|
||||
ev_init( &ctrl.timeout_watcher, mirror_timeout_cb );
|
||||
ctrl.timeout_watcher.repeat = MS_REQUEST_LIMIT_SECS_F ;
|
||||
|
||||
char * env_request_limit = getenv( "FLEXNBD_MS_REQUEST_LIMIT_SECS" );
|
||||
double timeout_limit = MS_REQUEST_LIMIT_SECS_F;
|
||||
|
||||
if ( NULL != env_request_limit ) {
|
||||
char *endptr = NULL;
|
||||
errno = 0;
|
||||
double limit = strtod( env_request_limit, &endptr );
|
||||
warn( SHOW_ERRNO( "Got %f from strtod", limit ) );
|
||||
|
||||
if ( errno == 0 ) {
|
||||
timeout_limit = limit;
|
||||
}
|
||||
}
|
||||
|
||||
ctrl.timeout_watcher.repeat = timeout_limit;
|
||||
|
||||
ev_init( &ctrl.limit_watcher, mirror_limit_cb );
|
||||
ctrl.limit_watcher.repeat = 1.0; // We check bps every second. seems sane.
|
||||
@@ -751,19 +793,23 @@ void mirror_run( struct server *serve )
|
||||
"Couldn't find first transfer for mirror!"
|
||||
);
|
||||
|
||||
|
||||
if ( serve->allocation_map_built ) {
|
||||
/* Start by writing xfer 0 to the listener */
|
||||
ev_io_start( ctrl.ev_loop, &ctrl.write_watcher );
|
||||
|
||||
/* We want to timeout during the first write as well as subsequent ones */
|
||||
ev_timer_again( ctrl.ev_loop, &ctrl.timeout_watcher );
|
||||
bitset_enable_stream( serve->allocation_map );
|
||||
} else {
|
||||
debug( "Waiting for allocation map to be built" );
|
||||
ev_timer_again( ctrl.ev_loop, &ctrl.begin_watcher );
|
||||
}
|
||||
|
||||
/* Everything up to here is blocking. We switch to non-blocking so we
|
||||
* can handle rate-limiting and weird error conditions better. TODO: We
|
||||
* should expand the event loop upwards so we can do the same there too */
|
||||
sock_set_nonblock( m->client, 1 );
|
||||
|
||||
bitset_enable_stream( serve->allocation_map );
|
||||
|
||||
info( "Entering event loop" );
|
||||
ev_run( ctrl.ev_loop, 0 );
|
||||
info( "Exited event loop" );
|
||||
@@ -784,12 +830,11 @@ void mirror_run( struct server *serve )
|
||||
* call retries the migration from scratch. */
|
||||
|
||||
if ( m->commit_state != MS_DONE ) {
|
||||
error( "Event loop exited, but mirroring is not complete" );
|
||||
|
||||
/* mirror_reset will be called before a retry, so keeping hold of events
|
||||
* between now and our next mirroring attempt is not useful
|
||||
*/
|
||||
bitset_disable_stream( serve->allocation_map );
|
||||
error( "Event loop exited, but mirroring is not complete" );
|
||||
}
|
||||
|
||||
return;
|
||||
@@ -1016,4 +1061,3 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
@@ -18,6 +18,18 @@ enum mirror_state;
|
||||
*/
|
||||
#define MS_CONNECT_TIME_SECS 60
|
||||
|
||||
/* MS_MAX_DOWNTIME_SECS
|
||||
* The length of time a migration must be estimated to have remaining for us to
|
||||
* disconnect clients for convergence
|
||||
*
|
||||
* TODO: Make this configurable so refusing-to-converge clients can be manually
|
||||
* fixed.
|
||||
* TODO: Make this adaptive - 5 seconds is fine, as long as we can guarantee
|
||||
* that all migrations will be able to converge in time. We'd add a new
|
||||
* state between open and closed, where gradually-increasing latency is
|
||||
* added to client requests to allow the mirror to be faster.
|
||||
*/
|
||||
#define MS_CONVERGE_TIME_SECS 5
|
||||
|
||||
/* MS_HELLO_TIME_SECS
|
||||
* The length of time the sender will wait for the NBD hello message
|
||||
@@ -38,9 +50,12 @@ enum mirror_state;
|
||||
* request, this is the time between the end of the NBD request and the
|
||||
* start of the NBD reply. For a write request, this is the time
|
||||
* between the end of the written data and the start of the NBD reply.
|
||||
* Can be overridden by the environment variable:
|
||||
* FLEXNBD_MS_REQUEST_LIMIT_SECS
|
||||
*/
|
||||
#define MS_REQUEST_LIMIT_SECS 4
|
||||
#define MS_REQUEST_LIMIT_SECS_F 4.0
|
||||
|
||||
#define MS_REQUEST_LIMIT_SECS 60
|
||||
#define MS_REQUEST_LIMIT_SECS_F 60.0
|
||||
|
||||
enum mirror_finish_action {
|
||||
ACTION_EXIT,
|
||||
@@ -122,7 +137,5 @@ struct mirror_super * mirror_super_create(
|
||||
);
|
||||
void * mirror_super_runner( void * serve_uncast );
|
||||
|
||||
uint64_t mirror_current_bps( struct mirror * mirror );
|
||||
|
||||
#endif
|
||||
|
@@ -787,7 +787,7 @@ int mode_break( int argc, char *argv[] )
|
||||
|
||||
if ( NULL == sock ){
|
||||
fprintf( stderr, "--sock is required.\n" );
|
||||
exit_err( acl_help_text );
|
||||
exit_err( break_help_text );
|
||||
}
|
||||
|
||||
do_remote_command( "break", sock, argc - optind, argv + optind );
|
||||
@@ -808,7 +808,7 @@ int mode_status( int argc, char *argv[] )
|
||||
|
||||
if ( NULL == sock ){
|
||||
fprintf( stderr, "--sock is required.\n" );
|
||||
exit_err( acl_help_text );
|
||||
exit_err( status_help_text );
|
||||
}
|
||||
|
||||
do_remote_command( "status", sock, argc - optind, argv + optind );
|
@@ -686,6 +686,7 @@ void* build_allocation_map_thread(void* serve_uncast)
|
||||
* the future, we'll need to wait for the allocation map to finish or
|
||||
* fail before we can complete the migration.
|
||||
*/
|
||||
serve->allocation_map_not_built = 1;
|
||||
warn( "Didn't build allocation map for %s", serve->filename );
|
||||
}
|
||||
|
||||
@@ -878,7 +879,19 @@ uint64_t server_mirror_eta( struct server * serve )
|
||||
{
|
||||
if ( server_is_mirroring( serve ) ) {
|
||||
uint64_t bytes_to_xfer = server_mirror_bytes_remaining( serve );
|
||||
return bytes_to_xfer / ( mirror_current_bps( serve->mirror ) + 1 );
|
||||
return bytes_to_xfer / ( server_mirror_bps( serve ) + 1 );
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t server_mirror_bps( struct server * serve )
|
||||
{
|
||||
if ( server_is_mirroring( serve ) ) {
|
||||
uint64_t duration_ms =
|
||||
monotonic_time_ms() - serve->mirror->migration_started;
|
||||
|
||||
return serve->mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -941,4 +954,3 @@ int do_serve( struct server* params, struct self_pipe * open_signal )
|
||||
|
||||
return success;
|
||||
}
|
||||
|
@@ -76,8 +76,10 @@ struct server {
|
||||
struct bitset * allocation_map;
|
||||
/* when starting up, this thread builds the allocation_map */
|
||||
pthread_t allocation_map_builder_thread;
|
||||
|
||||
/* when the thread has finished, it sets this to 1 */
|
||||
volatile sig_atomic_t allocation_map_built;
|
||||
volatile sig_atomic_t allocation_map_not_built;
|
||||
|
||||
int max_nbd_clients;
|
||||
struct client_tbl_entry *nbd_client;
|
||||
@@ -126,6 +128,7 @@ int server_is_mirroring( struct server * serve );
|
||||
|
||||
uint64_t server_mirror_bytes_remaining( struct server * serve );
|
||||
uint64_t server_mirror_eta( struct server * serve );
|
||||
uint64_t server_mirror_bps( struct server * serve );
|
||||
|
||||
void server_abandon_mirror( struct server * serve );
|
||||
void server_prevent_mirror_start( struct server *serve );
|
@@ -27,7 +27,7 @@ struct status * status_create( struct server * serve )
|
||||
status->migration_duration = 0;
|
||||
}
|
||||
status->migration_duration /= 1000;
|
||||
status->migration_speed = serve->mirror->all_dirty / ( status->migration_duration + 1 );
|
||||
status->migration_speed = server_mirror_bps( serve );
|
||||
status->migration_speed_limit = serve->mirror->max_bytes_per_second;
|
||||
|
||||
status->migration_seconds_left = server_mirror_eta( serve );
|
@@ -797,6 +797,13 @@ void proxy_session( struct proxier* proxy )
|
||||
proxy_session_state_names[state]
|
||||
);
|
||||
state = CONNECT_TO_UPSTREAM;
|
||||
|
||||
/* Since we've timed out, we won't have gone through the timeout logic
|
||||
* in the various state handlers that resets these appropriately... */
|
||||
proxy->init.size = 0;
|
||||
proxy->init.needle = 0;
|
||||
proxy->rsp.size = 0;
|
||||
proxy->rsp.needle = 0;
|
||||
}
|
||||
}
|
||||
}
|
@@ -116,6 +116,7 @@ enum bitset_stream_events {
|
||||
BITSET_STREAM_ON = 2,
|
||||
BITSET_STREAM_OFF = 3
|
||||
};
|
||||
#define BITSET_STREAM_EVENTS_ENUM_SIZE 4
|
||||
|
||||
struct bitset_stream_entry {
|
||||
enum bitset_stream_events event;
|
||||
@@ -138,6 +139,7 @@ struct bitset_stream {
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond_not_full;
|
||||
pthread_cond_t cond_not_empty;
|
||||
uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE];
|
||||
};
|
||||
|
||||
|
||||
@@ -217,13 +219,14 @@ static inline void bitset_stream_enqueue(
|
||||
stream->entries[stream->in].event = event;
|
||||
stream->entries[stream->in].from = from;
|
||||
stream->entries[stream->in].len = len;
|
||||
stream->queued_bytes[event] += len;
|
||||
|
||||
stream->size++;
|
||||
stream->in++;
|
||||
stream->in %= BITSET_STREAM_SIZE;
|
||||
|
||||
pthread_mutex_unlock( & stream->mutex );
|
||||
pthread_cond_broadcast( &stream->cond_not_empty );
|
||||
pthread_cond_signal( &stream->cond_not_empty );
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -234,6 +237,7 @@ static inline void bitset_stream_dequeue(
|
||||
)
|
||||
{
|
||||
struct bitset_stream * stream = set->stream;
|
||||
struct bitset_stream_entry * dequeued;
|
||||
|
||||
pthread_mutex_lock( &stream->mutex );
|
||||
|
||||
@@ -241,18 +245,21 @@ static inline void bitset_stream_dequeue(
|
||||
pthread_cond_wait( &stream->cond_not_empty, &stream->mutex );
|
||||
}
|
||||
|
||||
dequeued = &stream->entries[stream->out];
|
||||
|
||||
if ( out != NULL ) {
|
||||
out->event = stream->entries[stream->out].event;
|
||||
out->from = stream->entries[stream->out].from;
|
||||
out->len = stream->entries[stream->out].len;
|
||||
out->event = dequeued->event;
|
||||
out->from = dequeued->from;
|
||||
out->len = dequeued->len;
|
||||
}
|
||||
|
||||
stream->queued_bytes[dequeued->event] -= dequeued->len;
|
||||
stream->size--;
|
||||
stream->out++;
|
||||
stream->out %= BITSET_STREAM_SIZE;
|
||||
|
||||
pthread_mutex_unlock( &stream->mutex );
|
||||
pthread_cond_broadcast( &stream->cond_not_full );
|
||||
pthread_cond_signal( &stream->cond_not_full );
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -273,17 +280,10 @@ static inline uint64_t bitset_stream_queued_bytes(
|
||||
enum bitset_stream_events event
|
||||
)
|
||||
{
|
||||
uint64_t total = 0;
|
||||
int i;
|
||||
uint64_t total;
|
||||
|
||||
pthread_mutex_lock( &set->stream->mutex );
|
||||
|
||||
for ( i = set->stream->out; i < set->stream->in ; i++ ) {
|
||||
if ( set->stream->entries[i].event == event ) {
|
||||
total += set->stream->entries[i].len;
|
||||
}
|
||||
}
|
||||
|
||||
total = set->stream->queued_bytes[event];
|
||||
pthread_mutex_unlock( &set->stream->mutex );
|
||||
|
||||
return total;
|
@@ -20,7 +20,13 @@ t = Thread.start do
|
||||
client2.close
|
||||
end
|
||||
|
||||
sleep( FlexNBD::MS_REQUEST_LIMIT_SECS + 2 )
|
||||
sleep_time = if ENV.has_key?('FLEXNBD_MS_REQUEST_LIMIT_SECS')
|
||||
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'].to_f
|
||||
else
|
||||
FlexNBD::MS_REQUEST_LIMIT_SECS
|
||||
end
|
||||
|
||||
sleep( sleep_time + 2.0 )
|
||||
client1.close
|
||||
|
||||
t.join
|
||||
|
@@ -458,12 +458,18 @@ module FlexNBD
|
||||
|
||||
def maybe_timeout(cmd, timeout=nil )
|
||||
stdout, stderr = "",""
|
||||
stat = nil
|
||||
run = Proc.new do
|
||||
Open3.popen3( cmd ) do |io_in, io_out, io_err|
|
||||
# Ruby 1.9 changed the popen3 api. instead of 3 args, the block
|
||||
# gets 4. Not only that, but it no longer sets $?, so we have to
|
||||
# go elsewhere for the process' exit status.
|
||||
Open3.popen3( cmd ) do |io_in, io_out, io_err, maybe_thr|
|
||||
io_in.close
|
||||
stdout.replace io_out.read
|
||||
stderr.replace io_err.read
|
||||
stat = maybe_thr.value if maybe_thr
|
||||
end
|
||||
stat ||= $?
|
||||
end
|
||||
|
||||
if timeout
|
||||
@@ -472,13 +478,13 @@ module FlexNBD
|
||||
run.call
|
||||
end
|
||||
|
||||
[stdout, stderr]
|
||||
[stdout, stderr, stat]
|
||||
end
|
||||
|
||||
|
||||
def mirror(dest_ip, dest_port, bandwidth=nil, action=nil)
|
||||
stdout, stderr = mirror_unchecked( dest_ip, dest_port, bandwidth, action )
|
||||
raise IOError.new( "Migrate command failed\n" + stderr) unless $?.success?
|
||||
stdout, stderr, status = mirror_unchecked( dest_ip, dest_port, bandwidth, action )
|
||||
raise IOError.new( "Migrate command failed\n" + stderr) unless status.success?
|
||||
|
||||
stdout
|
||||
end
|
||||
|
@@ -2,6 +2,14 @@
|
||||
|
||||
module FlexNBD
|
||||
|
||||
def self.binary( str )
|
||||
if str.respond_to? :force_encoding
|
||||
str.force_encoding "ASCII-8BIT"
|
||||
else
|
||||
str
|
||||
end
|
||||
end
|
||||
|
||||
# eeevil is his one and only name...
|
||||
def self.read_constants
|
||||
parents = []
|
||||
@@ -17,7 +25,7 @@ module FlexNBD
|
||||
|
||||
fail "No source root!" unless source_root
|
||||
|
||||
headers = Dir[File.join( source_root, "src", "*.h" ) ]
|
||||
headers = Dir[File.join( source_root, "src", "{common,proxy,server}","*.h" ) ]
|
||||
|
||||
headers.each do |header_filename|
|
||||
txt_lines = File.readlines( header_filename )
|
||||
@@ -33,8 +41,8 @@ module FlexNBD
|
||||
|
||||
read_constants()
|
||||
|
||||
REQUEST_MAGIC = "\x25\x60\x95\x13" unless defined?(REQUEST_MAGIC)
|
||||
REPLY_MAGIC = "\x67\x44\x66\x98" unless defined?(REPLY_MAGIC)
|
||||
REQUEST_MAGIC = binary("\x25\x60\x95\x13") unless defined?(REQUEST_MAGIC)
|
||||
REPLY_MAGIC = binary("\x67\x44\x66\x98") unless defined?(REPLY_MAGIC)
|
||||
|
||||
end # module FlexNBD
|
||||
|
||||
|
@@ -2,12 +2,17 @@
|
||||
|
||||
require 'test/unit'
|
||||
require 'environment'
|
||||
require 'flexnbd/constants'
|
||||
|
||||
class TestHappyPath < Test::Unit::TestCase
|
||||
def setup
|
||||
@env = Environment.new
|
||||
end
|
||||
|
||||
def bin(str)
|
||||
FlexNBD.binary str
|
||||
end
|
||||
|
||||
def teardown
|
||||
@env.nbd1.can_die(0)
|
||||
@env.nbd2.can_die(0)
|
||||
@@ -22,13 +27,13 @@ class TestHappyPath < Test::Unit::TestCase
|
||||
[0, 12, 63].each do |num|
|
||||
|
||||
assert_equal(
|
||||
@env.nbd1.read(num*@env.blocksize, @env.blocksize),
|
||||
@env.file1.read(num*@env.blocksize, @env.blocksize)
|
||||
bin( @env.nbd1.read(num*@env.blocksize, @env.blocksize) ),
|
||||
bin( @env.file1.read(num*@env.blocksize, @env.blocksize) )
|
||||
)
|
||||
end
|
||||
|
||||
[124, 1200, 10028, 25488].each do |num|
|
||||
assert_equal(@env.nbd1.read(num, 4), @env.file1.read(num, 4))
|
||||
assert_equal(bin(@env.nbd1.read(num, 4)), bin(@env.file1.read(num, 4)))
|
||||
end
|
||||
end
|
||||
|
||||
@@ -102,7 +107,7 @@ class TestHappyPath < Test::Unit::TestCase
|
||||
assert_no_match( /unrecognized/, stderr )
|
||||
|
||||
|
||||
Timeout.timeout(2) do @env.nbd1.join end
|
||||
Timeout.timeout(10) do @env.nbd1.join end
|
||||
|
||||
assert !File.file?( @env.filename1 )
|
||||
end
|
||||
|
@@ -7,6 +7,9 @@ require 'environment'
|
||||
class TestSourceErrorHandling < Test::Unit::TestCase
|
||||
|
||||
def setup
|
||||
@old_env = ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS']
|
||||
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = "4.0"
|
||||
|
||||
@env = Environment.new
|
||||
@env.writefile1( "f" * 4 )
|
||||
@env.serve1
|
||||
@@ -16,6 +19,7 @@ class TestSourceErrorHandling < Test::Unit::TestCase
|
||||
def teardown
|
||||
@env.nbd1.can_die(0)
|
||||
@env.cleanup
|
||||
ENV['FLEXNBD_MS_REQUEST_LIMIT_SECS'] = @old_env
|
||||
end
|
||||
|
||||
|
||||
|
@@ -76,8 +76,8 @@ START_TEST( test_read_request_quits_on_stop_signal )
|
||||
|
||||
client_signal_stop( c );
|
||||
|
||||
int client_read_request( struct client *, struct nbd_request *);
|
||||
fail_unless( 0 == client_read_request( c, &nbdr ), "Didn't quit on stop." );
|
||||
int client_serve_request( struct client *);
|
||||
fail_unless( 1 == client_serve_request( c ), "Didn't quit on stop." );
|
||||
|
||||
close( fds[0] );
|
||||
close( fds[1] );
|
||||
|
@@ -72,9 +72,11 @@ START_TEST( test_sockaddr_address_string_doesnt_overflow_short_buffer )
|
||||
char testbuf[128];
|
||||
const char* result;
|
||||
|
||||
memset( testbuf, 0, 128 );
|
||||
v4->sin_family = AF_INET;
|
||||
v4->sin_port = htons( 4777 );
|
||||
ck_assert_int_eq( 1, inet_pton( AF_INET, "192.168.0.1", &v4->sin_addr ));
|
||||
memset( &testbuf, 0, 128 );
|
||||
|
||||
result = sockaddr_address_string( &sa, &testbuf[0], 2 );
|
||||
ck_assert( result == NULL );
|
||||
|
@@ -71,11 +71,12 @@ START_TEST( test_fatal_kills_process )
|
||||
sleep(10);
|
||||
}
|
||||
else {
|
||||
int kidstatus;
|
||||
int result;
|
||||
result = waitpid( pid, &kidstatus, 0 );
|
||||
int kidret, kidstatus, result;
|
||||
result = waitpid( pid, &kidret, 0 );
|
||||
fail_if( result < 0, "Wait failed." );
|
||||
fail_unless( kidstatus == 6, "Kid was not aborted." );
|
||||
fail_unless( WIFSIGNALED( kidret ), "Process didn't exit via signal" );
|
||||
kidstatus = WTERMSIG( kidret );
|
||||
ck_assert_int_eq( kidstatus, SIGABRT );
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user