35 Commits

Author SHA1 Message Date
Chris Elsworth
2663173d1a Merge branch 'release-to-master' into 'master'
Release 0.5.0 to master

See merge request open-source/flexnbd-c!60
2019-01-11 13:34:34 +00:00
James F. Carter
3448ff15b8 release 0.5.0 2019-01-11 10:37:48 +00:00
James Carter
aff33bce10 Merge branch '37-unexplained-mirror-timeout-causing-migrations-to-stall' into 'develop'
Resolve "Unexplained mirror timeout causing migrations to stall"

Closes #37

See merge request open-source/flexnbd-c!59
2018-12-10 12:36:43 +00:00
Patrick J Cherry
6b1a877dc3 Tweaked test file size, and removed debug ENV fiddling 2018-12-07 22:43:50 +00:00
Patrick J Cherry
e5133a50bd Slow down start/stop. Enable DEBUG
Trying to working why this is failing in gitlab-ci
2018-12-07 22:09:21 +00:00
Patrick J Cherry
39400f2e09 Fixed issue number. 2018-12-07 21:50:56 +00:00
Patrick J Cherry
52690f5382 Updated changelog 2018-12-07 21:48:55 +00:00
Patrick J Cherry
a4d641b215 Ensure ev abandon_watcher is stopped before reuse. 2018-12-07 21:47:14 +00:00
James Carter
416d8bde96 Merge branch '40-when-migrating-qemu-discs-from-one-tail-to-another-there-s-an-edge-case-where-flexnbd-freezes-and-the-vm-eventually-disconnects-and-becomes-unstartable' into 'develop'
Resolve "When migrating QEmu discs from one tail to another there's an edge case where flexnbd freezes and the VM eventually disconnects and becomes unstartable."

Closes #40

See merge request open-source/flexnbd-c!58
2018-12-07 17:12:21 +00:00
Patrick J Cherry
654d277453 Updated changelog 2018-12-07 16:40:53 +00:00
Patrick J Cherry
842e7d362d Ensure control socket is closed first, and wait for it to close. 2018-12-07 16:32:58 +00:00
Patrick J Cherry
5839a36ab1 Remove useless function definition 2018-12-07 15:05:19 +00:00
Patrick J Cherry
70a3a4bb55 Close the control socket during cleanup
This should prevent further requests coming in, triggering deadlocks.
2018-12-07 15:02:55 +00:00
Patrick J Cherry
ce9499efce Rubocop; add test to bombard a migration source with status commands 2018-12-07 13:59:49 +00:00
Chris Elsworth
edb42700d0 Merge branch 'release-to-master' into 'master'
Release 0.4.0

See merge request open-source/flexnbd-c!57
2018-11-15 15:05:23 +00:00
Chris Elsworth
e5f7038127 Merge branch 'release-to-master' into 'develop'
Updated changelog for release

See merge request open-source/flexnbd-c!56
2018-11-15 14:28:43 +00:00
Patrick J Cherry
8bc6ebbb0f Updated changelog for release 2018-11-15 14:24:59 +00:00
Chris Elsworth
eb45b5e483 Merge branch '39-following-a-proxy-timeout-a-write-request-is-not-restarted-or-abandoned-leading-to-bad-magic-errors-following-reconnection' into 'develop'
Resolve "Following a proxy timeout, a write request is not restarted (or abandoned) leading to "bad magic" errors following reconnection"

Closes #39

See merge request open-source/flexnbd-c!54
2018-11-15 11:29:24 +00:00
Patrick J Cherry
256cba79e3 Added note about the new environment variable 2018-11-14 17:31:42 +00:00
Patrick J Cherry
5d1b0472de Fixed typo 2018-11-14 17:21:08 +00:00
Patrick J Cherry
93308bbda1 Added jessie back. 2018-11-14 17:20:33 +00:00
Patrick J Cherry
bb5271cea3 Remove jessie packaging :'( 2018-11-14 16:58:33 +00:00
Patrick J Cherry
b7b50faa17 Updated comments 2018-11-14 16:58:05 +00:00
Patrick J Cherry
b26b308e68 Add test to check when proxy times-out mid-write to upstream
I've add to add code to allow the environment to specify the upstream
tiemout so we don't have to wait 30s for this test to happen.
2018-11-14 16:49:55 +00:00
Patrick J Cherry
3e00a88d45 Removed debug 2018-11-14 10:23:58 +00:00
Patrick J Cherry
3fe9f2c6a1 Removed a couple of gotos.
Hadn't you heard?  They're considered harmful.
2018-11-14 10:23:05 +00:00
Patrick J Cherry
391a17bfcc Updated changelog 2018-11-13 21:38:36 +00:00
Patrick J Cherry
9b1518806d Move state-resetting to after before the init is read from upstream
This removes repetition and ensures a constant state before the upstream
init is read.
2018-11-13 21:33:06 +00:00
Patrick J Cherry
1225a28d41 Reset proxy req size/needle on timeout 2018-11-13 16:42:13 +00:00
Patrick J Cherry
c9d30a9bde Merge branch 'release-to-master' into 'master'
Release 0.3.0

