#include "proxy.h" #include "readwrite.h" #ifdef PREFETCH #include "prefetch.h" #endif #include "ioutil.h" #include "sockutil.h" #include "util.h" #include #include #include struct proxier* proxy_create( char* s_downstream_address, char* s_downstream_port, char* s_upstream_address, char* s_upstream_port, char* s_upstream_bind ) { struct proxier* out; out = xmalloc( sizeof( struct proxier ) ); FATAL_IF_NULL(s_downstream_address, "Listen address not specified"); NULLCHECK( s_downstream_address ); FATAL_UNLESS( parse_to_sockaddr( &out->listen_on.generic, s_downstream_address ), "Couldn't parse downstream address %s" ); if ( out->listen_on.family != AF_UNIX ) { FATAL_IF_NULL( s_downstream_port, "Downstream port not specified" ); NULLCHECK( s_downstream_port ); parse_port( s_downstream_port, &out->listen_on.v4 ); } FATAL_IF_NULL(s_upstream_address, "Upstream address not specified"); NULLCHECK( s_upstream_address ); FATAL_UNLESS( parse_ip_to_sockaddr( &out->connect_to.generic, s_upstream_address ), "Couldn't parse upstream address '%s'", s_upstream_address ); FATAL_IF_NULL( s_upstream_port, "Upstream port not specified" ); NULLCHECK( s_upstream_port ); parse_port( s_upstream_port, &out->connect_to.v4 ); if ( s_upstream_bind ) { FATAL_IF_ZERO( parse_ip_to_sockaddr( &out->connect_from.generic, s_upstream_bind ), "Couldn't parse bind address '%s'", s_upstream_bind ); out->bind = 1; } out->listen_fd = -1; out->downstream_fd = -1; out->upstream_fd = -1; #ifdef PREFETCH out->prefetch = xmalloc( sizeof( struct prefetch ) ); #endif out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) ); out->req.buf = xmalloc( NBD_MAX_SIZE ); out->rsp.buf = xmalloc( NBD_MAX_SIZE ); return out; } void proxy_destroy( struct proxier* proxy ) { free( proxy->init.buf ); free( proxy->req.buf ); free( proxy->rsp.buf ); #ifdef PREFETCH free( proxy->prefetch ); #endif free( proxy ); } /* Shared between our two different connect_to_upstream paths */ void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ); /* Try to establish a connection to our upstream server. Return 1 on success, * 0 on failure. this is a blocking call that returns a non-blocking socket. */ int proxy_connect_to_upstream( struct proxier* proxy ) { struct sockaddr* connect_from = NULL; if ( proxy->bind ) { connect_from = &proxy->connect_from.generic; } int fd = socket_connect( &proxy->connect_to.generic, connect_from ); off64_t size = 0; if ( -1 == fd ) { return 0; } if( !socket_nbd_read_hello( fd, &size ) ) { WARN_IF_NEGATIVE( sock_try_close( fd ), "Couldn't close() after failed read of NBD hello on fd %i", fd ); return 0; } proxy->upstream_fd = fd; sock_set_nonblock( fd, 1 ); proxy_finish_connect_to_upstream( proxy, size ); return 1; } /* First half of non-blocking connection to upstream. Gets as far as calling * connect() on a non-blocking socket. */ void proxy_start_connect_to_upstream( struct proxier* proxy ) { int fd, result; struct sockaddr* from = NULL; struct sockaddr* to = &proxy->connect_to.generic; if ( proxy->bind ) { from = &proxy->connect_from.generic; } fd = socket( to->sa_family , SOCK_STREAM, 0 ); if( fd < 0 ) { warn( SHOW_ERRNO( "Couldn't create socket to reconnect to upstream" ) ); return; } info( "Beginning non-blocking connection to upstream on fd %i", fd ); if ( NULL != from ) { if ( 0 > bind( fd, from, sockaddr_size( from ) ) ) { warn( SHOW_ERRNO( "bind() to source address failed" ) ); } } result = sock_set_nonblock( fd, 1 ); if ( result == -1 ) { warn( SHOW_ERRNO( "Failed to set upstream fd %i non-blocking", fd ) ); goto error; } result = connect( fd, to, sockaddr_size( to ) ); if ( result == -1 && errno != EINPROGRESS ) { warn( SHOW_ERRNO( "Failed to start connect()ing to upstream!" ) ); goto error; } proxy->upstream_fd = fd; return; error: if ( sock_try_close( fd ) == -1 ) { /* Non-fatal leak, although still nasty */ warn( SHOW_ERRNO( "Failed to close fd for upstream %i", fd ) ); } return; } void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ) { if ( proxy->upstream_size == 0 ) { info( "Size of upstream image is %"PRIu64" bytes", size ); } else if ( proxy->upstream_size != size ) { warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size ); } proxy->upstream_size = size; info( "Connected to upstream on fd %i", proxy->upstream_fd ); return; } void proxy_disconnect_from_upstream( struct proxier* proxy ) { if ( -1 != proxy->upstream_fd ) { info("Closing upstream connection on fd %i", proxy->upstream_fd ); /* TODO: An NBD disconnect would be pleasant here */ WARN_IF_NEGATIVE( sock_try_close( proxy->upstream_fd ), "Failed to close() fd %i when disconnecting from upstream", proxy->upstream_fd ); proxy->upstream_fd = -1; } } /** Prepares a listening socket for the NBD server, binding etc. */ void proxy_open_listen_socket(struct proxier* params) { NULLCHECK( params ); params->listen_fd = socket(params->listen_on.family, SOCK_STREAM, 0); FATAL_IF_NEGATIVE( params->listen_fd, SHOW_ERRNO( "Couldn't create listen socket" ) ); /* Allow us to restart quickly */ FATAL_IF_NEGATIVE( sock_set_reuseaddr(params->listen_fd, 1), SHOW_ERRNO( "Couldn't set SO_REUSEADDR" ) ); if( AF_UNIX != params->listen_on.family ) { FATAL_IF_NEGATIVE( sock_set_tcp_nodelay(params->listen_fd, 1), SHOW_ERRNO( "Couldn't set TCP_NODELAY" ) ); } FATAL_UNLESS_ZERO( sock_try_bind( params->listen_fd, ¶ms->listen_on.generic ), SHOW_ERRNO( "Failed to bind to listening socket" ) ); /* We're only serving one client at a time, hence backlog of 1 */ FATAL_IF_NEGATIVE( listen(params->listen_fd, 1), SHOW_ERRNO( "Failed to listen on listening socket" ) ); info( "Now listening for incoming connections" ); return; } typedef enum { EXIT, WRITE_TO_DOWNSTREAM, READ_FROM_DOWNSTREAM, CONNECT_TO_UPSTREAM, READ_INIT_FROM_UPSTREAM, WRITE_TO_UPSTREAM, READ_FROM_UPSTREAM } proxy_session_states; static char* proxy_session_state_names[] = { "EXIT", "WRITE_TO_DOWNSTREAM", "READ_FROM_DOWNSTREAM", "CONNECT_TO_UPSTREAM", "READ_INIT_FROM_UPSTREAM", "WRITE_TO_UPSTREAM", "READ_FROM_UPSTREAM" }; #ifdef PREFETCH int proxy_prefetch_for_request( struct proxier* proxy, int state ) { struct nbd_request* req = &proxy->req_hdr; struct nbd_reply* rsp = &proxy->rsp_hdr; struct nbd_request_raw* req_raw = (struct nbd_request_raw*) proxy->req.buf; struct nbd_reply_raw *rsp_raw = (struct nbd_reply_raw*) proxy->rsp.buf; int is_read = ( req->type & REQUEST_MASK ) == REQUEST_READ; int prefetch_start = req->from; int prefetch_end = req->from + ( req->len * 2 ); /* We only want to consider prefetching if we know we're not * getting too much data back, if it's a read request, and if * the prefetch won't try to read past the end of the file. */ int prefetching = req->len <= PREFETCH_BUFSIZE && is_read && prefetch_start < prefetch_end && prefetch_end <= proxy->upstream_size; if ( is_read ) { /* See if we can respond with what's in our prefetch * cache */ if ( proxy->prefetch->is_full && req->from == proxy->prefetch->from && req->len == proxy->prefetch->len ) { /* HUZZAH! A match! */ debug( "Prefetch hit!" ); /* First build a reply header */ rsp->magic = REPLY_MAGIC; rsp->error = 0; memcpy( &rsp->handle, &req->handle, 8 ); /* now copy it into the response */ nbd_h2r_reply( rsp, rsp_raw ); /* and the data */ memcpy( proxy->rsp.buf + NBD_REPLY_SIZE, proxy->prefetch->buffer, proxy->prefetch->len ); proxy->rsp.size = NBD_REPLY_SIZE + proxy->prefetch->len; proxy->rsp.needle = 0; /* return early, our work here is done */ return WRITE_TO_DOWNSTREAM; } } else { /* Safety catch. If we're sending a write request, we * blow away the cache. This is very pessimistic, but * it's simpler (and therefore safer) than working out * whether we can keep it or not. */ debug( "Blowing away prefetch cache on type %d request.", req->type ); proxy->prefetch->is_full = 0; } debug( "Prefetch cache MISS!"); /* We pull the request out of the proxy struct, rewrite the * request size, and write it back. */ if ( prefetching ) { proxy->is_prefetch_req = 1; proxy->prefetch_req_orig_len = req->len; req->len *= 2; debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len ); nbd_h2r_request( req, req_raw ); debug( "Request len: %"PRIu32" ; raw: %"PRIu32, req->len, req_raw->len ); } return state; } int proxy_prefetch_for_reply( struct proxier* proxy, int state ) { size_t prefetched_bytes; if ( !proxy->is_prefetch_req ) { return state; } prefetched_bytes = proxy->req_hdr.len - proxy->prefetch_req_orig_len; debug( "Prefetched %d bytes", prefetched_bytes ); memcpy( proxy->rsp.buf + proxy->prefetch_req_orig_len, &(proxy->prefetch->buffer), prefetched_bytes ); proxy->prefetch->from = proxy->req_hdr.from + proxy->prefetch_req_orig_len; proxy->prefetch->len = prefetched_bytes; /* We've finished with proxy->req by now, so don't need to alter it to make * it look like the request was before prefetch */ /* Truncate the bytes we'll write downstream */ proxy->req_hdr.len = proxy->prefetch_req_orig_len; proxy->rsp.size -= prefetched_bytes; /* And we need to reset these */ proxy->prefetch->is_full = 1; proxy->is_prefetch_req = 0; return state; } #endif int proxy_read_from_downstream( struct proxier *proxy, int state ) { ssize_t count; struct nbd_request_raw* request_raw = (struct nbd_request_raw*) proxy->req.buf; struct nbd_request* request = &(proxy->req_hdr); // assert( state == READ_FROM_DOWNSTREAM ); count = iobuf_read( proxy->downstream_fd, &proxy->req, NBD_REQUEST_SIZE ); if ( count == -1 ) { warn( SHOW_ERRNO( "Couldn't read request from downstream" ) ); return EXIT; } if ( proxy->req.needle == NBD_REQUEST_SIZE ) { nbd_r2h_request( request_raw, request ); if ( ( request->type & REQUEST_MASK ) == REQUEST_DISCONNECT ) { info( "Received disconnect request from client" ); return EXIT; } /* Simple validations */ if ( ( request->type & REQUEST_MASK ) == REQUEST_READ ) { if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) { warn( "NBD read request size %"PRIu32" too large", request->len ); return EXIT; } } if ( (request->type & REQUEST_MASK ) == REQUEST_WRITE ) { if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) { warn( "NBD write request size %"PRIu32" too large", request->len ); return EXIT; } proxy->req.size += request->len; } } if ( proxy->req.needle == proxy->req.size ) { debug( "Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, request->type, request->from, request->len ); /* Finished reading, so advance state. Leave size untouched so the next * state knows how many bytes to write */ proxy->req.needle = 0; return WRITE_TO_UPSTREAM; } return state; } int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state ) { int error, result; socklen_t len = sizeof( error ); // assert( state == CONNECT_TO_UPSTREAM ); result = getsockopt( proxy->upstream_fd, SOL_SOCKET, SO_ERROR, &error, &len ); if ( result == -1 ) { warn( SHOW_ERRNO( "Failed to tell if connected to upstream" ) ); return state; } if ( error != 0 ) { errno = error; warn( SHOW_ERRNO( "Failed to connect to upstream" ) ); return state; } #ifdef PREFETCH /* Data may have changed while we were disconnected */ proxy->prefetch->is_full = 0; #endif info( "Connected to upstream on fd %i", proxy->upstream_fd ); return READ_INIT_FROM_UPSTREAM; } int proxy_read_init_from_upstream( struct proxier* proxy, int state ) { ssize_t count; // assert( state == READ_INIT_FROM_UPSTREAM ); count = iobuf_read( proxy->upstream_fd, &proxy->init, sizeof( struct nbd_init_raw ) ); if ( count == -1 ) { warn( SHOW_ERRNO( "Failed to read init from upstream" ) ); goto disconnect; } if ( proxy->init.needle == proxy->init.size ) { off64_t upstream_size; if ( !nbd_check_hello( (struct nbd_init_raw*) proxy->init.buf, &upstream_size ) ) { warn( "Upstream sent invalid init" ); goto disconnect; } /* Currently, we only get disconnected from upstream (so needing to come * here) when we have an outstanding request. If that becomes false, * we'll need to choose the right state to return to here */ proxy->init.needle = 0; return WRITE_TO_UPSTREAM; } return state; disconnect: proxy->init.needle = 0; proxy->init.size = 0; return CONNECT_TO_UPSTREAM; } int proxy_write_to_upstream( struct proxier* proxy, int state ) { ssize_t count; // assert( state == WRITE_TO_UPSTREAM ); count = iobuf_write( proxy->upstream_fd, &proxy->req ); if ( count == -1 ) { warn( SHOW_ERRNO( "Failed to send request to upstream" ) ); proxy->req.needle = 0; return CONNECT_TO_UPSTREAM; } if ( proxy->req.needle == proxy->req.size ) { /* Request sent. Advance to reading the response from upstream. We might * still need req.size if reading the reply fails - we disconnect * and resend the reply in that case - so keep it around for now. */ proxy->req.needle = 0; return READ_FROM_UPSTREAM; } return state; } int proxy_read_from_upstream( struct proxier* proxy, int state ) { ssize_t count; struct nbd_reply* reply = &(proxy->rsp_hdr); struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) proxy->rsp.buf; /* We can't assume the NBD_REPLY_SIZE + req->len is what we'll get back */ count = iobuf_read( proxy->upstream_fd, &proxy->rsp, NBD_REPLY_SIZE ); if ( count == -1 ) { warn( SHOW_ERRNO( "Failed to get reply from upstream" ) ); goto disconnect; } if ( proxy->rsp.needle == NBD_REPLY_SIZE ) { nbd_r2h_reply( reply_raw, reply ); if ( reply->magic != REPLY_MAGIC ) { warn( "Reply magic is incorrect" ); goto disconnect; } if ( reply->error != 0 ) { warn( "NBD error returned from upstream: %"PRIu32, reply->error ); goto disconnect; } if ( ( proxy->req_hdr.type & REQUEST_MASK ) == REQUEST_READ ) { /* Get the read reply data too. */ proxy->rsp.size += proxy->req_hdr.len; } } if ( proxy->rsp.size == proxy->rsp.needle ) { debug( "NBD reply received from upstream." ); proxy->rsp.needle = 0; return WRITE_TO_DOWNSTREAM; } return state; disconnect: proxy->rsp.needle = 0; proxy->rsp.size = 0; return CONNECT_TO_UPSTREAM; } int proxy_write_to_downstream( struct proxier* proxy, int state ) { ssize_t count; // assert( state == WRITE_TO_DOWNSTREAM ); if ( !proxy->hello_sent ) { info( "Writing init to downstream" ); } count = iobuf_write( proxy->downstream_fd, &proxy->rsp ); if ( count == -1 ) { warn( SHOW_ERRNO( "Failed to write to downstream" ) ); return EXIT; } if ( proxy->rsp.needle == proxy->rsp.size ) { if ( !proxy->hello_sent ) { info( "Hello message sent to client" ); proxy->hello_sent = 1; } else { debug( "Reply sent" ); proxy->req_count++; } /* We're done with the request & response buffers now */ proxy->req.size = 0; proxy->req.needle = 0; proxy->rsp.size = 0; proxy->rsp.needle = 0; return READ_FROM_DOWNSTREAM; } return state; } /* Non-blocking proxy session. Simple(ish) state machine. We read from d/s until * we have a full request, then try to write that request u/s. If writing fails, * we reconnect to upstream and retry. Once we've successfully written, we * attempt to read the reply. If that fails or times out (we give it 30 seconds) * then we disconnect from u/s and go back to trying to reconnect and resend. * * This is the second-simplest NBD proxy I can think of. The first version was * non-blocking I/O, but it was getting impossible to manage exceptional stuff */ void proxy_session( struct proxier* proxy ) { /* First action: Write hello to downstream */ nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp.buf, proxy->upstream_size ); proxy->rsp.size = sizeof( struct nbd_init_raw ); proxy->rsp.needle = 0; int state = WRITE_TO_DOWNSTREAM; info( "Beginning proxy session on fd %i", proxy->downstream_fd ); while( state != EXIT ) { struct timeval select_timeout = { .tv_sec = 0, .tv_usec = 0 }; struct timeval *select_timeout_ptr = NULL; int result; /* used by select() */ fd_set rfds; fd_set wfds; FD_ZERO( &rfds ); FD_ZERO( &wfds ); debug( "Proxy is in state %s ( %i )", proxy_session_state_names[state], state ); switch( state ) { case READ_FROM_DOWNSTREAM: FD_SET( proxy->downstream_fd, &rfds ); break; case WRITE_TO_DOWNSTREAM: FD_SET( proxy->downstream_fd, &wfds ); break; case WRITE_TO_UPSTREAM: select_timeout.tv_sec = 15; FD_SET(proxy->upstream_fd, &wfds ); break; case CONNECT_TO_UPSTREAM: proxy_disconnect_from_upstream( proxy ); /* Changes proxy->upstream_fd */ proxy_start_connect_to_upstream( proxy ); if ( proxy->upstream_fd == -1 ) { warn( SHOW_ERRNO( "Error acquiring socket to upstream" ) ); continue; } /* non-blocking connect() */ select_timeout.tv_sec = 15; FD_SET( proxy->upstream_fd, &wfds ); break; case READ_INIT_FROM_UPSTREAM: case READ_FROM_UPSTREAM: select_timeout.tv_sec = 15; FD_SET( proxy->upstream_fd, &rfds ); break; }; if ( select_timeout.tv_sec > 0 ) { select_timeout_ptr = &select_timeout; } result = sock_try_select( FD_SETSIZE, &rfds, &wfds, NULL, select_timeout_ptr ); if ( result == -1 ) { warn( SHOW_ERRNO( "select() failed: " ) ); break; } switch( state ) { case READ_FROM_DOWNSTREAM: if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) { state = proxy_read_from_downstream( proxy, state ); #ifdef PREFETCH /* Check if we can fulfil the request from prefetch, or * rewrite the request to fill the prefetch buffer if needed */ if ( state == WRITE_TO_UPSTREAM ) { state = proxy_prefetch_for_request( proxy, state ); } #endif } break; case CONNECT_TO_UPSTREAM: if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) { state = proxy_continue_connecting_to_upstream( proxy, state ); } /* Leaving state untouched will retry connecting to upstream */ break; case READ_INIT_FROM_UPSTREAM: state = proxy_read_init_from_upstream( proxy, state ); case WRITE_TO_UPSTREAM: if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) { state = proxy_write_to_upstream( proxy, state ); } break; case READ_FROM_UPSTREAM: if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) { state = proxy_read_from_upstream( proxy, state ); } # ifdef PREFETCH /* Fill the prefetch buffer and rewrite the reply, if needed */ if ( state == WRITE_TO_DOWNSTREAM ) { state = proxy_prefetch_for_reply( proxy, state ); } #endif break; case WRITE_TO_DOWNSTREAM: if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) { state = proxy_write_to_downstream( proxy, state ); } break; } } info( "Finished proxy session on fd %i after %"PRIu64" successful request(s)", proxy->downstream_fd, proxy->req_count ); /* Reset these two for the next session */ proxy->req_count = 0; proxy->hello_sent = 0; return; } /** Accept an NBD socket connection, dispatch appropriately */ int proxy_accept( struct proxier* params ) { NULLCHECK( params ); int client_fd; fd_set fds; union mysockaddr client_address; socklen_t socklen = sizeof( client_address ); info( "Waiting for client connection" ); FD_ZERO(&fds); FD_SET(params->listen_fd, &fds); FATAL_IF_NEGATIVE( sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL), SHOW_ERRNO( "select() failed" ) ); if ( FD_ISSET( params->listen_fd, &fds ) ) { client_fd = accept( params->listen_fd, &client_address.generic, &socklen ); if ( client_address.family != AF_UNIX ) { if ( sock_set_tcp_nodelay(client_fd, 1) == -1 ) { warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) ); } } info( "Accepted nbd client socket fd %d", client_fd ); sock_set_nonblock( client_fd, 1 ); params->downstream_fd = client_fd; proxy_session( params ); WARN_IF_NEGATIVE( sock_try_close( params->downstream_fd ), "Couldn't close() downstram fd %i after proxy session", params->downstream_fd ); params->downstream_fd = -1; } return 1; /* We actually expect to be interrupted by signal handlers */ } void proxy_accept_loop( struct proxier* params ) { NULLCHECK( params ); while( proxy_accept( params ) ); } /** Closes sockets */ void proxy_cleanup( struct proxier* proxy ) { NULLCHECK( proxy ); info( "Cleaning up" ); if ( -1 != proxy->listen_fd ) { if ( AF_UNIX == proxy->listen_on.family ) { if ( -1 == unlink( proxy->listen_on.un.sun_path ) ) { warn( SHOW_ERRNO( "Failed to unlink %s", proxy->listen_on.un.sun_path ) ); } } WARN_IF_NEGATIVE( sock_try_close( proxy->listen_fd ), SHOW_ERRNO( "Failed to close() listen fd %i", proxy->listen_fd ) ); proxy->listen_fd = -1; } if ( -1 != proxy->downstream_fd ) { WARN_IF_NEGATIVE( sock_try_close( proxy->downstream_fd ), SHOW_ERRNO( "Failed to close() downstream fd %i", proxy->downstream_fd ) ); proxy->downstream_fd = -1; } if ( -1 != proxy->upstream_fd ) { WARN_IF_NEGATIVE( sock_try_close( proxy->upstream_fd ), SHOW_ERRNO( "Failed to close() upstream fd %i", proxy->upstream_fd ) ); proxy->upstream_fd = -1; } info( "Cleanup done" ); } /** Full lifecycle of the proxier */ int do_proxy( struct proxier* params ) { NULLCHECK( params ); info( "Ensuring upstream server is open" ); if ( !proxy_connect_to_upstream( params ) ) { warn( "Couldn't connect to upstream server during initialization, exiting" ); proxy_cleanup( params ); return 1; }; proxy_open_listen_socket( params ); proxy_accept_loop( params ); proxy_cleanup( params ); return 0; }