34 Commits

Author SHA1 Message Date
Chris Elsworth
2663173d1a Merge branch 'release-to-master' into 'master'
Release 0.5.0 to master

See merge request open-source/flexnbd-c!60
2019-01-11 13:34:34 +00:00
James F. Carter
3448ff15b8 release 0.5.0 2019-01-11 10:37:48 +00:00
James Carter
aff33bce10 Merge branch '37-unexplained-mirror-timeout-causing-migrations-to-stall' into 'develop'
Resolve "Unexplained mirror timeout causing migrations to stall"

Closes #37

See merge request open-source/flexnbd-c!59
2018-12-10 12:36:43 +00:00
Patrick J Cherry
6b1a877dc3 Tweaked test file size, and removed debug ENV fiddling 2018-12-07 22:43:50 +00:00
Patrick J Cherry
e5133a50bd Slow down start/stop. Enable DEBUG
Trying to working why this is failing in gitlab-ci
2018-12-07 22:09:21 +00:00
Patrick J Cherry
39400f2e09 Fixed issue number. 2018-12-07 21:50:56 +00:00
Patrick J Cherry
52690f5382 Updated changelog 2018-12-07 21:48:55 +00:00
Patrick J Cherry
a4d641b215 Ensure ev abandon_watcher is stopped before reuse. 2018-12-07 21:47:14 +00:00
James Carter
416d8bde96 Merge branch '40-when-migrating-qemu-discs-from-one-tail-to-another-there-s-an-edge-case-where-flexnbd-freezes-and-the-vm-eventually-disconnects-and-becomes-unstartable' into 'develop'
Resolve "When migrating QEmu discs from one tail to another there's an edge case where flexnbd freezes and the VM eventually disconnects and becomes unstartable."

Closes #40

See merge request open-source/flexnbd-c!58
2018-12-07 17:12:21 +00:00
Patrick J Cherry
654d277453 Updated changelog 2018-12-07 16:40:53 +00:00
Patrick J Cherry
842e7d362d Ensure control socket is closed first, and wait for it to close. 2018-12-07 16:32:58 +00:00
Patrick J Cherry
5839a36ab1 Remove useless function definition 2018-12-07 15:05:19 +00:00
Patrick J Cherry
70a3a4bb55 Close the control socket during cleanup
This should prevent further requests coming in, triggering deadlocks.
2018-12-07 15:02:55 +00:00
Patrick J Cherry
ce9499efce Rubocop; add test to bombard a migration source with status commands 2018-12-07 13:59:49 +00:00
Chris Elsworth
edb42700d0 Merge branch 'release-to-master' into 'master'
Release 0.4.0

See merge request open-source/flexnbd-c!57
2018-11-15 15:05:23 +00:00
Chris Elsworth
e5f7038127 Merge branch 'release-to-master' into 'develop'
Updated changelog for release

See merge request open-source/flexnbd-c!56
2018-11-15 14:28:43 +00:00
Patrick J Cherry
8bc6ebbb0f Updated changelog for release 2018-11-15 14:24:59 +00:00
Chris Elsworth
eb45b5e483 Merge branch '39-following-a-proxy-timeout-a-write-request-is-not-restarted-or-abandoned-leading-to-bad-magic-errors-following-reconnection' into 'develop'
Resolve "Following a proxy timeout, a write request is not restarted (or abandoned) leading to "bad magic" errors following reconnection"

Closes #39