See merge request open-source/flexnbd-c!53
2018-04-24 13:12:53 +01:00
James Carter
b18c46606f Merge branch 'release-to-master' into 'develop'
Updated changelog for release 0.3.0

See merge request open-source/flexnbd-c!52
2018-04-24 12:12:06 +01:00
Patrick J Cherry
b3cea813e4 Updated changelog for release 2018-04-24 12:06:06 +01:00
James Carter
a4f1956a56 Merge branch 'release-to-master' into 'master'
Release to master

See merge request open-source/flexnbd-c!50
2018-02-20 11:52:07 +00:00
Patrick J Cherry
072f4be3c0 Merge branch 'release' into 'master'
Release

See merge request !31
2017-07-14 17:42:33 +01:00
James Carter
b4426f5dce Merge branch 'develop' into 'master'
Merge develop into master for release.

See merge request !29
2017-03-23 13:21:40 +00:00
16 changed files with 278 additions and 87 deletions

View File

@@ -2,17 +2,20 @@ stages:
- package - package
- publish - publish
package:jessie: &package .package: &package
stage: package stage: package
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
variables:
DISTRO: jessie
script: script:
- package - package
artifacts: artifacts:
paths: paths:
- pkg/ - pkg/
package:jessie:
<<: *package
variables:
DISTRO: jessie
package:stretch: package:stretch:
<<: *package <<: *package
variables: variables:

View File

@@ -169,6 +169,11 @@ That is, the '=' is required. This is a limitation of getopt-long.
If no cache size is given, a size of 4096 bytes is assumed. Caching can If no cache size is given, a size of 4096 bytes is assumed. Caching can
be explicitly disabled by setting a size of 0. be explicitly disabled by setting a size of 0.
ENVIRONMENT
FLEXNBD_UPSTREAM_TIMEOUT The timeout in seconds for the proxy communicating
with the upstream server defaults to 30 seconds.
BUGS BUGS
Should be reported via GitHub. Should be reported via GitHub.

23
debian/changelog vendored
View File

