Merge branch 'release-to-master' into 'master'
Release 0.4.0 See merge request open-source/flexnbd-c!57
This commit is contained in:
@@ -1,18 +1,21 @@
|
|||||||
stages:
|
stages:
|
||||||
- package
|
- package
|
||||||
- publish
|
- publish
|
||||||
|
|
||||||
package:jessie: &package
|
.package: &package
|
||||||
stage: package
|
stage: package
|
||||||
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
|
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
|
||||||
variables:
|
|
||||||
DISTRO: jessie
|
|
||||||
script:
|
script:
|
||||||
- package
|
- package
|
||||||
artifacts:
|
artifacts:
|
||||||
paths:
|
paths:
|
||||||
- pkg/
|
- pkg/
|
||||||
|
|
||||||
|
package:jessie:
|
||||||
|
<<: *package
|
||||||
|
variables:
|
||||||
|
DISTRO: jessie
|
||||||
|
|
||||||
package:stretch:
|
package:stretch:
|
||||||
<<: *package
|
<<: *package
|
||||||
variables:
|
variables:
|
||||||
@@ -20,7 +23,7 @@ package:stretch:
|
|||||||
|
|
||||||
publish:
|
publish:
|
||||||
stage: publish
|
stage: publish
|
||||||
tags:
|
tags:
|
||||||
- shell
|
- shell
|
||||||
script:
|
script:
|
||||||
- publish
|
- publish
|
||||||
|
2
Makefile
2
Makefile
@@ -4,7 +4,7 @@ VPATH=src:tests/unit
|
|||||||
DESTDIR?=/
|
DESTDIR?=/
|
||||||
PREFIX?=/usr/local/bin
|
PREFIX?=/usr/local/bin
|
||||||
INSTALLDIR=$(DESTDIR)/$(PREFIX)
|
INSTALLDIR=$(DESTDIR)/$(PREFIX)
|
||||||
|
|
||||||
ifdef DEBUG
|
ifdef DEBUG
|
||||||
CFLAGS_EXTRA=-g -DDEBUG
|
CFLAGS_EXTRA=-g -DDEBUG
|
||||||
LDFLAGS_EXTRA=-g
|
LDFLAGS_EXTRA=-g
|
||||||
|
@@ -169,6 +169,11 @@ That is, the '=' is required. This is a limitation of getopt-long.
|
|||||||
If no cache size is given, a size of 4096 bytes is assumed. Caching can
|
If no cache size is given, a size of 4096 bytes is assumed. Caching can
|
||||||
be explicitly disabled by setting a size of 0.
|
be explicitly disabled by setting a size of 0.
|
||||||
|
|
||||||
|
ENVIRONMENT
|
||||||
|
|
||||||
|
FLEXNBD_UPSTREAM_TIMEOUT The timeout in seconds for the proxy communicating
|
||||||
|
with the upstream server defaults to 30 seconds.
|
||||||
|
|
||||||
BUGS
|
BUGS
|
||||||
|
|
||||||
Should be reported via GitHub.
|
Should be reported via GitHub.
|
||||||
|
7
debian/changelog
vendored
7
debian/changelog
vendored
@@ -1,3 +1,10 @@
|
|||||||
|
flexnbd (0.4.0) stable; urgency=medium
|
||||||
|
|
||||||
|
* Ensure proxy state is completely reset before upstream init is read,
|
||||||
|
ensuring any waiting requests are fully replayed (#39, !54)
|
||||||
|
|
||||||
|
-- Patrick J Cherry <patrick@bytemark.co.uk> Thu, 15 Nov 2018 14:24:26 +0000
|
||||||
|
|
||||||
flexnbd (0.3.0) stable; urgency=medium
|
flexnbd (0.3.0) stable; urgency=medium
|
||||||
|
|
||||||
* Force a msync after every write, ignoring FUA flag, or lack thereof (!51).
|
* Force a msync after every write, ignoring FUA flag, or lack thereof (!51).
|
||||||
|
@@ -58,6 +58,20 @@ struct proxier *proxy_create(char *s_downstream_address,
|
|||||||
out->downstream_fd = -1;
|
out->downstream_fd = -1;
|
||||||
out->upstream_fd = -1;
|
out->upstream_fd = -1;
|
||||||
|
|
||||||
|
int upstream_timeout = UPSTREAM_TIMEOUT;
|
||||||
|
|
||||||
|
char *env_upstream_timeout = getenv("FLEXNBD_UPSTREAM_TIMEOUT");
|
||||||
|
if (NULL != env_upstream_timeout) {
|
||||||
|
int ut = atoi(env_upstream_timeout);
|
||||||
|
warn("Got %i from atoi\n", ut);
|
||||||
|
if (ut > 0) {
|
||||||
|
upstream_timeout = ut;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out->upstream_timeout = upstream_timeout;
|
||||||
|
out->upstream_timeout_ms = (long unsigned int) upstream_timeout * 1000;
|
||||||
|
|
||||||
out->prefetch = NULL;
|
out->prefetch = NULL;
|
||||||
if (s_cache_bytes) {
|
if (s_cache_bytes) {
|
||||||
int cache_bytes = atoi(s_cache_bytes);
|
int cache_bytes = atoi(s_cache_bytes);
|
||||||
@@ -507,6 +521,20 @@ int proxy_continue_connecting_to_upstream(struct proxier *proxy, int state)
|
|||||||
/* Data may have changed while we were disconnected */
|
/* Data may have changed while we were disconnected */
|
||||||
prefetch_set_is_empty(proxy->prefetch);
|
prefetch_set_is_empty(proxy->prefetch);
|
||||||
|
|
||||||
|
/* Reset our needles and sizes.
|
||||||
|
*
|
||||||
|
* Don't zero the req buffer size in case there's an outstanding request
|
||||||
|
* waiting to be re-sent following a disconnection. The init and rsp
|
||||||
|
* buffers are used for reading from upstream. If we're in this state then
|
||||||
|
* any upstream reads will be re-requested.
|
||||||
|
*/
|
||||||
|
proxy->init.needle = 0;
|
||||||
|
proxy->init.size = 0;
|
||||||
|
proxy->rsp.needle = 0;
|
||||||
|
proxy->rsp.size = 0;
|
||||||
|
proxy->req.needle = 0;
|
||||||
|
/* Don't zero the req.size, as we may need to re-write it upstream */
|
||||||
|
|
||||||
info("Connected to upstream on fd %i", proxy->upstream_fd);
|
info("Connected to upstream on fd %i", proxy->upstream_fd);
|
||||||
return READ_INIT_FROM_UPSTREAM;
|
return READ_INIT_FROM_UPSTREAM;
|
||||||
}
|
}
|
||||||
@@ -523,7 +551,7 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
warn(SHOW_ERRNO("Failed to read init from upstream"));
|
warn(SHOW_ERRNO("Failed to read init from upstream"));
|
||||||
goto disconnect;
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proxy->init.needle == proxy->init.size) {
|
if (proxy->init.needle == proxy->init.size) {
|
||||||
@@ -533,26 +561,24 @@ int proxy_read_init_from_upstream(struct proxier *proxy, int state)
|
|||||||
((struct nbd_init_raw *) proxy->init.buf, &upstream_size,
|
((struct nbd_init_raw *) proxy->init.buf, &upstream_size,
|
||||||
&upstream_flags)) {
|
&upstream_flags)) {
|
||||||
warn("Upstream sent invalid init");
|
warn("Upstream sent invalid init");
|
||||||
goto disconnect;
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* record the flags, and log the reconnection, set TCP_NODELAY */
|
/* record the flags, and log the reconnection, set TCP_NODELAY */
|
||||||
proxy_finish_connect_to_upstream(proxy, upstream_size,
|
proxy_finish_connect_to_upstream(proxy, upstream_size,
|
||||||
upstream_flags);
|
upstream_flags);
|
||||||
|
|
||||||
/* Currently, we only get disconnected from upstream (so needing to come
|
/* Finished reading the init response now, reset the needle. */
|
||||||
* here) when we have an outstanding request. If that becomes false,
|
|
||||||
* we'll need to choose the right state to return to here */
|
|
||||||
proxy->init.needle = 0;
|
proxy->init.needle = 0;
|
||||||
|
|
||||||
|
/* Currently, we only get disconnected from upstream (so needing to
|
||||||
|
* come here) when we have an outstanding request. If that becomes
|
||||||
|
* false, we'll need to choose the right state to return to here.
|
||||||
|
*/
|
||||||
return WRITE_TO_UPSTREAM;
|
return WRITE_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
return state;
|
return state;
|
||||||
|
|
||||||
disconnect:
|
|
||||||
proxy->init.needle = 0;
|
|
||||||
proxy->init.size = 0;
|
|
||||||
return CONNECT_TO_UPSTREAM;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int proxy_write_to_upstream(struct proxier *proxy, int state)
|
int proxy_write_to_upstream(struct proxier *proxy, int state)
|
||||||
@@ -574,7 +600,6 @@ int proxy_write_to_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
warn(SHOW_ERRNO("Failed to send request to upstream"));
|
warn(SHOW_ERRNO("Failed to send request to upstream"));
|
||||||
proxy->req.needle = 0;
|
|
||||||
// We're throwing the socket away so no need to uncork
|
// We're throwing the socket away so no need to uncork
|
||||||
return CONNECT_TO_UPSTREAM;
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
@@ -611,7 +636,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
warn(SHOW_ERRNO("Failed to get reply from upstream"));
|
warn(SHOW_ERRNO("Failed to get reply from upstream"));
|
||||||
goto disconnect;
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proxy->rsp.needle == NBD_REPLY_SIZE) {
|
if (proxy->rsp.needle == NBD_REPLY_SIZE) {
|
||||||
@@ -619,7 +644,7 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
|
|||||||
|
|
||||||
if (reply->magic != REPLY_MAGIC) {
|
if (reply->magic != REPLY_MAGIC) {
|
||||||
warn("Reply magic is incorrect");
|
warn("Reply magic is incorrect");
|
||||||
goto disconnect;
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proxy->req_hdr.type == REQUEST_READ) {
|
if (proxy->req_hdr.type == REQUEST_READ) {
|
||||||
@@ -635,11 +660,6 @@ int proxy_read_from_upstream(struct proxier *proxy, int state)
|
|||||||
}
|
}
|
||||||
|
|
||||||
return state;
|
return state;
|
||||||
|
|
||||||
disconnect:
|
|
||||||
proxy->rsp.needle = 0;
|
|
||||||
proxy->rsp.size = 0;
|
|
||||||
return CONNECT_TO_UPSTREAM;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -775,6 +795,9 @@ void proxy_session(struct proxier *proxy)
|
|||||||
};
|
};
|
||||||
|
|
||||||
if (select_timeout.tv_sec > 0) {
|
if (select_timeout.tv_sec > 0) {
|
||||||
|
if (select_timeout.tv_sec > proxy->upstream_timeout) {
|
||||||
|
select_timeout.tv_sec = proxy->upstream_timeout;
|
||||||
|
}
|
||||||
select_timeout_ptr = &select_timeout;
|
select_timeout_ptr = &select_timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -848,17 +871,9 @@ void proxy_session(struct proxier *proxy)
|
|||||||
/* In these states, we're interested in restarting after a timeout.
|
/* In these states, we're interested in restarting after a timeout.
|
||||||
*/
|
*/
|
||||||
if (old_state == state && proxy_state_upstream(state)) {
|
if (old_state == state && proxy_state_upstream(state)) {
|
||||||
if ((monotonic_time_ms()) - state_started > UPSTREAM_TIMEOUT) {
|
if ((monotonic_time_ms()) - state_started > proxy->upstream_timeout_ms) {
|
||||||
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]
|
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]);
|
||||||
);
|
|
||||||
state = CONNECT_TO_UPSTREAM;
|
state = CONNECT_TO_UPSTREAM;
|
||||||
|
|
||||||
/* Since we've timed out, we won't have gone through the timeout logic
|
|
||||||
* in the various state handlers that resets these appropriately... */
|
|
||||||
proxy->init.size = 0;
|
|
||||||
proxy->init.needle = 0;
|
|
||||||
proxy->rsp.size = 0;
|
|
||||||
proxy->rsp.needle = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -14,10 +14,10 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/** UPSTREAM_TIMEOUT
|
/** UPSTREAM_TIMEOUT
|
||||||
* How long ( in ms ) to allow for upstream to respond. If it takes longer
|
* How long (in s) to allow for upstream to respond. If it takes longer
|
||||||
* than this, we will cancel the current request-response to them and resubmit
|
* than this, we will cancel the current request-response to them and resubmit
|
||||||
*/
|
*/
|
||||||
#define UPSTREAM_TIMEOUT 30 * 1000
|
#define UPSTREAM_TIMEOUT 30
|
||||||
|
|
||||||
struct proxier {
|
struct proxier {
|
||||||
/** address/port to bind to */
|
/** address/port to bind to */
|
||||||
@@ -72,6 +72,16 @@ struct proxier {
|
|||||||
uint64_t req_count;
|
uint64_t req_count;
|
||||||
int hello_sent;
|
int hello_sent;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* How long (in s) to allow for upstream to respond. If it takes longer
|
||||||
|
* than this, we will cancel the current request-response to them and
|
||||||
|
* resubmit
|
||||||
|
*
|
||||||
|
* Defaults to UPSTREAM_TIMEOUT but can be overridden in the environment.
|
||||||
|
*/
|
||||||
|
int upstream_timeout;
|
||||||
|
unsigned long int upstream_timeout_ms;
|
||||||
|
|
||||||
/** These are only used if we pass --cache on the command line */
|
/** These are only used if we pass --cache on the command line */
|
||||||
|
|
||||||
/* While the in-flight request has been munged by prefetch, these two are
|
/* While the in-flight request has been munged by prefetch, these two are
|
||||||
|
@@ -1,5 +1,6 @@
|
|||||||
require 'socket'
|
require 'socket'
|
||||||
require 'timeout'
|
require 'timeout'
|
||||||
|
require 'io/wait' # For IO#nread
|
||||||
|
|
||||||
require 'flexnbd/constants'
|
require 'flexnbd/constants'
|
||||||
|
|
||||||
@@ -54,6 +55,10 @@ module FlexNBD
|
|||||||
write_reply(handle, 1)
|
write_reply(handle, 1)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def nread
|
||||||
|
@sock.nread
|
||||||
|
end
|
||||||
|
|
||||||
def disconnected?
|
def disconnected?
|
||||||
Timeout.timeout(2) do
|
Timeout.timeout(2) do
|
||||||
@sock.read(1).nil?
|
@sock.read(1).nil?
|
||||||
@@ -85,6 +90,10 @@ module FlexNBD
|
|||||||
@sock.write(len)
|
@sock.write(len)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def getsockopt(level, optname)
|
||||||
|
@sock.getsockopt(level, optname)
|
||||||
|
end
|
||||||
|
|
||||||
def self.parse_be64(str)
|
def self.parse_be64(str)
|
||||||
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
|
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
|
||||||
str.length == 8
|
str.length == 8
|
||||||
|
@@ -158,7 +158,7 @@ module ProxyTests
|
|||||||
# Send the read request to the proxy
|
# Send the read request to the proxy
|
||||||
client.write(0, (b * 4096))
|
client.write(0, (b * 4096))
|
||||||
|
|
||||||
# ensure we're given the read request
|
# ensure we're given the write request
|
||||||
req1 = sc1.read_request
|
req1 = sc1.read_request
|
||||||
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
||||||
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
||||||
@@ -206,6 +206,62 @@ module ProxyTests
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_write_request_retried_when_upstream_times_out_during_write_phase
|
||||||
|
ENV['FLEXNBD_UPSTREAM_TIMEOUT'] = '4'
|
||||||
|
maker = make_fake_server
|
||||||
|
|
||||||
|
with_ld_preload('setsockopt_logger') do
|
||||||
|
with_proxied_client(4096) do |client|
|
||||||
|
server, sc1 = maker.value
|
||||||
|
|
||||||
|
# Guess an approprate request size, based on the send buffer size.
|
||||||
|
sz = sc1.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int * 4
|
||||||
|
data1 = (b * sz)
|
||||||
|
|
||||||
|
# Send the read request to the proxy
|
||||||
|
client.write(0, data1)
|
||||||
|
|
||||||
|
# ensure we're given the write request
|
||||||
|
req1 = sc1.read_request
|
||||||
|
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
||||||
|
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
||||||
|
assert_equal 0, req1[:from]
|
||||||
|
assert_equal data1.size, req1[:len]
|
||||||
|
|
||||||
|
# We do not read it at this point, as we want the proxy to be waiting
|
||||||
|
# in the WRITE_UPSTREAM state.
|
||||||
|
|
||||||
|
# Need to sleep longer than the timeout set above
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
# Check the number of bytes that can be read from the socket without
|
||||||
|
# blocking. If this equal to the size of the original request, then
|
||||||
|
# the whole request has been buffered. If this is the case, then the
|
||||||
|
# proxy will not time-out in the WRITE_UPSTREAM statem which is what
|
||||||
|
# we're trying to test.
|
||||||
|
assert sc1.nread < sz, 'Request from proxy completely buffered. Test is useless'
|
||||||
|
|
||||||
|
# Kill the server now that the timeout has happened.
|
||||||
|
sc1.close
|
||||||
|
|
||||||
|
# We expect the proxy to reconnect without our client doing anything.
|
||||||
|
sc2 = server.accept
|
||||||
|
sc2.write_hello
|
||||||
|
|
||||||
|
# And once reconnected, it should resend an identical request.
|
||||||
|
req2 = sc2.read_request
|
||||||
|
assert_equal req1, req2
|
||||||
|
|
||||||
|
# And now lets read the data to make sure we get it all.
|
||||||
|
data2 = sc2.read_data(req2[:len])
|
||||||
|
assert_equal data1, data2
|
||||||
|
|
||||||
|
sc2.close
|
||||||
|
server.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def test_only_one_client_can_connect_to_proxy_at_a_time
|
def test_only_one_client_can_connect_to_proxy_at_a_time
|
||||||
with_proxied_client do |_client|
|
with_proxied_client do |_client|
|
||||||
c2 = nil
|
c2 = nil
|
||||||
|
Reference in New Issue
Block a user