See merge request open-source/flexnbd-c!54
2018-11-15 11:29:24 +00:00
Patrick J Cherry
256cba79e3 Added note about the new environment variable 2018-11-14 17:31:42 +00:00
Patrick J Cherry
5d1b0472de Fixed typo 2018-11-14 17:21:08 +00:00
Patrick J Cherry
93308bbda1 Added jessie back. 2018-11-14 17:20:33 +00:00
Patrick J Cherry
bb5271cea3 Remove jessie packaging :'( 2018-11-14 16:58:33 +00:00
Patrick J Cherry
b7b50faa17 Updated comments 2018-11-14 16:58:05 +00:00
Patrick J Cherry
b26b308e68 Add test to check when proxy times-out mid-write to upstream
I've add to add code to allow the environment to specify the upstream
tiemout so we don't have to wait 30s for this test to happen.
2018-11-14 16:49:55 +00:00
Patrick J Cherry
3e00a88d45 Removed debug 2018-11-14 10:23:58 +00:00
Patrick J Cherry
3fe9f2c6a1 Removed a couple of gotos.
Hadn't you heard?  They're considered harmful.
2018-11-14 10:23:05 +00:00
Patrick J Cherry
391a17bfcc Updated changelog 2018-11-13 21:38:36 +00:00
Patrick J Cherry
9b1518806d Move state-resetting to after before the init is read from upstream
This removes repetition and ensures a constant state before the upstream
init is read.
2018-11-13 21:33:06 +00:00
Patrick J Cherry
1225a28d41 Reset proxy req size/needle on timeout 2018-11-13 16:42:13 +00:00
Patrick J Cherry
c9d30a9bde Merge branch 'release-to-master' into 'master'
Release 0.3.0

See merge request open-source/flexnbd-c!53
2018-04-24 13:12:53 +01:00
James Carter
b18c46606f Merge branch 'release-to-master' into 'develop'
Updated changelog for release 0.3.0

See merge request open-source/flexnbd-c!52
2018-04-24 12:12:06 +01:00
James Carter
a4f1956a56 Merge branch 'release-to-master' into 'master'
Release to master

See merge request open-source/flexnbd-c!50
2018-02-20 11:52:07 +00:00
Patrick J Cherry
072f4be3c0 Merge branch 'release' into 'master'
Release

See merge request !31
2017-07-14 17:42:33 +01:00
James Carter
b4426f5dce Merge branch 'develop' into 'master'
Merge develop into master for release.

See merge request !29
2017-03-23 13:21:40 +00:00
13 changed files with 245 additions and 54 deletions

View File

@@ -1,18 +1,21 @@
stages: stages:
- package - package
- publish - publish
package:jessie: &package .package: &package
stage: package stage: package
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
variables:
DISTRO: jessie
script: script:
- package - package
artifacts: artifacts:
paths: paths:
- pkg/ - pkg/
package:jessie:
<<: *package
variables:
DISTRO: jessie
package:stretch: package:stretch:
<<: *package <<: *package
variables: variables:
@@ -20,7 +23,7 @@ package:stretch:
publish: publish:
stage: publish stage: publish
tags: tags:
- shell - shell
script: script:
- publish - publish

View File

@@ -4,7 +4,7 @@ VPATH=src:tests/unit
DESTDIR?=/ DESTDIR?=/
PREFIX?=/usr/local/bin PREFIX?=/usr/local/bin
INSTALLDIR=$(DESTDIR)/$(PREFIX) INSTALLDIR=$(DESTDIR)/$(PREFIX)
ifdef DEBUG ifdef DEBUG
CFLAGS_EXTRA=-g -DDEBUG CFLAGS_EXTRA=-g -DDEBUG
LDFLAGS_EXTRA=-g LDFLAGS_EXTRA=-g

View File

@@ -169,6 +169,11 @@ That is, the '=' is required. This is a limitation of getopt-long.
If no cache size is given, a size of 4096 bytes is assumed. Caching can If no cache size is given, a size of 4096 bytes is assumed. Caching can
be explicitly disabled by setting a size of 0. be explicitly disabled by setting a size of 0.
ENVIRONMENT
FLEXNBD_UPSTREAM_TIMEOUT The timeout in seconds for the proxy communicating
with the upstream server defaults to 30 seconds.
BUGS BUGS
Should be reported via GitHub. Should be reported via GitHub.

17
debian/changelog vendored
View File

