From b26b308e681f16913934e82f2e8538aae4331cfb Mon Sep 17 00:00:00 2001 From: Patrick J Cherry Date: Wed, 14 Nov 2018 16:49:55 +0000 Subject: [PATCH] Add test to check when proxy times-out mid-write to upstream I've add to add code to allow the environment to specify the upstream tiemout so we don't have to wait 30s for this test to happen. --- src/proxy/proxy.c | 19 +++++++++- src/proxy/proxy.h | 14 ++++++-- tests/acceptance/flexnbd/fake_dest.rb | 8 +++++ tests/acceptance/proxy_tests.rb | 50 +++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/src/proxy/proxy.c b/src/proxy/proxy.c index b17dd11..0e277e1 100644 --- a/src/proxy/proxy.c +++ b/src/proxy/proxy.c @@ -58,6 +58,20 @@ struct proxier *proxy_create(char *s_downstream_address, out->downstream_fd = -1; out->upstream_fd = -1; + int upstream_timeout = UPSTREAM_TIMEOUT; + + char *env_upstream_timeout = getenv("FLEXNBD_UPSTREAM_TIMEOUT"); + if (NULL != env_upstream_timeout) { + int ut = atoi(env_upstream_timeout); + warn("Got %i from atoi\n", ut); + if (ut > 0) { + upstream_timeout = ut; + } + } + + out->upstream_timeout = upstream_timeout; + out->upstream_timeout_ms = (long unsigned int) upstream_timeout * 1000; + out->prefetch = NULL; if (s_cache_bytes) { int cache_bytes = atoi(s_cache_bytes); @@ -781,6 +795,9 @@ void proxy_session(struct proxier *proxy) }; if (select_timeout.tv_sec > 0) { + if (select_timeout.tv_sec > proxy->upstream_timeout) { + select_timeout.tv_sec = proxy->upstream_timeout; + } select_timeout_ptr = &select_timeout; } @@ -854,7 +871,7 @@ void proxy_session(struct proxier *proxy) /* In these states, we're interested in restarting after a timeout. */ if (old_state == state && proxy_state_upstream(state)) { - if ((monotonic_time_ms()) - state_started > UPSTREAM_TIMEOUT) { + if ((monotonic_time_ms()) - state_started > proxy->upstream_timeout_ms) { warn("Timed out in state %s while communicating with upstream", proxy_session_state_names[state]); state = CONNECT_TO_UPSTREAM; } diff --git a/src/proxy/proxy.h b/src/proxy/proxy.h index 3652350..43d12ce 100644 --- a/src/proxy/proxy.h +++ b/src/proxy/proxy.h @@ -14,10 +14,10 @@ #endif /** UPSTREAM_TIMEOUT - * How long ( in ms ) to allow for upstream to respond. If it takes longer + * How long (in s) to allow for upstream to respond. If it takes longer * than this, we will cancel the current request-response to them and resubmit */ -#define UPSTREAM_TIMEOUT 30 * 1000 +#define UPSTREAM_TIMEOUT 30 struct proxier { /** address/port to bind to */ @@ -72,6 +72,16 @@ struct proxier { uint64_t req_count; int hello_sent; + /* + * How long (in s) to allow for upstream to respond. If it takes longer + * than this, we will cancel the current request-response to them and + * resubmit + * + * Defaults to UPSTREAM_TIMEOUT but can be overridden in the environment. + */ + int upstream_timeout; + unsigned long int upstream_timeout_ms; + /** These are only used if we pass --cache on the command line */ /* While the in-flight request has been munged by prefetch, these two are diff --git a/tests/acceptance/flexnbd/fake_dest.rb b/tests/acceptance/flexnbd/fake_dest.rb index 26d82bf..0b39bdc 100644 --- a/tests/acceptance/flexnbd/fake_dest.rb +++ b/tests/acceptance/flexnbd/fake_dest.rb @@ -54,6 +54,10 @@ module FlexNBD write_reply(handle, 1) end + def nread + @sock.nread + end + def disconnected? Timeout.timeout(2) do @sock.read(1).nil? @@ -85,6 +89,10 @@ module FlexNBD @sock.write(len) end + def getsockopt(level, optname) + @sock.getsockopt(level, optname) + end + def self.parse_be64(str) raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless str.length == 8 diff --git a/tests/acceptance/proxy_tests.rb b/tests/acceptance/proxy_tests.rb index afd7a11..2296453 100644 --- a/tests/acceptance/proxy_tests.rb +++ b/tests/acceptance/proxy_tests.rb @@ -206,6 +206,56 @@ module ProxyTests end end + def test_write_request_retried_when_upstream_times_out_during_write_phase + ENV['FLEXNBD_UPSTREAM_TIMEOUT'] = '4' + maker = make_fake_server + + with_ld_preload('setsockopt_logger') do + with_proxied_client(4096) do |client| + server, sc1 = maker.value + + # Guess an approprate request size, based on the send buffer size. + sz = sc1.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int * 4 + data1 = (b * sz) + + # Send the read request to the proxy + client.write(0, data1) + + # ensure we're given the read request + req1 = sc1.read_request + assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] + assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] + assert_equal 0, req1[:from] + assert_equal data1.size, req1[:len] + + # Need to sleep longer than the timeout set above + sleep 5 + # Check the number of bytes that can be read from the socket without + # blocking. If this equal to the size of the original request, then + # the whole request has been buffered. If this is the case, then the + # proxy will not time-out in the WRITE_UPSTREAM statem which is what + # we're trying to test. + assert sc1.nread < sz, 'Request from proxy completely buffered. Test is useless' + + # Kill the server again, now we're sure the read request has been sent once + sc1.close + + # We expect the proxy to reconnect without our client doing anything. + sc2 = server.accept + sc2.write_hello + + # And once reconnected, it should resend an identical request. + req2 = sc2.read_request + assert_equal req1, req2 + data2 = sc2.read_data(req2[:len]) + assert_equal data1, data2 + + sc2.close + server.close + end + end + end + def test_only_one_client_can_connect_to_proxy_at_a_time with_proxied_client do |_client| c2 = nil