35 Commits

Author SHA1 Message Date
Chris Elsworth
2663173d1a Merge branch 'release-to-master' into 'master'
Release 0.5.0 to master

See merge request open-source/flexnbd-c!60
2019-01-11 13:34:34 +00:00
James F. Carter
3448ff15b8 release 0.5.0 2019-01-11 10:37:48 +00:00
James Carter
aff33bce10 Merge branch '37-unexplained-mirror-timeout-causing-migrations-to-stall' into 'develop'
Resolve "Unexplained mirror timeout causing migrations to stall"

Closes #37

See merge request open-source/flexnbd-c!59
2018-12-10 12:36:43 +00:00
Patrick J Cherry
6b1a877dc3 Tweaked test file size, and removed debug ENV fiddling 2018-12-07 22:43:50 +00:00
Patrick J Cherry
e5133a50bd Slow down start/stop. Enable DEBUG
Trying to working why this is failing in gitlab-ci
2018-12-07 22:09:21 +00:00
Patrick J Cherry
39400f2e09 Fixed issue number. 2018-12-07 21:50:56 +00:00
Patrick J Cherry
52690f5382 Updated changelog 2018-12-07 21:48:55 +00:00
Patrick J Cherry
a4d641b215 Ensure ev abandon_watcher is stopped before reuse. 2018-12-07 21:47:14 +00:00
James Carter
416d8bde96 Merge branch '40-when-migrating-qemu-discs-from-one-tail-to-another-there-s-an-edge-case-where-flexnbd-freezes-and-the-vm-eventually-disconnects-and-becomes-unstartable' into 'develop'
Resolve "When migrating QEmu discs from one tail to another there's an edge case where flexnbd freezes and the VM eventually disconnects and becomes unstartable."

Closes #40

See merge request open-source/flexnbd-c!58
2018-12-07 17:12:21 +00:00
Patrick J Cherry
654d277453 Updated changelog 2018-12-07 16:40:53 +00:00
Patrick J Cherry
842e7d362d Ensure control socket is closed first, and wait for it to close. 2018-12-07 16:32:58 +00:00
Patrick J Cherry
5839a36ab1 Remove useless function definition 2018-12-07 15:05:19 +00:00
Patrick J Cherry
70a3a4bb55 Close the control socket during cleanup
This should prevent further requests coming in, triggering deadlocks.
2018-12-07 15:02:55 +00:00
Patrick J Cherry
ce9499efce Rubocop; add test to bombard a migration source with status commands 2018-12-07 13:59:49 +00:00
Chris Elsworth
edb42700d0 Merge branch 'release-to-master' into 'master'
Release 0.4.0

See merge request open-source/flexnbd-c!57
2018-11-15 15:05:23 +00:00
Chris Elsworth
e5f7038127 Merge branch 'release-to-master' into 'develop'
Updated changelog for release

See merge request open-source/flexnbd-c!56
2018-11-15 14:28:43 +00:00
Patrick J Cherry
8bc6ebbb0f Updated changelog for release 2018-11-15 14:24:59 +00:00
Chris Elsworth
eb45b5e483 Merge branch '39-following-a-proxy-timeout-a-write-request-is-not-restarted-or-abandoned-leading-to-bad-magic-errors-following-reconnection' into 'develop'
Resolve "Following a proxy timeout, a write request is not restarted (or abandoned) leading to "bad magic" errors following reconnection"

Closes #39

