diff --git a/README.txt b/README.txt index 49bfd58..3c92a9d 100644 --- a/README.txt +++ b/README.txt @@ -24,7 +24,7 @@ COMMANDS serve ~~~~~ - $ flexnbd serve --addr --port --file + $ flexnbd serve --addr --port --file [--sock ] [--default-deny] [global option]* [acl entry]* Serve a file. If any ACL entries are given (which should be IP @@ -54,11 +54,11 @@ Options How to interpret an empty ACL. If --default-deny is given, an empty ACL will let no clients connect. If it is not given, an empty ACL will let any client connect. - + listen ~~~~~~ - $ flexnbd listen --addr --port --file + $ flexnbd listen --addr --port --file [--sock ] [--default-deny] [global option]* [acl entry]* Listen for an inbound migration, and quit with a status of 0 on @@ -85,10 +85,59 @@ Options ^^^^^^^ As for 'serve'. +proxy +~~~~~ + + $ flexnbd proxy --addr --port + --conn-addr --conn-port [--bind ] [global option]* + +Proxy requests from an NBD client to an NBD server, resiliently. Only one +client can be connected (to the address specified by --addr and --port) at a +time, and ACLs cannot be applied to the client, as they can be to clients +connecting directly to a flexnbd in serve mode. + +On starting up, the proxy will attempt to connect to the server specified by +--conn-addr and --conn-port (from the address specified by --bind, if given). If +it fails, then the process will die with an error exit status. + +Assuming a successful connection to the `upstream` server is made, the proxy +will then start listening on the address specified by --addr and --port, waiting +for `downstream` to connect to it (this will be your NBD client). The client +will be given the same hello message as the proxy was given by the server. + +When connected, any request the client makes will be read by the proxy and sent +to the server. If the server goes away for any reason, the proxy will remember +the request and regularly (~ every 5 seconds) try to reconnect to the server. +Upon reconnection, the request is sent and a reply is waited for. When a reply +is received, it is sent back to the client. + +When the client disconnects, cleanly or otherwise, the proxy goes back to +waiting for a new client to connect. The connection to the server is maintained +at that point, in case it is needed again. + +Only one request may be in-flight at a time under the current architecture; that +doesn't seem to slow things down much relative to alternative options, but may +be changed in the future if it becomes an issue. + +Options +^^^^^^^ + +*--addr, -l ADDR*: + The address to listen on. Required. + +*--port, -p PORT*: + The port to listen on. Required. + +*--conn-addr, -C ADDR*: + The address of the NBD server to connect to. Required. + +*--conn-port, -P PORT*: + The port of the NBD server to connect to. Required. + mirror ~~~~~~ - $ flexnbd mirror --addr --port --sock SOCK + $ flexnbd mirror --addr --port --sock SOCK [--unlink] [--bind ] [global option]* Start a migration from the server with control socket SOCK to the server @@ -128,8 +177,8 @@ Options *--sock, -s SOCK*: The control socket of the local server to migrate from. Required. -*--unlink, -u*: - Unlink the served file from the local filesystem after successfully +*--unlink, -u*: + Unlink the served file from the local filesystem after successfully mirroring. *--bind, -b BIND-ADDR*: @@ -190,7 +239,7 @@ value. Currently reported values are: read ~~~~ - $ flexnbd read --addr --port --from + $ flexnbd read --addr --port --from --size [--bind BIND-ADDR] [global option]* Connect to the server at ADDR:PORT, and read SIZE bytes starting at @@ -220,7 +269,7 @@ Options write ~~~~~ - $ cat ... | flexnbd write --addr --port --from + $ cat ... | flexnbd write --addr --port --from --size [--bind BIND-ADDR] [global option]* Connect to the server at ADDR:PORT, and write SIZE bytes from STDIN @@ -299,9 +348,9 @@ the log line. *SOURCEFILE:SOURCELINE*: Identifies where in the source code this log line can be found. -*MSG*: +*MSG*: A short message describing what's happening, how it's being done, or -if you're very lucky *why* it's going on. +if you're very lucky *why* it's going on. EXAMPLES -------- @@ -346,11 +395,11 @@ To migrate, we need to provide a destination file of the right size. Now we check the status of each server, to check that they are both in the right state: - + $ flexnbd status --sock /tmp/flex-source.sock pid=9648 is_mirroring=false has_control=true $ flexnbd status --sock /tmp/flex-dest.sock - pid=9651 is_mirroring=false has_control=false + pid=9651 is_mirroring=false has_control=false $ With this knowledge in hand, we can start the migration: @@ -366,6 +415,40 @@ Note that because the file is so small in this case, we see the source server quit soon after we start the migration, and the destination exited at roughly the same time. +Proxying +~~~~~~~~ + +The main point of the proxy mode is to allow clients that would otherwise break +when the NBD server goes away (during a migration, for instance) to see a +persistent TCP connection throughout the process, instead of needing its own +reconnection logic. + +For maximum reliability, the proxy process would be run on the same machine as +the actual NBD client; an example might look like: + + nbd-server-1$ flexnbd serve -l 10.0.0.1 -p 4777 myfile [...] + + nbd-client-1$ flexnbd proxy -l 127.0.0.1 -p 4777 -C 10.0.0.1 -P 4777 + nbd-client-1$ nbd-client -c 127.0.0.1 4777 /dev/nbd0 + + nbd-server-2$ flexnbd listen -l 10.0.0.2 -p 4777 -f myfile [...] + + nbd-server-1$ flexnbd mirror --addr 10.0.0.2 -p 4777 [...] + +Upon completing the migration, the mirroring and listening flexnbd servers will +both exit. With the proxy mediating requests, this does not break the TCP +connection that nbd-client is holding open. If no requests are in-flight, it +will not notice anything at all; if requests are in-flight, then the reply will +take longer than usual to be returned. + +When flexnbd is restarted in serve mode on the second server: + + nbd-server-2$ flexnbd serve -l 10.0.0.1 -p 4777 -f myfile [...] + +The proxy notices and reconnects, fulfiling any request it has in its buffer. +The data in myfile has been moved between physical servers without the nbd +client process having to be disturbed at all. + BUGS ---- @@ -374,9 +457,9 @@ Should be reported to alex@bytemark.co.uk. AUTHOR ------ -Written by Alex Young . -Original concept and core code by Matthew Bloch -. +Written by Alex Young . +Original concept and core code by Matthew Bloch . +Some additions by Nick Thomas COPYING ------- @@ -384,3 +467,4 @@ COPYING Copyright (c) 2012 Bytemark Hosting Ltd. Free use of this software is granted under the terms of the GNU General Public License version 3 or later. + diff --git a/Rakefile b/Rakefile index f87780b..572317a 100644 --- a/Rakefile +++ b/Rakefile @@ -84,7 +84,7 @@ namespace "test" do desc "Run NBD test scenarios" task 'scenarios' => 'flexnbd' do - sh "cd tests/acceptance; ruby nbd_scenarios" + sh "cd tests/acceptance; ruby nbd_scenarios -v" end end @@ -125,6 +125,7 @@ file check("client") => build/parse.o build/client.o build/serve.o + build/proxy.o build/acl.o build/ioutil.o build/mbox.o @@ -160,6 +161,7 @@ file check("serve") => build/client.o build/flexthread.o build/serve.o + build/proxy.o build/flexnbd.o build/mirror.o build/status.o @@ -177,6 +179,7 @@ file check("readwrite") => build/client.o build/self_pipe.o build/serve.o + build/proxy.o build/parse.o build/acl.o build/flexthread.o @@ -210,7 +213,8 @@ file check("flexnbd") => build/nbdtypes.o build/readwrite.o build/mirror.o - build/serve.o} do |t| + build/serve.o + build/proxy.o} do |t| gcc_link t.name, t.prerequisites + [LIBCHECK] end @@ -225,7 +229,7 @@ end tgt = "build/tests/check_#{m}.o" maybe_obj_name = "build/#{m}.o" # Take it out in case we're testing util.o or ioutil.o - deps = ["build/ioutil.o", "build/util.o"] - [maybe_obj_name] + deps = ["build/ioutil.o", "build/util.o", "build/sockutil.o"] - [maybe_obj_name] # Add it back in if it's something we need to compile deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name ) diff --git a/src/client.c b/src/client.c index 152b825..456f296 100644 --- a/src/client.c +++ b/src/client.c @@ -1,7 +1,8 @@ #include "client.h" #include "serve.h" -#include "util.h" #include "ioutil.h" +#include "sockutil.h" +#include "util.h" #include "bitset.h" #include "nbdtypes.h" #include "self_pipe.h" @@ -189,7 +190,7 @@ int client_read_request( struct client * client , struct nbd_request *out_reques FD_ZERO(&fds); FD_SET(client->socket, &fds); self_pipe_fd_set( client->stop_signal, &fds ); - fd_count = select(FD_SETSIZE, &fds, NULL, NULL, ptv); + fd_count = sock_try_select(FD_SETSIZE, &fds, NULL, NULL, ptv); if ( fd_count == 0 ) { /* This "can't ever happen" */ if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); } @@ -344,30 +345,43 @@ void client_flush( struct client * client, size_t len ) int client_request_needs_reply( struct client * client, struct nbd_request request ) { - debug("request type %d", request.type); - + /* The client is stupid, but don't take down the whole server as a result. + * We send a reply before disconnecting so that at least some indication of + * the problem is visible, and so proxies don't retry the same (bad) request + * forever. + */ if (request.magic != REQUEST_MAGIC) { - fatal("Bad magic %08x", request.magic); + warn("Bad magic %08x from client", request.magic); + client_write_reply( client, &request, EBADMSG ); + client->disconnect = 1; // no need to flush + return 0; } + debug( + "request type=%"PRIu32", from=%"PRIu64", len=%"PRIu32, + request.type, request.from, request.len + ); + + /* check it's not out of range */ + if ( request.from+request.len > client->serve->size) { + warn("write request %"PRIu64"+%"PRIu32" out of range", + request.from, request.len + ); + if ( request.type == REQUEST_WRITE ) { + client_flush( client, request.len ); + } + client_write_reply( client, &request, EPERM ); /* TODO: Change to ERANGE ? */ + client->disconnect = 0; + return 0; + } + + switch (request.type) { case REQUEST_READ: break; case REQUEST_WRITE: - /* check it's not out of range */ - if ( request.from+request.len > client->serve->size) { - warn("write request %d+%d out of range", - request.from, - request.len - ); - client_write_reply( client, &request, 1 ); - client_flush( client, request.len ); - client->disconnect = 0; - return 0; - } break; - case REQUEST_DISCONNECT: debug("request disconnect"); client->disconnect = 1; diff --git a/src/flexnbd.c b/src/flexnbd.c index 7194e94..ec0e75e 100644 --- a/src/flexnbd.c +++ b/src/flexnbd.c @@ -129,6 +129,26 @@ struct flexnbd * flexnbd_create_listening( } +struct flexnbd * flexnbd_create_proxying( + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind +) +{ + struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) ); + flexnbd->proxy = proxy_create( + flexnbd, + s_downstream_address, + s_downstream_port, + s_upstream_address, + s_upstream_port, + s_upstream_bind); + flexnbd_create_shared( flexnbd, NULL ); + return flexnbd; +} + void flexnbd_spawn_control(struct flexnbd * flexnbd ) { NULLCHECK( flexnbd ); @@ -181,7 +201,6 @@ struct server * flexnbd_server( struct flexnbd * flexnbd ) return flexnbd->serve; } - void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl ) { NULLCHECK( flexnbd ); @@ -255,3 +274,14 @@ int flexnbd_serve( struct flexnbd * flexnbd ) return success; } +int flexnbd_proxy( struct flexnbd * flexnbd ) +{ + NULLCHECK( flexnbd ); + int success; + + success = do_proxy( flexnbd->proxy ); + debug("do_proxy success is %d", success ); + + return success; +} + diff --git a/src/flexnbd.h b/src/flexnbd.h index f378322..7fc9f22 100644 --- a/src/flexnbd.h +++ b/src/flexnbd.h @@ -4,6 +4,7 @@ #include "acl.h" #include "mirror.h" #include "serve.h" +#include "proxy.h" #include "self_pipe.h" #include "mbox.h" #include "control.h" @@ -11,11 +12,14 @@ /* Carries the "globals". */ struct flexnbd { - /* We always have a serve pointer, but it should never be - * dereferenced outside a flexnbd_switch_lock/unlock pair. + /* Our serve pointer should never be dereferenced outside a + * flexnbd_switch_lock/unlock pair. */ struct server * serve; + /* In proxy mode, this is filled instead of serve, above */ + struct proxier * proxy; + /* We only have a control object if a control socket name was * passed on the command line. */ @@ -46,6 +50,14 @@ struct flexnbd * flexnbd_create_listening( int acl_entries, char** s_acl_entries ); +struct flexnbd * flexnbd_create_proxying( + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind +); + void flexnbd_destroy( struct flexnbd * ); enum mirror_state; enum mirror_state flexnbd_get_mirror_state( struct flexnbd * ); @@ -55,7 +67,9 @@ int flexnbd_signal_fd( struct flexnbd * flexnbd ); int flexnbd_serve( struct flexnbd * flexnbd ); +int flexnbd_proxy( struct flexnbd * flexnbd ); struct server * flexnbd_server( struct flexnbd * flexnbd ); void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl ); struct status * flexnbd_status_create( struct flexnbd * flexnbd ); #endif + diff --git a/src/mode.c b/src/mode.c index 5df59c9..f1af346 100644 --- a/src/mode.c +++ b/src/mode.c @@ -56,6 +56,29 @@ static char listen_help_text[] = VERBOSE_LINE QUIET_LINE; +static struct option proxy_options[] = { + GETOPT_HELP, + GETOPT_ADDR, + GETOPT_PORT, + GETOPT_CONNECT_ADDR, + GETOPT_CONNECT_PORT, + GETOPT_BIND, + GETOPT_QUIET, + GETOPT_VERBOSE, + {0} +}; +static char proxy_short_options[] = "hl:p:C:P:b:" SOPT_QUIET SOPT_VERBOSE; +static char proxy_help_text[] = + "Usage: flexnbd " CMD_PROXY " \n\n" + "Resiliently proxy an NBD connection between client and server\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address we will bind to as a proxy.\n" + "\t--" OPT_PORT ",-p \tThe port we will bind to as a proxy.\n" + "\t--" OPT_CONNECT_ADDR ",-C \tAddress of the proxied server.\n" + "\t--" OPT_CONNECT_PORT ",-P \tPort of the proxied server.\n" + "\t--" OPT_BIND ",-b \tThe address we connect from, as a proxy.\n" + QUIET_LINE + VERBOSE_LINE; static struct option read_options[] = { GETOPT_HELP, @@ -173,10 +196,13 @@ char help_help_text_arr[] = "Usage: flexnbd [cmd options]\n\n" "Commands:\n" "\tflexnbd serve\n" + "\tflexnbd listen\n" + "\tflexnbd proxy\n" "\tflexnbd read\n" "\tflexnbd write\n" "\tflexnbd acl\n" "\tflexnbd mirror\n" + "\tflexnbd break\n" "\tflexnbd status\n" "\tflexnbd help\n\n" "See flexnbd help for further info\n"; @@ -390,6 +416,46 @@ void read_break_param( int c, char **sock ) } +void read_proxy_param( + int c, + char **downstream_addr, + char **downstream_port, + char **upstream_addr, + char **upstream_port, + char **bind_addr ) +{ + switch( c ) { + case 'h' : + fprintf( stdout, "%s\n", proxy_help_text ); + exit( 0 ); + break; + case 'l': + *downstream_addr = optarg; + break; + case 'p': + *downstream_port = optarg; + break; + case 'C': + *upstream_addr = optarg; + break; + case 'P': + *upstream_port = optarg; + break; + case 'b': + *bind_addr = optarg; + break; + case 'q': + log_level = QUIET_LOG_LEVEL; + break; + case 'v': + log_level = VERBOSE_LOG_LEVEL; + break; + default: + exit_err( proxy_help_text ); + break; + } +} + void read_status_param( int c, char **sock ) { read_sock_param( c, sock, status_help_text ); @@ -733,6 +799,55 @@ int mode_status( int argc, char *argv[] ) return 0; } +int mode_proxy( int argc, char *argv[] ) +{ + int c; + struct flexnbd * flexnbd; + char *downstream_addr = NULL; + char *downstream_port = NULL; + char *upstream_addr = NULL; + char *upstream_port = NULL; + char *bind_addr = NULL; + int success; + + while (1) { + c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL ); + if ( -1 == c ) { break; } + read_proxy_param( c, + &downstream_addr, + &downstream_port, + &upstream_addr, + &upstream_port, + &bind_addr + ); + } + + if ( NULL == downstream_addr || NULL == downstream_port ){ + fprintf( stderr, "both --addr and --port are required.\n" ); + exit_err( proxy_help_text ); + } else if ( NULL == upstream_addr || NULL == upstream_port ){ + fprintf( stderr, "both --conn-addr and --conn-port are required.\n" ); + exit_err( proxy_help_text ); + } + + flexnbd = flexnbd_create_proxying( + downstream_addr, + downstream_port, + upstream_addr, + upstream_port, + bind_addr + ); + + info( + "Proxying between %s %s (downstream) and %s %s (upstream)", + downstream_addr, downstream_port, upstream_addr, upstream_port + ); + + success = flexnbd_proxy( flexnbd ); + flexnbd_destroy( flexnbd ); + + return success ? 0 : 1; +} int mode_help( int argc, char *argv[] ) { @@ -757,6 +872,8 @@ int mode_help( int argc, char *argv[] ) help_text = mirror_help_text; } else if ( IS_CMD( CMD_STATUS, cmd ) ) { help_text = status_help_text; + } else if ( IS_CMD( CMD_PROXY, cmd ) ) { + help_text = proxy_help_text; } else { exit_err( help_help_text ); } } @@ -790,6 +907,8 @@ void mode(char* mode, int argc, char **argv) } else if ( IS_CMD( CMD_STATUS, mode ) ) { mode_status( argc, argv ); + } else if ( IS_CMD( CMD_PROXY, mode ) ) { + mode_proxy( argc, argv ); } else if ( IS_CMD( CMD_HELP, mode ) ) { mode_help( argc-1, argv+1 ); @@ -801,4 +920,3 @@ void mode(char* mode, int argc, char **argv) exit(0); } - diff --git a/src/mode.h b/src/mode.h index 2c9d1fc..ed67f06 100644 --- a/src/mode.h +++ b/src/mode.h @@ -20,9 +20,12 @@ void mode(char* mode, int argc, char **argv); #define OPT_SIZE "size" #define OPT_DENY "default-deny" #define OPT_UNLINK "unlink" +#define OPT_CONNECT_ADDR "conn-addr" +#define OPT_CONNECT_PORT "conn-port" #define CMD_SERVE "serve" #define CMD_LISTEN "listen" +#define CMD_PROXY "proxy" #define CMD_READ "read" #define CMD_WRITE "write" #define CMD_ACL "acl" @@ -40,7 +43,6 @@ void mode(char* mode, int argc, char **argv); #define GETOPT_HELP GETOPT_FLAG( OPT_HELP, 'h' ) #define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' ) - #define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' ) #define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' ) #define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' ) @@ -49,6 +51,8 @@ void mode(char* mode, int argc, char **argv); #define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' ) #define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' ) #define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' ) +#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' ) +#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' ) #define OPT_VERBOSE "verbose" #define SOPT_VERBOSE "v" diff --git a/src/proxy.c b/src/proxy.c new file mode 100644 index 0000000..9cf80bb --- /dev/null +++ b/src/proxy.c @@ -0,0 +1,497 @@ +#include "proxy.h" +#include "readwrite.h" + +#include "ioutil.h" +#include "sockutil.h" +#include "util.h" + +#include + +#include +#include + +struct proxier* proxy_create( + struct flexnbd* flexnbd, + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind ) +{ + NULLCHECK( flexnbd ); + + struct proxier* out; + out = xmalloc( sizeof( struct proxier ) ); + out->flexnbd = flexnbd; + + FATAL_IF_NULL(s_downstream_address, "Listen address not specified"); + NULLCHECK( s_downstream_address ); + + FATAL_UNLESS( + parse_ip_to_sockaddr( &out->listen_on.generic, s_downstream_address ), + "Couldn't parse downstream address '%s' (use 0 if " + "you want to bind all IPs)", + s_downstream_address + ); + + FATAL_IF_NULL( s_downstream_port, "Downstream port not specified" ); + NULLCHECK( s_downstream_port ); + parse_port( s_downstream_port, &out->listen_on.v4 ); + + FATAL_IF_NULL(s_upstream_address, "Upstream address not specified"); + NULLCHECK( s_upstream_address ); + + FATAL_UNLESS( + parse_ip_to_sockaddr( &out->connect_to.generic, s_upstream_address ), + "Couldn't parse upstream address '%s'", + s_upstream_address + ); + + FATAL_IF_NULL( s_upstream_port, "Upstream port not specified" ); + NULLCHECK( s_upstream_port ); + parse_port( s_upstream_port, &out->connect_to.v4 ); + + if ( s_upstream_bind ) { + FATAL_IF_ZERO( + parse_ip_to_sockaddr( &out->connect_from.generic, s_upstream_bind ), + "Couldn't parse bind address '%s'", + s_upstream_bind + ); + } + + out->listen_fd = -1; + out->downstream_fd = -1; + out->upstream_fd = -1; + + out->req_buf = xmalloc( NBD_MAX_SIZE ); + out->rsp_buf = xmalloc( NBD_MAX_SIZE ); + + return out; +} + +void proxy_destroy( struct proxier* proxy ) +{ + free( proxy->req_buf ); + free( proxy->rsp_buf ); + free( proxy ); +} + + +/* Try to establish a connection to our upstream server. Return 1 on success, + * 0 on failure + */ +int proxy_connect_to_upstream( struct proxier* proxy ) +{ + int fd = socket_connect( &proxy->connect_to.generic, &proxy->connect_from.generic ); + off64_t size = 0; + + if ( -1 == fd ) { + return 0; + } + + if( !socket_nbd_read_hello( fd, &size ) ) { + FATAL_IF_NEGATIVE( + close( fd ), SHOW_ERRNO( "FIXME: shouldn't be fatal" ) + ); + return 0; + } + + if ( proxy->upstream_size == 0 ) { + info( "Size of upstream image is %"PRIu64" bytes", size ); + } else if ( proxy->upstream_size != size ) { + warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size ); + } + + proxy->upstream_size = size; + proxy->upstream_fd = fd; + + return 1; +} + +void proxy_disconnect_from_upstream( struct proxier* proxy ) +{ + if ( -1 != proxy->upstream_fd ) { + debug(" Closing upstream connection" ); + + /* TODO: An NBD disconnect would be pleasant here */ + + FATAL_IF_NEGATIVE( + close( proxy->upstream_fd ), + SHOW_ERRNO( "FIXME: shouldn't be fatal" ) + ); + proxy->upstream_fd = -1; + } +} + + +/** Prepares a listening socket for the NBD server, binding etc. */ +void proxy_open_listen_socket(struct proxier* params) +{ + NULLCHECK( params ); + + params->listen_fd = socket(params->listen_on.family, SOCK_STREAM, 0); + FATAL_IF_NEGATIVE( + params->listen_fd, SHOW_ERRNO( "Couldn't create listen socket" ) + ); + + /* Allow us to restart quickly */ + FATAL_IF_NEGATIVE( + sock_set_reuseaddr(params->listen_fd, 1), + SHOW_ERRNO( "Couldn't set SO_REUSEADDR" ) + ); + + FATAL_IF_NEGATIVE( + sock_set_tcp_nodelay(params->listen_fd, 1), + SHOW_ERRNO( "Couldn't set TCP_NODELAY" ) + ); + + FATAL_UNLESS_ZERO( + sock_try_bind( params->listen_fd, ¶ms->listen_on.generic ), + SHOW_ERRNO( "Failed to bind to listening socket" ) + ); + + /* We're only serving one client at a time, hence backlog of 1 */ + FATAL_IF_NEGATIVE( + listen(params->listen_fd, 1), + SHOW_ERRNO( "Failed to listen on listening socket" ) + ); + + info( "Now listening for incoming connections" ); +} + + +/* Return 0 if we should keep running, 1 if an exit has been signaled. Pass it + * an fd_set to check, or set check_fds to NULL to have it perform its own. + * If we do the latter, then wait specifies how many seconds we'll wait for an + * exit signal to show up. + */ +int proxy_should_exit( struct proxier* params, fd_set *check_fds, int wait ) +{ + struct timeval tv = { wait, 0 }; + fd_set internal_fds; + fd_set* fds = check_fds; + + int signal_fd = flexnbd_signal_fd( params->flexnbd ); + + if ( NULL == check_fds ) { + fds = &internal_fds; + + FD_ZERO( fds ); + FD_SET( signal_fd, fds ); + + FATAL_IF_NEGATIVE( + sock_try_select(FD_SETSIZE, fds, NULL, NULL, &tv), + SHOW_ERRNO( "select() failed." ) + ); + } + + if ( FD_ISSET( signal_fd, fds ) ) { + info( "Stop signal received" ); + return 1; + } + + return 0; +} + +/* Try to get a request from downstream. If reading from downstream fails, then + * the session will be over. Returns 1 on success, 0 on failure. + */ +int proxy_get_request_from_downstream( struct proxier* proxy ) +{ + unsigned char* req_hdr_raw = proxy->req_buf; + unsigned char* req_data = proxy->req_buf + NBD_REQUEST_SIZE; + size_t req_buf_size; + + struct nbd_request_raw* request_raw = (struct nbd_request_raw*) req_hdr_raw; + struct nbd_request* request = &(proxy->req_hdr); + + if ( readloop( proxy->downstream_fd, req_hdr_raw, NBD_REQUEST_SIZE ) == -1 ) { + info( SHOW_ERRNO( "Failed to get request header" ) ); + return 0; + } + + nbd_r2h_request( request_raw, request ); + req_buf_size = NBD_REQUEST_SIZE; + + if ( request->type == REQUEST_DISCONNECT ) { + info( "Received disconnect request from client" ); + return 0; + } + + if ( request->type == REQUEST_READ ) { + if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) { + warn( "NBD read request size %"PRIu32" too large", request->len ); + return 0; + } + + } + + if ( request->type == REQUEST_WRITE ) { + if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) { + warn( "NBD write request size %"PRIu32" too large", request->len ); + return 0; + } + + if ( readloop( proxy->downstream_fd, req_data, request->len ) == -1 ) { + warn( "Failed to get NBD write request data: %"PRIu32"b", request->len ); + return 0; + } + + req_buf_size += request->len; + } + + debug( + "Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, + request->type, request->from, request->len + ); + + proxy->req_buf_size = req_buf_size; + return 1; +} + +/* Tries to send the request upstream and receive a response. If upstream breaks + * then we reconnect to it, and keep it up until we have a complete response + * back. Returns 1 on success, 0 on failure, -1 if exit is signalled. + */ +int proxy_run_request_upstream( struct proxier* proxy ) +{ + unsigned char* rsp_hdr_raw = proxy->rsp_buf; + unsigned char* rsp_data = proxy->rsp_buf + NBD_REPLY_SIZE; + + struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw; + + struct nbd_request* request = &(proxy->req_hdr); + struct nbd_reply* reply = &(proxy->rsp_hdr); + + size_t rsp_buf_size; + + if ( proxy->upstream_fd == -1 ) { + debug( "Connecting to upstream" ); + if ( !proxy_connect_to_upstream( proxy ) ) { + debug( "Failed to connect to upstream" ); + + if ( proxy_should_exit( proxy, NULL, 5 ) ) { + return -1; + } + + return 0; + } + debug( "Connected to upstream" ); + } + + if ( writeloop( proxy->upstream_fd, proxy->req_buf, proxy->req_buf_size ) == -1 ) { + warn( "Failed to send request to upstream" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + + if ( readloop( proxy->upstream_fd, rsp_hdr_raw, NBD_REPLY_SIZE ) == -1 ) { + debug( "Failed to get reply header from upstream" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + + nbd_r2h_reply( reply_raw, reply ); + rsp_buf_size = NBD_REPLY_SIZE; + + if ( reply->magic != REPLY_MAGIC ) { + debug( "Reply magic is incorrect" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + + debug( "NBD reply received from upstream. Response code: %"PRIu32, reply->error ); + + if ( reply->error != 0 ) { + warn( "NBD error returned from upstream: %"PRIu32, reply->error ); + } + + if ( reply->error == 0 && request->type == REQUEST_READ ) { + if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) { + debug( "Failed to get reply data from upstream" ); + proxy_disconnect_from_upstream( proxy ); + return 0; + } + rsp_buf_size += request->len; + } + + proxy->rsp_buf_size = rsp_buf_size; + return rsp_buf_size; +} + +/* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */ +int proxy_send_reply_downstream( struct proxier* proxy ) +{ + int result; + unsigned char* rsp_buf = proxy->rsp_buf; + + debug( + "Writing header (%"PRIu32") + data (%"PRIu32") bytes downstream", + NBD_REPLY_SIZE, proxy->rsp_buf_size - NBD_REPLY_SIZE + ); + + result = writeloop( proxy->downstream_fd, rsp_buf, proxy->rsp_buf_size ); + if ( result == -1 ) { + debug( "Failed to send reply downstream" ); + return 0; + } + + debug( "Reply sent" ); + return 1; +} + + +/* Here, we negotiate an NBD session with downstream, based on the information + * we got on first connection to upstream. Then we wait for a request to come + * in from downstream, read it into memory, then send it to upstream. If + * upstream dies before responding, we reconnect to upstream and resend it. + * Once we've got a response, we write it directly to downstream, and wait for a + * new request. When downstream disconnects, or we receive an exit signal (which + * can be blocked, unfortunately), we are finished. + * + * This is the simplest possible nbd proxy I can think of. It may not be at all + * performant - let's see. + */ + +void proxy_session( struct proxier* proxy ) +{ + int downstream_fd = proxy->downstream_fd; + uint64_t req_count = 0; + int result; + + info( "Beginning proxy session on fd %i", downstream_fd ); + + if ( !socket_nbd_write_hello( downstream_fd, proxy->upstream_size ) ) { + debug( "Sending hello failed on fd %i, ending session", downstream_fd ); + return; + } + + while( proxy_get_request_from_downstream( proxy ) ) { + + /* Don't start running the request if exit has been signalled */ + if ( proxy_should_exit( proxy, NULL, 0 ) ) { + break; + } + + do { + result = proxy_run_request_upstream( proxy ); + } while ( result == 0 ); + + /* We have to exit, but don't know if the request was successfully + * proxied or not. We could add that knowledge, and attempt to send a + * reply downstream if it was, but I don't think it's worth it. + */ + if ( result == -1 ) { + break; + } + + if ( !proxy_send_reply_downstream( proxy ) ) { + break; + } + + proxy->req_buf_size = 0; + proxy->rsp_buf_size = 0; + + req_count++; + }; + + info( + "Finished proxy session on fd %i after %"PRIu64" successful request(s)", + downstream_fd, req_count + ); + + return; +} + +/** Accept an NBD socket connection, dispatch appropriately */ +int proxy_accept( struct proxier* params ) +{ + NULLCHECK( params ); + + int client_fd; + int signal_fd = flexnbd_signal_fd( params->flexnbd ); + fd_set fds; + int should_continue = 1; + + union mysockaddr client_address; + socklen_t socklen = sizeof( client_address ); + + debug("accept loop starting"); + + FD_ZERO(&fds); + FD_SET(params->listen_fd, &fds); + FD_SET(signal_fd, &fds); + + FATAL_IF_NEGATIVE( + sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL), + SHOW_ERRNO( "select() failed" ) + ); + + if ( proxy_should_exit( params, &fds, 0) ) { + should_continue = 0; + } + + if ( should_continue && FD_ISSET( params->listen_fd, &fds ) ) { + client_fd = accept( params->listen_fd, &client_address.generic, &socklen ); + + if ( sock_set_tcp_nodelay(client_fd, 1) == -1 ) { + warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) ); + } + + info( "Accepted nbd client socket fd %d", client_fd ); + params->downstream_fd = client_fd; + proxy_session( params ); + + if ( close( params->downstream_fd ) == -1 ) { + warn( SHOW_ERRNO( "FIXME: close returned" ) ); + } + + params->downstream_fd = -1; + } + + return should_continue; +} + + +void proxy_accept_loop( struct proxier* params ) +{ + NULLCHECK( params ); + while( proxy_accept( params ) ); +} + +/** Closes sockets, frees memory and waits for all requests to clear */ +void proxy_cleanup( struct proxier* params ) +{ + NULLCHECK( params ); + + info( "cleaning up" ); + + if ( -1 != params->listen_fd ) { + close( params->listen_fd ); + } + + debug( "Cleanup done" ); +} + +/** Full lifecycle of the proxier */ +int do_proxy( struct proxier* params ) +{ + NULLCHECK( params ); + + error_set_handler( (cleanup_handler*) proxy_cleanup, params ); + + debug( "Ensuring upstream server is open" ); + + if ( !proxy_connect_to_upstream( params ) ) { + info( "Couldn't connect to upstream server during initialization" ); + proxy_cleanup( params ); + return 1; + }; + + proxy_open_listen_socket( params ); + proxy_accept_loop( params ); + proxy_cleanup( params ); + + return 0; +} + diff --git a/src/proxy.h b/src/proxy.h new file mode 100644 index 0000000..4ca1017 --- /dev/null +++ b/src/proxy.h @@ -0,0 +1,71 @@ +#ifndef PROXY_H +#define PROXY_H + +#include +#include + +#include "flexnbd.h" +#include "parse.h" +#include "nbdtypes.h" +#include "self_pipe.h" + +struct proxier { + /* The flexnbd wrapper this proxier is attached to */ + struct flexnbd* flexnbd; + + /** address/port to bind to */ + union mysockaddr listen_on; + + /** address/port to connect to */ + union mysockaddr connect_to; + + /** address to bind to when making outgoing connections */ + union mysockaddr connect_from; + + /* The socket we listen() on and accept() against */ + int listen_fd; + + /* The socket returned by accept() that we receive requests from and send + * responses to + */ + int downstream_fd; + + /* The socket returned by connect() that we send requests to and receive + * responses from + */ + int upstream_fd; + + /* This is the size we advertise to the downstream server */ + off64_t upstream_size; + + /* Scratch space for the current NBD request from downstream */ + unsigned char* req_buf; + + /* Number of bytes currently sat in req_buf */ + size_t req_buf_size; + + /* We transform the raw request header into here */ + struct nbd_request req_hdr; + + /* Scratch space for the current NBD reply from upstream */ + unsigned char* rsp_buf; + + /* Number of bytes currently sat in rsp_buf */ + size_t rsp_buf_size; + + /* We transform the raw reply header into here */ + struct nbd_reply rsp_hdr; +}; + +struct proxier* proxy_create( + struct flexnbd * flexnbd, + char* s_downstream_address, + char* s_downstream_port, + char* s_upstream_address, + char* s_upstream_port, + char* s_upstream_bind ); +int do_proxy( struct proxier* proxy ); +void proxy_destroy( struct proxier* proxy ); + +#endif + diff --git a/src/readwrite.c b/src/readwrite.c index 0f20f24..bb971ce 100644 --- a/src/readwrite.c +++ b/src/readwrite.c @@ -1,7 +1,8 @@ #include "nbdtypes.h" #include "ioutil.h" +#include "sockutil.h" #include "util.h" -#include "serve.h" +#include "serve.h" #include #include @@ -18,14 +19,14 @@ int socket_connect(struct sockaddr* to, struct sockaddr* from) if (NULL != from) { if ( 0 > bind(fd, from, sizeof(struct sockaddr_in6)) ){ warn( "bind() failed"); - close( fd ); + FATAL_IF_NEGATIVE( close( fd ), SHOW_ERRNO( "FIXME" ) ); return -1; } } - if ( 0 > connect(fd, to, sizeof(struct sockaddr_in6)) ) { - warn( "connect failed" ); - close( fd ); + if ( 0 > sock_try_connect( fd, to, sizeof( struct sockaddr_in6 ), 15 ) ) { + warn( SHOW_ERRNO( "connect failed" ) ); + FATAL_IF_NEGATIVE( close( fd ), SHOW_ERRNO( "FIXME" ) ); return -1; } @@ -88,7 +89,7 @@ void fill_request(struct nbd_request *request, int type, off64_t from, int len) void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply) { struct nbd_reply_raw reply_raw; - + ERROR_IF_NEGATIVE(readloop(fd, &reply_raw, sizeof(struct nbd_reply_raw)), "Couldn't read reply"); @@ -108,14 +109,15 @@ void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply) void wait_for_data( int fd, int timeout_secs ) { fd_set fds; - struct timeval tv = {timeout_secs, 0}; + struct timeval tv = { timeout_secs, 0 }; int selected; FD_ZERO( &fds ); FD_SET( fd, &fds ); - selected = select( FD_SETSIZE, - &fds, NULL, NULL, - timeout_secs >=0 ? &tv : NULL ); + + selected = sock_try_select( + FD_SETSIZE, &fds, NULL, NULL, timeout_secs >=0 ? &tv : NULL + ); FATAL_IF( -1 == selected, "Select failed" ); ERROR_IF( 0 == selected, "Timed out waiting for reply" ); @@ -126,16 +128,16 @@ void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, i { struct nbd_request request; struct nbd_reply reply; - + fill_request(&request, REQUEST_READ, from, len); FATAL_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)), "Couldn't write request"); wait_for_data( fd, timeout_secs ); read_reply(fd, &request, &reply); - + if (out_buf) { - FATAL_IF_NEGATIVE(readloop(fd, out_buf, len), + FATAL_IF_NEGATIVE(readloop(fd, out_buf, len), "Read failed"); } else { @@ -150,13 +152,13 @@ void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf, in { struct nbd_request request; struct nbd_reply reply; - + fill_request(&request, REQUEST_WRITE, from, len); ERROR_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)), "Couldn't write request"); - + if (in_buf) { - ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len), + ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len), "Write failed"); } else { @@ -165,7 +167,7 @@ void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf, in "Splice failed" ); } - + wait_for_data( fd, timeout_secs ); read_reply(fd, &request, &reply); } @@ -200,13 +202,13 @@ int socket_nbd_disconnect( int fd ) fatal( error_type " connection failed." );\ }\ } - + void do_read(struct mode_readwrite_params* params) { params->client = socket_connect(¶ms->connect_to.generic, ¶ms->connect_from.generic); FATAL_IF_NEGATIVE( params->client, "Couldn't connect." ); CHECK_RANGE("read"); - socket_nbd_read(params->client, params->from, params->len, + socket_nbd_read(params->client, params->from, params->len, params->data_fd, NULL, 10); close(params->client); } @@ -216,7 +218,7 @@ void do_write(struct mode_readwrite_params* params) params->client = socket_connect(¶ms->connect_to.generic, ¶ms->connect_from.generic); FATAL_IF_NEGATIVE( params->client, "Couldn't connect." ); CHECK_RANGE("write"); - socket_nbd_write(params->client, params->from, params->len, + socket_nbd_write(params->client, params->from, params->len, params->data_fd, NULL, 10); close(params->client); } diff --git a/src/serve.c b/src/serve.c index 9438061..0995f1e 100644 --- a/src/serve.c +++ b/src/serve.c @@ -636,8 +636,10 @@ int server_accept( struct server * params ) self_pipe_fd_set( params->close_signal, &fds ); self_pipe_fd_set( params->acl_updated_signal, &fds ); - FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds, - NULL, NULL, NULL), "select() failed"); + FATAL_IF_NEGATIVE( + sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL), + SHOW_ERRNO( "select() failed" ) + ); if ( self_pipe_fd_isset( params->close_signal, &fds ) ){ server_close_clients( params ); diff --git a/src/sockutil.c b/src/sockutil.c index 1adcbb5..586b37a 100644 --- a/src/sockutil.c +++ b/src/sockutil.c @@ -1,7 +1,14 @@ +#include +#include + +#include +#include +#include + #include "sockutil.h" #include "util.h" -size_t sockaddr_size(const struct sockaddr* sa) +size_t sockaddr_size( const struct sockaddr* sa ) { size_t ret = 0; @@ -17,7 +24,7 @@ size_t sockaddr_size(const struct sockaddr* sa) return ret; } -const char* sockaddr_address_string(const struct sockaddr* sa, char* dest, size_t len) +const char* sockaddr_address_string( const struct sockaddr* sa, char* dest, size_t len ) { NULLCHECK( sa ); NULLCHECK( dest ); @@ -47,21 +54,39 @@ const char* sockaddr_address_string(const struct sockaddr* sa, char* dest, size_ return ret; } -int sock_set_reuseaddr(int fd, int optval) +int sock_set_reuseaddr( int fd, int optval ) { return setsockopt( fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval) ); } /* Set the tcp_nodelay option */ -int sock_set_tcp_nodelay(int fd, int optval) +int sock_set_tcp_nodelay( int fd, int optval ) { return setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval) ); } -int sock_try_bind(int fd, const struct sockaddr* sa) +int sock_set_nonblock( int fd, int optval ) +{ + int flags = fcntl( fd, F_GETFL ); + + if ( flags == -1 ) { + return -1; + } + + if ( optval ) { + flags = flags | O_NONBLOCK; + } else { + flags = flags & (~O_NONBLOCK); + } + + return fcntl( fd, F_SETFL, flags ); +} + +int sock_try_bind( int fd, const struct sockaddr* sa ) { int bind_result; char s_address[256]; + int retry = 1; sockaddr_address_string( sa, &s_address[0], 256 ); @@ -92,13 +117,99 @@ int sock_try_bind(int fd, const struct sockaddr* sa) continue; case EADDRINUSE: warn( "%s in use, giving up.", s_address ); + retry = 0; break; default: warn( "giving up" ); + retry = 0; } } - } while ( 1 ); + } while ( retry ); return bind_result; } +int sock_try_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) +{ + int result; + + do { + result = select(nfds, readfds, writefds, exceptfds, timeout); + if ( errno != EINTR ) { + break; + } + + } while ( result == -1 ); + + return result; +} + +int sock_try_connect( int fd, struct sockaddr* to, socklen_t addrlen, int wait ) +{ + fd_set fds; + struct timeval tv = { wait, 0 }; + int result = 0; + + if ( sock_set_nonblock( fd, 1 ) == -1 ) { + warn( SHOW_ERRNO( "Failed to set socket non-blocking for connect()" ) ); + return connect( fd, to, addrlen ); + } + + FD_ZERO( &fds ); + FD_SET( fd, &fds ); + + do { + result = connect( fd, to, addrlen ); + + if ( result == -1 ) { + switch( errno ) { + case EINPROGRESS: + result = 0; + break; /* success */ + case EAGAIN: + case EINTR: + break; /* Try again */ + default: + warn( SHOW_ERRNO( "Failed to connect()") ); + goto out; + } + } + } while ( result == -1 ); + + if ( -1 == sock_try_select( FD_SETSIZE, NULL, &fds, NULL, &tv) ) { + warn( SHOW_ERRNO( "failed to select() on non-blocking connect" ) ); + result = -1; + goto out; + } + + if ( !FD_ISSET( fd, &fds ) ) { + result = -1; + errno = ETIMEDOUT; + goto out; + } + + int scratch; + socklen_t s_size = sizeof( scratch ); + if ( getsockopt( fd, SOL_SOCKET, SO_ERROR, &scratch, &s_size ) == -1 ) { + result = -1; + warn( SHOW_ERRNO( "getsockopt() failed" ) ); + goto out; + } + + if ( scratch == EINPROGRESS ) { + scratch = ETIMEDOUT; + } + + result = scratch ? -1 : 0; + errno = scratch; + +out: + if ( sock_set_nonblock( fd, 0 ) == -1 ) { + warn( SHOW_ERRNO( "Failed to make socket blocking after connect()" ) ); + return -1; + } + + debug( "sock_try_connect: %i", result ); + return result; +} + diff --git a/src/sockutil.h b/src/sockutil.h index e60fc91..6cc860c 100644 --- a/src/sockutil.h +++ b/src/sockutil.h @@ -2,11 +2,9 @@ #define SOCKUTIL_H -#include +#include #include -#include -#include -#include +#include /* Returns the size of the sockaddr, or 0 on error */ size_t sockaddr_size(const struct sockaddr* sa); @@ -25,8 +23,16 @@ int sock_set_tcp_nodelay(int fd, int optval); /* TODO: Set the tcp_cork option */ // int sock_set_cork(int fd, int optval); +int sock_set_nonblock(int fd, int optval); + /* Attempt to bind the fd to the sockaddr, retrying common transient failures */ int sock_try_bind(int fd, const struct sockaddr* sa); +/* Try to call select(), retrying EINTR */ +int sock_try_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); + +/* Try to call connect(), timing out after wait seconds */ +int sock_try_connect( int fd, struct sockaddr* to, socklen_t addrlen, int wait ); + #endif diff --git a/tests/acceptance/environment.rb b/tests/acceptance/environment.rb index bbbfadf..0275705 100644 --- a/tests/acceptance/environment.rb +++ b/tests/acceptance/environment.rb @@ -21,6 +21,13 @@ class Environment @fake_pid = nil end + def proxy1(port=@port2) + @nbd1.proxy(@ip, port) + end + def proxy2(port=@port1) + @nbd2.proxy(@ip, port) + end + def serve1(*acl) @nbd1.serve(@filename1, *acl) diff --git a/tests/acceptance/fakes/source/connect_from_banned_ip.rb b/tests/acceptance/fakes/source/connect_from_banned_ip.rb index f87b099..aa16fa0 100755 --- a/tests/acceptance/fakes/source/connect_from_banned_ip.rb +++ b/tests/acceptance/fakes/source/connect_from_banned_ip.rb @@ -13,10 +13,8 @@ addr, port = *ARGV client = FakeSource.new( addr, port, "Timed out connecting", "127.0.0.6" ) sleep( 0.25 ) -client.ensure_disconnected +rsp = client.disconnected? ? 0 : 1 client.close -exit(0) - - +exit(rsp) diff --git a/tests/acceptance/file_writer.rb b/tests/acceptance/file_writer.rb index 8c20f53..92ad220 100644 --- a/tests/acceptance/file_writer.rb +++ b/tests/acceptance/file_writer.rb @@ -8,6 +8,10 @@ class FileWriter @pattern = "" end + def size + @blocksize * @pattern.split("").size + end + # We write in fixed block sizes, given by "blocksize" # _ means skip a block # 0 means write a block full of zeroes diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index d018144..d5b1dbf 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -241,6 +241,15 @@ module FlexNBD "#{acl.join(' ')}" end + def proxy_cmd( connect_ip, connect_port ) + "#{bin} proxy "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--conn-addr #{connect_ip} "\ + "--conn-port #{connect_port} "\ + "#{@debug}" + end + def read_cmd( offset, length ) "#{bin} read "\ @@ -319,6 +328,7 @@ module FlexNBD sleep 0.1 end + start_wait_thread( @pid ) at_exit { kill } end @@ -336,6 +346,31 @@ module FlexNBD run_serve_cmd( listen_cmd( file, acl ) ) end + def tcp_server_open? + # raises if the other side doesn't accept() + sock = TCPSocket.new(ip, port) rescue nil + + success = !!sock + ( sock.close rescue nil) if sock + success + end + + def proxy( connect_ip, connect_port ) + cmd = proxy_cmd( connect_ip, connect_port ) + debug( cmd ) + + @pid = @executor.run( cmd ) + + until tcp_server_open? + pid, status = Process.wait2(@pid, Process::WNOHANG) + raise "server did not start (#{cmd})" if pid + sleep 0.1 + end + + start_wait_thread( @pid ) + at_exit { kill } + end + def start_wait_thread( pid ) @wait_thread = Thread.start do diff --git a/tests/acceptance/flexnbd/constants.rb b/tests/acceptance/flexnbd/constants.rb index 128a8bf..992031a 100644 --- a/tests/acceptance/flexnbd/constants.rb +++ b/tests/acceptance/flexnbd/constants.rb @@ -32,6 +32,9 @@ module FlexNBD end read_constants() + + REQUEST_MAGIC = "\x25\x60\x95\x13" unless defined?(REQUEST_MAGIC) + REPLY_MAGIC = "\x67\x44\x66\x98" unless defined?(REPLY_MAGIC) + end # module FlexNBD - diff --git a/tests/acceptance/flexnbd/fake_dest.rb b/tests/acceptance/flexnbd/fake_dest.rb index bae0531..25df522 100644 --- a/tests/acceptance/flexnbd/fake_dest.rb +++ b/tests/acceptance/flexnbd/fake_dest.rb @@ -56,8 +56,6 @@ module FlexNBD } end - REPLY_MAGIC="\x67\x44\x66\x98" - def write_error( handle ) write_reply( handle, 1 ) end @@ -76,7 +74,7 @@ module FlexNBD if opts[:magic] == :wrong write_rand( @sock, 4 ) else - @sock.write( REPLY_MAGIC ) + @sock.write( ::FlexNBD::REPLY_MAGIC ) end @sock.write( [err].pack("N") ) @@ -93,6 +91,10 @@ module FlexNBD @sock.read( len ) end + def write_data( len ) + @sock.write( len ) + end + def self.parse_be64(str) raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless @@ -161,3 +163,4 @@ module FlexNBD end # module FakeDest end # module FlexNBD + diff --git a/tests/acceptance/flexnbd/fake_source.rb b/tests/acceptance/flexnbd/fake_source.rb index 930c56a..dfbce69 100644 --- a/tests/acceptance/flexnbd/fake_source.rb +++ b/tests/acceptance/flexnbd/fake_source.rb @@ -30,7 +30,7 @@ module FlexNBD def read_hello() - timing_out( FlexNBD::MS_HELLO_TIME_SECS, + timing_out( ::FlexNBD::MS_HELLO_TIME_SECS, "Timed out waiting for hello." ) do fail "No hello." unless (hello = @sock.read( 152 )) && hello.length==152 @@ -47,15 +47,14 @@ module FlexNBD end - def send_request( type, handle="myhandle", from=0, len=0 ) + def send_request( type, handle="myhandle", from=0, len=0, magic=REQUEST_MAGIC ) fail "Bad handle" unless handle.length == 8 - @sock.write( "\x25\x60\x95\x13" ) + @sock.write( magic ) @sock.write( [type].pack( 'N' ) ) @sock.write( handle ) - @sock.write( "\x0"*4 ) - @sock.write( [from].pack( 'N' ) ) - @sock.write( [len ].pack( 'N' ) ) + @sock.write( [n64( from )].pack( 'q' ) ) + @sock.write( [len].pack( 'N' ) ) end @@ -122,10 +121,10 @@ module FlexNBD end - def ensure_disconnected - Timeout.timeout( 2 ) do - @sock.read(1) - end + def disconnected? + result = nil + Timeout.timeout( 2 ) { result = ( @sock.read(1) == nil ) } + result end @@ -140,6 +139,22 @@ module FlexNBD end end + private + + # take a 64-bit number, turn it upside down (due to : + # http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-core/11920 + # ) + def n64(b) + ((b & 0xff00000000000000) >> 56) | + ((b & 0x00ff000000000000) >> 40) | + ((b & 0x0000ff0000000000) >> 24) | + ((b & 0x000000ff00000000) >> 8) | + ((b & 0x00000000ff000000) << 8) | + ((b & 0x0000000000ff0000) << 24) | + ((b & 0x000000000000ff00) << 40) | + ((b & 0x00000000000000ff) << 56) + end end # class FakeSource end # module FlexNBD + diff --git a/tests/acceptance/test_proxy_mode.rb b/tests/acceptance/test_proxy_mode.rb new file mode 100644 index 0000000..bb56b96 --- /dev/null +++ b/tests/acceptance/test_proxy_mode.rb @@ -0,0 +1,199 @@ +require 'test/unit' +require 'environment' +require 'flexnbd/fake_source' +require 'flexnbd/fake_dest' + +class TestProxyMode < Test::Unit::TestCase + def setup + super + @env = Environment.new + @env.writefile1( "0" * 16 ) + end + + def teardown + @env.cleanup + super + end + + def with_proxied_client( override_size = nil ) + @env.serve1 unless @server_up + @env.proxy2 unless @proxy_up + @env.nbd2.can_die(0) + client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy") + begin + + result = client.read_hello + assert_equal "NBDMAGIC", result[:magic] + assert_equal override_size || @env.file1.size, result[:size] + + yield client + ensure + client.close rescue nil + end + end + + def test_exits_with_error_when_cannot_connect_to_upstream_on_start + assert_raises(RuntimeError) { @env.proxy1 } + end + + def test_read_requests_successfully_proxied + with_proxied_client do |client| + (0..3).each do |n| + offset = n * 4096 + client.write_read_request(offset, 4096, "myhandle") + rsp = client.read_response + + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert_equal 0, rsp[:error] + + orig_data = @env.file1.read(offset, 4096) + data = client.read_raw(4096) + + assert_equal 4096, orig_data.size + assert_equal 4096, data.size + + assert_equal( orig_data, data, "Returned data does not match" ) + end + end + end + + def test_write_requests_successfully_proxied + with_proxied_client do |client| + (0..3).each do |n| + offset = n * 4096 + client.write(offset, "\xFF" * 4096) + rsp = client.read_response + + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert_equal 0, rsp[:error] + + data = @env.file1.read(offset, 4096) + assert_equal( ( "\xFF" * 4096 ), data, "Data not written" ) + end + end + end + + def make_fake_server + server = FlexNBD::FakeDest.new(@env.ip, @env.port1) + @server_up = true + + # We return a thread here because accept() and connect() both block for us + Thread.new do + sc = server.accept # just tell the supervisor we're up + sc.write_hello + + [ server, sc ] + end + end + + def test_read_request_retried_when_upstream_dies_partway + maker = make_fake_server + + with_proxied_client(4096) do |client| + server, sc1 = maker.value + + # Send the read request to the proxy + client.write_read_request( 0, 4096 ) + + # ensure we're given the read request + req1 = sc1.read_request + assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] + assert_equal ::FlexNBD::REQUEST_READ, req1[:type] + assert_equal 0, req1[:from] + assert_equal 4096, req1[:len] + + # Kill the server again, now we're sure the read request has been sent once + sc1.close + + # We expect the proxy to reconnect without our client doing anything. + sc2 = server.accept + sc2.write_hello + + # And once reconnected, it should resend an identical request. + req2 = sc2.read_request + assert_equal req1, req2 + + # The reply should be proxied back to the client. + sc2.write_reply( req2[:handle] ) + sc2.write_data( "\xFF" * 4096 ) + + # Check it to make sure it's correct + rsp = timeout(15) { client.read_response } + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal 0, rsp[:error] + assert_equal req1[:handle], rsp[:handle] + + data = client.read_raw( 4096 ) + assert_equal( ("\xFF" * 4096), data, "Wrong data returned" ) + + sc2.close + server.close + end + + end + + def test_write_request_retried_when_upstream_dies_partway + maker = make_fake_server + + with_proxied_client(4096) do |client| + server, sc1 = maker.value + + # Send the read request to the proxy + client.write( 0, ( "\xFF" * 4096 ) ) + + # ensure we're given the read request + req1 = sc1.read_request + assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] + assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] + assert_equal 0, req1[:from] + assert_equal 4096, req1[:len] + data1 = sc1.read_data( 4096 ) + assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" ) + + # Kill the server again, now we're sure the read request has been sent once + sc1.close + + # We expect the proxy to reconnect without our client doing anything. + sc2 = server.accept + sc2.write_hello + + # And once reconnected, it should resend an identical request. + req2 = sc2.read_request + assert_equal req1, req2 + data2 = sc2.read_data( 4096 ) + assert_equal data1, data2 + + # The reply should be proxied back to the client. + sc2.write_reply( req2[:handle] ) + + # Check it to make sure it's correct + rsp = timeout(15) { client.read_response } + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal 0, rsp[:error] + assert_equal req1[:handle], rsp[:handle] + + sc2.close + server.close + end + end + + def test_only_one_client_can_connect_to_proxy_at_a_time + with_proxied_client do |client| + + c2 = nil + assert_raises(Timeout::Error) do + timeout(1) do + c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)") + c2.read_hello + end + end + c2.close rescue nil if c2 + end + + + end + +end + diff --git a/tests/acceptance/test_serve_mode.rb b/tests/acceptance/test_serve_mode.rb new file mode 100644 index 0000000..1278e68 --- /dev/null +++ b/tests/acceptance/test_serve_mode.rb @@ -0,0 +1,86 @@ +require 'test/unit' +require 'environment' +require 'flexnbd/fake_source' + +class TestServeMode < Test::Unit::TestCase + + def setup + super + @env = Environment.new + @env.writefile1( "0" ) + @env.serve1 + end + + def teardown + @env.cleanup + super + end + + def connect_to_server + client = FlexNBD::FakeSource.new(@env.ip, @env.port1, "Connecting to server failed") + begin + result = client.read_hello + assert_equal "NBDMAGIC", result[:magic] + assert_equal @env.file1.size, result[:size] + yield client + ensure + client.close rescue nil + end + end + + def test_bad_request_magic_receives_error_response + connect_to_server do |client| + + # replace REQUEST_MAGIC with all 0s to make it look bad + client.send_request( 0, "myhandle", 0, 0, "\x00\x00\x00\x00" ) + rsp = client.read_response + + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}" + + # The client should be disconnected now + assert client.disconnected?, "Server not disconnected" + end + end + + + def test_read_request_out_of_bounds_receives_error_response + connect_to_server do |client| + client.write_read_request( @env.file1.size, 4096 ) + rsp = client.read_response + + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}" + + # Ensure we're not disconnected by sending a request. We don't care about + # whether the reply is good or not, here. + client.write_read_request( 0, 4096 ) + rsp = client.read_response + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + end + end + + def test_write_request_out_of_bounds_receives_error_response + connect_to_server do |client| + client.write( @env.file1.size, "\x00" * 4096 ) + rsp = client.read_response + + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}" + + # Ensure we're not disconnected by sending a request. We don't care about + # whether the reply is good or not, here. + client.write( 0, "\x00" * @env.file1.size ) + rsp = client.read_response + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + end + + end + + + +end + diff --git a/tests/unit/check_sockutil.c b/tests/unit/check_sockutil.c index ab55701..bc5c661 100644 --- a/tests/unit/check_sockutil.c +++ b/tests/unit/check_sockutil.c @@ -1,3 +1,7 @@ +#include +#include +#include + #include "sockutil.h" #include