#include "proxy.h" #include "readwrite.h" #include "ioutil.h" #include "sockutil.h" #include "util.h" #include #include #include struct proxier* proxy_create( char* s_downstream_address, char* s_downstream_port, char* s_upstream_address, char* s_upstream_port, char* s_upstream_bind ) { struct proxier* out; out = xmalloc( sizeof( struct proxier ) ); FATAL_IF_NULL(s_downstream_address, "Listen address not specified"); NULLCHECK( s_downstream_address ); FATAL_UNLESS( parse_to_sockaddr( &out->listen_on.generic, s_downstream_address ), "Couldn't parse downstream address %s" ); if ( out->listen_on.family != AF_UNIX ) { FATAL_IF_NULL( s_downstream_port, "Downstream port not specified" ); NULLCHECK( s_downstream_port ); parse_port( s_downstream_port, &out->listen_on.v4 ); } FATAL_IF_NULL(s_upstream_address, "Upstream address not specified"); NULLCHECK( s_upstream_address ); FATAL_UNLESS( parse_ip_to_sockaddr( &out->connect_to.generic, s_upstream_address ), "Couldn't parse upstream address '%s'", s_upstream_address ); FATAL_IF_NULL( s_upstream_port, "Upstream port not specified" ); NULLCHECK( s_upstream_port ); parse_port( s_upstream_port, &out->connect_to.v4 ); if ( s_upstream_bind ) { FATAL_IF_ZERO( parse_ip_to_sockaddr( &out->connect_from.generic, s_upstream_bind ), "Couldn't parse bind address '%s'", s_upstream_bind ); out->bind = 1; } out->listen_fd = -1; out->downstream_fd = -1; out->upstream_fd = -1; out->req_buf = xmalloc( NBD_MAX_SIZE ); out->rsp_buf = xmalloc( NBD_MAX_SIZE ); return out; } void proxy_destroy( struct proxier* proxy ) { free( proxy->req_buf ); free( proxy->rsp_buf ); free( proxy ); } /* Try to establish a connection to our upstream server. Return 1 on success, * 0 on failure */ int proxy_connect_to_upstream( struct proxier* proxy ) { struct sockaddr* connect_from = NULL; if ( proxy->bind ) { connect_from = &proxy->connect_from.generic; } int fd = socket_connect( &proxy->connect_to.generic, connect_from ); off64_t size = 0; if ( -1 == fd ) { return 0; } if( !socket_nbd_read_hello( fd, &size ) ) { WARN_IF_NEGATIVE( sock_try_close( fd ), "Couldn't close() after failed read of NBD hello on fd %i", fd ); return 0; } if ( proxy->upstream_size == 0 ) { info( "Size of upstream image is %"PRIu64" bytes", size ); } else if ( proxy->upstream_size != size ) { warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size ); } proxy->upstream_size = size; proxy->upstream_fd = fd; return 1; } void proxy_disconnect_from_upstream( struct proxier* proxy ) { if ( -1 != proxy->upstream_fd ) { debug(" Closing upstream connection" ); /* TODO: An NBD disconnect would be pleasant here */ WARN_IF_NEGATIVE( sock_try_close( proxy->upstream_fd ), "Failed to close() fd %i when disconnecting from upstream", proxy->upstream_fd ); proxy->upstream_fd = -1; } } /** Prepares a listening socket for the NBD server, binding etc. */ void proxy_open_listen_socket(struct proxier* params) { NULLCHECK( params ); params->listen_fd = socket(params->listen_on.family, SOCK_STREAM, 0); FATAL_IF_NEGATIVE( params->listen_fd, SHOW_ERRNO( "Couldn't create listen socket" ) ); /* Allow us to restart quickly */ FATAL_IF_NEGATIVE( sock_set_reuseaddr(params->listen_fd, 1), SHOW_ERRNO( "Couldn't set SO_REUSEADDR" ) ); if( AF_UNIX != params->listen_on.family ) { FATAL_IF_NEGATIVE( sock_set_tcp_nodelay(params->listen_fd, 1), SHOW_ERRNO( "Couldn't set TCP_NODELAY" ) ); } FATAL_UNLESS_ZERO( sock_try_bind( params->listen_fd, ¶ms->listen_on.generic ), SHOW_ERRNO( "Failed to bind to listening socket" ) ); /* We're only serving one client at a time, hence backlog of 1 */ FATAL_IF_NEGATIVE( listen(params->listen_fd, 1), SHOW_ERRNO( "Failed to listen on listening socket" ) ); info( "Now listening for incoming connections" ); } /* Try to get a request from downstream. If reading from downstream fails, then * the session will be over. Returns 1 on success, 0 on failure. */ int proxy_get_request_from_downstream( struct proxier* proxy ) { unsigned char* req_hdr_raw = proxy->req_buf; unsigned char* req_data = proxy->req_buf + NBD_REQUEST_SIZE; size_t req_buf_size; struct nbd_request_raw* request_raw = (struct nbd_request_raw*) req_hdr_raw; struct nbd_request* request = &(proxy->req_hdr); if ( readloop( proxy->downstream_fd, req_hdr_raw, NBD_REQUEST_SIZE ) == -1 ) { info( SHOW_ERRNO( "Failed to get request header" ) ); return 0; } nbd_r2h_request( request_raw, request ); req_buf_size = NBD_REQUEST_SIZE; if ( request->type == REQUEST_DISCONNECT ) { info( "Received disconnect request from client" ); return 0; } if ( request->type == REQUEST_READ ) { if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) { warn( "NBD read request size %"PRIu32" too large", request->len ); return 0; } } if ( request->type == REQUEST_WRITE ) { if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) { warn( "NBD write request size %"PRIu32" too large", request->len ); return 0; } if ( readloop( proxy->downstream_fd, req_data, request->len ) == -1 ) { warn( "Failed to get NBD write request data: %"PRIu32"b", request->len ); return 0; } req_buf_size += request->len; } debug( "Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, request->type, request->from, request->len ); proxy->req_buf_size = req_buf_size; return 1; } /* Tries to send the request upstream and receive a response. If upstream breaks * then we reconnect to it, and keep it up until we have a complete response * back. Returns 1 on success, 0 on failure */ int proxy_run_request_upstream( struct proxier* proxy ) { unsigned char* rsp_hdr_raw = proxy->rsp_buf; unsigned char* rsp_data = proxy->rsp_buf + NBD_REPLY_SIZE; struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw; struct nbd_request* request = &(proxy->req_hdr); struct nbd_reply* reply = &(proxy->rsp_hdr); size_t rsp_buf_size; if ( writeloop( proxy->upstream_fd, proxy->req_buf, proxy->req_buf_size ) == -1 ) { warn( "Failed to send request to upstream" ); proxy_disconnect_from_upstream( proxy ); return 0; } if ( readloop( proxy->upstream_fd, rsp_hdr_raw, NBD_REPLY_SIZE ) == -1 ) { debug( "Failed to get reply header from upstream" ); proxy_disconnect_from_upstream( proxy ); return 0; } nbd_r2h_reply( reply_raw, reply ); rsp_buf_size = NBD_REPLY_SIZE; if ( reply->magic != REPLY_MAGIC ) { debug( "Reply magic is incorrect" ); proxy_disconnect_from_upstream( proxy ); return 0; } debug( "NBD reply received from upstream. Response code: %"PRIu32, reply->error ); if ( reply->error != 0 ) { warn( "NBD error returned from upstream: %"PRIu32, reply->error ); } if ( reply->error == 0 && request->type == REQUEST_READ ) { if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) { debug( "Failed to get reply data from upstream" ); proxy_disconnect_from_upstream( proxy ); return 0; } rsp_buf_size += request->len; } proxy->rsp_buf_size = rsp_buf_size; return 1; } /* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */ int proxy_send_reply_downstream( struct proxier* proxy ) { int result; unsigned char* rsp_buf = proxy->rsp_buf; debug( "Writing header (%"PRIu32") + data (%"PRIu32") bytes downstream", NBD_REPLY_SIZE, proxy->rsp_buf_size - NBD_REPLY_SIZE ); result = writeloop( proxy->downstream_fd, rsp_buf, proxy->rsp_buf_size ); if ( result == -1 ) { debug( "Failed to send reply downstream" ); return 0; } debug( "Reply sent" ); return 1; } /* Here, we negotiate an NBD session with downstream, based on the information * we got on first connection to upstream. Then we wait for a request to come * in from downstream, read it into memory, then send it to upstream. If * upstream dies before responding, we reconnect to upstream and resend it. * Once we've got a response, we write it directly to downstream, and wait for a * new request. When downstream disconnects, or we receive an exit signal (which * can be blocked, unfortunately), we are finished. * * This is the simplest possible nbd proxy I can think of. It may not be at all * performant - let's see. */ void proxy_session( struct proxier* proxy ) { int downstream_fd = proxy->downstream_fd; uint64_t req_count = 0; int result; info( "Beginning proxy session on fd %i", downstream_fd ); if ( !socket_nbd_write_hello( downstream_fd, proxy->upstream_size ) ) { debug( "Sending hello failed on fd %i, ending session", downstream_fd ); return; } while( proxy_get_request_from_downstream( proxy ) ) { do { if ( proxy->upstream_fd == -1 ) { debug( "Connecting to upstream" ); if ( !proxy_connect_to_upstream( proxy ) ) { debug( "Failed to connect to upstream" ); result = 0; sleep( 5 ); continue; } debug( "Connected to upstream" ); } result = proxy_run_request_upstream( proxy ); } while ( result == 0 ); if ( !proxy_send_reply_downstream( proxy ) ) { debug( "Replying on fd %i failed, ending session", downstream_fd ); break; } proxy->req_buf_size = 0; proxy->rsp_buf_size = 0; req_count++; }; info( "Finished proxy session on fd %i after %"PRIu64" successful request(s)", downstream_fd, req_count ); return; } /** Accept an NBD socket connection, dispatch appropriately */ int proxy_accept( struct proxier* params ) { NULLCHECK( params ); int client_fd; fd_set fds; union mysockaddr client_address; socklen_t socklen = sizeof( client_address ); debug("accept loop starting"); FD_ZERO(&fds); FD_SET(params->listen_fd, &fds); FATAL_IF_NEGATIVE( sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL), SHOW_ERRNO( "select() failed" ) ); if ( FD_ISSET( params->listen_fd, &fds ) ) { client_fd = accept( params->listen_fd, &client_address.generic, &socklen ); if ( client_address.family != AF_UNIX ) { if ( sock_set_tcp_nodelay(client_fd, 1) == -1 ) { warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) ); } } info( "Accepted nbd client socket fd %d", client_fd ); params->downstream_fd = client_fd; proxy_session( params ); WARN_IF_NEGATIVE( sock_try_close( params->downstream_fd ), "Couldn't close() downstram fd %i after proxy session", params->downstream_fd ); params->downstream_fd = -1; } return 1; /* We actually expect to be interrupted by signal handlers */ } void proxy_accept_loop( struct proxier* params ) { NULLCHECK( params ); while( proxy_accept( params ) ); } /** Closes sockets */ void proxy_cleanup( struct proxier* proxy ) { NULLCHECK( proxy ); info( "Cleaning up" ); if ( -1 != proxy->listen_fd ) { WARN_IF_NEGATIVE( sock_try_close( proxy->listen_fd ), SHOW_ERRNO( "Failed to close() listen fd %i", proxy->listen_fd ) ); proxy->listen_fd = -1; } if ( -1 != proxy->downstream_fd ) { WARN_IF_NEGATIVE( sock_try_close( proxy->downstream_fd ), SHOW_ERRNO( "Failed to close() downstream fd %i", proxy->downstream_fd ) ); proxy->downstream_fd = -1; } if ( -1 != proxy->upstream_fd ) { WARN_IF_NEGATIVE( sock_try_close( proxy->upstream_fd ), SHOW_ERRNO( "Failed to close() upstream fd %i", proxy->upstream_fd ) ); proxy->upstream_fd = -1; } if ( AF_UNIX == proxy->listen_on.family ) { if ( -1 == unlink( proxy->listen_on.un.sun_path ) ) { warn( SHOW_ERRNO( "Failed to unlink %s", proxy->listen_on.un.sun_path ) ); } } debug( "Cleanup done" ); } /** Full lifecycle of the proxier */ int do_proxy( struct proxier* params ) { NULLCHECK( params ); debug( "Ensuring upstream server is open" ); if ( !proxy_connect_to_upstream( params ) ) { info( "Couldn't connect to upstream server during initialization" ); proxy_cleanup( params ); return 1; }; proxy_open_listen_socket( params ); proxy_accept_loop( params ); proxy_cleanup( params ); return 0; }