@@ -1,3 +1,20 @@
flexnbd (0.5.0) stable; urgency=medium
[ Patrick J Cherry ]
* Explicitly close the server control socket, and wait for it to close, to
prevent deadlocks during the server clean-up process (#40 !58)
* Ensure mirroring can be restarted after a break command is sent to the
source (#37, !59)
-- James Carter <james.carter@bytemark.co.uk> Fri, 11 Jan 2019 10:37:23 +0000
flexnbd (0.4.0) stable; urgency=medium
* Ensure proxy state is completely reset before upstream init is read,
ensuring any waiting requests are fully replayed (#39, !54)
-- Patrick J Cherry <patrick@bytemark.co.uk> Thu, 15 Nov 2018 14:24:26 +0000
flexnbd (0.3.0) stable; urgency=medium flexnbd (0.3.0) stable; urgency=medium
* Force a msync after every write, ignoring FUA flag, or lack thereof (!51). * Force a msync after every write, ignoring FUA flag, or lack thereof (!51).

View File

@@ -58,6 +58,20 @@ struct proxier *proxy_create(char *s_downstream_address,
out->downstream_fd = -1; out->downstream_fd = -1;
out->upstream_fd = -1; out->upstream_fd = -1;
int upstream_timeout = UPSTREAM_TIMEOUT;
char *env_upstream_timeout = getenv("FLEXNBD_UPSTREAM_TIMEOUT");
if (NULL != env_upstream_timeout) {
int ut = atoi(env_upstream_timeout);
warn("Got %i from atoi\n", ut);
if (ut > 0) {
upstream_timeout = ut;
}
}
out->upstream_timeout = upstream_timeout;
out->upstream_timeout_ms = (long unsigned int) upstream_timeout * 1000;
out->prefetch = NULL; out->prefetch = NULL;
if (s_cache_bytes) { if (s_cache_bytes) {
int cache_bytes = atoi(s_cache_bytes); int cache_bytes = atoi(s_cache_bytes);
@@ -507,6 +521,20 @@ int proxy_continue_connecting_to_upstream(struct proxier *proxy, int state)
/* Data may have changed while we were disconnected */ /* Data may have changed while we were disconnected */
prefetch_set_is_empty(proxy->prefetch); prefetch_set_is_empty(proxy->prefetch);
/* Reset our needles and sizes.
*
* Don't zero the req buffer size in case there's an outstanding request
* waiting to be re-sent following a disconnection. The init and rsp
* buffers are used for reading from upstream. If we're in this state then
* any upstream reads will be re-requested.
*/
proxy->init.needle = 0;
proxy->init.size = 0;
proxy->rsp.needle = 0;
proxy->rsp.size = 0;
proxy->req.needle = 0;
/* Don't zero the req.size, as we may need to re-write it upstream */
info("Connected to upstream on fd %i", proxy->upstream_fd); info("Connected to upstream on fd %i", proxy->upstream_fd);
return READ_INIT_FROM_UPSTREAM; return READ_INIT_FROM_UPSTREAM;
} }
@@ -523,7 +551,7 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
if (count == -1) { if (count == -1) {
warn(SHOW_ERRNO("Failed to read init from upstream")); warn(SHOW_ERRNO("Failed to read init from upstream"));
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
if (proxy->init.needle == proxy->init.size) { if (proxy->init.needle == proxy->init.size) {
@@ -533,26 +561,24 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
((struct nbd_init_raw *) proxy->init.buf, &upstream_size, ((struct nbd_init_raw *) proxy->init.buf, &upstream_size,
&upstream_flags)) { &upstream_flags)) {
warn("Upstream sent invalid init"); warn("Upstream sent invalid init");
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
/* record the flags, and log the reconnection, set TCP_NODELAY */ /* record the flags, and log the reconnection, set TCP_NODELAY */
proxy_finish_connect_to_upstream(proxy, upstream_size, proxy_finish_connect_to_upstream(proxy, upstream_size,
upstream_flags); upstream_flags);
/* Currently, we only get disconnected from upstream (so needing to come /* Finished reading the init response now, reset the needle. */
* here) when we have an outstanding request. If that becomes false,
* we'll need to choose the right state to return to here */
proxy->init.needle = 0; proxy->init.needle = 0;
/* Currently, we only get disconnected from upstream (so needing to
* come here) when we have an outstanding request. If that becomes
* false, we'll need to choose the right state to return to here.
*/
return WRITE_TO_UPSTREAM; return WRITE_TO_UPSTREAM;
} }
return state; return state;
disconnect:
proxy->init.needle = 0;
proxy->init.size = 0;
return CONNECT_TO_UPSTREAM;
} }
int proxy_write_to_upstream(struct proxier *proxy, int state) int proxy_write_to_upstream(struct proxier *proxy, int state)
@@ -574,7 +600,6 @@ int proxy_write_to_upstream(struct proxier *proxy, int state)
if (count == -1) { if (count == -1) {
warn(SHOW_ERRNO("Failed to send request to upstream")); warn(SHOW_ERRNO("Failed to send request to upstream"));
proxy->req.needle = 0;
// We're throwing the socket away so no need to uncork // We're throwing the socket away so no need to uncork
return CONNECT_TO_UPSTREAM; return CONNECT_TO_UPSTREAM;
} }
@@ -611,7 +636,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
if (count == -1) { if (count == -1) {
warn(SHOW_ERRNO("Failed to get reply from upstream")); warn(SHOW_ERRNO("Failed to get reply from upstream"));
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
if (proxy->rsp.needle == NBD_REPLY_SIZE) { if (proxy->rsp.needle == NBD_REPLY_SIZE) {
@@ -619,7 +644,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
if (reply->magic != REPLY_MAGIC) { if (reply->magic != REPLY_MAGIC) {
warn("Reply magic is incorrect"); warn("Reply magic is incorrect");
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
if (proxy->req_hdr.type == REQUEST_READ) { if (proxy->req_hdr.type == REQUEST_READ) {
@@ -635,11 +660,6 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
} }
return state; return state;
disconnect:
proxy->rsp.needle = 0;
proxy->rsp.size = 0;
return CONNECT_TO_UPSTREAM;
} }
@@ -775,6 +795,9 @@ void proxy_session(struct proxier *proxy)
}; };
if (select_timeout.tv_sec > 0) { if (select_timeout.tv_sec > 0) {
if (select_timeout.tv_sec > proxy->upstream_timeout) {
select_timeout.tv_sec = proxy->upstream_timeout;
}
select_timeout_ptr = &select_timeout; select_timeout_ptr = &select_timeout;
} }
@@ -848,17 +871,9 @@ void proxy_session(struct proxier *proxy)
/* In these states, we're interested in restarting after a timeout. /* In these states, we're interested in restarting after a timeout.
*/ */
if (old_state == state && proxy_state_upstream(state)) { if (old_state == state && proxy_state_upstream(state)) {
if ((monotonic_time_ms()) - state_started > UPSTREAM_TIMEOUT) { if ((monotonic_time_ms()) - state_started > proxy->upstream_timeout_ms) {
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state] warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]);
);
state = CONNECT_TO_UPSTREAM; state = CONNECT_TO_UPSTREAM;
/* Since we've timed out, we won't have gone through the timeout logic
* in the various state handlers that resets these appropriately... */
proxy->init.size = 0;
proxy->init.needle = 0;
proxy->rsp.size = 0;
proxy->rsp.needle = 0;
} }
} }
} }

