proxy: Add a 30-second timeout for requests in-flight to upstream

It's a little more complicated than that, actually. For the various
states that involve reading from, or writing to, the upstream fd,
if the amount of time spent in that state is > 30 seconds, we reconnect
to the server and resend the request.

we also introduce a 15-second reconnect dampener to keep us from stressing
things unduly. This may need to be decreased, or turned into an exponential
backoff, at some point.
This commit is contained in:
nick
2013-06-19 16:36:19 +01:00
parent 3c56ba0af6
commit 89fd18f6f0
2 changed files with 68 additions and 12 deletions

View File

@@ -266,6 +266,12 @@ static char* proxy_session_state_names[] = {
"READ_FROM_UPSTREAM" "READ_FROM_UPSTREAM"
}; };
static inline int proxy_state_upstream( int state )
{
return state == CONNECT_TO_UPSTREAM || state == READ_INIT_FROM_UPSTREAM ||
state == WRITE_TO_UPSTREAM || state == READ_FROM_UPSTREAM;
}
#ifdef PREFETCH #ifdef PREFETCH
int proxy_prefetch_for_request( struct proxier* proxy, int state ) int proxy_prefetch_for_request( struct proxier* proxy, int state )
@@ -343,7 +349,6 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len ); debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len );
nbd_h2r_request( req, req_raw ); nbd_h2r_request( req, req_raw );
debug( "Request len: %"PRIu32" ; raw: %"PRIu32, req->len, req_raw->len );
} }
return state; return state;
@@ -629,11 +634,18 @@ int proxy_write_to_downstream( struct proxier* proxy, int state )
*/ */
void proxy_session( struct proxier* proxy ) void proxy_session( struct proxier* proxy )
{ {
uint64_t state_started;
int old_state = EXIT;
int state;
int connect_to_upstream_cooldown = 0;
/* First action: Write hello to downstream */ /* First action: Write hello to downstream */
nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp.buf, proxy->upstream_size ); nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp.buf, proxy->upstream_size );
proxy->rsp.size = sizeof( struct nbd_init_raw ); proxy->rsp.size = sizeof( struct nbd_init_raw );
proxy->rsp.needle = 0; proxy->rsp.needle = 0;
int state = WRITE_TO_DOWNSTREAM; state = WRITE_TO_DOWNSTREAM;
info( "Beginning proxy session on fd %i", proxy->downstream_fd ); info( "Beginning proxy session on fd %i", proxy->downstream_fd );
@@ -654,7 +666,19 @@ void proxy_session( struct proxier* proxy )
FD_ZERO( &rfds ); FD_ZERO( &rfds );
FD_ZERO( &wfds ); FD_ZERO( &wfds );
debug( "Proxy is in state %s ( %i )", proxy_session_state_names[state], state ); if ( state != old_state ) {
state_started = monotonic_time_ms();
debug(
"State transitition from %s to %s",
proxy_session_state_names[old_state],
proxy_session_state_names[state]
);
} else {
debug( "Proxy is in state %s", proxy_session_state_names[state], state );
}
old_state = state;
switch( state ) { switch( state ) {
case READ_FROM_DOWNSTREAM: case READ_FROM_DOWNSTREAM:
@@ -668,18 +692,23 @@ void proxy_session( struct proxier* proxy )
FD_SET(proxy->upstream_fd, &wfds ); FD_SET(proxy->upstream_fd, &wfds );
break; break;
case CONNECT_TO_UPSTREAM: case CONNECT_TO_UPSTREAM:
/* upstream_fd is now -1 */
proxy_disconnect_from_upstream( proxy ); proxy_disconnect_from_upstream( proxy );
/* Changes proxy->upstream_fd */
proxy_start_connect_to_upstream( proxy );
if ( proxy->upstream_fd == -1 ) { if ( connect_to_upstream_cooldown ) {
warn( SHOW_ERRNO( "Error acquiring socket to upstream" ) ); connect_to_upstream_cooldown = 0;
continue; } else {
proxy_start_connect_to_upstream( proxy );
if ( proxy->upstream_fd == -1 ) {
warn( SHOW_ERRNO( "Error acquiring socket to upstream" ) );
continue;
}
FD_SET( proxy->upstream_fd, &wfds );
} }
/* non-blocking connect() or a simple sleep */
/* non-blocking connect() */
select_timeout.tv_sec = 15; select_timeout.tv_sec = 15;
FD_SET( proxy->upstream_fd, &wfds );
break; break;
case READ_INIT_FROM_UPSTREAM: case READ_INIT_FROM_UPSTREAM:
case READ_FROM_UPSTREAM: case READ_FROM_UPSTREAM:
@@ -699,6 +728,11 @@ void proxy_session( struct proxier* proxy )
break; break;
} }
/* Happens after failed reconnect. Avoid SIGBUS on FD_ISSET() */
if ( proxy->upstream_fd == -1 ) {
continue;
}
switch( state ) { switch( state ) {
case READ_FROM_DOWNSTREAM: case READ_FROM_DOWNSTREAM:
if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) { if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) {
@@ -717,7 +751,12 @@ void proxy_session( struct proxier* proxy )
if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) { if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) {
state = proxy_continue_connecting_to_upstream( proxy, state ); state = proxy_continue_connecting_to_upstream( proxy, state );
} }
/* Leaving state untouched will retry connecting to upstream */ /* Leaving state untouched will retry connecting to upstream -
* so introduce a bit of sleep */
if ( state == CONNECT_TO_UPSTREAM ) {
connect_to_upstream_cooldown = 1;
}
break; break;
case READ_INIT_FROM_UPSTREAM: case READ_INIT_FROM_UPSTREAM:
state = proxy_read_init_from_upstream( proxy, state ); state = proxy_read_init_from_upstream( proxy, state );
@@ -744,6 +783,17 @@ void proxy_session( struct proxier* proxy )
break; break;
} }
/* In these states, we're interested in restarting after a timeout.
*/
if ( old_state == state && proxy_state_upstream( state ) ) {
if ( ( monotonic_time_ms() ) - state_started > UPSTREAM_TIMEOUT ) {
warn(
"Timed out in state %s while communicating with upstream",
proxy_session_state_names[state]
);
state = CONNECT_TO_UPSTREAM;
}
}
} }
info( info(

View File

@@ -14,6 +14,12 @@
#include "prefetch.h" #include "prefetch.h"
#endif #endif
/** UPSTREAM_TIMEOUT
* How long ( in ms ) to allow for upstream to respond. If it takes longer
* than this, we will cancel the current request-response to them and resubmit
*/
#define UPSTREAM_TIMEOUT 30 * 1000
struct proxier { struct proxier {
/* The flexnbd wrapper this proxier is attached to */ /* The flexnbd wrapper this proxier is attached to */
struct flexnbd* flexnbd; struct flexnbd* flexnbd;