@@ -1,8 +1,25 @@
flexnbd (0.2.1) UNRELEASED; urgency=medium flexnbd (0.5.0) stable; urgency=medium
* Force a msync after every write, ignoring FUA flag, or lack thereof. [ Patrick J Cherry ]
* Explicitly close the server control socket, and wait for it to close, to
prevent deadlocks during the server clean-up process (#40 !58)
* Ensure mirroring can be restarted after a break command is sent to the
source (#37, !59)
-- Patrick J Cherry <patrick@bytemark.co.uk> Tue, 24 Apr 2018 10:27:12 +0100 -- James Carter <james.carter@bytemark.co.uk> Fri, 11 Jan 2019 10:37:23 +0000
flexnbd (0.4.0) stable; urgency=medium
* Ensure proxy state is completely reset before upstream init is read,
ensuring any waiting requests are fully replayed (#39, !54)
-- Patrick J Cherry <patrick@bytemark.co.uk> Thu, 15 Nov 2018 14:24:26 +0000
flexnbd (0.3.0) stable; urgency=medium
* Force a msync after every write, ignoring FUA flag, or lack thereof (!51).
-- Patrick J Cherry <patrick@bytemark.co.uk> Tue, 24 Apr 2018 12:05:43 +0100
flexnbd (0.2.0) stable; urgency=medium flexnbd (0.2.0) stable; urgency=medium

View File

@@ -148,6 +148,8 @@ int readloop(int filedes, void *buffer, size_t size)
ssize_t result = read(filedes, buffer + readden, size - readden); ssize_t result = read(filedes, buffer + readden, size - readden);
if (result == 0 /* EOF */ ) { if (result == 0 /* EOF */ ) {
warn("end-of-file detected while reading after %i bytes",
readden);
return -1; return -1;
} }
@@ -222,7 +224,8 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
while (spliced < len) { while (spliced < len) {
ssize_t run = len - spliced; ssize_t run = len - spliced;
ssize_t s2, s1 = spliceloop(fd_in, NULL, pipefd[1], NULL, run, ssize_t s2, s1 =
spliceloop(fd_in, NULL, pipefd[1], NULL, run,
SPLICE_F_NONBLOCK); SPLICE_F_NONBLOCK);
/*if (run > 65535) /*if (run > 65535)
run = 65535; */ run = 65535; */

View File

@@ -76,7 +76,7 @@ int socket_nbd_read_hello(int fd, uint64_t * out_size,
if (0 > readloop(fd, &init_raw, sizeof(init_raw))) { if (0 > readloop(fd, &init_raw, sizeof(init_raw))) {
warn(SHOW_ERRNO("Couldn't read init")); warn("Couldn't read init");
return 0; return 0;
} }
@@ -130,7 +130,7 @@ void read_reply(int fd, uint64_t request_raw_handle,
ERROR_IF_NEGATIVE(readloop ERROR_IF_NEGATIVE(readloop
(fd, &reply_raw, sizeof(struct nbd_reply_raw)), (fd, &reply_raw, sizeof(struct nbd_reply_raw)),
SHOW_ERRNO("Couldn't read reply")); "Couldn't read reply");
nbd_r2h_reply(&reply_raw, reply); nbd_r2h_reply(&reply_raw, reply);
@@ -171,15 +171,14 @@ void socket_nbd_read(int fd, uint64_t from, uint32_t len, int out_fd,
fill_request(&request_raw, REQUEST_READ, 0, from, len); fill_request(&request_raw, REQUEST_READ, 0, from, len);
FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)), FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
SHOW_ERRNO("Couldn't write request")); "Couldn't write request");
wait_for_data(fd, timeout_secs); wait_for_data(fd, timeout_secs);
read_reply(fd, request_raw.handle.w, &reply); read_reply(fd, request_raw.handle.w, &reply);
if (out_buf) { if (out_buf) {
FATAL_IF_NEGATIVE(readloop(fd, out_buf, len), FATAL_IF_NEGATIVE(readloop(fd, out_buf, len), "Read failed");
SHOW_ERRNO("Read failed"));
} else { } else {
FATAL_IF_NEGATIVE(splice_via_pipe_loop(fd, out_fd, len), FATAL_IF_NEGATIVE(splice_via_pipe_loop(fd, out_fd, len),
"Splice failed"); "Splice failed");
@@ -194,11 +193,10 @@ void socket_nbd_write(int fd, uint64_t from, uint32_t len, int in_fd,
fill_request(&request_raw, REQUEST_WRITE, 0, from, len); fill_request(&request_raw, REQUEST_WRITE, 0, from, len);
ERROR_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)), ERROR_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
SHOW_ERRNO("Couldn't write request")); "Couldn't write request");
if (in_buf) { if (in_buf) {
ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len), ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len), "Write failed");
SHOW_ERRNO("Write failed"));
} else { } else {
ERROR_IF_NEGATIVE(splice_via_pipe_loop(in_fd, fd, len), ERROR_IF_NEGATIVE(splice_via_pipe_loop(in_fd, fd, len),
"Splice failed"); "Splice failed");
@@ -219,8 +217,7 @@ int socket_nbd_disconnect(int fd)
* the mirror without affecting the main server. * the mirror without affecting the main server.
*/ */
FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)), FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
SHOW_ERRNO "Failed to write the disconnect request.");
("Failed to write the disconnect request."));
return success; return success;
} }

View File

