From 89fd18f6f0c67ce9340c901ab03905cc8545e5e7 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 19 Jun 2013 16:36:19 +0100 Subject: [PATCH] proxy: Add a 30-second timeout for requests in-flight to upstream It's a little more complicated than that, actually. For the various states that involve reading from, or writing to, the upstream fd, if the amount of time spent in that state is > 30 seconds, we reconnect to the server and resend the request. we also introduce a 15-second reconnect dampener to keep us from stressing things unduly. This may need to be decreased, or turned into an exponential backoff, at some point. --- src/proxy.c | 74 ++++++++++++++++++++++++++++++++++++++++++++--------- src/proxy.h | 6 +++++ 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/src/proxy.c b/src/proxy.c index b472d97..893cda6 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -266,6 +266,12 @@ static char* proxy_session_state_names[] = { "READ_FROM_UPSTREAM" }; +static inline int proxy_state_upstream( int state ) +{ + return state == CONNECT_TO_UPSTREAM || state == READ_INIT_FROM_UPSTREAM || + state == WRITE_TO_UPSTREAM || state == READ_FROM_UPSTREAM; +} + #ifdef PREFETCH int proxy_prefetch_for_request( struct proxier* proxy, int state ) @@ -343,7 +349,6 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state ) debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len ); nbd_h2r_request( req, req_raw ); - debug( "Request len: %"PRIu32" ; raw: %"PRIu32, req->len, req_raw->len ); } return state; @@ -629,11 +634,18 @@ int proxy_write_to_downstream( struct proxier* proxy, int state ) */ void proxy_session( struct proxier* proxy ) { + uint64_t state_started; + int old_state = EXIT; + int state; + int connect_to_upstream_cooldown = 0; + + /* First action: Write hello to downstream */ nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp.buf, proxy->upstream_size ); proxy->rsp.size = sizeof( struct nbd_init_raw ); proxy->rsp.needle = 0; - int state = WRITE_TO_DOWNSTREAM; + state = WRITE_TO_DOWNSTREAM; + info( "Beginning proxy session on fd %i", proxy->downstream_fd ); @@ -654,7 +666,19 @@ void proxy_session( struct proxier* proxy ) FD_ZERO( &rfds ); FD_ZERO( &wfds ); - debug( "Proxy is in state %s ( %i )", proxy_session_state_names[state], state ); + if ( state != old_state ) { + state_started = monotonic_time_ms(); + + debug( + "State transitition from %s to %s", + proxy_session_state_names[old_state], + proxy_session_state_names[state] + ); + } else { + debug( "Proxy is in state %s", proxy_session_state_names[state], state ); + } + + old_state = state; switch( state ) { case READ_FROM_DOWNSTREAM: @@ -668,18 +692,23 @@ void proxy_session( struct proxier* proxy ) FD_SET(proxy->upstream_fd, &wfds ); break; case CONNECT_TO_UPSTREAM: + /* upstream_fd is now -1 */ proxy_disconnect_from_upstream( proxy ); - /* Changes proxy->upstream_fd */ - proxy_start_connect_to_upstream( proxy ); - if ( proxy->upstream_fd == -1 ) { - warn( SHOW_ERRNO( "Error acquiring socket to upstream" ) ); - continue; + if ( connect_to_upstream_cooldown ) { + connect_to_upstream_cooldown = 0; + } else { + proxy_start_connect_to_upstream( proxy ); + + if ( proxy->upstream_fd == -1 ) { + warn( SHOW_ERRNO( "Error acquiring socket to upstream" ) ); + continue; + } + FD_SET( proxy->upstream_fd, &wfds ); } - - /* non-blocking connect() */ + /* non-blocking connect() or a simple sleep */ select_timeout.tv_sec = 15; - FD_SET( proxy->upstream_fd, &wfds ); + break; case READ_INIT_FROM_UPSTREAM: case READ_FROM_UPSTREAM: @@ -699,6 +728,11 @@ void proxy_session( struct proxier* proxy ) break; } + /* Happens after failed reconnect. Avoid SIGBUS on FD_ISSET() */ + if ( proxy->upstream_fd == -1 ) { + continue; + } + switch( state ) { case READ_FROM_DOWNSTREAM: if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) { @@ -717,7 +751,12 @@ void proxy_session( struct proxier* proxy ) if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) { state = proxy_continue_connecting_to_upstream( proxy, state ); } - /* Leaving state untouched will retry connecting to upstream */ + /* Leaving state untouched will retry connecting to upstream - + * so introduce a bit of sleep */ + if ( state == CONNECT_TO_UPSTREAM ) { + connect_to_upstream_cooldown = 1; + } + break; case READ_INIT_FROM_UPSTREAM: state = proxy_read_init_from_upstream( proxy, state ); @@ -744,6 +783,17 @@ void proxy_session( struct proxier* proxy ) break; } + /* In these states, we're interested in restarting after a timeout. + */ + if ( old_state == state && proxy_state_upstream( state ) ) { + if ( ( monotonic_time_ms() ) - state_started > UPSTREAM_TIMEOUT ) { + warn( + "Timed out in state %s while communicating with upstream", + proxy_session_state_names[state] + ); + state = CONNECT_TO_UPSTREAM; + } + } } info( diff --git a/src/proxy.h b/src/proxy.h index 258967b..38645d9 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -14,6 +14,12 @@ #include "prefetch.h" #endif +/** UPSTREAM_TIMEOUT + * How long ( in ms ) to allow for upstream to respond. If it takes longer + * than this, we will cancel the current request-response to them and resubmit + */ +#define UPSTREAM_TIMEOUT 30 * 1000 + struct proxier { /* The flexnbd wrapper this proxier is attached to */ struct flexnbd* flexnbd;