See merge request open-source/flexnbd-c!54
2018-11-15 11:29:24 +00:00
Patrick J Cherry
256cba79e3 Added note about the new environment variable 2018-11-14 17:31:42 +00:00
Patrick J Cherry
5d1b0472de Fixed typo 2018-11-14 17:21:08 +00:00
Patrick J Cherry
93308bbda1 Added jessie back. 2018-11-14 17:20:33 +00:00
Patrick J Cherry
bb5271cea3 Remove jessie packaging :'( 2018-11-14 16:58:33 +00:00
Patrick J Cherry
b7b50faa17 Updated comments 2018-11-14 16:58:05 +00:00
Patrick J Cherry
b26b308e68 Add test to check when proxy times-out mid-write to upstream
I've add to add code to allow the environment to specify the upstream
tiemout so we don't have to wait 30s for this test to happen.
2018-11-14 16:49:55 +00:00
Patrick J Cherry
3e00a88d45 Removed debug 2018-11-14 10:23:58 +00:00
Patrick J Cherry
3fe9f2c6a1 Removed a couple of gotos.
Hadn't you heard?  They're considered harmful.
2018-11-14 10:23:05 +00:00
Patrick J Cherry
391a17bfcc Updated changelog 2018-11-13 21:38:36 +00:00
Patrick J Cherry
9b1518806d Move state-resetting to after before the init is read from upstream
This removes repetition and ensures a constant state before the upstream
init is read.
2018-11-13 21:33:06 +00:00
Patrick J Cherry
1225a28d41 Reset proxy req size/needle on timeout 2018-11-13 16:42:13 +00:00
Patrick J Cherry
c9d30a9bde Merge branch 'release-to-master' into 'master'
Release 0.3.0

See merge request open-source/flexnbd-c!53
2018-04-24 13:12:53 +01:00
James Carter
b18c46606f Merge branch 'release-to-master' into 'develop'
Updated changelog for release 0.3.0

See merge request open-source/flexnbd-c!52
2018-04-24 12:12:06 +01:00
Patrick J Cherry
b3cea813e4 Updated changelog for release 2018-04-24 12:06:06 +01:00
James Carter
a4f1956a56 Merge branch 'release-to-master' into 'master'
Release to master

See merge request open-source/flexnbd-c!50
2018-02-20 11:52:07 +00:00
Patrick J Cherry
072f4be3c0 Merge branch 'release' into 'master'
Release

See merge request !31
2017-07-14 17:42:33 +01:00
James Carter
b4426f5dce Merge branch 'develop' into 'master'
Merge develop into master for release.

See merge request !29
2017-03-23 13:21:40 +00:00
13 changed files with 248 additions and 57 deletions

View File

@@ -1,18 +1,21 @@
stages:
- package
- publish
package:jessie: &package
.package: &package
stage: package
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
variables:
DISTRO: jessie
script:
- package
artifacts:
paths:
- pkg/
package:jessie:
<<: *package
variables:
DISTRO: jessie
package:stretch:
<<: *package
variables:
@@ -20,7 +23,7 @@ package:stretch:
publish:
stage: publish
tags:
tags:
- shell
script:
- publish

View File

@@ -4,7 +4,7 @@ VPATH=src:tests/unit
DESTDIR?=/
PREFIX?=/usr/local/bin
INSTALLDIR=$(DESTDIR)/$(PREFIX)
ifdef DEBUG
CFLAGS_EXTRA=-g -DDEBUG
LDFLAGS_EXTRA=-g

View File

@@ -169,6 +169,11 @@ That is, the '=' is required. This is a limitation of getopt-long.
If no cache size is given, a size of 4096 bytes is assumed. Caching can
be explicitly disabled by setting a size of 0.
ENVIRONMENT
FLEXNBD_UPSTREAM_TIMEOUT The timeout in seconds for the proxy communicating
with the upstream server defaults to 30 seconds.
BUGS
Should be reported via GitHub.

23
debian/changelog vendored
View File