@@ -58,6 +58,20 @@ struct proxier *proxy_create(char *s_downstream_address,
out->downstream_fd = -1; out->downstream_fd = -1;
out->upstream_fd = -1; out->upstream_fd = -1;
int upstream_timeout = UPSTREAM_TIMEOUT;
char *env_upstream_timeout = getenv("FLEXNBD_UPSTREAM_TIMEOUT");
if (NULL != env_upstream_timeout) {
int ut = atoi(env_upstream_timeout);
warn("Got %i from atoi\n", ut);
if (ut > 0) {
upstream_timeout = ut;
}
}
out->upstream_timeout = upstream_timeout;
out->upstream_timeout_ms = (long unsigned int) upstream_timeout * 1000;
out->prefetch = NULL; out->prefetch = NULL;
if (s_cache_bytes) { if (s_cache_bytes) {
int cache_bytes = atoi(s_cache_bytes); int cache_bytes = atoi(s_cache_bytes);
@@ -507,6 +521,20 @@ int proxy_continue_connecting_to_upstream(struct proxier *proxy, int state)
/* Data may have changed while we were disconnected */ /* Data may have changed while we were disconnected */
prefetch_set_is_empty(proxy->prefetch); prefetch_set_is_empty(proxy->prefetch);
/* Reset our needles and sizes.
*
* Don't zero the req buffer size in case there's an outstanding request
* waiting to be re-sent following a disconnection. The init and rsp
* buffers are used for reading from upstream. If we're in this state then
* any upstream reads will be re-requested.
*/
proxy->init.needle = 0;
proxy->init.size = 0;
proxy->rsp.needle = 0;
proxy->rsp.size = 0;
proxy->req.needle = 0;
/* Don't zero the req.size, as we may need to re-write it upstream */
info("Connected to upstream on fd %i", proxy->upstream_fd); info("Connected to upstream on fd %i", proxy->upstream_fd);
return READ_INIT_FROM_UPSTREAM; return READ_INIT_FROM_UPSTREAM;
} }
@@ -523,7 +551,7 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
if (count == -1) { if (count == -1) {
warn(SHOW_ERRNO("Failed to read init from upstream")); warn(SHOW_ERRNO("Failed to read init from upstream"));
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
if (proxy->init.needle == proxy->init.size) { if (proxy->init.needle == proxy->init.size) {
@@ -533,26 +561,24 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
((struct nbd_init_raw *) proxy->init.buf, &upstream_size, ((struct nbd_init_raw *) proxy->init.buf, &upstream_size,
&upstream_flags)) { &upstream_flags)) {
warn("Upstream sent invalid init"); warn("Upstream sent invalid init");
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
/* record the flags, and log the reconnection, set TCP_NODELAY */ /* record the flags, and log the reconnection, set TCP_NODELAY */
proxy_finish_connect_to_upstream(proxy, upstream_size, proxy_finish_connect_to_upstream(proxy, upstream_size,
upstream_flags); upstream_flags);
/* Currently, we only get disconnected from upstream (so needing to come /* Finished reading the init response now, reset the needle. */
* here) when we have an outstanding request. If that becomes false,
* we'll need to choose the right state to return to here */
proxy->init.needle = 0; proxy->init.needle = 0;
/* Currently, we only get disconnected from upstream (so needing to
* come here) when we have an outstanding request. If that becomes
* false, we'll need to choose the right state to return to here.
*/
return WRITE_TO_UPSTREAM; return WRITE_TO_UPSTREAM;
} }
return state; return state;
disconnect:
proxy->init.needle = 0;
proxy->init.size = 0;
return CONNECT_TO_UPSTREAM;
} }
int proxy_write_to_upstream(struct proxier *proxy, int state) int proxy_write_to_upstream(struct proxier *proxy, int state)
@@ -574,7 +600,6 @@ int proxy_write_to_upstream(struct proxier *proxy, int state)
if (count == -1) { if (count == -1) {
warn(SHOW_ERRNO("Failed to send request to upstream")); warn(SHOW_ERRNO("Failed to send request to upstream"));
proxy->req.needle = 0;
// We're throwing the socket away so no need to uncork // We're throwing the socket away so no need to uncork
return CONNECT_TO_UPSTREAM; return CONNECT_TO_UPSTREAM;
} }
@@ -611,7 +636,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
if (count == -1) { if (count == -1) {
warn(SHOW_ERRNO("Failed to get reply from upstream")); warn(SHOW_ERRNO("Failed to get reply from upstream"));
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
if (proxy->rsp.needle == NBD_REPLY_SIZE) { if (proxy->rsp.needle == NBD_REPLY_SIZE) {
@@ -619,7 +644,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
if (reply->magic != REPLY_MAGIC) { if (reply->magic != REPLY_MAGIC) {
warn("Reply magic is incorrect"); warn("Reply magic is incorrect");
goto disconnect; return CONNECT_TO_UPSTREAM;
} }
if (proxy->req_hdr.type == REQUEST_READ) { if (proxy->req_hdr.type == REQUEST_READ) {
@@ -635,11 +660,6 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
} }
return state; return state;
disconnect:
proxy->rsp.needle = 0;
proxy->rsp.size = 0;
return CONNECT_TO_UPSTREAM;
} }
@@ -775,6 +795,9 @@ void proxy_session(struct proxier *proxy)
}; };
if (select_timeout.tv_sec > 0) { if (select_timeout.tv_sec > 0) {
if (select_timeout.tv_sec > proxy->upstream_timeout) {
select_timeout.tv_sec = proxy->upstream_timeout;
}
select_timeout_ptr = &select_timeout; select_timeout_ptr = &select_timeout;
} }
@@ -848,17 +871,9 @@ void proxy_session(struct proxier *proxy)
/* In these states, we're interested in restarting after a timeout. /* In these states, we're interested in restarting after a timeout.
*/ */
if (old_state == state && proxy_state_upstream(state)) { if (old_state == state && proxy_state_upstream(state)) {
if ((monotonic_time_ms()) - state_started > UPSTREAM_TIMEOUT) { if ((monotonic_time_ms()) - state_started > proxy->upstream_timeout_ms) {
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state] warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]);
);
state = CONNECT_TO_UPSTREAM; state = CONNECT_TO_UPSTREAM;
/* Since we've timed out, we won't have gone through the timeout logic
* in the various state handlers that resets these appropriately... */
proxy->init.size = 0;
proxy->init.needle = 0;
proxy->rsp.size = 0;
proxy->rsp.needle = 0;
} }
} }
} }

