From f5c434f21cc6cbdd7cc22afa6ac14620ac088f8f Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 18 Jun 2013 15:37:39 +0100 Subject: [PATCH] proxy: Initial move to event-loop proxy model. Building with -DPREFETCH is currently broken, I'm sure, but otherwise this version seems to be feature-complete compared to the previous one, albeit wordier. Upcoming: cleanups --- src/proxy.c | 743 +++++++++++++++++++++++++++++++++++++++------------- src/proxy.h | 23 +- 2 files changed, 577 insertions(+), 189 deletions(-) diff --git a/src/proxy.c b/src/proxy.c index 7d9ac5a..ed418f4 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -69,7 +69,6 @@ struct proxier* proxy_create( out->prefetch = xmalloc( sizeof( struct prefetch ) ); #endif - out->req_buf = xmalloc( NBD_MAX_SIZE ); out->rsp_buf = xmalloc( NBD_MAX_SIZE ); @@ -87,9 +86,11 @@ void proxy_destroy( struct proxier* proxy ) free( proxy ); } +/* Shared between our two different connect_to_upstream paths */ +void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ); /* Try to establish a connection to our upstream server. Return 1 on success, - * 0 on failure + * 0 on failure. this is a blocking call that returns a non-blocking socket. */ int proxy_connect_to_upstream( struct proxier* proxy ) { @@ -113,22 +114,85 @@ int proxy_connect_to_upstream( struct proxier* proxy ) return 0; } + proxy_finish_connect_to_upstream( proxy, size ); + proxy->upstream_fd = fd; + sock_set_nonblock( fd, 1 ); + + return 1; +} + +/* First half of non-blocking connection to upstream. Gets as far as calling + * connect() on a non-blocking socket. + */ +void proxy_start_connect_to_upstream( struct proxier* proxy ) +{ + int fd, result; + struct sockaddr* from = NULL; + struct sockaddr* to = &proxy->connect_to.generic; + + if ( proxy->bind ) { + from = &proxy->connect_from.generic; + } + + fd = socket( to->sa_family , SOCK_STREAM, 0 ); + + if( fd < 0 ) { + warn( SHOW_ERRNO( "Couldn't create socket to reconnect to upstream" ) ); + return; + } + + info( "Beginning non-blocking connection to upstream on fd %i", fd ); + + if ( NULL != from ) { + if ( 0 > bind( fd, from, sockaddr_size( from ) ) ) { + warn( SHOW_ERRNO( "bind() to source address failed" ) ); + } + } + + result = sock_set_nonblock( fd, 1 ); + if ( result == -1 ) { + warn( SHOW_ERRNO( "Failed to set upstream fd %i non-blocking", fd ) ); + goto error; + } + + result = connect( fd, to, sockaddr_size( to ) ); + if ( result == -1 && errno != EINPROGRESS ) { + warn( SHOW_ERRNO( "Failed to start connect()ing to upstream!" ) ); + goto error; + } + + proxy->upstream_fd = fd; + return; + +error: + if ( sock_try_close( fd ) == -1 ) { + /* Non-fatal leak, although still nasty */ + warn( SHOW_ERRNO( "Failed to close fd for upstream %i", fd ) ); + } + return; +} + +void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ) { + if ( proxy->upstream_size == 0 ) { info( "Size of upstream image is %"PRIu64" bytes", size ); } else if ( proxy->upstream_size != size ) { - warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size ); + warn( + "Size changed from %"PRIu64" to %"PRIu64" bytes", + proxy->upstream_size, size + ); } proxy->upstream_size = size; - proxy->upstream_fd = fd; + info( "Connected to upstream on fd %i", proxy->upstream_fd ); - return 1; + return; } void proxy_disconnect_from_upstream( struct proxier* proxy ) { if ( -1 != proxy->upstream_fd ) { - info(" Closing upstream connection" ); + info("Closing upstream connection on fd %i", proxy->upstream_fd ); /* TODO: An NBD disconnect would be pleasant here */ WARN_IF_NEGATIVE( @@ -176,77 +240,18 @@ void proxy_open_listen_socket(struct proxier* params) ); info( "Now listening for incoming connections" ); + + return; } -/* Try to get a request from downstream. If reading from downstream fails, then - * the session will be over. Returns 1 on success, 0 on failure. - */ -int proxy_get_request_from_downstream( struct proxier* proxy ) -{ - unsigned char* req_hdr_raw = proxy->req_buf; - unsigned char* req_data = proxy->req_buf + NBD_REQUEST_SIZE; - size_t req_buf_size; - - struct nbd_request_raw* request_raw = (struct nbd_request_raw*) req_hdr_raw; - struct nbd_request* request = &(proxy->req_hdr); - - if ( readloop( proxy->downstream_fd, req_hdr_raw, NBD_REQUEST_SIZE ) == -1 ) { - info( SHOW_ERRNO( "Failed to get request header from downstream" ) ); - return 0; - } - - nbd_r2h_request( request_raw, request ); - req_buf_size = NBD_REQUEST_SIZE; - - if ( request->type == REQUEST_DISCONNECT ) { - info( "Received disconnect request from client" ); - return 0; - } - - if ( request->type == REQUEST_READ ) { - if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) { - warn( "NBD read request size %"PRIu32" too large", request->len ); - return 0; - } - - } - - if ( request->type == REQUEST_WRITE ) { - if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) { - warn( "NBD write request size %"PRIu32" too large", request->len ); - return 0; - } - - if ( readloop( proxy->downstream_fd, req_data, request->len ) == -1 ) { - warn( "Failed to get NBD write request data: %"PRIu32"b", request->len ); - return 0; - } - - req_buf_size += request->len; - } - - debug( - "Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, - request->type, request->from, request->len - ); - - proxy->req_buf_size = req_buf_size; - return 1; -} - -/* Tries to send the request upstream and receive a response. If upstream breaks - * then we reconnect to it, and keep it up until we have a complete response - * back. Returns 1 on success, 0 on failure - */ -int proxy_run_request_upstream( struct proxier* proxy ) -{ - unsigned char* rsp_hdr_raw = proxy->rsp_buf; - unsigned char* rsp_data = proxy->rsp_buf + NBD_REPLY_SIZE; - - struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw; - struct nbd_request* request = &(proxy->req_hdr); #ifdef PREFETCH + +int proxy_prefetch_for_request( struct proxier* proxy, int state ) +{ + /* TODO: restore prefetch capability */ + return state; + /* We only want to consider prefetching if we know we're not * getting too much data back, if it's a read request, and if * the prefetch won't try to read past the end of the file. @@ -277,13 +282,7 @@ int proxy_run_request_upstream( struct proxier* proxy ) (struct nbd_request_raw *)proxy->req_buf ); free( rewrite_request ); } -#endif - - struct nbd_reply* reply = &(proxy->rsp_hdr); - - size_t rsp_buf_size; - -#ifdef PREFETCH +//#### if ( request->type == REQUEST_READ ){ /* See if we can respond with what's in our prefetch * cache @@ -324,40 +323,12 @@ int proxy_run_request_upstream( struct proxier* proxy ) request->type); proxy->prefetch->is_full = 0; } -#endif +} - if ( -1 == writeloop( proxy->upstream_fd, - proxy->req_buf, - proxy->req_buf_size ) ) { - warn( SHOW_ERRNO( "Failed to send request to upstream" ) ); - goto disconnect; - } +int proxy_prefetch_for_reply( struct proxier* proxy, int state ) +{ + return state; - if ( -1 == readloop( proxy->upstream_fd, - rsp_hdr_raw, - NBD_REPLY_SIZE ) ) { - warn( SHOW_ERRNO( "Failed to get reply header from upstream" ) ); - goto disconnect; - } - - nbd_r2h_reply( reply_raw, reply ); - rsp_buf_size = NBD_REPLY_SIZE; - - if ( reply->magic != REPLY_MAGIC ) { - debug( "Reply magic is incorrect" ); - goto disconnect; - } - - debug( "NBD reply received from upstream. Response code: %"PRIu32, - reply->error ); - - if ( reply->error != 0 ) { - warn( "NBD error returned from upstream: %"PRIu32, - reply->error ); - } - - if ( reply->error == 0 && request->type == REQUEST_READ ) { -#ifdef PREFETCH if ( -1 == readloop( proxy->upstream_fd, rsp_data, request->len ) ) { @@ -376,111 +347,509 @@ int proxy_run_request_upstream( struct proxier* proxy ) proxy->prefetch->len = request->len; proxy->prefetch->is_full = 1; } -#else - if ( -1 == readloop( proxy->upstream_fd, - rsp_data, - request->len ) ) { - warn( SHOW_ERRNO( "Failed to get read reply data from upstream" ) ); - goto disconnect; - } +} + #endif - rsp_buf_size += request->len; + +typedef enum { + EXIT, + WRITE_TO_DOWNSTREAM, + READ_FROM_DOWNSTREAM, + CONNECT_TO_UPSTREAM, + READ_INIT_FROM_UPSTREAM, + WRITE_TO_UPSTREAM, + READ_FROM_UPSTREAM +} proxy_session_states; + +static char* proxy_session_state_names[] = { + "EXIT", + "WRITE_TO_DOWNSTREAM", + "READ_FROM_DOWNSTREAM", + "CONNECT_TO_UPSTREAM", + "READ_INIT_FROM_UPSTREAM", + "WRITE_TO_UPSTREAM", + "READ_FROM_UPSTREAM" +}; + +static inline int io_errno_permanent() +{ + return ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ); +} + + +int proxy_read_from_downstream( struct proxier *proxy, int state ) +{ + ssize_t count; + + struct nbd_request_raw* request_raw = (struct nbd_request_raw*) proxy->req_buf; + struct nbd_request* request = &(proxy->req_hdr); + +// assert( state == READ_FROM_DOWNSTREAM ); + + /* We're beginning a new read from downstream */ + if ( proxy->req_buf_size == 0 ) { + proxy->req_buf_size = NBD_REQUEST_SIZE; } - proxy->rsp_buf_size = rsp_buf_size; - return 1; - -disconnect: - warn( - "Request was: type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, - request->type, request->from, request->len +read: + debug( + "Reading %"PRIu32" of %"PRIu32" bytes from downstream for NBD request", + proxy->req_buf_size - proxy->req_buf_offset, + proxy->req_buf_size ); - proxy_disconnect_from_upstream( proxy ); + + count = read( + proxy->downstream_fd, + proxy->req_buf + proxy->req_buf_offset, + proxy->req_buf_size - proxy->req_buf_offset + ); + + if ( count == 0 ) { + info( "EOF on downstream fd %i received", proxy->downstream_fd ); + return EXIT; + } + + if ( count != -1 ) { + proxy->req_buf_offset += count; + } else if ( io_errno_permanent() ) { + warn( SHOW_ERRNO( "Couldn't read request from downstream: " ) ); + return EXIT; + } + + if ( proxy->req_buf_offset == NBD_REQUEST_SIZE ) { + nbd_r2h_request( request_raw, request ); + + if ( request->type == REQUEST_DISCONNECT ) { + info( "Received disconnect request from client" ); + return EXIT; + } + + /* Simple validations */ + if ( request->type == REQUEST_READ ) { + if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) { + warn( "NBD read request size %"PRIu32" too large", request->len ); + return EXIT; + } + } + if ( request->type == REQUEST_WRITE ) { + if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) { + warn( "NBD write request size %"PRIu32" too large", request->len ); + return EXIT; + } + + proxy->req_buf_size += request->len; + + /* Minor optimisation: Read again immediately if there's write + * request data to be had. No point select()ing again + */ + goto read; + } + } + + if ( proxy->req_buf_offset == proxy->req_buf_size ) { + debug( + "Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, + request->type, request->from, request->len + ); + + /* Finished reading, so advance state. Leave size untouched so the next + * state knows how many bytes to write */ + proxy->req_buf_offset = 0; + return WRITE_TO_UPSTREAM; + } + + return state; +} + +int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state ) +{ + int error, result; + socklen_t len = sizeof( error ); + +// assert( state == CONNECT_TO_UPSTREAM ); + + result = getsockopt( + proxy->upstream_fd, SOL_SOCKET, SO_ERROR, &error, &len + ); + + if ( result == -1 ) { + warn( SHOW_ERRNO( "Failed to tell if connected to upstream" ) ); + return state; + } + + if ( error != 0 ) { + errno = error; + warn( SHOW_ERRNO( "Failed to connect to upstream" ) ); + return state; + } + #ifdef PREFETCH + /* Data may have changed while we were disconnected */ proxy->prefetch->is_full = 0; #endif - return 0; + + info( "Connected to upstream on fd %i", proxy->upstream_fd ); + return READ_INIT_FROM_UPSTREAM; } -/* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */ -int proxy_send_reply_downstream( struct proxier* proxy ) +int proxy_read_init_from_upstream( struct proxier* proxy, int state ) { - int result; - unsigned char* rsp_buf = proxy->rsp_buf; + ssize_t count; + unsigned char* buf = (unsigned char *) &(proxy->init_buf); + +// assert( state == READ_INIT_FROM_UPSTREAM ); debug( - "Writing header (%"PRIu32") + data (%"PRIu32") bytes downstream", - NBD_REPLY_SIZE, proxy->rsp_buf_size - NBD_REPLY_SIZE + "Reading %"PRIu32" bytes of %"PRIu32" in init message", + sizeof( proxy->init_buf ) - proxy->init_buf_offset, + sizeof( proxy->init_buf ) ); - result = writeloop( proxy->downstream_fd, rsp_buf, proxy->rsp_buf_size ); - if ( result == -1 ) { - warn( SHOW_ERRNO( "Failed to send reply downstream" ) ); - return 0; + count = read( + proxy->upstream_fd, + buf + proxy->init_buf_offset, + sizeof( proxy->init_buf ) - proxy->init_buf_offset + ); + + if ( count == 0 ) { + info( "EOF signalled on upstream fd %i", proxy->upstream_fd ); + goto disconnect; } - debug( "Reply sent" ); - return 1; + if ( count != -1 ) { + proxy->init_buf_offset += count; + } else if ( io_errno_permanent() ) { + warn( SHOW_ERRNO( "Failed to read init from upstream" ) ); + goto disconnect; + } + + if ( proxy->init_buf_offset == sizeof( proxy->init_buf ) ) { + off64_t upstream_size; + if ( !nbd_check_hello( &proxy->init_buf, &upstream_size ) ) { + warn( "Upstream sent invalid init" ); + goto disconnect; + } + + /* Currently, we only get disconnected from upstream (so needing to come + * here) when we have an outstanding request. If that becomes false, + * we'll need to choose the right state to return to here */ + proxy->init_buf_offset = 0; + return WRITE_TO_UPSTREAM; + } + + return state; + +disconnect: + proxy->init_buf_offset = 0; + return CONNECT_TO_UPSTREAM; +} + +int proxy_write_to_upstream( struct proxier* proxy, int state ) +{ + ssize_t count; + +// assert( state == WRITE_TO_UPSTREAM ); + + debug( + "Writing %"PRIu32" of %"PRIu32" bytes of request to upstream", + proxy->req_buf_size - proxy->req_buf_offset, + proxy->req_buf_size + ); + + count = write( + proxy->upstream_fd, + proxy->req_buf + proxy->req_buf_offset, + proxy->req_buf_size - proxy->req_buf_offset + ); + + if ( count != -1 ) { + proxy->req_buf_offset += count; + } else if ( io_errno_permanent() ) { + warn( SHOW_ERRNO( "Failed to send request to upstream" ) ); + proxy->req_buf_offset = 0; + return CONNECT_TO_UPSTREAM; + } + + if ( proxy->req_buf_offset == proxy->req_buf_size ) { + /* Request sent. Advance to reading the response from upstream. We might + * still need req_buf_size if reading the reply fails - we disconnect + * and resend the reply in that case - so keep it around for now. */ + proxy->req_buf_offset = 0; + return READ_FROM_UPSTREAM; + } + + return state; +} + +int proxy_read_from_upstream( struct proxier* proxy, int state ) +{ + ssize_t count; + + struct nbd_reply* reply = &(proxy->rsp_hdr); + struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) proxy->rsp_buf; + + /* We can't assume the NBD_REPLY_SIZE + req->len is what we'll get back */ + if ( proxy->rsp_buf_offset == 0 ) { + proxy->rsp_buf_size = NBD_REPLY_SIZE; + } + +read: + debug( + "Reading %"PRIu32" of %"PRIu32" bytes in NBD reply", + proxy->rsp_buf_size - proxy->rsp_buf_offset, + proxy->rsp_buf_size + ); + + count = read( + proxy->upstream_fd, + proxy->rsp_buf + proxy->rsp_buf_offset, + proxy->rsp_buf_size - proxy->rsp_buf_offset + ); + + if ( count == 0 ) { + info( "EOF signalled on upstream fd %i", proxy->upstream_fd ); + goto disconnect; + } + + if ( count != -1 ) { + proxy->rsp_buf_offset += count; + } else if ( io_errno_permanent() ) { + warn( SHOW_ERRNO( "Failed to get reply from upstream" ) ); + goto disconnect; + } + + debug( + "Read %"PRIu32" of %"PRIu32" bytes of reply from upstream", + proxy->rsp_buf_offset, + proxy->rsp_buf_size + ); + + if ( proxy->rsp_buf_offset == NBD_REPLY_SIZE ) { + nbd_r2h_reply( reply_raw, reply ); + + if ( reply->magic != REPLY_MAGIC ) { + warn( "Reply magic is incorrect" ); + goto disconnect; + } + + if ( reply->error != 0 ) { + warn( "NBD error returned from upstream: %"PRIu32, reply->error ); + goto disconnect; + } + + if ( ( proxy->req_hdr.type || REQUEST_READ ) == REQUEST_READ ) { + /* Read the read reply data too. Same optimisation as + * read_from_downstream */ + proxy->rsp_buf_size += proxy->req_hdr.len; + goto read; + } + } + + if ( proxy->rsp_buf_size == proxy->rsp_buf_offset ) { + debug( "NBD reply received from upstream." ); + proxy->rsp_buf_offset = 0; + return WRITE_TO_DOWNSTREAM; + } + + return state; + +disconnect: + proxy->rsp_buf_offset = 0; + proxy->rsp_buf_size = 0; + return CONNECT_TO_UPSTREAM; } -/* Here, we negotiate an NBD session with downstream, based on the information - * we got on first connection to upstream. Then we wait for a request to come - * in from downstream, read it into memory, then send it to upstream. If - * upstream dies before responding, we reconnect to upstream and resend it. - * Once we've got a response, we write it directly to downstream, and wait for a - * new request. When downstream disconnects, or we receive an exit signal (which - * can be blocked, unfortunately), we are finished. - * - * This is the simplest possible nbd proxy I can think of. It may not be at all - * performant - let's see. - */ - -void proxy_session( struct proxier* proxy ) +int proxy_write_to_downstream( struct proxier* proxy, int state ) { - int downstream_fd = proxy->downstream_fd; - uint64_t req_count = 0; - int result; + ssize_t count; - info( "Beginning proxy session on fd %i", downstream_fd ); +// assert( state == WRITE_TO_DOWNSTREAM ); - if ( !socket_nbd_write_hello( downstream_fd, proxy->upstream_size ) ) { - warn( "Sending hello failed on fd %i, ending session", downstream_fd ); - return; + if ( proxy->hello_sent ) { + debug( + "Writing request of %"PRIu32" bytes from offset %"PRIu32, + proxy->rsp_buf_size, proxy->rsp_buf_offset + ); + } else { + debug( "Writing init to downstream" ); } - while( proxy_get_request_from_downstream( proxy ) ) { - do { - if ( proxy->upstream_fd == -1 ) { - info( "Connecting to upstream" ); - if ( !proxy_connect_to_upstream( proxy ) ) { - warn( "Failed to connect to upstream" ); - result = 0; - sleep( 5 ); + count = write( + proxy->downstream_fd, + proxy->rsp_buf + proxy->rsp_buf_offset, + proxy->rsp_buf_size - proxy->rsp_buf_offset + ); + + if ( count != -1 ) { + proxy->rsp_buf_offset += count; + } else if ( io_errno_permanent() ) { + if ( proxy->hello_sent ) { + warn( SHOW_ERRNO( "Failed to send reply downstream" ) ); + } else { + warn( SHOW_ERRNO( "Failed to send init downstream" ) ); + } + return EXIT; + } + + if ( proxy->rsp_buf_offset == proxy->rsp_buf_size ) { + if ( !proxy->hello_sent ) { + info( "Hello message sent to client" ); + proxy->hello_sent = 1; + } else { + debug( "Reply sent" ); + proxy->req_count++; + } + + /* We're done with the request & response buffers now */ + proxy->req_buf_size = 0; + proxy->req_buf_offset = 0; + proxy->rsp_buf_size = 0; + proxy->rsp_buf_offset = 0; + return READ_FROM_DOWNSTREAM; + } + + return state; +} + +/* Non-blocking proxy session. Simple(ish) state machine. We read from d/s until + * we have a full request, then try to write that request u/s. If writing fails, + * we reconnect to upstream and retry. Once we've successfully written, we + * attempt to read the reply. If that fails or times out (we give it 30 seconds) + * then we disconnect from u/s and go back to trying to reconnect and resend. + * + * This is the second-simplest NBD proxy I can think of. The first version was + * non-blocking I/O, but it was getting impossible to manage exceptional stuff + */ +void proxy_session( struct proxier* proxy ) +{ + /* First action: Write hello to downstream */ + nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp_buf, proxy->upstream_size ); + proxy->rsp_buf_size = sizeof( struct nbd_init_raw ); + proxy->rsp_buf_offset = 0; + int state = WRITE_TO_DOWNSTREAM; + + info( "Beginning proxy session on fd %i", proxy->downstream_fd ); + + while( state != EXIT ) { + + struct timeval select_timeout = { + .tv_sec = 0, + .tv_usec = 0 + }; + + struct timeval *select_timeout_ptr = NULL; + + int result; /* used by select() */ + + fd_set rfds; + fd_set wfds; + + FD_ZERO( &rfds ); + FD_ZERO( &wfds ); + + debug( "Proxy is in state %s ( %i )", proxy_session_state_names[state], state ); + + switch( state ) { + case READ_FROM_DOWNSTREAM: + FD_SET( proxy->downstream_fd, &rfds ); + break; + case WRITE_TO_DOWNSTREAM: + FD_SET( proxy->downstream_fd, &wfds ); + break; + case WRITE_TO_UPSTREAM: + select_timeout.tv_sec = 15; + FD_SET(proxy->upstream_fd, &wfds ); + break; + case CONNECT_TO_UPSTREAM: + proxy_disconnect_from_upstream( proxy ); + /* Changes proxy->upstream_fd */ + proxy_start_connect_to_upstream( proxy ); + + if ( proxy->upstream_fd == -1 ) { + warn( SHOW_ERRNO( "Error acquiring socket to upstream" ) ); continue; } - info( "Connected to upstream"); - } - result = proxy_run_request_upstream( proxy ); - } while ( result == 0 ); - if ( !proxy_send_reply_downstream( proxy ) ) { - warn( "Replying on fd %i failed, ending session", downstream_fd ); + /* non-blocking connect() */ + select_timeout.tv_sec = 15; + FD_SET( proxy->upstream_fd, &wfds ); + break; + case READ_INIT_FROM_UPSTREAM: + case READ_FROM_UPSTREAM: + select_timeout.tv_sec = 15; + FD_SET( proxy->upstream_fd, &rfds ); + break; + }; + + if ( select_timeout.tv_sec > 0 ) { + select_timeout_ptr = &select_timeout; + } + + result = sock_try_select( FD_SETSIZE, &rfds, &wfds, NULL, select_timeout_ptr ); + + if ( result == -1 ) { + warn( SHOW_ERRNO( "select() failed: " ) ); break; } - proxy->req_buf_size = 0; - proxy->rsp_buf_size = 0; + switch( state ) { + case READ_FROM_DOWNSTREAM: + if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) { + state = proxy_read_from_downstream( proxy, state ); +#ifdef PREFETCH + /* Check if we can fulfil the request from prefetch, or + * rewrite the request to fill the prefetch buffer if needed + */ + if ( state == WRITE_TO_UPSTREAM ) { + state = proxy_prefetch_for_request( proxy, state ); + } +#endif + } + break; + case CONNECT_TO_UPSTREAM: + if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) { + state = proxy_continue_connecting_to_upstream( proxy, state ); + } + /* Leaving state untouched will retry connecting to upstream */ + break; + case READ_INIT_FROM_UPSTREAM: + state = proxy_read_init_from_upstream( proxy, state ); + case WRITE_TO_UPSTREAM: + if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) { + state = proxy_write_to_upstream( proxy, state ); + } + break; + case READ_FROM_UPSTREAM: + if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) { + state = proxy_read_from_upstream( proxy, state ); + } +# ifdef PREFETCH + /* Fill the prefetch buffer and rewrite the reply, if needed */ + if ( state == WRITE_TO_DOWNSTREAM ) { + state = proxy_prefetch_for_reply( proxy, state ); + } +#endif + break; + case WRITE_TO_DOWNSTREAM: + if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) { + state = proxy_write_to_downstream( proxy, state ); + } + break; + } - req_count++; - }; + } info( "Finished proxy session on fd %i after %"PRIu64" successful request(s)", - downstream_fd, req_count + proxy->downstream_fd, proxy->req_count ); + /* Reset these two for the next session */ + proxy->req_count = 0; + proxy->hello_sent = 0; + return; } @@ -515,6 +884,7 @@ int proxy_accept( struct proxier* params ) } info( "Accepted nbd client socket fd %d", client_fd ); + sock_set_nonblock( client_fd, 1 ); params->downstream_fd = client_fd; proxy_session( params ); @@ -544,6 +914,13 @@ void proxy_cleanup( struct proxier* proxy ) info( "Cleaning up" ); if ( -1 != proxy->listen_fd ) { + + if ( AF_UNIX == proxy->listen_on.family ) { + if ( -1 == unlink( proxy->listen_on.un.sun_path ) ) { + warn( SHOW_ERRNO( "Failed to unlink %s", proxy->listen_on.un.sun_path ) ); + } + } + WARN_IF_NEGATIVE( sock_try_close( proxy->listen_fd ), SHOW_ERRNO( "Failed to close() listen fd %i", proxy->listen_fd ) @@ -571,12 +948,6 @@ void proxy_cleanup( struct proxier* proxy ) proxy->upstream_fd = -1; } - if ( AF_UNIX == proxy->listen_on.family ) { - if ( -1 == unlink( proxy->listen_on.un.sun_path ) ) { - warn( SHOW_ERRNO( "Failed to unlink %s", proxy->listen_on.un.sun_path ) ); - } - } - info( "Cleanup done" ); } diff --git a/src/proxy.h b/src/proxy.h index 09f7a94..3ac957f 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -47,7 +47,10 @@ struct proxier { unsigned char* req_buf; /* Number of bytes currently sat in req_buf */ - size_t req_buf_size; + ssize_t req_buf_size; + + /* Number of bytes of request we've gotten through */ + off_t req_buf_offset; /* We transform the raw request header into here */ struct nbd_request req_hdr; @@ -56,13 +59,27 @@ struct proxier { unsigned char* rsp_buf; /* Number of bytes currently sat in rsp_buf */ - size_t rsp_buf_size; + ssize_t rsp_buf_size; + + /* Number of bytes of response we've gotten through */ + off_t rsp_buf_offset; /* We transform the raw reply header into here */ struct nbd_reply rsp_hdr; + /* It's starting to feel like we need an object for a single proxy session. + * These two track how many requests we've sent so far, and whether the + * NBD_INIT code has been sent to the client yet. + */ + uint64_t req_count; + int hello_sent; + + /* And now we're doing non-blocking connect to upstream, we need this too */ + struct nbd_init_raw init_buf; + off_t init_buf_offset; + #ifdef PREFETCH - struct prefetch *prefetch; + struct prefetch *prefetch; #endif };