Add test to check when proxy times-out mid-write to upstream

I've add to add code to allow the environment to specify the upstream
tiemout so we don't have to wait 30s for this test to happen.
This commit is contained in:
Patrick J Cherry
2018-11-14 16:49:55 +00:00
parent 3e00a88d45
commit b26b308e68
4 changed files with 88 additions and 3 deletions

View File

@@ -58,6 +58,20 @@ struct proxier *proxy_create(char *s_downstream_address,
out->downstream_fd = -1; out->downstream_fd = -1;
out->upstream_fd = -1; out->upstream_fd = -1;
int upstream_timeout = UPSTREAM_TIMEOUT;
char *env_upstream_timeout = getenv("FLEXNBD_UPSTREAM_TIMEOUT");
if (NULL != env_upstream_timeout) {
int ut = atoi(env_upstream_timeout);
warn("Got %i from atoi\n", ut);
if (ut > 0) {
upstream_timeout = ut;
}
}
out->upstream_timeout = upstream_timeout;
out->upstream_timeout_ms = (long unsigned int) upstream_timeout * 1000;
out->prefetch = NULL; out->prefetch = NULL;
if (s_cache_bytes) { if (s_cache_bytes) {
int cache_bytes = atoi(s_cache_bytes); int cache_bytes = atoi(s_cache_bytes);
@@ -781,6 +795,9 @@ void proxy_session(struct proxier *proxy)
}; };
if (select_timeout.tv_sec > 0) { if (select_timeout.tv_sec > 0) {
if (select_timeout.tv_sec > proxy->upstream_timeout) {
select_timeout.tv_sec = proxy->upstream_timeout;
}
select_timeout_ptr = &select_timeout; select_timeout_ptr = &select_timeout;
} }
@@ -854,7 +871,7 @@ void proxy_session(struct proxier *proxy)
/* In these states, we're interested in restarting after a timeout. /* In these states, we're interested in restarting after a timeout.
*/ */
if (old_state == state && proxy_state_upstream(state)) { if (old_state == state && proxy_state_upstream(state)) {
if ((monotonic_time_ms()) - state_started > UPSTREAM_TIMEOUT) { if ((monotonic_time_ms()) - state_started > proxy->upstream_timeout_ms) {
warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]); warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]);
state = CONNECT_TO_UPSTREAM; state = CONNECT_TO_UPSTREAM;
} }

View File

@@ -14,10 +14,10 @@
#endif #endif
/** UPSTREAM_TIMEOUT /** UPSTREAM_TIMEOUT
* How long ( in ms ) to allow for upstream to respond. If it takes longer * How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and resubmit * than this, we will cancel the current request-response to them and resubmit
*/ */
#define UPSTREAM_TIMEOUT 30 * 1000 #define UPSTREAM_TIMEOUT 30
struct proxier { struct proxier {
/** address/port to bind to */ /** address/port to bind to */
@@ -72,6 +72,16 @@ struct proxier {
uint64_t req_count; uint64_t req_count;
int hello_sent; int hello_sent;
/*
* How long (in s) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and
* resubmit
*
* Defaults to UPSTREAM_TIMEOUT but can be overridden in the environment.
*/
int upstream_timeout;
unsigned long int upstream_timeout_ms;
/** These are only used if we pass --cache on the command line */ /** These are only used if we pass --cache on the command line */
/* While the in-flight request has been munged by prefetch, these two are /* While the in-flight request has been munged by prefetch, these two are

View File

@@ -54,6 +54,10 @@ module FlexNBD
write_reply(handle, 1) write_reply(handle, 1)
end end
def nread
@sock.nread
end
def disconnected? def disconnected?
Timeout.timeout(2) do Timeout.timeout(2) do
@sock.read(1).nil? @sock.read(1).nil?
@@ -85,6 +89,10 @@ module FlexNBD
@sock.write(len) @sock.write(len)
end end
def getsockopt(level, optname)
@sock.getsockopt(level, optname)
end
def self.parse_be64(str) def self.parse_be64(str)
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
str.length == 8 str.length == 8

View File

@@ -206,6 +206,56 @@ module ProxyTests
end end
end end
def test_write_request_retried_when_upstream_times_out_during_write_phase
ENV['FLEXNBD_UPSTREAM_TIMEOUT'] = '4'
maker = make_fake_server
with_ld_preload('setsockopt_logger') do
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Guess an approprate request size, based on the send buffer size.
sz = sc1.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int * 4
data1 = (b * sz)
# Send the read request to the proxy
client.write(0, data1)
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal data1.size, req1[:len]
# Need to sleep longer than the timeout set above
sleep 5
# Check the number of bytes that can be read from the socket without
# blocking. If this equal to the size of the original request, then
# the whole request has been buffered. If this is the case, then the
# proxy will not time-out in the WRITE_UPSTREAM statem which is what
# we're trying to test.
assert sc1.nread < sz, 'Request from proxy completely buffered. Test is useless'
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
data2 = sc2.read_data(req2[:len])
assert_equal data1, data2
sc2.close
server.close
end
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |_client| with_proxied_client do |_client|
c2 = nil c2 = nil