View File

@@ -14,10 +14,10 @@
#endif #endif
/** UPSTREAM_TIMEOUT /** UPSTREAM_TIMEOUT
* How long ( in ms ) to allow for upstream to respond. If it takes longer * How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and resubmit * than this, we will cancel the current request-response to them and resubmit
*/ */
#define UPSTREAM_TIMEOUT 30 * 1000 #define UPSTREAM_TIMEOUT 30
struct proxier { struct proxier {
/** address/port to bind to */ /** address/port to bind to */
@@ -72,6 +72,16 @@ struct proxier {
uint64_t req_count; uint64_t req_count;
int hello_sent; int hello_sent;
/*
* How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and
* resubmit
*
* Defaults to UPSTREAM_TIMEOUT but can be overridden in the environment.
*/
int upstream_timeout;
unsigned long int upstream_timeout_ms;
/** These are only used if we pass --cache on the command line */ /** These are only used if we pass --cache on the command line */
/* While the in-flight request has been munged by prefetch, these two are /* While the in-flight request has been munged by prefetch, these two are

View File

@@ -152,7 +152,7 @@ void write_not_zeroes(struct client *client, uint64_t from, uint64_t len)
(dst), \ (dst), \
(len) \ (len) \
), \ ), \
SHOW_ERRNO("read failed %ld+%d", from, (len)) \ "read failed %ld+%d", from, (len) \
) )
if (bitset_is_set_at(map, from)) { if (bitset_is_set_at(map, from)) {
@@ -232,13 +232,13 @@ int client_read_request(struct client *client,
*disconnected = 1; *disconnected = 1;
switch (errno) { switch (errno) {
case 0: case 0:
warn(SHOW_ERRNO("EOF while reading request")); warn("EOF while reading request");
return 0; return 0;
case ECONNRESET: case ECONNRESET:
warn(SHOW_ERRNO("Connection reset while reading request")); warn("Connection reset while" " reading request");
return 0; return 0;
case ETIMEDOUT: case ETIMEDOUT:
warn(SHOW_ERRNO("Connection timed out while reading request")); warn("Connection timed out while" " reading request");
return 0; return 0;
default: default:
/* FIXME: I've seen this happen, but I /* FIXME: I've seen this happen, but I
@@ -248,7 +248,7 @@ int client_read_request(struct client *client,
* again. It should *probably* be an * again. It should *probably* be an
* error() call, but I want to be sure. * error() call, but I want to be sure.
* */ * */
fatal(SHOW_ERRNO("Error reading request")); fatal("Error reading request: %d, %s", errno, strerror(errno));
} }
} }
@@ -271,17 +271,16 @@ int fd_write_reply(int fd, uint64_t handle, int error)
if (-1 == writeloop(fd, &reply_raw, sizeof(reply_raw))) { if (-1 == writeloop(fd, &reply_raw, sizeof(reply_raw))) {
switch (errno) { switch (errno) {
case ECONNRESET: case ECONNRESET:
error(SHOW_ERRNO("Connection reset while writing reply")); error("Connection reset while writing reply");
break; break;
case EBADF: case EBADF:
fatal(SHOW_ERRNO fatal("Tried to write to an invalid file descriptor");
("Tried to write to an invalid file descriptor"));
break; break;
case EPIPE: case EPIPE:
error(SHOW_ERRNO("Remote end closed")); error("Remote end closed");
break; break;
default: default:
fatal(SHOW_ERRNO("Unhandled error while writing")); fatal("Unhandled error while writing: %d", errno);
} }
} }
@@ -319,7 +318,7 @@ void client_write_init(struct client *client, uint64_t size)
ERROR_IF_NEGATIVE(writeloop ERROR_IF_NEGATIVE(writeloop
(client->socket, &init_raw, sizeof(init_raw)), (client->socket, &init_raw, sizeof(init_raw)),
SHOW_ERRNO("Couldn't send hello")); "Couldn't send hello");
} }
@@ -330,7 +329,8 @@ void client_write_init(struct client *client, uint64_t size)
void client_flush(struct client *client, size_t len) void client_flush(struct client *client, size_t len)
{ {
int devnull = open("/dev/null", O_WRONLY); int devnull = open("/dev/null", O_WRONLY);
FATAL_IF_NEGATIVE(devnull, SHOW_ERRNO("Couldn't open /dev/null")); FATAL_IF_NEGATIVE(devnull,
"Couldn't open /dev/null: %s", strerror(errno));
int pipes[2]; int pipes[2];
pipe(pipes); pipe(pipes);
@@ -341,12 +341,12 @@ void client_flush(struct client *client, size_t len)
ssize_t received = splice(client->socket, NULL, ssize_t received = splice(client->socket, NULL,
pipes[1], NULL, pipes[1], NULL,
len - spliced, flags); len - spliced, flags);
FATAL_IF_NEGATIVE(received, SHOW_ERRNO("splice error")); FATAL_IF_NEGATIVE(received, "splice error: %s", strerror(errno));
ssize_t junked = 0; ssize_t junked = 0;
while (junked < received) { while (junked < received) {
ssize_t junk; ssize_t junk;
junk = splice(pipes[0], NULL, devnull, NULL, received, flags); junk = splice(pipes[0], NULL, devnull, NULL, received, flags);
FATAL_IF_NEGATIVE(junk, SHOW_ERRNO("splice error")); FATAL_IF_NEGATIVE(junk, "splice error: %s", strerror(errno));
junked += junk; junked += junk;
} }
spliced += received; spliced += received;
@@ -459,8 +459,8 @@ void client_reply_to_write(struct client *client,
ERROR_IF_NEGATIVE(readloop(client->socket, ERROR_IF_NEGATIVE(readloop(client->socket,
client->mapped + request.from, client->mapped + request.from,
request.len), request.len),
SHOW_ERRNO("reading write data failed from=%ld, len=%d", "reading write data failed from=%ld, len=%d",
request.from, request.len)); request.from, request.len);
/* the allocation_map is shared between client threads, and may be /* the allocation_map is shared between client threads, and may be
* being built. We need to reflect the write in it, as it may be in * being built. We need to reflect the write in it, as it may be in
@@ -693,8 +693,8 @@ void *client_serve(void *client_uncast)
&client->fileno, &client->fileno,
&client->mapped_size, &client->mapped_size,
(void **) &client->mapped), (void **) &client->mapped),
SHOW_ERRNO("Couldn't open/mmap file %s", "Couldn't open/mmap file %s: %s",
client->serve->filename) client->serve->filename, strerror(errno)
); );
FATAL_IF_NEGATIVE(madvise FATAL_IF_NEGATIVE(madvise

View File

@@ -78,6 +78,14 @@ void control_destroy(struct control *control)
free(control); free(control);
} }
void control_wait_for_close(struct control *control)
{
NULLCHECK(control);
while (!fd_is_closed(control->control_fd)) {
usleep(10000);
}
}
struct control_client *control_client_create(struct flexnbd *flexnbd, struct control_client *control_client_create(struct flexnbd *flexnbd,
int client_fd, int client_fd,
struct mbox *state_mbox) struct mbox *state_mbox)

View File

@@ -47,6 +47,7 @@ struct control_client {
struct control *control_create(struct flexnbd *, struct control *control_create(struct flexnbd *,
const char *control_socket_name); const char *control_socket_name);
void control_signal_close(struct control *); void control_signal_close(struct control *);
void control_wait_for_close(struct control *control);
void control_destroy(struct control *); void control_destroy(struct control *);
void *control_runner(void *); void *control_runner(void *);

View File

@@ -671,6 +671,7 @@ static void mirror_abandon_cb(struct ev_loop *loop, ev_io * w, int revents)
debug("Abandon message received"); debug("Abandon message received");
mirror_set_state(ctrl->mirror, MS_ABANDONED); mirror_set_state(ctrl->mirror, MS_ABANDONED);
self_pipe_signal_clear(ctrl->mirror->abandon_signal); self_pipe_signal_clear(ctrl->mirror->abandon_signal);
ev_io_stop(loop, &ctrl->abandon_watcher);
ev_break(loop, EVBREAK_ONE); ev_break(loop, EVBREAK_ONE);
return; return;
} }

View File

@@ -812,8 +812,6 @@ void server_control_arrived(struct server *serve)
} }
void flexnbd_stop_control(struct flexnbd *flexnbd);
/** Closes sockets, frees memory and waits for all client threads to finish */ /** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct server *params, void serve_cleanup(struct server *params,
int fatal __attribute__ ((unused))) int fatal __attribute__ ((unused)))
@@ -823,7 +821,19 @@ void serve_cleanup(struct server *params,
info("cleaning up"); info("cleaning up");
/* Close the control socket, and wait for it to close before proceeding.
* If we do not wait, we risk a race condition with the tail supervisor
* sending a status command, and deadlocking the mirroring. */
if (params->flexnbd && params->flexnbd->control) {
debug("closing control socket");
control_signal_close(params->flexnbd->control);
debug("waiting for control socket to close");
control_wait_for_close(params->flexnbd->control);
}
if (params->server_fd) { if (params->server_fd) {
debug("closing server_fd");
close(params->server_fd); close(params->server_fd);
} }
@@ -861,15 +871,6 @@ void serve_cleanup(struct server *params,
server_unlock_acl(params); server_unlock_acl(params);
} }
/* if( params->flexnbd ) { */
/* if ( params->flexnbd->control ) { */
/* flexnbd_stop_control( params->flexnbd ); */
/* } */
/* flexnbd_destroy( params->flexnbd ); */
/* } */
/* server_destroy( params ); */
debug("Cleanup done"); debug("Cleanup done");
} }