@@ -1,8 +1,25 @@
flexnbd (0.2.1) UNRELEASED; urgency=medium
flexnbd (0.5.0) stable; urgency=medium
* Force a msync after every write, ignoring FUA flag, or lack thereof.
[ Patrick J Cherry ]
* Explicitly close the server control socket, and wait for it to close, to
prevent deadlocks during the server clean-up process (#40 !58)
* Ensure mirroring can be restarted after a break command is sent to the
source (#37, !59)
-- Patrick J Cherry <patrick@bytemark.co.uk> Tue, 24 Apr 2018 10:27:12 +0100
-- James Carter <james.carter@bytemark.co.uk> Fri, 11 Jan 2019 10:37:23 +0000
flexnbd (0.4.0) stable; urgency=medium
* Ensure proxy state is completely reset before upstream init is read,
ensuring any waiting requests are fully replayed (#39, !54)
-- Patrick J Cherry <patrick@bytemark.co.uk> Thu, 15 Nov 2018 14:24:26 +0000
flexnbd (0.3.0) stable; urgency=medium
* Force a msync after every write, ignoring FUA flag, or lack thereof (!51).
-- Patrick J Cherry <patrick@bytemark.co.uk> Tue, 24 Apr 2018 12:05:43 +0100
flexnbd (0.2.0) stable; urgency=medium

View File

@@ -58,6 +58,20 @@ struct proxier *proxy_create(char *s_downstream_address,
out->downstream_fd = -1;
out->upstream_fd = -1;
int upstream_timeout = UPSTREAM_TIMEOUT;
char *env_upstream_timeout = getenv("FLEXNBD_UPSTREAM_TIMEOUT");
if (NULL != env_upstream_timeout) {
int ut = atoi(env_upstream_timeout);
warn("Got %i from atoi\n", ut);
if (ut > 0) {
upstream_timeout = ut;
}
}
out->upstream_timeout = upstream_timeout;
out->upstream_timeout_ms = (long unsigned int) upstream_timeout * 1000;
out->prefetch = NULL;
if (s_cache_bytes) {
int cache_bytes = atoi(s_cache_bytes);
@@ -507,6 +521,20 @@ int proxy_continue_connecting_to_upstream(struct proxier *proxy, int state)
/* Data may have changed while we were disconnected */
prefetch_set_is_empty(proxy->prefetch);
/* Reset our needles and sizes.
*
* Don't zero the req buffer size in case there's an outstanding request
* waiting to be re-sent following a disconnection. The init and rsp
* buffers are used for reading from upstream. If we're in this state then
* any upstream reads will be re-requested.
*/
proxy->init.needle = 0;
proxy->init.size = 0;
proxy->rsp.needle = 0;
proxy->rsp.size = 0;
proxy->req.needle = 0;
/* Don't zero the req.size, as we may need to re-write it upstream */
info("Connected to upstream on fd %i", proxy->upstream_fd);
return READ_INIT_FROM_UPSTREAM;
}
@@ -523,7 +551,7 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
if (count == -1) {
warn(SHOW_ERRNO("Failed to read init from upstream"));
goto disconnect;
return CONNECT_TO_UPSTREAM;
}
if (proxy->init.needle == proxy->init.size) {
@@ -533,26 +561,24 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
((struct nbd_init_raw *) proxy->init.buf, &upstream_size,
&upstream_flags)) {
warn("Upstream sent invalid init");
goto disconnect;
return CONNECT_TO_UPSTREAM;
}
/* record the flags, and log the reconnection, set TCP_NODELAY */
proxy_finish_connect_to_upstream(proxy, upstream_size,
upstream_flags);
/* Currently, we only get disconnected from upstream (so needing to come
* here) when we have an outstanding request. If that becomes false,
* we'll need to choose the right state to return to here */
/* Finished reading the init response now, reset the needle. */
proxy->init.needle = 0;
/* Currently, we only get disconnected from upstream (so needing to
* come here) when we have an outstanding request. If that becomes
* false, we'll need to choose the right state to return to here.
*/
return WRITE_TO_UPSTREAM;
}
return state;
disconnect:
proxy->init.needle = 0;
proxy->init.size = 0;
return CONNECT_TO_UPSTREAM;
}
int proxy_write_to_upstream(struct proxier *proxy, int state)
@@ -574,7 +600,6 @@ int proxy_write_to_upstream(struct proxier *proxy, int state)
if (count == -1) {
warn(SHOW_ERRNO("Failed to send request to upstream"));
proxy->req.needle = 0;
// We're throwing the socket away so no need to uncork
return CONNECT_TO_UPSTREAM;
}
@@ -611,7 +636,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
if (count == -1) {
warn(SHOW_ERRNO("Failed to get reply from upstream"));
goto disconnect;
return CONNECT_TO_UPSTREAM;
}
if (proxy->rsp.needle == NBD_REPLY_SIZE) {
@@ -619,7 +644,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
if (reply->magic != REPLY_MAGIC) {
warn("Reply magic is incorrect");
goto disconnect;
return CONNECT_TO_UPSTREAM;
}
if (proxy->req_hdr.type == REQUEST_READ) {
@@ -635,11 +660,6 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
}
return state;
disconnect:
proxy->rsp.needle = 0;
proxy->rsp.size = 0;
return CONNECT_TO_UPSTREAM;
}
@@ -775,6 +795,9 @@ void proxy_session(struct proxier *proxy)
};
if (select_timeout.tv_sec > 0) {
if (select_timeout.tv_sec > proxy->upstream_timeout) {
select_timeout.tv_sec = proxy->upstream_timeout;
}
select_timeout_ptr = &select_timeout;
}
@@ -848,17 +871,9 @@ void proxy_session(struct proxier *proxy)
/* In these states, we're interested in restarting after a timeout.
*/
if (old_state == state && proxy_state_upstream(state)) {
if ((monotonic_time_ms()) - state_started > UPSTREAM_TIMEOUT) {
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]
);
if ((monotonic_time_ms()) - state_started > proxy->upstream_timeout_ms) {
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]);
state = CONNECT_TO_UPSTREAM;
/* Since we've timed out, we won't have gone through the timeout logic
* in the various state handlers that resets these appropriately... */
proxy->init.size = 0;
proxy->init.needle = 0;
proxy->rsp.size = 0;
proxy->rsp.needle = 0;
}
}
}

