Compare commits
1 Commits
master
...
add-read-f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
102738d9ad |
@@ -1,21 +1,18 @@
|
|||||||
stages:
|
stages:
|
||||||
- package
|
- package
|
||||||
- publish
|
- publish
|
||||||
|
|
||||||
.package: &package
|
package:jessie: &package
|
||||||
stage: package
|
stage: package
|
||||||
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
|
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
|
||||||
|
variables:
|
||||||
|
DISTRO: jessie
|
||||||
script:
|
script:
|
||||||
- package
|
- package
|
||||||
artifacts:
|
artifacts:
|
||||||
paths:
|
paths:
|
||||||
- pkg/
|
- pkg/
|
||||||
|
|
||||||
package:jessie:
|
|
||||||
<<: *package
|
|
||||||
variables:
|
|
||||||
DISTRO: jessie
|
|
||||||
|
|
||||||
package:stretch:
|
package:stretch:
|
||||||
<<: *package
|
<<: *package
|
||||||
variables:
|
variables:
|
||||||
@@ -23,7 +20,7 @@ package:stretch:
|
|||||||
|
|
||||||
publish:
|
publish:
|
||||||
stage: publish
|
stage: publish
|
||||||
tags:
|
tags:
|
||||||
- shell
|
- shell
|
||||||
script:
|
script:
|
||||||
- publish
|
- publish
|
||||||
|
2
Makefile
2
Makefile
@@ -4,7 +4,7 @@ VPATH=src:tests/unit
|
|||||||
DESTDIR?=/
|
DESTDIR?=/
|
||||||
PREFIX?=/usr/local/bin
|
PREFIX?=/usr/local/bin
|
||||||
INSTALLDIR=$(DESTDIR)/$(PREFIX)
|
INSTALLDIR=$(DESTDIR)/$(PREFIX)
|
||||||
|
|
||||||
ifdef DEBUG
|
ifdef DEBUG
|
||||||
CFLAGS_EXTRA=-g -DDEBUG
|
CFLAGS_EXTRA=-g -DDEBUG
|
||||||
LDFLAGS_EXTRA=-g
|
LDFLAGS_EXTRA=-g
|
||||||
|
@@ -169,11 +169,6 @@ That is, the '=' is required. This is a limitation of getopt-long.
|
|||||||
If no cache size is given, a size of 4096 bytes is assumed. Caching can
|
If no cache size is given, a size of 4096 bytes is assumed. Caching can
|
||||||
be explicitly disabled by setting a size of 0.
|
be explicitly disabled by setting a size of 0.
|
||||||
|
|
||||||
ENVIRONMENT
|
|
||||||
|
|
||||||
FLEXNBD_UPSTREAM_TIMEOUT The timeout in seconds for the proxy communicating
|
|
||||||
with the upstream server defaults to 30 seconds.
|
|
||||||
|
|
||||||
BUGS
|
BUGS
|
||||||
|
|
||||||
Should be reported via GitHub.
|
Should be reported via GitHub.
|
||||||
|
23
debian/changelog
vendored
23
debian/changelog
vendored
@@ -1,25 +1,8 @@
|
|||||||
flexnbd (0.5.0) stable; urgency=medium
|
flexnbd (0.2.1) UNRELEASED; urgency=medium
|
||||||
|
|
||||||
[ Patrick J Cherry ]
|
* Force a msync after every write, ignoring FUA flag, or lack thereof.
|
||||||
* Explicitly close the server control socket, and wait for it to close, to
|
|
||||||
prevent deadlocks during the server clean-up process (#40 !58)
|
|
||||||
* Ensure mirroring can be restarted after a break command is sent to the
|
|
||||||
source (#37, !59)
|
|
||||||
|
|
||||||
-- James Carter <james.carter@bytemark.co.uk> Fri, 11 Jan 2019 10:37:23 +0000
|
-- Patrick J Cherry <patrick@bytemark.co.uk> Tue, 24 Apr 2018 10:27:12 +0100
|
||||||
|
|
||||||
flexnbd (0.4.0) stable; urgency=medium
|
|
||||||
|
|
||||||
* Ensure proxy state is completely reset before upstream init is read,
|
|
||||||
ensuring any waiting requests are fully replayed (#39, !54)
|
|
||||||
|
|
||||||
-- Patrick J Cherry <patrick@bytemark.co.uk> Thu, 15 Nov 2018 14:24:26 +0000
|
|
||||||
|
|
||||||
flexnbd (0.3.0) stable; urgency=medium
|
|
||||||
|
|
||||||
* Force a msync after every write, ignoring FUA flag, or lack thereof (!51).
|
|
||||||
|
|
||||||
-- Patrick J Cherry <patrick@bytemark.co.uk> Tue, 24 Apr 2018 12:05:43 +0100
|
|
||||||
|
|
||||||
flexnbd (0.2.0) stable; urgency=medium
|
flexnbd (0.2.0) stable; urgency=medium
|
||||||
|
|
||||||
|
@@ -148,8 +148,6 @@ int readloop(int filedes, void *buffer, size_t size)
|
|||||||
ssize_t result = read(filedes, buffer + readden, size - readden);
|
ssize_t result = read(filedes, buffer + readden, size - readden);
|
||||||
|
|
||||||
if (result == 0 /* EOF */ ) {
|
if (result == 0 /* EOF */ ) {
|
||||||
warn("end-of-file detected while reading after %i bytes",
|
|
||||||
readden);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -224,9 +222,8 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
|
|||||||
|
|
||||||
while (spliced < len) {
|
while (spliced < len) {
|
||||||
ssize_t run = len - spliced;
|
ssize_t run = len - spliced;
|
||||||
ssize_t s2, s1 =
|
ssize_t s2, s1 = spliceloop(fd_in, NULL, pipefd[1], NULL, run,
|
||||||
spliceloop(fd_in, NULL, pipefd[1], NULL, run,
|
SPLICE_F_NONBLOCK);
|
||||||
SPLICE_F_NONBLOCK);
|
|
||||||
/*if (run > 65535)
|
/*if (run > 65535)
|
||||||
run = 65535; */
|
run = 65535; */
|
||||||
if (s1 < 0) {
|
if (s1 < 0) {
|
||||||
|
@@ -76,7 +76,7 @@ int socket_nbd_read_hello(int fd, uint64_t * out_size,
|
|||||||
|
|
||||||
|
|
||||||
if (0 > readloop(fd, &init_raw, sizeof(init_raw))) {
|
if (0 > readloop(fd, &init_raw, sizeof(init_raw))) {
|
||||||
warn("Couldn't read init");
|
warn(SHOW_ERRNO("Couldn't read init"));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,7 +130,7 @@ void read_reply(int fd, uint64_t request_raw_handle,
|
|||||||
|
|
||||||
ERROR_IF_NEGATIVE(readloop
|
ERROR_IF_NEGATIVE(readloop
|
||||||
(fd, &reply_raw, sizeof(struct nbd_reply_raw)),
|
(fd, &reply_raw, sizeof(struct nbd_reply_raw)),
|
||||||
"Couldn't read reply");
|
SHOW_ERRNO("Couldn't read reply"));
|
||||||
|
|
||||||
nbd_r2h_reply(&reply_raw, reply);
|
nbd_r2h_reply(&reply_raw, reply);
|
||||||
|
|
||||||
@@ -171,14 +171,15 @@ void socket_nbd_read(int fd, uint64_t from, uint32_t len, int out_fd,
|
|||||||
|
|
||||||
fill_request(&request_raw, REQUEST_READ, 0, from, len);
|
fill_request(&request_raw, REQUEST_READ, 0, from, len);
|
||||||
FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
|
FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
|
||||||
"Couldn't write request");
|
SHOW_ERRNO("Couldn't write request"));
|
||||||
|
|
||||||
|
|
||||||
wait_for_data(fd, timeout_secs);
|
wait_for_data(fd, timeout_secs);
|
||||||
read_reply(fd, request_raw.handle.w, &reply);
|
read_reply(fd, request_raw.handle.w, &reply);
|
||||||
|
|
||||||
if (out_buf) {
|
if (out_buf) {
|
||||||
FATAL_IF_NEGATIVE(readloop(fd, out_buf, len), "Read failed");
|
FATAL_IF_NEGATIVE(readloop(fd, out_buf, len),
|
||||||
|
SHOW_ERRNO("Read failed"));
|
||||||
} else {
|
} else {
|
||||||
FATAL_IF_NEGATIVE(splice_via_pipe_loop(fd, out_fd, len),
|
FATAL_IF_NEGATIVE(splice_via_pipe_loop(fd, out_fd, len),
|
||||||
"Splice failed");
|
"Splice failed");
|
||||||
@@ -193,10 +194,11 @@ void socket_nbd_write(int fd, uint64_t from, uint32_t len, int in_fd,
|
|||||||
|
|
||||||
fill_request(&request_raw, REQUEST_WRITE, 0, from, len);
|
fill_request(&request_raw, REQUEST_WRITE, 0, from, len);
|
||||||
ERROR_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
|
ERROR_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
|
||||||
"Couldn't write request");
|
SHOW_ERRNO("Couldn't write request"));
|
||||||
|
|
||||||
if (in_buf) {
|
if (in_buf) {
|
||||||
ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len), "Write failed");
|
ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len),
|
||||||
|
SHOW_ERRNO("Write failed"));
|
||||||
} else {
|
} else {
|
||||||
ERROR_IF_NEGATIVE(splice_via_pipe_loop(in_fd, fd, len),
|
ERROR_IF_NEGATIVE(splice_via_pipe_loop(in_fd, fd, len),
|
||||||
"Splice failed");
|
"Splice failed");
|
||||||
@@ -217,7 +219,8 @@ int socket_nbd_disconnect(int fd)
|
|||||||
* the mirror without affecting the main server.
|
* the mirror without affecting the main server.
|
||||||
*/
|
*/
|
||||||
FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
|
FATAL_IF_NEGATIVE(writeloop(fd, &request_raw, sizeof(request_raw)),
|
||||||
"Failed to write the disconnect request.");
|
SHOW_ERRNO
|
||||||
|
("Failed to write the disconnect request."));
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -58,20 +58,6 @@ struct proxier *proxy_create(char *s_downstream_address,
|
|||||||
out->downstream_fd = -1;
|
out->downstream_fd = -1;
|
||||||
out->upstream_fd = -1;
|
out->upstream_fd = -1;
|
||||||
|
|
||||||
int upstream_timeout = UPSTREAM_TIMEOUT;
|
|
||||||
|
|
||||||
char *env_upstream_timeout = getenv("FLEXNBD_UPSTREAM_TIMEOUT");
|
|
||||||
if (NULL != env_upstream_timeout) {
|
|
||||||
int ut = atoi(env_upstream_timeout);
|
|
||||||
warn("Got %i from atoi\n", ut);
|
|
||||||
if (ut > 0) {
|
|
||||||
upstream_timeout = ut;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
out->upstream_timeout = upstream_timeout;
|
|
||||||
out->upstream_timeout_ms = (long unsigned int) upstream_timeout * 1000;
|
|
||||||
|
|
||||||
out->prefetch = NULL;
|
out->prefetch = NULL;
|
||||||
if (s_cache_bytes) {
|
if (s_cache_bytes) {
|
||||||
int cache_bytes = atoi(s_cache_bytes);
|
int cache_bytes = atoi(s_cache_bytes);
|
||||||
@@ -521,20 +507,6 @@ int proxy_continue_connecting_to_upstream(struct proxier *proxy, int state)
|
|||||||
/* Data may have changed while we were disconnected */
|
/* Data may have changed while we were disconnected */
|
||||||
prefetch_set_is_empty(proxy->prefetch);
|
prefetch_set_is_empty(proxy->prefetch);
|
||||||
|
|
||||||
/* Reset our needles and sizes.
|
|
||||||
*
|
|
||||||
* Don't zero the req buffer size in case there's an outstanding request
|
|
||||||
* waiting to be re-sent following a disconnection. The init and rsp
|
|
||||||
* buffers are used for reading from upstream. If we're in this state then
|
|
||||||
* any upstream reads will be re-requested.
|
|
||||||
*/
|
|
||||||
proxy->init.needle = 0;
|
|
||||||
proxy->init.size = 0;
|
|
||||||
proxy->rsp.needle = 0;
|
|
||||||
proxy->rsp.size = 0;
|
|
||||||
proxy->req.needle = 0;
|
|
||||||
/* Don't zero the req.size, as we may need to re-write it upstream */
|
|
||||||
|
|
||||||
info("Connected to upstream on fd %i", proxy->upstream_fd);
|
info("Connected to upstream on fd %i", proxy->upstream_fd);
|
||||||
return READ_INIT_FROM_UPSTREAM;
|
return READ_INIT_FROM_UPSTREAM;
|
||||||
}
|
}
|
||||||
@@ -551,7 +523,7 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
warn(SHOW_ERRNO("Failed to read init from upstream"));
|
warn(SHOW_ERRNO("Failed to read init from upstream"));
|
||||||
return CONNECT_TO_UPSTREAM;
|
goto disconnect;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proxy->init.needle == proxy->init.size) {
|
if (proxy->init.needle == proxy->init.size) {
|
||||||
@@ -561,24 +533,26 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
|
|||||||
((struct nbd_init_raw *) proxy->init.buf, &upstream_size,
|
((struct nbd_init_raw *) proxy->init.buf, &upstream_size,
|
||||||
&upstream_flags)) {
|
&upstream_flags)) {
|
||||||
warn("Upstream sent invalid init");
|
warn("Upstream sent invalid init");
|
||||||
return CONNECT_TO_UPSTREAM;
|
goto disconnect;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* record the flags, and log the reconnection, set TCP_NODELAY */
|
/* record the flags, and log the reconnection, set TCP_NODELAY */
|
||||||
proxy_finish_connect_to_upstream(proxy, upstream_size,
|
proxy_finish_connect_to_upstream(proxy, upstream_size,
|
||||||
upstream_flags);
|
upstream_flags);
|
||||||
|
|
||||||
/* Finished reading the init response now, reset the needle. */
|
/* Currently, we only get disconnected from upstream (so needing to come
|
||||||
|
* here) when we have an outstanding request. If that becomes false,
|
||||||
|
* we'll need to choose the right state to return to here */
|
||||||
proxy->init.needle = 0;
|
proxy->init.needle = 0;
|
||||||
|
|
||||||
/* Currently, we only get disconnected from upstream (so needing to
|
|
||||||
* come here) when we have an outstanding request. If that becomes
|
|
||||||
* false, we'll need to choose the right state to return to here.
|
|
||||||
*/
|
|
||||||
return WRITE_TO_UPSTREAM;
|
return WRITE_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
return state;
|
return state;
|
||||||
|
|
||||||
|
disconnect:
|
||||||
|
proxy->init.needle = 0;
|
||||||
|
proxy->init.size = 0;
|
||||||
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
int proxy_write_to_upstream(struct proxier *proxy, int state)
|
int proxy_write_to_upstream(struct proxier *proxy, int state)
|
||||||
@@ -600,6 +574,7 @@ int proxy_write_to_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
warn(SHOW_ERRNO("Failed to send request to upstream"));
|
warn(SHOW_ERRNO("Failed to send request to upstream"));
|
||||||
|
proxy->req.needle = 0;
|
||||||
// We're throwing the socket away so no need to uncork
|
// We're throwing the socket away so no need to uncork
|
||||||
return CONNECT_TO_UPSTREAM;
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
@@ -636,7 +611,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
warn(SHOW_ERRNO("Failed to get reply from upstream"));
|
warn(SHOW_ERRNO("Failed to get reply from upstream"));
|
||||||
return CONNECT_TO_UPSTREAM;
|
goto disconnect;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proxy->rsp.needle == NBD_REPLY_SIZE) {
|
if (proxy->rsp.needle == NBD_REPLY_SIZE) {
|
||||||
@@ -644,7 +619,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (reply->magic != REPLY_MAGIC) {
|
if (reply->magic != REPLY_MAGIC) {
|
||||||
warn("Reply magic is incorrect");
|
warn("Reply magic is incorrect");
|
||||||
return CONNECT_TO_UPSTREAM;
|
goto disconnect;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proxy->req_hdr.type == REQUEST_READ) {
|
if (proxy->req_hdr.type == REQUEST_READ) {
|
||||||
@@ -660,6 +635,11 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
|
|||||||
}
|
}
|
||||||
|
|
||||||
return state;
|
return state;
|
||||||
|
|
||||||
|
disconnect:
|
||||||
|
proxy->rsp.needle = 0;
|
||||||
|
proxy->rsp.size = 0;
|
||||||
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -795,9 +775,6 @@ void proxy_session(struct proxier *proxy)
|
|||||||
};
|
};
|
||||||
|
|
||||||
if (select_timeout.tv_sec > 0) {
|
if (select_timeout.tv_sec > 0) {
|
||||||
if (select_timeout.tv_sec > proxy->upstream_timeout) {
|
|
||||||
select_timeout.tv_sec = proxy->upstream_timeout;
|
|
||||||
}
|
|
||||||
select_timeout_ptr = &select_timeout;
|
select_timeout_ptr = &select_timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -871,9 +848,17 @@ void proxy_session(struct proxier *proxy)
|
|||||||
/* In these states, we're interested in restarting after a timeout.
|
/* In these states, we're interested in restarting after a timeout.
|
||||||
*/
|
*/
|
||||||
if (old_state == state && proxy_state_upstream(state)) {
|
if (old_state == state && proxy_state_upstream(state)) {
|
||||||
if ((monotonic_time_ms()) - state_started > proxy->upstream_timeout_ms) {
|
if ((monotonic_time_ms()) - state_started > UPSTREAM_TIMEOUT) {
|
||||||
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]);
|
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]
|
||||||
|
);
|
||||||
state = CONNECT_TO_UPSTREAM;
|
state = CONNECT_TO_UPSTREAM;
|
||||||
|
|
||||||
|
/* Since we've timed out, we won't have gone through the timeout logic
|
||||||
|
* in the various state handlers that resets these appropriately... */
|
||||||
|
proxy->init.size = 0;
|
||||||
|
proxy->init.needle = 0;
|
||||||
|
proxy->rsp.size = 0;
|
||||||
|
proxy->rsp.needle = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -14,10 +14,10 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/** UPSTREAM_TIMEOUT
|
/** UPSTREAM_TIMEOUT
|
||||||
* How long (in s) to allow for upstream to respond. If it takes longer
|
* How long ( in ms ) to allow for upstream to respond. If it takes longer
|
||||||
* than this, we will cancel the current request-response to them and resubmit
|
* than this, we will cancel the current request-response to them and resubmit
|
||||||
*/
|
*/
|
||||||
#define UPSTREAM_TIMEOUT 30
|
#define UPSTREAM_TIMEOUT 30 * 1000
|
||||||
|
|
||||||
struct proxier {
|
struct proxier {
|
||||||
/** address/port to bind to */
|
/** address/port to bind to */
|
||||||
@@ -72,16 +72,6 @@ struct proxier {
|
|||||||
uint64_t req_count;
|
uint64_t req_count;
|
||||||
int hello_sent;
|
int hello_sent;
|
||||||
|
|
||||||
/*
|
|
||||||
* How long (in s) to allow for upstream to respond. If it takes longer
|
|
||||||
* than this, we will cancel the current request-response to them and
|
|
||||||
* resubmit
|
|
||||||
*
|
|
||||||
* Defaults to UPSTREAM_TIMEOUT but can be overridden in the environment.
|
|
||||||
*/
|
|
||||||
int upstream_timeout;
|
|
||||||
unsigned long int upstream_timeout_ms;
|
|
||||||
|
|
||||||
/** These are only used if we pass --cache on the command line */
|
/** These are only used if we pass --cache on the command line */
|
||||||
|
|
||||||
/* While the in-flight request has been munged by prefetch, these two are
|
/* While the in-flight request has been munged by prefetch, these two are
|
||||||
|
@@ -152,7 +152,7 @@ void write_not_zeroes(struct client *client, uint64_t from, uint64_t len)
|
|||||||
(dst), \
|
(dst), \
|
||||||
(len) \
|
(len) \
|
||||||
), \
|
), \
|
||||||
"read failed %ld+%d", from, (len) \
|
SHOW_ERRNO("read failed %ld+%d", from, (len)) \
|
||||||
)
|
)
|
||||||
|
|
||||||
if (bitset_is_set_at(map, from)) {
|
if (bitset_is_set_at(map, from)) {
|
||||||
@@ -232,13 +232,13 @@ int client_read_request(struct client *client,
|
|||||||
*disconnected = 1;
|
*disconnected = 1;
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
case 0:
|
case 0:
|
||||||
warn("EOF while reading request");
|
warn(SHOW_ERRNO("EOF while reading request"));
|
||||||
return 0;
|
return 0;
|
||||||
case ECONNRESET:
|
case ECONNRESET:
|
||||||
warn("Connection reset while" " reading request");
|
warn(SHOW_ERRNO("Connection reset while reading request"));
|
||||||
return 0;
|
return 0;
|
||||||
case ETIMEDOUT:
|
case ETIMEDOUT:
|
||||||
warn("Connection timed out while" " reading request");
|
warn(SHOW_ERRNO("Connection timed out while reading request"));
|
||||||
return 0;
|
return 0;
|
||||||
default:
|
default:
|
||||||
/* FIXME: I've seen this happen, but I
|
/* FIXME: I've seen this happen, but I
|
||||||
@@ -248,7 +248,7 @@ int client_read_request(struct client *client,
|
|||||||
* again. It should *probably* be an
|
* again. It should *probably* be an
|
||||||
* error() call, but I want to be sure.
|
* error() call, but I want to be sure.
|
||||||
* */
|
* */
|
||||||
fatal("Error reading request: %d, %s", errno, strerror(errno));
|
fatal(SHOW_ERRNO("Error reading request"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -271,16 +271,17 @@ int fd_write_reply(int fd, uint64_t handle, int error)
|
|||||||
if (-1 == writeloop(fd, &reply_raw, sizeof(reply_raw))) {
|
if (-1 == writeloop(fd, &reply_raw, sizeof(reply_raw))) {
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
case ECONNRESET:
|
case ECONNRESET:
|
||||||
error("Connection reset while writing reply");
|
error(SHOW_ERRNO("Connection reset while writing reply"));
|
||||||
break;
|
break;
|
||||||
case EBADF:
|
case EBADF:
|
||||||
fatal("Tried to write to an invalid file descriptor");
|
fatal(SHOW_ERRNO
|
||||||
|
("Tried to write to an invalid file descriptor"));
|
||||||
break;
|
break;
|
||||||
case EPIPE:
|
case EPIPE:
|
||||||
error("Remote end closed");
|
error(SHOW_ERRNO("Remote end closed"));
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
fatal("Unhandled error while writing: %d", errno);
|
fatal(SHOW_ERRNO("Unhandled error while writing"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -318,7 +319,7 @@ void client_write_init(struct client *client, uint64_t size)
|
|||||||
|
|
||||||
ERROR_IF_NEGATIVE(writeloop
|
ERROR_IF_NEGATIVE(writeloop
|
||||||
(client->socket, &init_raw, sizeof(init_raw)),
|
(client->socket, &init_raw, sizeof(init_raw)),
|
||||||
"Couldn't send hello");
|
SHOW_ERRNO("Couldn't send hello"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -329,8 +330,7 @@ void client_write_init(struct client *client, uint64_t size)
|
|||||||
void client_flush(struct client *client, size_t len)
|
void client_flush(struct client *client, size_t len)
|
||||||
{
|
{
|
||||||
int devnull = open("/dev/null", O_WRONLY);
|
int devnull = open("/dev/null", O_WRONLY);
|
||||||
FATAL_IF_NEGATIVE(devnull,
|
FATAL_IF_NEGATIVE(devnull, SHOW_ERRNO("Couldn't open /dev/null"));
|
||||||
"Couldn't open /dev/null: %s", strerror(errno));
|
|
||||||
int pipes[2];
|
int pipes[2];
|
||||||
pipe(pipes);
|
pipe(pipes);
|
||||||
|
|
||||||
@@ -341,12 +341,12 @@ void client_flush(struct client *client, size_t len)
|
|||||||
ssize_t received = splice(client->socket, NULL,
|
ssize_t received = splice(client->socket, NULL,
|
||||||
pipes[1], NULL,
|
pipes[1], NULL,
|
||||||
len - spliced, flags);
|
len - spliced, flags);
|
||||||
FATAL_IF_NEGATIVE(received, "splice error: %s", strerror(errno));
|
FATAL_IF_NEGATIVE(received, SHOW_ERRNO("splice error"));
|
||||||
ssize_t junked = 0;
|
ssize_t junked = 0;
|
||||||
while (junked < received) {
|
while (junked < received) {
|
||||||
ssize_t junk;
|
ssize_t junk;
|
||||||
junk = splice(pipes[0], NULL, devnull, NULL, received, flags);
|
junk = splice(pipes[0], NULL, devnull, NULL, received, flags);
|
||||||
FATAL_IF_NEGATIVE(junk, "splice error: %s", strerror(errno));
|
FATAL_IF_NEGATIVE(junk, SHOW_ERRNO("splice error"));
|
||||||
junked += junk;
|
junked += junk;
|
||||||
}
|
}
|
||||||
spliced += received;
|
spliced += received;
|
||||||
@@ -459,8 +459,8 @@ void client_reply_to_write(struct client *client,
|
|||||||
ERROR_IF_NEGATIVE(readloop(client->socket,
|
ERROR_IF_NEGATIVE(readloop(client->socket,
|
||||||
client->mapped + request.from,
|
client->mapped + request.from,
|
||||||
request.len),
|
request.len),
|
||||||
"reading write data failed from=%ld, len=%d",
|
SHOW_ERRNO("reading write data failed from=%ld, len=%d",
|
||||||
request.from, request.len);
|
request.from, request.len));
|
||||||
|
|
||||||
/* the allocation_map is shared between client threads, and may be
|
/* the allocation_map is shared between client threads, and may be
|
||||||
* being built. We need to reflect the write in it, as it may be in
|
* being built. We need to reflect the write in it, as it may be in
|
||||||
@@ -693,8 +693,8 @@ void *client_serve(void *client_uncast)
|
|||||||
&client->fileno,
|
&client->fileno,
|
||||||
&client->mapped_size,
|
&client->mapped_size,
|
||||||
(void **) &client->mapped),
|
(void **) &client->mapped),
|
||||||
"Couldn't open/mmap file %s: %s",
|
SHOW_ERRNO("Couldn't open/mmap file %s",
|
||||||
client->serve->filename, strerror(errno)
|
client->serve->filename)
|
||||||
);
|
);
|
||||||
|
|
||||||
FATAL_IF_NEGATIVE(madvise
|
FATAL_IF_NEGATIVE(madvise
|
||||||
|
@@ -78,14 +78,6 @@ void control_destroy(struct control *control)
|
|||||||
free(control);
|
free(control);
|
||||||
}
|
}
|
||||||
|
|
||||||
void control_wait_for_close(struct control *control)
|
|
||||||
{
|
|
||||||
NULLCHECK(control);
|
|
||||||
while (!fd_is_closed(control->control_fd)) {
|
|
||||||
usleep(10000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct control_client *control_client_create(struct flexnbd *flexnbd,
|
struct control_client *control_client_create(struct flexnbd *flexnbd,
|
||||||
int client_fd,
|
int client_fd,
|
||||||
struct mbox *state_mbox)
|
struct mbox *state_mbox)
|
||||||
|
@@ -47,7 +47,6 @@ struct control_client {
|
|||||||
struct control *control_create(struct flexnbd *,
|
struct control *control_create(struct flexnbd *,
|
||||||
const char *control_socket_name);
|
const char *control_socket_name);
|
||||||
void control_signal_close(struct control *);
|
void control_signal_close(struct control *);
|
||||||
void control_wait_for_close(struct control *control);
|
|
||||||
void control_destroy(struct control *);
|
void control_destroy(struct control *);
|
||||||
|
|
||||||
void *control_runner(void *);
|
void *control_runner(void *);
|
||||||
|
@@ -671,7 +671,6 @@ static void mirror_abandon_cb(struct ev_loop *loop, ev_io * w, int revents)
|
|||||||
debug("Abandon message received");
|
debug("Abandon message received");
|
||||||
mirror_set_state(ctrl->mirror, MS_ABANDONED);
|
mirror_set_state(ctrl->mirror, MS_ABANDONED);
|
||||||
self_pipe_signal_clear(ctrl->mirror->abandon_signal);
|
self_pipe_signal_clear(ctrl->mirror->abandon_signal);
|
||||||
ev_io_stop(loop, &ctrl->abandon_watcher);
|
|
||||||
ev_break(loop, EVBREAK_ONE);
|
ev_break(loop, EVBREAK_ONE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@@ -812,6 +812,8 @@ void server_control_arrived(struct server *serve)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void flexnbd_stop_control(struct flexnbd *flexnbd);
|
||||||
|
|
||||||
/** Closes sockets, frees memory and waits for all client threads to finish */
|
/** Closes sockets, frees memory and waits for all client threads to finish */
|
||||||
void serve_cleanup(struct server *params,
|
void serve_cleanup(struct server *params,
|
||||||
int fatal __attribute__ ((unused)))
|
int fatal __attribute__ ((unused)))
|
||||||
@@ -820,20 +822,8 @@ void serve_cleanup(struct server *params,
|
|||||||
void *status;
|
void *status;
|
||||||
|
|
||||||
info("cleaning up");
|
info("cleaning up");
|
||||||
|
|
||||||
/* Close the control socket, and wait for it to close before proceeding.
|
|
||||||
* If we do not wait, we risk a race condition with the tail supervisor
|
|
||||||
* sending a status command, and deadlocking the mirroring. */
|
|
||||||
if (params->flexnbd && params->flexnbd->control) {
|
|
||||||
debug("closing control socket");
|
|
||||||
control_signal_close(params->flexnbd->control);
|
|
||||||
|
|
||||||
debug("waiting for control socket to close");
|
|
||||||
control_wait_for_close(params->flexnbd->control);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (params->server_fd) {
|
if (params->server_fd) {
|
||||||
debug("closing server_fd");
|
|
||||||
close(params->server_fd);
|
close(params->server_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -871,6 +861,15 @@ void serve_cleanup(struct server *params,
|
|||||||
server_unlock_acl(params);
|
server_unlock_acl(params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* if( params->flexnbd ) { */
|
||||||
|
/* if ( params->flexnbd->control ) { */
|
||||||
|
/* flexnbd_stop_control( params->flexnbd ); */
|
||||||
|
/* } */
|
||||||
|
/* flexnbd_destroy( params->flexnbd ); */
|
||||||
|
/* } */
|
||||||
|
|
||||||
|
/* server_destroy( params ); */
|
||||||
|
|
||||||
debug("Cleanup done");
|
debug("Cleanup done");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,6 +1,5 @@
|
|||||||
require 'socket'
|
require 'socket'
|
||||||
require 'timeout'
|
require 'timeout'
|
||||||
require 'io/wait' # For IO#nread
|
|
||||||
|
|
||||||
require 'flexnbd/constants'
|
require 'flexnbd/constants'
|
||||||
|
|
||||||
@@ -55,10 +54,6 @@ module FlexNBD
|
|||||||
write_reply(handle, 1)
|
write_reply(handle, 1)
|
||||||
end
|
end
|
||||||
|
|
||||||
def nread
|
|
||||||
@sock.nread
|
|
||||||
end
|
|
||||||
|
|
||||||
def disconnected?
|
def disconnected?
|
||||||
Timeout.timeout(2) do
|
Timeout.timeout(2) do
|
||||||
@sock.read(1).nil?
|
@sock.read(1).nil?
|
||||||
@@ -90,10 +85,6 @@ module FlexNBD
|
|||||||
@sock.write(len)
|
@sock.write(len)
|
||||||
end
|
end
|
||||||
|
|
||||||
def getsockopt(level, optname)
|
|
||||||
@sock.getsockopt(level, optname)
|
|
||||||
end
|
|
||||||
|
|
||||||
def self.parse_be64(str)
|
def self.parse_be64(str)
|
||||||
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
|
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
|
||||||
str.length == 8
|
str.length == 8
|
||||||
|
@@ -158,7 +158,7 @@ module ProxyTests
|
|||||||
# Send the read request to the proxy
|
# Send the read request to the proxy
|
||||||
client.write(0, (b * 4096))
|
client.write(0, (b * 4096))
|
||||||
|
|
||||||
# ensure we're given the write request
|
# ensure we're given the read request
|
||||||
req1 = sc1.read_request
|
req1 = sc1.read_request
|
||||||
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
||||||
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
||||||
@@ -206,62 +206,6 @@ module ProxyTests
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_write_request_retried_when_upstream_times_out_during_write_phase
|
|
||||||
ENV['FLEXNBD_UPSTREAM_TIMEOUT'] = '4'
|
|
||||||
maker = make_fake_server
|
|
||||||
|
|
||||||
with_ld_preload('setsockopt_logger') do
|
|
||||||
with_proxied_client(4096) do |client|
|
|
||||||
server, sc1 = maker.value
|
|
||||||
|
|
||||||
# Guess an approprate request size, based on the send buffer size.
|
|
||||||
sz = sc1.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int * 4
|
|
||||||
data1 = (b * sz)
|
|
||||||
|
|
||||||
# Send the read request to the proxy
|
|
||||||
client.write(0, data1)
|
|
||||||
|
|
||||||
# ensure we're given the write request
|
|
||||||
req1 = sc1.read_request
|
|
||||||
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
|
||||||
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
|
||||||
assert_equal 0, req1[:from]
|
|
||||||
assert_equal data1.size, req1[:len]
|
|
||||||
|
|
||||||
# We do not read it at this point, as we want the proxy to be waiting
|
|
||||||
# in the WRITE_UPSTREAM state.
|
|
||||||
|
|
||||||
# Need to sleep longer than the timeout set above
|
|
||||||
sleep 5
|
|
||||||
|
|
||||||
# Check the number of bytes that can be read from the socket without
|
|
||||||
# blocking. If this equal to the size of the original request, then
|
|
||||||
# the whole request has been buffered. If this is the case, then the
|
|
||||||
# proxy will not time-out in the WRITE_UPSTREAM statem which is what
|
|
||||||
# we're trying to test.
|
|
||||||
assert sc1.nread < sz, 'Request from proxy completely buffered. Test is useless'
|
|
||||||
|
|
||||||
# Kill the server now that the timeout has happened.
|
|
||||||
sc1.close
|
|
||||||
|
|
||||||
# We expect the proxy to reconnect without our client doing anything.
|
|
||||||
sc2 = server.accept
|
|
||||||
sc2.write_hello
|
|
||||||
|
|
||||||
# And once reconnected, it should resend an identical request.
|
|
||||||
req2 = sc2.read_request
|
|
||||||
assert_equal req1, req2
|
|
||||||
|
|
||||||
# And now lets read the data to make sure we get it all.
|
|
||||||
data2 = sc2.read_data(req2[:len])
|
|
||||||
assert_equal data1, data2
|
|
||||||
|
|
||||||
sc2.close
|
|
||||||
server.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_only_one_client_can_connect_to_proxy_at_a_time
|
def test_only_one_client_can_connect_to_proxy_at_a_time
|
||||||
with_proxied_client do |_client|
|
with_proxied_client do |_client|
|
||||||
c2 = nil
|
c2 = nil
|
||||||
|
77
tests/acceptance/test_write_during_migration.rb
Executable file → Normal file
77
tests/acceptance/test_write_during_migration.rb
Executable file → Normal file
@@ -82,22 +82,15 @@ class TestWriteDuringMigration < Test::Unit::TestCase
|
|||||||
UNIXSocket.open(@source_sock) do |sock|
|
UNIXSocket.open(@source_sock) do |sock|
|
||||||
sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A")
|
sock.write(['mirror', '127.0.0.1', @dest_port.to_s, 'exit'].join("\x0A") + "\x0A\x0A")
|
||||||
sock.flush
|
sock.flush
|
||||||
sock.readline
|
rsp = sock.readline
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def stop_mirror
|
|
||||||
UNIXSocket.open(@source_sock) do |sock|
|
|
||||||
sock.write("break\x0A\x0A")
|
|
||||||
sock.flush
|
|
||||||
sock.readline
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def wait_for_quit
|
def wait_for_quit
|
||||||
Timeout.timeout(10) do
|
Timeout.timeout(10) do
|
||||||
Process.waitpid2(@dst_proc)
|
start_time = Time.now
|
||||||
Process.waitpid2(@src_proc)
|
dst_result = Process.waitpid2(@dst_proc)
|
||||||
|
src_result = Process.waitpid2(@src_proc)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -107,28 +100,13 @@ class TestWriteDuringMigration < Test::Unit::TestCase
|
|||||||
loop do
|
loop do
|
||||||
begin
|
begin
|
||||||
client.write(offsets[rand(offsets.size)] * 4096, @write_data)
|
client.write(offsets[rand(offsets.size)] * 4096, @write_data)
|
||||||
rescue StandardError
|
rescue StandardError => err
|
||||||
# We expect a broken write at some point, so ignore it
|
# We expect a broken write at some point, so ignore it
|
||||||
break
|
break
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def bombard_with_status
|
|
||||||
loop do
|
|
||||||
begin
|
|
||||||
UNIXSocket.open(@source_sock) do |sock|
|
|
||||||
sock.write("status\x0A\x0A")
|
|
||||||
sock.flush
|
|
||||||
sock.readline
|
|
||||||
end
|
|
||||||
rescue StandardError
|
|
||||||
# If the socket disappears, that's OK.
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def assert_both_sides_identical
|
def assert_both_sides_identical
|
||||||
# puts `md5sum #{@source_file} #{@dest_file}`
|
# puts `md5sum #{@source_file} #{@dest_file}`
|
||||||
|
|
||||||
@@ -182,48 +160,5 @@ class TestWriteDuringMigration < Test::Unit::TestCase
|
|||||||
(src_writers_1 + src_writers_2).each(&:join)
|
(src_writers_1 + src_writers_2).each(&:join)
|
||||||
assert_both_sides_identical
|
assert_both_sides_identical
|
||||||
end
|
end
|
||||||
end
|
end end
|
||||||
end
|
|
||||||
|
|
||||||
def test_status_call_after_cleanup
|
|
||||||
Dir.mktmpdir do |tmpdir|
|
|
||||||
Dir.chdir(tmpdir) do
|
|
||||||
make_files
|
|
||||||
|
|
||||||
launch_servers
|
|
||||||
|
|
||||||
status_poker = Thread.new { bombard_with_status }
|
|
||||||
|
|
||||||
start_mirror
|
|
||||||
|
|
||||||
wait_for_quit
|
|
||||||
status_poker.join
|
|
||||||
assert_both_sides_identical
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_mirroring_can_be_restarted
|
|
||||||
@size = 100 * 1024 * 1024 # 100MB
|
|
||||||
Dir.mktmpdir do |tmpdir|
|
|
||||||
Dir.chdir(tmpdir) do
|
|
||||||
make_files
|
|
||||||
|
|
||||||
launch_servers
|
|
||||||
|
|
||||||
# This is a bit racy. It needs to be slow enough that the migration
|
|
||||||
# isn't finished before the stop runs, and slow enough so that we can
|
|
||||||
# stop/start a few times.
|
|
||||||
3.times do
|
|
||||||
start_mirror
|
|
||||||
sleep 0.1
|
|
||||||
stop_mirror
|
|
||||||
sleep 0.1
|
|
||||||
end
|
|
||||||
start_mirror
|
|
||||||
|
|
||||||
wait_for_quit
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user