View File

@@ -1,5 +1,6 @@
require 'socket' require 'socket'
require 'timeout' require 'timeout'
require 'io/wait' # For IO#nread
require 'flexnbd/constants' require 'flexnbd/constants'
@@ -54,6 +55,10 @@ module FlexNBD
write_reply(handle, 1) write_reply(handle, 1)
end end
def nread
@sock.nread
end
def disconnected? def disconnected?
Timeout.timeout(2) do Timeout.timeout(2) do
@sock.read(1).nil? @sock.read(1).nil?
@@ -85,6 +90,10 @@ module FlexNBD
@sock.write(len) @sock.write(len)
end end
def getsockopt(level, optname)
@sock.getsockopt(level, optname)
end
def self.parse_be64(str) def self.parse_be64(str)
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
str.length == 8 str.length == 8

View File

@@ -158,7 +158,7 @@ module ProxyTests
# Send the read request to the proxy # Send the read request to the proxy
client.write(0, (b * 4096)) client.write(0, (b * 4096))
# ensure we're given the read request # ensure we're given the write request
req1 = sc1.read_request req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
@@ -206,6 +206,62 @@ module ProxyTests
end end
end end
def test_write_request_retried_when_upstream_times_out_during_write_phase
ENV['FLEXNBD_UPSTREAM_TIMEOUT'] = '4'
maker = make_fake_server
with_ld_preload('setsockopt_logger') do
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Guess an approprate request size, based on the send buffer size.
sz = sc1.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int * 4
data1 = (b * sz)
# Send the read request to the proxy
client.write(0, data1)
# ensure we're given the write request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal data1.size, req1[:len]
# We do not read it at this point, as we want the proxy to be waiting
# in the WRITE_UPSTREAM state.
# Need to sleep longer than the timeout set above
sleep 5
# Check the number of bytes that can be read from the socket without
# blocking. If this equal to the size of the original request, then
# the whole request has been buffered. If this is the case, then the
# proxy will not time-out in the WRITE_UPSTREAM statem which is what
# we're trying to test.
assert sc1.nread < sz, 'Request from proxy completely buffered. Test is useless'
# Kill the server now that the timeout has happened.
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# And now lets read the data to make sure we get it all.
data2 = sc2.read_data(req2[:len])
assert_equal data1, data2
sc2.close
server.close
end
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |_client| with_proxied_client do |_client|
c2 = nil c2 = nil

