From 98d8fbeaf095a9e4189544cb62eecdde42493bab Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 15 Feb 2013 16:52:16 +0000 Subject: [PATCH] flexnbd: Add a proxy mode This lets us proxy connections between NBD clients and servers, resiliently. --- README.txt | 114 ++++++- Rakefile | 8 +- src/flexnbd.c | 32 +- src/flexnbd.h | 18 +- src/mode.c | 120 ++++++- src/mode.h | 6 +- src/proxy.c | 497 ++++++++++++++++++++++++++++ src/proxy.h | 71 ++++ tests/acceptance/custom.supp | 13 + tests/acceptance/environment.rb | 7 + tests/acceptance/flexnbd.rb | 40 ++- tests/acceptance/test_proxy_mode.rb | 199 +++++++++++ 12 files changed, 1101 insertions(+), 24 deletions(-) create mode 100644 src/proxy.c create mode 100644 src/proxy.h create mode 100644 tests/acceptance/custom.supp create mode 100644 tests/acceptance/test_proxy_mode.rb diff --git a/README.txt b/README.txt index 49bfd58..3c92a9d 100644 --- a/README.txt +++ b/README.txt @@ -24,7 +24,7 @@ COMMANDS serve ~~~~~ - $ flexnbd serve --addr --port --file + $ flexnbd serve --addr --port --file [--sock ] [--default-deny] [global option]* [acl entry]* Serve a file. If any ACL entries are given (which should be IP @@ -54,11 +54,11 @@ Options How to interpret an empty ACL. If --default-deny is given, an empty ACL will let no clients connect. If it is not given, an empty ACL will let any client connect. - + listen ~~~~~~ - $ flexnbd listen --addr --port --file + $ flexnbd listen --addr --port --file [--sock ] [--default-deny] [global option]* [acl entry]* Listen for an inbound migration, and quit with a status of 0 on @@ -85,10 +85,59 @@ Options ^^^^^^^ As for 'serve'. +proxy +~~~~~ + + $ flexnbd proxy --addr --port + --conn-addr --conn-port [--bind ] [global option]* + +Proxy requests from an NBD client to an NBD server, resiliently. Only one +client can be connected (to the address specified by --addr and --port) at a +time, and ACLs cannot be applied to the client, as they can be to clients +connecting directly to a flexnbd in serve mode. + +On starting up, the proxy will attempt to connect to the server specified by +--conn-addr and --conn-port (from the address specified by --bind, if given). If +it fails, then the process will die with an error exit status. + +Assuming a successful connection to the `upstream` server is made, the proxy +will then start listening on the address specified by --addr and --port, waiting +for `downstream` to connect to it (this will be your NBD client). The client +will be given the same hello message as the proxy was given by the server. + +When connected, any request the client makes will be read by the proxy and sent +to the server. If the server goes away for any reason, the proxy will remember +the request and regularly (~ every 5 seconds) try to reconnect to the server. +Upon reconnection, the request is sent and a reply is waited for. When a reply +is received, it is sent back to the client. + +When the client disconnects, cleanly or otherwise, the proxy goes back to +waiting for a new client to connect. The connection to the server is maintained +at that point, in case it is needed again. + +Only one request may be in-flight at a time under the current architecture; that +doesn't seem to slow things down much relative to alternative options, but may +be changed in the future if it becomes an issue. + +Options +^^^^^^^ + +*--addr, -l ADDR*: + The address to listen on. Required. + +*--port, -p PORT*: + The port to listen on. Required. + +*--conn-addr, -C ADDR*: + The address of the NBD server to connect to. Required. + +*--conn-port, -P PORT*: + The port of the NBD server to connect to. Required. + mirror ~~~~~~ - $ flexnbd mirror --addr --port --sock SOCK + $ flexnbd mirror --addr --port --sock SOCK [--unlink] [--bind ] [global option]* Start a migration from the server with control socket SOCK to the server @@ -128,8 +177,8 @@ Options *--sock, -s SOCK*: The control socket of the local server to migrate from. Required. -*--unlink, -u*: - Unlink the served file from the local filesystem after successfully +*--unlink, -u*: + Unlink the served file from the local filesystem after successfully mirroring. *--bind, -b BIND-ADDR*: @@ -190,7 +239,7 @@ value. Currently reported values are: read ~~~~ - $ flexnbd read --addr --port --from + $ flexnbd read --addr --port --from --size [--bind BIND-ADDR] [global option]* Connect to the server at ADDR:PORT, and read SIZE bytes starting at @@ -220,7 +269,7 @@ Options write ~~~~~ - $ cat ... | flexnbd write --addr --port --from + $ cat ... | flexnbd write --addr --port --from --size [--bind BIND-ADDR] [global option]* Connect to the server at ADDR:PORT, and write SIZE bytes from STDIN @@ -299,9 +348,9 @@ the log line. *SOURCEFILE:SOURCELINE*: Identifies where in the source code this log line can be found. -*MSG*: +*MSG*: A short message describing what's happening, how it's being done, or -if you're very lucky *why* it's going on. +if you're very lucky *why* it's going on. EXAMPLES -------- @@ -346,11 +395,11 @@ To migrate, we need to provide a destination file of the right size. Now we check the status of each server, to check that they are both in the right state: - + $ flexnbd status --sock /tmp/flex-source.sock pid=9648 is_mirroring=false has_control=true $ flexnbd status --sock /tmp/flex-dest.sock - pid=9651 is_mirroring=false has_control=false + pid=9651 is_mirroring=false has_control=false $ With this knowledge in hand, we can start the migration: @@ -366,6 +415,40 @@ Note that because the file is so small in this case, we see the source server quit soon after we start the migration, and the destination exited at roughly the same time. +Proxying +~~~~~~~~ + +The main point of the proxy mode is to allow clients that would otherwise break +when the NBD server goes away (during a migration, for instance) to see a +persistent TCP connection throughout the process, instead of needing its own +reconnection logic. + +For maximum reliability, the proxy process would be run on the same machine as +the actual NBD client; an example might look like: + + nbd-server-1$ flexnbd serve -l 10.0.0.1 -p 4777 myfile [...] + + nbd-client-1$ flexnbd proxy -l 127.0.0.1 -p 4777 -C 10.0.0.1 -P 4777 + nbd-client-1$ nbd-client -c 127.0.0.1 4777 /dev/nbd0 + + nbd-server-2$ flexnbd listen -l 10.0.0.2 -p 4777 -f myfile [...] + + nbd-server-1$ flexnbd mirror --addr 10.0.0.2 -p 4777 [...] + +Upon completing the migration, the mirroring and listening flexnbd servers will +both exit. With the proxy mediating requests, this does not break the TCP +connection that nbd-client is holding open. If no requests are in-flight, it +will not notice anything at all; if requests are in-flight, then the reply will +take longer than usual to be returned. + +When flexnbd is restarted in serve mode on the second server: + + nbd-server-2$ flexnbd serve -l 10.0.0.1 -p 4777 -f myfile [...] + +The proxy notices and reconnects, fulfiling any request it has in its buffer. +The data in myfile has been moved between physical servers without the nbd +client process having to be disturbed at all. + BUGS ---- @@ -374,9 +457,9 @@ Should be reported to alex@bytemark.co.uk. AUTHOR ------ -Written by Alex Young . -Original concept and core code by Matthew Bloch -. +Written by Alex Young . +Original concept and core code by Matthew Bloch . +Some additions by Nick Thomas COPYING ------- @@ -384,3 +467,4 @@ COPYING Copyright (c) 2012 Bytemark Hosting Ltd. Free use of this software is granted under the terms of the GNU General Public License version 3 or later. + diff --git a/Rakefile b/Rakefile index 93ac10d..572317a 100644 --- a/Rakefile +++ b/Rakefile @@ -125,6 +125,7 @@ file check("client") => build/parse.o build/client.o build/serve.o + build/proxy.o build/acl.o build/ioutil.o build/mbox.o @@ -160,6 +161,7 @@ file check("serve") => build/client.o build/flexthread.o build/serve.o + build/proxy.o build/flexnbd.o build/mirror.o build/status.o @@ -177,6 +179,7 @@ file check("readwrite") => build/client.o build/self_pipe.o build/serve.o + build/proxy.o build/parse.o build/acl.o build/flexthread.o @@ -210,7 +213,8 @@ file check("flexnbd") => build/nbdtypes.o build/readwrite.o build/mirror.o - build/serve.o} do |t| + build/serve.o + build/proxy.o} do |t| gcc_link t.name, t.prerequisites + [LIBCHECK] end @@ -225,7 +229,7 @@ end tgt = "build/tests/check_#{m}.o" maybe_obj_name = "build/#{m}.o" # Take it out in case we're testing util.o or ioutil.o - deps = ["build/ioutil.o", "build/util.o"] - [maybe_obj_name] + deps = ["build/ioutil.o", "build/util.o", "build/sockutil.o"] - [maybe_obj_name] # Add it back in if it's something we need to compile deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name ) diff --git a/src/flexnbd.c b/src/flexnbd.c index 7194e94..ec0e75e 100644 --- a/src/flexnbd.c +++ b/src/flexnbd.c @@ -129,6 +129,26 @@ struct flexnbd * flexnbd_create_listening( } +struct flexnbd * flexnbd_create_proxying( + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind +) +{ + struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) ); + flexnbd->proxy = proxy_create( + flexnbd, + s_downstream_address, + s_downstream_port, + s_upstream_address, + s_upstream_port, + s_upstream_bind); + flexnbd_create_shared( flexnbd, NULL ); + return flexnbd; +} + void flexnbd_spawn_control(struct flexnbd * flexnbd ) { NULLCHECK( flexnbd ); @@ -181,7 +201,6 @@ struct server * flexnbd_server( struct flexnbd * flexnbd ) return flexnbd->serve; } - void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl ) { NULLCHECK( flexnbd ); @@ -255,3 +274,14 @@ int flexnbd_serve( struct flexnbd * flexnbd ) return success; } +int flexnbd_proxy( struct flexnbd * flexnbd ) +{ + NULLCHECK( flexnbd ); + int success; + + success = do_proxy( flexnbd->proxy ); + debug("do_proxy success is %d", success ); + + return success; +} + diff --git a/src/flexnbd.h b/src/flexnbd.h index f378322..7fc9f22 100644 --- a/src/flexnbd.h +++ b/src/flexnbd.h @@ -4,6 +4,7 @@ #include "acl.h" #include "mirror.h" #include "serve.h" +#include "proxy.h" #include "self_pipe.h" #include "mbox.h" #include "control.h" @@ -11,11 +12,14 @@ /* Carries the "globals". */ struct flexnbd { - /* We always have a serve pointer, but it should never be - * dereferenced outside a flexnbd_switch_lock/unlock pair. + /* Our serve pointer should never be dereferenced outside a + * flexnbd_switch_lock/unlock pair. */ struct server * serve; + /* In proxy mode, this is filled instead of serve, above */ + struct proxier * proxy; + /* We only have a control object if a control socket name was * passed on the command line. */ @@ -46,6 +50,14 @@ struct flexnbd * flexnbd_create_listening( int acl_entries, char** s_acl_entries ); +struct flexnbd * flexnbd_create_proxying( + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind +); + void flexnbd_destroy( struct flexnbd * ); enum mirror_state; enum mirror_state flexnbd_get_mirror_state( struct flexnbd * ); @@ -55,7 +67,9 @@ int flexnbd_signal_fd( struct flexnbd * flexnbd ); int flexnbd_serve( struct flexnbd * flexnbd ); +int flexnbd_proxy( struct flexnbd * flexnbd ); struct server * flexnbd_server( struct flexnbd * flexnbd ); void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl ); struct status * flexnbd_status_create( struct flexnbd * flexnbd ); #endif + diff --git a/src/mode.c b/src/mode.c index 5df59c9..f1af346 100644 --- a/src/mode.c +++ b/src/mode.c @@ -56,6 +56,29 @@ static char listen_help_text[] = VERBOSE_LINE QUIET_LINE; +static struct option proxy_options[] = { + GETOPT_HELP, + GETOPT_ADDR, + GETOPT_PORT, + GETOPT_CONNECT_ADDR, + GETOPT_CONNECT_PORT, + GETOPT_BIND, + GETOPT_QUIET, + GETOPT_VERBOSE, + {0} +}; +static char proxy_short_options[] = "hl:p:C:P:b:" SOPT_QUIET SOPT_VERBOSE; +static char proxy_help_text[] = + "Usage: flexnbd " CMD_PROXY " \n\n" + "Resiliently proxy an NBD connection between client and server\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address we will bind to as a proxy.\n" + "\t--" OPT_PORT ",-p \tThe port we will bind to as a proxy.\n" + "\t--" OPT_CONNECT_ADDR ",-C \tAddress of the proxied server.\n" + "\t--" OPT_CONNECT_PORT ",-P \tPort of the proxied server.\n" + "\t--" OPT_BIND ",-b \tThe address we connect from, as a proxy.\n" + QUIET_LINE + VERBOSE_LINE; static struct option read_options[] = { GETOPT_HELP, @@ -173,10 +196,13 @@ char help_help_text_arr[] = "Usage: flexnbd [cmd options]\n\n" "Commands:\n" "\tflexnbd serve\n" + "\tflexnbd listen\n" + "\tflexnbd proxy\n" "\tflexnbd read\n" "\tflexnbd write\n" "\tflexnbd acl\n" "\tflexnbd mirror\n" + "\tflexnbd break\n" "\tflexnbd status\n" "\tflexnbd help\n\n" "See flexnbd help for further info\n"; @@ -390,6 +416,46 @@ void read_break_param( int c, char **sock ) } +void read_proxy_param( + int c, + char **downstream_addr, + char **downstream_port, + char **upstream_addr, + char **upstream_port, + char **bind_addr ) +{ + switch( c ) { + case 'h' : + fprintf( stdout, "%s\n", proxy_help_text ); + exit( 0 ); + break; + case 'l': + *downstream_addr = optarg; + break; + case 'p': + *downstream_port = optarg; + break; + case 'C': + *upstream_addr = optarg; + break; + case 'P': + *upstream_port = optarg; + break; + case 'b': + *bind_addr = optarg; + break; + case 'q': + log_level = QUIET_LOG_LEVEL; + break; + case 'v': + log_level = VERBOSE_LOG_LEVEL; + break; + default: + exit_err( proxy_help_text ); + break; + } +} + void read_status_param( int c, char **sock ) { read_sock_param( c, sock, status_help_text ); @@ -733,6 +799,55 @@ int mode_status( int argc, char *argv[] ) return 0; } +int mode_proxy( int argc, char *argv[] ) +{ + int c; + struct flexnbd * flexnbd; + char *downstream_addr = NULL; + char *downstream_port = NULL; + char *upstream_addr = NULL; + char *upstream_port = NULL; + char *bind_addr = NULL; + int success; + + while (1) { + c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL ); + if ( -1 == c ) { break; } + read_proxy_param( c, + &downstream_addr, + &downstream_port, + &upstream_addr, + &upstream_port, + &bind_addr + ); + } + + if ( NULL == downstream_addr || NULL == downstream_port ){ + fprintf( stderr, "both --addr and --port are required.\n" ); + exit_err( proxy_help_text ); + } else if ( NULL == upstream_addr || NULL == upstream_port ){ + fprintf( stderr, "both --conn-addr and --conn-port are required.\n" ); + exit_err( proxy_help_text ); + } + + flexnbd = flexnbd_create_proxying( + downstream_addr, + downstream_port, + upstream_addr, + upstream_port, + bind_addr + ); + + info( + "Proxying between %s %s (downstream) and %s %s (upstream)", + downstream_addr, downstream_port, upstream_addr, upstream_port + ); + + success = flexnbd_proxy( flexnbd ); + flexnbd_destroy( flexnbd ); + + return success ? 0 : 1; +} int mode_help( int argc, char *argv[] ) { @@ -757,6 +872,8 @@ int mode_help( int argc, char *argv[] ) help_text = mirror_help_text; } else if ( IS_CMD( CMD_STATUS, cmd ) ) { help_text = status_help_text; + } else if ( IS_CMD( CMD_PROXY, cmd ) ) { + help_text = proxy_help_text; } else { exit_err( help_help_text ); } } @@ -790,6 +907,8 @@ void mode(char* mode, int argc, char **argv) } else if ( IS_CMD( CMD_STATUS, mode ) ) { mode_status( argc, argv ); + } else if ( IS_CMD( CMD_PROXY, mode ) ) { + mode_proxy( argc, argv ); } else if ( IS_CMD( CMD_HELP, mode ) ) { mode_help( argc-1, argv+1 ); @@ -801,4 +920,3 @@ void mode(char* mode, int argc, char **argv) exit(0); } - diff --git a/src/mode.h b/src/mode.h index 2c9d1fc..ed67f06 100644 --- a/src/mode.h +++ b/src/mode.h @@ -20,9 +20,12 @@ void mode(char* mode, int argc, char **argv); #define OPT_SIZE "size" #define OPT_DENY "default-deny" #define OPT_UNLINK "unlink" +#define OPT_CONNECT_ADDR "conn-addr" +#define OPT_CONNECT_PORT "conn-port" #define CMD_SERVE "serve" #define CMD_LISTEN "listen" +#define CMD_PROXY "proxy" #define CMD_READ "read" #define CMD_WRITE "write" #define CMD_ACL "acl" @@ -40,7 +43,6 @@ void mode(char* mode, int argc, char **argv); #define GETOPT_HELP GETOPT_FLAG( OPT_HELP, 'h' ) #define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' ) - #define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' ) #define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' ) #define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' ) @@ -49,6 +51,8 @@ void mode(char* mode, int argc, char **argv); #define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' ) #define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' ) #define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' ) +#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' ) +#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' ) #define OPT_VERBOSE "verbose" #define SOPT_VERBOSE "v" diff --git a/src/proxy.c b/src/proxy.c new file mode 100644 index 0000000..9cf80bb --- /dev/null +++ b/src/proxy.c @@ -0,0 +1,497 @@ +#include "proxy.h" +#include "readwrite.h" + +#include "ioutil.h" +#include "sockutil.h" +#include "util.h" + +#include + +#include +#include + +struct proxier* proxy_create( + struct flexnbd* flexnbd, + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind ) +{ + NULLCHECK( flexnbd ); + + struct proxier* out; + out = xmalloc( sizeof( struct proxier ) ); + out->flexnbd = flexnbd; + + FATAL_IF_NULL(s_downstream_address, "Listen address not specified"); + NULLCHECK( s_downstream_address ); + + FATAL_UNLESS( + parse_ip_to_sockaddr( &out->listen_on.generic, s_downstream_address ), + "Couldn't parse downstream address '%s' (use 0 if " + "you want to bind all IPs)", + s_downstream_address + ); + + FATAL_IF_NULL( s_downstream_port, "Downstream port not specified" ); + NULLCHECK( s_downstream_port ); + parse_port( s_downstream_port, &out->listen_on.v4 ); + + FATAL_IF_NULL(s_upstream_address, "Upstream address not specified"); + NULLCHECK( s_upstream_address ); + + FATAL_UNLESS( + parse_ip_to_sockaddr( &out->connect_to.generic, s_upstream_address ), + "Couldn't parse upstream address '%s'", + s_upstream_address + ); + + FATAL_IF_NULL( s_upstream_port, "Upstream port not specified" ); + NULLCHECK( s_upstream_port ); + parse_port( s_upstream_port, &out->connect_to.v4 ); + + if ( s_upstream_bind ) { + FATAL_IF_ZERO( + parse_ip_to_sockaddr( &out->connect_from.generic, s_upstream_bind ), + "Couldn't parse bind address '%s'", + s_upstream_bind + ); + } + + out->listen_fd = -1; + out->downstream_fd = -1; + out->upstream_fd = -1; + + out->req_buf = xmalloc( NBD_MAX_SIZE ); + out->rsp_buf = xmalloc( NBD_MAX_SIZE ); + + return out; +} + +void proxy_destroy( struct proxier* proxy ) +{ + free( proxy->req_buf ); + free( proxy->rsp_buf ); + free( proxy ); +} + + +/* Try to establish a connection to our upstream server. Return 1 on success, + * 0 on failure + */ +int proxy_connect_to_upstream( struct proxier* proxy ) +{ + int fd = socket_connect( &proxy->connect_to.generic, &proxy->connect_from.generic ); + off64_t size = 0; + + if ( -1 == fd ) { + return 0; + } + + if( !socket_nbd_read_hello( fd, &size ) ) { + FATAL_IF_NEGATIVE( + close( fd ), SHOW_ERRNO( "FIXME: shouldn't be fatal" ) + ); + return 0; + } + + if ( proxy->upstream_size == 0 ) { + info( "Size of upstream image is %"PRIu64" bytes", size ); + } else if ( proxy->upstream_size != size ) { + warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size ); + } + + proxy->upstream_size = size; + proxy->upstream_fd = fd; + + return 1; +} + +void proxy_disconnect_from_upstream( struct proxier* proxy ) +{ + if ( -1 != proxy->upstream_fd ) { + debug(" Closing upstream connection" ); + + /* TODO: An NBD disconnect would be pleasant here */ + + FATAL_IF_NEGATIVE( + close( proxy->upstream_fd ), + SHOW_ERRNO( "FIXME: shouldn't be fatal" ) + ); + proxy->upstream_fd = -1; + } +} + + +/** Prepares a listening socket for the NBD server, binding etc. */ +void proxy_open_listen_socket(struct proxier* params) +{ + NULLCHECK( params ); + + params->listen_fd = socket(params->listen_on.family, SOCK_STREAM, 0); + FATAL_IF_NEGATIVE( + params->listen_fd, SHOW_ERRNO( "Couldn't create listen socket" ) + ); + + /* Allow us to restart quickly */ + FATAL_IF_NEGATIVE( + sock_set_reuseaddr(params->listen_fd, 1), + SHOW_ERRNO( "Couldn't set SO_REUSEADDR" ) + ); + + FATAL_IF_NEGATIVE( + sock_set_tcp_nodelay(params->listen_fd, 1), + SHOW_ERRNO( "Couldn't set TCP_NODELAY" ) + ); + + FATAL_UNLESS_ZERO( + sock_try_bind( params->listen_fd, ¶ms->listen_on.generic ), + SHOW_ERRNO( "Failed to bind to listening socket" ) + ); + + /* We're only serving one client at a time, hence backlog of 1 */ + FATAL_IF_NEGATIVE( + listen(params->listen_fd, 1), + SHOW_ERRNO( "Failed to listen on listening socket" ) + ); + + info( "Now listening for incoming connections" ); +} + + +/* Return 0 if we should keep running, 1 if an exit has been signaled. Pass it + * an fd_set to check, or set check_fds to NULL to have it perform its own. + * If we do the latter, then wait specifies how many seconds we'll wait for an + * exit signal to show up. + */ +int proxy_should_exit( struct proxier* params, fd_set *check_fds, int wait ) +{ + struct timeval tv = { wait, 0 }; + fd_set internal_fds; + fd_set* fds = check_fds; + + int signal_fd = flexnbd_signal_fd( params->flexnbd ); + + if ( NULL == check_fds ) { + fds = &internal_fds; + + FD_ZERO( fds ); + FD_SET( signal_fd, fds ); + + FATAL_IF_NEGATIVE( + sock_try_select(FD_SETSIZE, fds, NULL, NULL, &tv), + SHOW_ERRNO( "select() failed." ) + ); + } + + if ( FD_ISSET( signal_fd, fds ) ) { + info( "Stop signal received" ); + return 1; + } + + return 0; +} + +/* Try to get a request from downstream. If reading from downstream fails, then + * the session will be over. Returns 1 on success, 0 on failure. + */ +int proxy_get_request_from_downstream( struct proxier* proxy ) +{ + unsigned char* req_hdr_raw = proxy->req_buf; + unsigned char* req_data = proxy->req_buf + NBD_REQUEST_SIZE; + size_t req_buf_size; + + struct nbd_request_raw* request_raw = (struct nbd_request_raw*) req_hdr_raw; + struct nbd_request* request = &(proxy->req_hdr); + + if ( readloop( proxy->downstream_fd, req_hdr_raw, NBD_REQUEST_SIZE ) == -1 ) { + info( SHOW_ERRNO( "Failed to get request header" ) ); + return 0; + } + + nbd_r2h_request( request_raw, request ); + req_buf_size = NBD_REQUEST_SIZE; + + if ( request->type == REQUEST_DISCONNECT ) { + info( "Received disconnect request from client" ); + return 0; + } + + if ( request->type == REQUEST_READ ) { + if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) { + warn( "NBD read request size %"PRIu32" too large", request->len ); + return 0; + } + + } + + if ( request->type == REQUEST_WRITE ) { + if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) { + warn( "NBD write request size %"PRIu32" too large", request->len ); + return 0; + } + + if ( readloop( proxy->downstream_fd, req_data, request->len ) == -1 ) { + warn( "Failed to get NBD write request data: %"PRIu32"b", request->len ); + return 0; + } + + req_buf_size += request->len; + } + + debug( + "Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, + request->type, request->from, request->len + ); + + proxy->req_buf_size = req_buf_size; + return 1; +} + +/* Tries to send the request upstream and receive a response. If upstream breaks + * then we reconnect to it, and keep it up until we have a complete response + * back. Returns 1 on success, 0 on failure, -1 if exit is signalled. + */ +int proxy_run_request_upstream( struct proxier* proxy ) +{ + unsigned char* rsp_hdr_raw = proxy->rsp_buf; + unsigned char* rsp_data = proxy->rsp_buf + NBD_REPLY_SIZE; + + struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw; + + struct nbd_request* request = &(proxy->req_hdr); + struct nbd_reply* reply = &(proxy->rsp_hdr); + + size_t rsp_buf_size; + + if ( proxy->upstream_fd == -1 ) { + debug( "Connecting to upstream" ); + if ( !proxy_connect_to_upstream( proxy ) ) { + debug( "Failed to connect to upstream" ); + + if ( proxy_should_exit( proxy, NULL, 5 ) ) { + return -1; + } + + return 0; + } + debug( "Connected to upstream" ); + } + + if ( writeloop( proxy->upstream_fd, proxy->req_buf, proxy->req_buf_size ) == -1 ) { + warn( "Failed to send request to upstream" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + + if ( readloop( proxy->upstream_fd, rsp_hdr_raw, NBD_REPLY_SIZE ) == -1 ) { + debug( "Failed to get reply header from upstream" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + + nbd_r2h_reply( reply_raw, reply ); + rsp_buf_size = NBD_REPLY_SIZE; + + if ( reply->magic != REPLY_MAGIC ) { + debug( "Reply magic is incorrect" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + + debug( "NBD reply received from upstream. Response code: %"PRIu32, reply->error ); + + if ( reply->error != 0 ) { + warn( "NBD error returned from upstream: %"PRIu32, reply->error ); + } + + if ( reply->error == 0 && request->type == REQUEST_READ ) { + if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) { + debug( "Failed to get reply data from upstream" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + rsp_buf_size += request->len; + } + + proxy->rsp_buf_size = rsp_buf_size; + return rsp_buf_size; +} + +/* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */ +int proxy_send_reply_downstream( struct proxier* proxy ) +{ + int result; + unsigned char* rsp_buf = proxy->rsp_buf; + + debug( + "Writing header (%"PRIu32") + data (%"PRIu32") bytes downstream", + NBD_REPLY_SIZE, proxy->rsp_buf_size - NBD_REPLY_SIZE + ); + + result = writeloop( proxy->downstream_fd, rsp_buf, proxy->rsp_buf_size ); + if ( result == -1 ) { + debug( "Failed to send reply downstream" ); + return 0; + } + + debug( "Reply sent" ); + return 1; +} + + +/* Here, we negotiate an NBD session with downstream, based on the information + * we got on first connection to upstream. Then we wait for a request to come + * in from downstream, read it into memory, then send it to upstream. If + * upstream dies before responding, we reconnect to upstream and resend it. + * Once we've got a response, we write it directly to downstream, and wait for a + * new request. When downstream disconnects, or we receive an exit signal (which + * can be blocked, unfortunately), we are finished. + * + * This is the simplest possible nbd proxy I can think of. It may not be at all + * performant - let's see. + */ + +void proxy_session( struct proxier* proxy ) +{ + int downstream_fd = proxy->downstream_fd; + uint64_t req_count = 0; + int result; + + info( "Beginning proxy session on fd %i", downstream_fd ); + + if ( !socket_nbd_write_hello( downstream_fd, proxy->upstream_size ) ) { + debug( "Sending hello failed on fd %i, ending session", downstream_fd ); + return; + } + + while( proxy_get_request_from_downstream( proxy ) ) { + + /* Don't start running the request if exit has been signalled */ + if ( proxy_should_exit( proxy, NULL, 0 ) ) { + break; + } + + do { + result = proxy_run_request_upstream( proxy ); + } while ( result == 0 ); + + /* We have to exit, but don't know if the request was successfully + * proxied or not. We could add that knowledge, and attempt to send a + * reply downstream if it was, but I don't think it's worth it. + */ + if ( result == -1 ) { + break; + } + + if ( !proxy_send_reply_downstream( proxy ) ) { + break; + } + + proxy->req_buf_size = 0; + proxy->rsp_buf_size = 0; + + req_count++; + }; + + info( + "Finished proxy session on fd %i after %"PRIu64" successful request(s)", + downstream_fd, req_count + ); + + return; +} + +/** Accept an NBD socket connection, dispatch appropriately */ +int proxy_accept( struct proxier* params ) +{ + NULLCHECK( params ); + + int client_fd; + int signal_fd = flexnbd_signal_fd( params->flexnbd ); + fd_set fds; + int should_continue = 1; + + union mysockaddr client_address; + socklen_t socklen = sizeof( client_address ); + + debug("accept loop starting"); + + FD_ZERO(&fds); + FD_SET(params->listen_fd, &fds); + FD_SET(signal_fd, &fds); + + FATAL_IF_NEGATIVE( + sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL), + SHOW_ERRNO( "select() failed" ) + ); + + if ( proxy_should_exit( params, &fds, 0) ) { + should_continue = 0; + } + + if ( should_continue && FD_ISSET( params->listen_fd, &fds ) ) { + client_fd = accept( params->listen_fd, &client_address.generic, &socklen ); + + if ( sock_set_tcp_nodelay(client_fd, 1) == -1 ) { + warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) ); + } + + info( "Accepted nbd client socket fd %d", client_fd ); + params->downstream_fd = client_fd; + proxy_session( params ); + + if ( close( params->downstream_fd ) == -1 ) { + warn( SHOW_ERRNO( "FIXME: close returned" ) ); + } + + params->downstream_fd = -1; + } + + return should_continue; +} + + +void proxy_accept_loop( struct proxier* params ) +{ + NULLCHECK( params ); + while( proxy_accept( params ) ); +} + +/** Closes sockets, frees memory and waits for all requests to clear */ +void proxy_cleanup( struct proxier* params ) +{ + NULLCHECK( params ); + + info( "cleaning up" ); + + if ( -1 != params->listen_fd ) { + close( params->listen_fd ); + } + + debug( "Cleanup done" ); +} + +/** Full lifecycle of the proxier */ +int do_proxy( struct proxier* params ) +{ + NULLCHECK( params ); + + error_set_handler( (cleanup_handler*) proxy_cleanup, params ); + + debug( "Ensuring upstream server is open" ); + + if ( !proxy_connect_to_upstream( params ) ) { + info( "Couldn't connect to upstream server during initialization" ); + proxy_cleanup( params ); + return 1; + }; + + proxy_open_listen_socket( params ); + proxy_accept_loop( params ); + proxy_cleanup( params ); + + return 0; +} + diff --git a/src/proxy.h b/src/proxy.h new file mode 100644 index 0000000..4ca1017 --- /dev/null +++ b/src/proxy.h @@ -0,0 +1,71 @@ +#ifndef PROXY_H +#define PROXY_H + +#include +#include + +#include "flexnbd.h" +#include "parse.h" +#include "nbdtypes.h" +#include "self_pipe.h" + +struct proxier { + /* The flexnbd wrapper this proxier is attached to */ + struct flexnbd* flexnbd; + + /** address/port to bind to */ + union mysockaddr listen_on; + + /** address/port to connect to */ + union mysockaddr connect_to; + + /** address to bind to when making outgoing connections */ + union mysockaddr connect_from; + + /* The socket we listen() on and accept() against */ + int listen_fd; + + /* The socket returned by accept() that we receive requests from and send + * responses to + */ + int downstream_fd; + + /* The socket returned by connect() that we send requests to and receive + * responses from + */ + int upstream_fd; + + /* This is the size we advertise to the downstream server */ + off64_t upstream_size; + + /* Scratch space for the current NBD request from downstream */ + unsigned char* req_buf; + + /* Number of bytes currently sat in req_buf */ + size_t req_buf_size; + + /* We transform the raw request header into here */ + struct nbd_request req_hdr; + + /* Scratch space for the current NBD reply from upstream */ + unsigned char* rsp_buf; + + /* Number of bytes currently sat in rsp_buf */ + size_t rsp_buf_size; + + /* We transform the raw reply header into here */ + struct nbd_reply rsp_hdr; +}; + +struct proxier* proxy_create( + struct flexnbd * flexnbd, + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind ); +int do_proxy( struct proxier* proxy ); +void proxy_destroy( struct proxier* proxy ); + +#endif + diff --git a/tests/acceptance/custom.supp b/tests/acceptance/custom.supp new file mode 100644 index 0000000..faf8d91 --- /dev/null +++ b/tests/acceptance/custom.supp @@ -0,0 +1,13 @@ +{ + avoid_glibc_bug_do_lookup + Memcheck:Addr8 + fun:do_lookup_x + obj:* + fun:_dl_lookup_symbol_x +} +{ + avoid_glibc_bug_check_match + Memcheck:Addr8 + fun:check_match.12149 +} + diff --git a/tests/acceptance/environment.rb b/tests/acceptance/environment.rb index bbbfadf..0275705 100644 --- a/tests/acceptance/environment.rb +++ b/tests/acceptance/environment.rb @@ -21,6 +21,13 @@ class Environment @fake_pid = nil end + def proxy1(port=@port2) + @nbd1.proxy(@ip, port) + end + def proxy2(port=@port1) + @nbd2.proxy(@ip, port) + end + def serve1(*acl) @nbd1.serve(@filename1, *acl) diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index e66ae75..ec97497 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -21,7 +21,7 @@ class ValgrindExecutor attr_reader :pid def run( cmd ) - @pid = fork do exec "valgrind --track-origins=yes #{cmd}" end + @pid = fork do exec "valgrind --suppressions=custom.supp --track-origins=yes #{cmd}" end end end # class ValgrindExecutor @@ -131,7 +131,7 @@ class ValgrindKillingExecutor def run( cmd ) @io_r, io_w = IO.pipe - @pid = fork do exec( "valgrind --xml=yes --xml-fd=#{io_w.fileno} " + cmd ) end + @pid = fork do exec( "valgrind --suppressions=custom.supp --xml=yes --xml-fd=#{io_w.fileno} " + cmd ) end launch_watch_thread( @pid, @io_r ) @pid end @@ -241,6 +241,15 @@ module FlexNBD "#{acl.join(' ')}" end + def proxy_cmd( connect_ip, connect_port ) + "#{bin} proxy "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--conn-addr #{connect_ip} "\ + "--conn-port #{connect_port} "\ + "#{@debug}" + end + def read_cmd( offset, length ) "#{bin} read "\ @@ -319,6 +328,7 @@ module FlexNBD sleep 0.1 end + start_wait_thread( @pid ) at_exit { kill } end @@ -336,6 +346,31 @@ module FlexNBD run_serve_cmd( listen_cmd( file, acl ) ) end + def tcp_server_open? + # raises if the other side doesn't accept() + sock = TCPSocket.new(ip, port) rescue nil + + success = !!sock + ( sock.close rescue nil) if sock + success + end + + def proxy( connect_ip, connect_port ) + cmd = proxy_cmd( connect_ip, connect_port ) + debug( cmd ) + + @pid = @executor.run( cmd ) + + until tcp_server_open? + pid, status = Process.wait2(@pid, Process::WNOHANG) + raise "server did not start (#{cmd})" if pid + sleep 0.1 + end + + start_wait_thread( @pid ) + at_exit { kill } + end + def start_wait_thread( pid ) @wait_thread = Thread.start do @@ -521,3 +556,4 @@ module FlexNBD end end + diff --git a/tests/acceptance/test_proxy_mode.rb b/tests/acceptance/test_proxy_mode.rb new file mode 100644 index 0000000..bb56b96 --- /dev/null +++ b/tests/acceptance/test_proxy_mode.rb @@ -0,0 +1,199 @@ +require 'test/unit' +require 'environment' +require 'flexnbd/fake_source' +require 'flexnbd/fake_dest' + +class TestProxyMode < Test::Unit::TestCase + def setup + super + @env = Environment.new + @env.writefile1( "0" * 16 ) + end + + def teardown + @env.cleanup + super + end + + def with_proxied_client( override_size = nil ) + @env.serve1 unless @server_up + @env.proxy2 unless @proxy_up + @env.nbd2.can_die(0) + client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy") + begin + + result = client.read_hello + assert_equal "NBDMAGIC", result[:magic] + assert_equal override_size || @env.file1.size, result[:size] + + yield client + ensure + client.close rescue nil + end + end + + def test_exits_with_error_when_cannot_connect_to_upstream_on_start + assert_raises(RuntimeError) { @env.proxy1 } + end + + def test_read_requests_successfully_proxied + with_proxied_client do |client| + (0..3).each do |n| + offset = n * 4096 + client.write_read_request(offset, 4096, "myhandle") + rsp = client.read_response + + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert_equal 0, rsp[:error] + + orig_data = @env.file1.read(offset, 4096) + data = client.read_raw(4096) + + assert_equal 4096, orig_data.size + assert_equal 4096, data.size + + assert_equal( orig_data, data, "Returned data does not match" ) + end + end + end + + def test_write_requests_successfully_proxied + with_proxied_client do |client| + (0..3).each do |n| + offset = n * 4096 + client.write(offset, "\xFF" * 4096) + rsp = client.read_response + + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert_equal 0, rsp[:error] + + data = @env.file1.read(offset, 4096) + assert_equal( ( "\xFF" * 4096 ), data, "Data not written" ) + end + end + end + + def make_fake_server + server = FlexNBD::FakeDest.new(@env.ip, @env.port1) + @server_up = true + + # We return a thread here because accept() and connect() both block for us + Thread.new do + sc = server.accept # just tell the supervisor we're up + sc.write_hello + + [ server, sc ] + end + end + + def test_read_request_retried_when_upstream_dies_partway + maker = make_fake_server + + with_proxied_client(4096) do |client| + server, sc1 = maker.value + + # Send the read request to the proxy + client.write_read_request( 0, 4096 ) + + # ensure we're given the read request + req1 = sc1.read_request + assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] + assert_equal ::FlexNBD::REQUEST_READ, req1[:type] + assert_equal 0, req1[:from] + assert_equal 4096, req1[:len] + + # Kill the server again, now we're sure the read request has been sent once + sc1.close + + # We expect the proxy to reconnect without our client doing anything. + sc2 = server.accept + sc2.write_hello + + # And once reconnected, it should resend an identical request. + req2 = sc2.read_request + assert_equal req1, req2 + + # The reply should be proxied back to the client. + sc2.write_reply( req2[:handle] ) + sc2.write_data( "\xFF" * 4096 ) + + # Check it to make sure it's correct + rsp = timeout(15) { client.read_response } + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal 0, rsp[:error] + assert_equal req1[:handle], rsp[:handle] + + data = client.read_raw( 4096 ) + assert_equal( ("\xFF" * 4096), data, "Wrong data returned" ) + + sc2.close + server.close + end + + end + + def test_write_request_retried_when_upstream_dies_partway + maker = make_fake_server + + with_proxied_client(4096) do |client| + server, sc1 = maker.value + + # Send the read request to the proxy + client.write( 0, ( "\xFF" * 4096 ) ) + + # ensure we're given the read request + req1 = sc1.read_request + assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] + assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] + assert_equal 0, req1[:from] + assert_equal 4096, req1[:len] + data1 = sc1.read_data( 4096 ) + assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" ) + + # Kill the server again, now we're sure the read request has been sent once + sc1.close + + # We expect the proxy to reconnect without our client doing anything. + sc2 = server.accept + sc2.write_hello + + # And once reconnected, it should resend an identical request. + req2 = sc2.read_request + assert_equal req1, req2 + data2 = sc2.read_data( 4096 ) + assert_equal data1, data2 + + # The reply should be proxied back to the client. + sc2.write_reply( req2[:handle] ) + + # Check it to make sure it's correct + rsp = timeout(15) { client.read_response } + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal 0, rsp[:error] + assert_equal req1[:handle], rsp[:handle] + + sc2.close + server.close + end + end + + def test_only_one_client_can_connect_to_proxy_at_a_time + with_proxied_client do |client| + + c2 = nil + assert_raises(Timeout::Error) do + timeout(1) do + c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)") + c2.read_hello + end + end + c2.close rescue nil if c2 + end + + + end + +end +