View File

@@ -14,10 +14,10 @@
#endif #endif
/** UPSTREAM_TIMEOUT /** UPSTREAM_TIMEOUT
* How long ( in ms ) to allow for upstream to respond. If it takes longer * How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and resubmit * than this, we will cancel the current request-response to them and resubmit
*/ */
#define UPSTREAM_TIMEOUT 30 * 1000 #define UPSTREAM_TIMEOUT 30
struct proxier { struct proxier {
/** address/port to bind to */ /** address/port to bind to */
@@ -72,6 +72,16 @@ struct proxier {
uint64_t req_count; uint64_t req_count;
int hello_sent; int hello_sent;
/*
* How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and
* resubmit
*
* Defaults to UPSTREAM_TIMEOUT but can be overridden in the environment.
*/
int upstream_timeout;
unsigned long int upstream_timeout_ms;
/** These are only used if we pass --cache on the command line */ /** These are only used if we pass --cache on the command line */
/* While the in-flight request has been munged by prefetch, these two are /* While the in-flight request has been munged by prefetch, these two are

View File

@@ -78,6 +78,14 @@ void control_destroy(struct control *control)
free(control); free(control);
} }
void control_wait_for_close(struct control *control)
{
NULLCHECK(control);
while (!fd_is_closed(control->control_fd)) {
usleep(10000);
}
}
struct control_client *control_client_create(struct flexnbd *flexnbd, struct control_client *control_client_create(struct flexnbd *flexnbd,
int client_fd, int client_fd,
struct mbox *state_mbox) struct mbox *state_mbox)

View File

@@ -47,6 +47,7 @@ struct control_client {
struct control *control_create(struct flexnbd *, struct control *control_create(struct flexnbd *,
const char *control_socket_name); const char *control_socket_name);
void control_signal_close(struct control *); void control_signal_close(struct control *);
void control_wait_for_close(struct control *control);
void control_destroy(struct control *); void control_destroy(struct control *);
void *control_runner(void *); void *control_runner(void *);

View File

@@ -671,6 +671,7 @@ static void mirror_abandon_cb(struct ev_loop *loop, ev_io * w, int revents)
debug("Abandon message received"); debug("Abandon message received");
mirror_set_state(ctrl->mirror, MS_ABANDONED); mirror_set_state(ctrl->mirror, MS_ABANDONED);
self_pipe_signal_clear(ctrl->mirror->abandon_signal); self_pipe_signal_clear(ctrl->mirror->abandon_signal);
ev_io_stop(loop, &ctrl->abandon_watcher);
ev_break(loop, EVBREAK_ONE); ev_break(loop, EVBREAK_ONE);
return; return;
} }

View File

@@ -812,8 +812,6 @@ void server_control_arrived(struct server *serve)
} }
void flexnbd_stop_control(struct flexnbd *flexnbd);
/** Closes sockets, frees memory and waits for all client threads to finish */ /** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct server *params, void serve_cleanup(struct server *params,
int fatal __attribute__ ((unused))) int fatal __attribute__ ((unused)))
@@ -822,8 +820,20 @@ void serve_cleanup(struct server *params,
void *status; void *status;
info("cleaning up"); info("cleaning up");
/* Close the control socket, and wait for it to close before proceeding.
* If we do not wait, we risk a race condition with the tail supervisor
* sending a status command, and deadlocking the mirroring. */
if (params->flexnbd && params->flexnbd->control) {
debug("closing control socket");
control_signal_close(params->flexnbd->control);
debug("waiting for control socket to close");
control_wait_for_close(params->flexnbd->control);
}
if (params->server_fd) { if (params->server_fd) {
debug("closing server_fd");
close(params->server_fd); close(params->server_fd);
} }
@@ -861,15 +871,6 @@ void serve_cleanup(struct server *params,
server_unlock_acl(params); server_unlock_acl(params);
} }
/* if( params->flexnbd ) { */
/* if ( params->flexnbd->control ) { */
/* flexnbd_stop_control( params->flexnbd ); */
/* } */
/* flexnbd_destroy( params->flexnbd ); */
/* } */
/* server_destroy( params ); */
debug("Cleanup done"); debug("Cleanup done");
} }