77
tests/acceptance/test_write_during_migration.rb Normal file → Executable file
View File

@@ -82,15 +82,22 @@ class TestWriteDuringMigration < Test::Unit::TestCase
UNIXSocket.open(@source_sock) do |sock| UNIXSocket.open(@source_sock) do |sock|
sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A") sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A")
sock.flush sock.flush
rsp = sock.readline sock.readline
end
end
def stop_mirror
UNIXSocket.open(@source_sock) do |sock|
sock.write("break\x0A\x0A")
sock.flush
sock.readline
end end
end end
def wait_for_quit def wait_for_quit
Timeout.timeout(10) do Timeout.timeout(10) do
start_time = Time.now Process.waitpid2(@dst_proc)
dst_result = Process.waitpid2(@dst_proc) Process.waitpid2(@src_proc)
src_result = Process.waitpid2(@src_proc)
end end
end end
@@ -100,13 +107,28 @@ class TestWriteDuringMigration < Test::Unit::TestCase
loop do loop do
begin begin
client.write(offsets[rand(offsets.size)] * 4096, @write_data) client.write(offsets[rand(offsets.size)] * 4096, @write_data)
rescue StandardError => err rescue StandardError
# We expect a broken write at some point, so ignore it # We expect a broken write at some point, so ignore it
break break
end end
end end
end end
def bombard_with_status
loop do
begin
UNIXSocket.open(@source_sock) do |sock|
sock.write("status\x0A\x0A")
sock.flush
sock.readline
end
rescue StandardError
# If the socket disappears, that's OK.
break
end
end
end
def assert_both_sides_identical def assert_both_sides_identical
# puts `md5sum #{@source_file} #{@dest_file}` # puts `md5sum #{@source_file} #{@dest_file}`
@@ -160,5 +182,48 @@ class TestWriteDuringMigration < Test::Unit::TestCase
(src_writers_1 + src_writers_2).each(&:join) (src_writers_1 + src_writers_2).each(&:join)
assert_both_sides_identical assert_both_sides_identical
end end
end end end
end
def test_status_call_after_cleanup
Dir.mktmpdir do |tmpdir|
Dir.chdir(tmpdir) do
make_files
launch_servers
status_poker = Thread.new { bombard_with_status }
start_mirror
wait_for_quit
status_poker.join
assert_both_sides_identical
end
end
end
def test_mirroring_can_be_restarted
@size = 100 * 1024 * 1024 # 100MB
Dir.mktmpdir do |tmpdir|
Dir.chdir(tmpdir) do
make_files
launch_servers
# This is a bit racy. It needs to be slow enough that the migration
# isn't finished before the stop runs, and slow enough so that we can
# stop/start a few times.
3.times do
start_mirror
sleep 0.1
stop_mirror
sleep 0.1
end
start_mirror
wait_for_quit
end
end
end
end end