View File

@@ -14,10 +14,10 @@
#endif
/** UPSTREAM_TIMEOUT
* How long ( in ms ) to allow for upstream to respond. If it takes longer
* How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and resubmit
*/
#define UPSTREAM_TIMEOUT 30 * 1000
#define UPSTREAM_TIMEOUT 30
struct proxier {
/** address/port to bind to */
@@ -72,6 +72,16 @@ struct proxier {
uint64_t req_count;
int hello_sent;
/*
* How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and
* resubmit
*
* Defaults to UPSTREAM_TIMEOUT but can be overridden in the environment.
*/
int upstream_timeout;
unsigned long int upstream_timeout_ms;
/** These are only used if we pass --cache on the command line */
/* While the in-flight request has been munged by prefetch, these two are

View File

@@ -78,6 +78,14 @@ void control_destroy(struct control *control)
free(control);
}
void control_wait_for_close(struct control *control)
{
NULLCHECK(control);
while (!fd_is_closed(control->control_fd)) {
usleep(10000);
}
}
struct control_client *control_client_create(struct flexnbd *flexnbd,
int client_fd,
struct mbox *state_mbox)

View File

@@ -47,6 +47,7 @@ struct control_client {
struct control *control_create(struct flexnbd *,
const char *control_socket_name);
void control_signal_close(struct control *);
void control_wait_for_close(struct control *control);
void control_destroy(struct control *);
void *control_runner(void *);

View File

@@ -671,6 +671,7 @@ static void mirror_abandon_cb(struct ev_loop *loop, ev_io * w, int revents)
debug("Abandon message received");
mirror_set_state(ctrl->mirror, MS_ABANDONED);
self_pipe_signal_clear(ctrl->mirror->abandon_signal);
ev_io_stop(loop, &ctrl->abandon_watcher);
ev_break(loop, EVBREAK_ONE);
return;
}

View File

@@ -812,8 +812,6 @@ void server_control_arrived(struct server *serve)
}
void flexnbd_stop_control(struct flexnbd *flexnbd);
/** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct server *params,
int fatal __attribute__ ((unused)))
@@ -822,8 +820,20 @@ void serve_cleanup(struct server *params,
void *status;
info("cleaning up");
/* Close the control socket, and wait for it to close before proceeding.
* If we do not wait, we risk a race condition with the tail supervisor
* sending a status command, and deadlocking the mirroring. */
if (params->flexnbd && params->flexnbd->control) {
debug("closing control socket");
control_signal_close(params->flexnbd->control);
debug("waiting for control socket to close");
control_wait_for_close(params->flexnbd->control);
}
if (params->server_fd) {
debug("closing server_fd");
close(params->server_fd);
}
@@ -861,15 +871,6 @@ void serve_cleanup(struct server *params,
server_unlock_acl(params);
}
/* if( params->flexnbd ) { */
/* if ( params->flexnbd->control ) { */
/* flexnbd_stop_control( params->flexnbd ); */
/* } */
/* flexnbd_destroy( params->flexnbd ); */
/* } */
/* server_destroy( params ); */
debug("Cleanup done");
}

View File

@@ -1,5 +1,6 @@
require 'socket'
require 'timeout'
require 'io/wait' # For IO#nread
require 'flexnbd/constants'
@@ -54,6 +55,10 @@ module FlexNBD
write_reply(handle, 1)
end
def nread
@sock.nread
end
def disconnected?
Timeout.timeout(2) do
@sock.read(1).nil?
@@ -85,6 +90,10 @@ module FlexNBD
@sock.write(len)
end
def getsockopt(level, optname)
@sock.getsockopt(level, optname)
end
def self.parse_be64(str)
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
str.length == 8

View File

@@ -158,7 +158,7 @@ module ProxyTests
# Send the read request to the proxy
client.write(0, (b * 4096))
# ensure we're given the read request
# ensure we're given the write request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
@@ -206,6 +206,62 @@ module ProxyTests
end
end
def test_write_request_retried_when_upstream_times_out_during_write_phase
ENV['FLEXNBD_UPSTREAM_TIMEOUT'] = '4'
maker = make_fake_server
with_ld_preload('setsockopt_logger') do
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Guess an approprate request size, based on the send buffer size.
sz = sc1.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int * 4
data1 = (b * sz)
# Send the read request to the proxy
client.write(0, data1)
# ensure we're given the write request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal data1.size, req1[:len]
# We do not read it at this point, as we want the proxy to be waiting
# in the WRITE_UPSTREAM state.
# Need to sleep longer than the timeout set above
sleep 5
# Check the number of bytes that can be read from the socket without
# blocking. If this equal to the size of the original request, then
# the whole request has been buffered. If this is the case, then the
# proxy will not time-out in the WRITE_UPSTREAM statem which is what
# we're trying to test.
assert sc1.nread < sz, 'Request from proxy completely buffered. Test is useless'
# Kill the server now that the timeout has happened.
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# And now lets read the data to make sure we get it all.
data2 = sc2.read_data(req2[:len])
assert_equal data1, data2
sc2.close
server.close
end
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |_client|
c2 = nil

77
tests/acceptance/test_write_during_migration.rb Normal file → Executable file
View File

@@ -82,15 +82,22 @@ class TestWriteDuringMigration < Test::Unit::TestCase
UNIXSocket.open(@source_sock) do |sock|
sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A")
sock.flush
rsp = sock.readline
sock.readline
end
end
def stop_mirror
UNIXSocket.open(@source_sock) do |sock|
sock.write("break\x0A\x0A")
sock.flush
sock.readline
end
end
def wait_for_quit
Timeout.timeout(10) do
start_time = Time.now
dst_result = Process.waitpid2(@dst_proc)
src_result = Process.waitpid2(@src_proc)
Process.waitpid2(@dst_proc)
Process.waitpid2(@src_proc)
end
end
@@ -100,13 +107,28 @@ class TestWriteDuringMigration < Test::Unit::TestCase
loop do
begin
client.write(offsets[rand(offsets.size)] * 4096, @write_data)
rescue StandardError => err
rescue StandardError
# We expect a broken write at some point, so ignore it
break
end
end
end
def bombard_with_status
loop do
begin
UNIXSocket.open(@source_sock) do |sock|
sock.write("status\x0A\x0A")
sock.flush
sock.readline
end
rescue StandardError
# If the socket disappears, that's OK.
break
end
end
end
def assert_both_sides_identical
# puts `md5sum #{@source_file} #{@dest_file}`
@@ -160,5 +182,48 @@ class TestWriteDuringMigration < Test::Unit::TestCase
(src_writers_1 + src_writers_2).each(&:join)
assert_both_sides_identical
end
end end
end
end
def test_status_call_after_cleanup
Dir.mktmpdir do |tmpdir|
Dir.chdir(tmpdir) do
make_files
launch_servers
status_poker = Thread.new { bombard_with_status }
start_mirror
wait_for_quit
status_poker.join
assert_both_sides_identical
end
end
end
def test_mirroring_can_be_restarted
@size = 100 * 1024 * 1024 # 100MB
Dir.mktmpdir do |tmpdir|
Dir.chdir(tmpdir) do
make_files
launch_servers
# This is a bit racy. It needs to be slow enough that the migration
# isn't finished before the stop runs, and slow enough so that we can
# stop/start a few times.
3.times do
start_mirror
sleep 0.1
stop_mirror
sleep 0.1
end
start_mirror
wait_for_quit
end
end
end
end