View File

@@ -1,5 +1,6 @@
require 'socket' require 'socket'
require 'timeout' require 'timeout'
require 'io/wait' # For IO#nread
require 'flexnbd/constants' require 'flexnbd/constants'
@@ -54,6 +55,10 @@ module FlexNBD
write_reply(handle, 1) write_reply(handle, 1)
end end
def nread
@sock.nread
end
def disconnected? def disconnected?
Timeout.timeout(2) do Timeout.timeout(2) do
@sock.read(1).nil? @sock.read(1).nil?
@@ -85,6 +90,10 @@ module FlexNBD
@sock.write(len) @sock.write(len)
end end
def getsockopt(level, optname)
@sock.getsockopt(level, optname)
end
def self.parse_be64(str) def self.parse_be64(str)
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
str.length == 8 str.length == 8

View File

@@ -158,7 +158,7 @@ module ProxyTests
# Send the read request to the proxy # Send the read request to the proxy
client.write(0, (b * 4096)) client.write(0, (b * 4096))
# ensure we're given the read request # ensure we're given the write request
req1 = sc1.read_request req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
@@ -206,6 +206,62 @@ module ProxyTests
end end
end end
def test_write_request_retried_when_upstream_times_out_during_write_phase
ENV['FLEXNBD_UPSTREAM_TIMEOUT'] = '4'
maker = make_fake_server
with_ld_preload('setsockopt_logger') do
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Guess an approprate request size, based on the send buffer size.
sz = sc1.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int * 4
data1 = (b * sz)
# Send the read request to the proxy
client.write(0, data1)
# ensure we're given the write request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal data1.size, req1[:len]
# We do not read it at this point, as we want the proxy to be waiting
# in the WRITE_UPSTREAM state.
# Need to sleep longer than the timeout set above
sleep 5
# Check the number of bytes that can be read from the socket without
# blocking. If this equal to the size of the original request, then
# the whole request has been buffered. If this is the case, then the
# proxy will not time-out in the WRITE_UPSTREAM statem which is what
# we're trying to test.
assert sc1.nread < sz, 'Request from proxy completely buffered. Test is useless'
# Kill the server now that the timeout has happened.
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# And now lets read the data to make sure we get it all.
data2 = sc2.read_data(req2[:len])
assert_equal data1, data2
sc2.close
server.close
end
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |_client| with_proxied_client do |_client|
c2 = nil c2 = nil

77
tests/acceptance/test_write_during_migration.rb Normal file → Executable file
View File

@@ -82,15 +82,22 @@ class TestWriteDuringMigration < Test::Unit::TestCase
UNIXSocket.open(@source_sock) do |sock| UNIXSocket.open(@source_sock) do |sock|
sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A") sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A")
sock.flush sock.flush
rsp = sock.readline sock.readline
end
end
def stop_mirror
UNIXSocket.open(@source_sock) do |sock|
sock.write("break\x0A\x0A")
sock.flush
sock.readline
end end
end end
def wait_for_quit def wait_for_quit
Timeout.timeout(10) do Timeout.timeout(10) do
start_time = Time.now Process.waitpid2(@dst_proc)
dst_result = Process.waitpid2(@dst_proc) Process.waitpid2(@src_proc)
src_result = Process.waitpid2(@src_proc)
end end
end end
@@ -100,13 +107,28 @@ class TestWriteDuringMigration < Test::Unit::TestCase
loop do loop do
begin begin
client.write(offsets[rand(offsets.size)] * 4096, @write_data) client.write(offsets[rand(offsets.size)] * 4096, @write_data)
rescue StandardError => err rescue StandardError
# We expect a broken write at some point, so ignore it # We expect a broken write at some point, so ignore it
break break
end end
end end
end end
def bombard_with_status
loop do
begin
UNIXSocket.open(@source_sock) do |sock|
sock.write("status\x0A\x0A")
sock.flush
sock.readline
end
rescue StandardError
# If the socket disappears, that's OK.
break
end
end
end
def assert_both_sides_identical def assert_both_sides_identical
# puts `md5sum #{@source_file} #{@dest_file}` # puts `md5sum #{@source_file} #{@dest_file}`
@@ -160,5 +182,48 @@ class TestWriteDuringMigration < Test::Unit::TestCase
(src_writers_1 + src_writers_2).each(&:join) (src_writers_1 + src_writers_2).each(&:join)
assert_both_sides_identical assert_both_sides_identical
end end
end end end
end
def test_status_call_after_cleanup
Dir.mktmpdir do |tmpdir|
Dir.chdir(tmpdir) do
make_files
launch_servers
status_poker = Thread.new { bombard_with_status }
start_mirror
wait_for_quit
status_poker.join
assert_both_sides_identical
end
end
end
def test_mirroring_can_be_restarted
@size = 100 * 1024 * 1024 # 100MB
Dir.mktmpdir do |tmpdir|
Dir.chdir(tmpdir) do
make_files
launch_servers
# This is a bit racy. It needs to be slow enough that the migration
# isn't finished before the stop runs, and slow enough so that we can
# stop/start a few times.
3.times do
start_mirror
sleep 0.1
stop_mirror
sleep 0.1
end
start_mirror
wait_for_quit
end
end
end
end end