flexnbd: Add a proxy mode

This lets us proxy connections between NBD clients and servers, resiliently.
This commit is contained in:
nick
2013-02-15 16:52:16 +00:00
parent 9b67d30608
commit 98d8fbeaf0
12 changed files with 1101 additions and 24 deletions

View File

@@ -24,7 +24,7 @@ COMMANDS
serve
~~~~~
$ flexnbd serve --addr <ADDR> --port <PORT> --file <FILE>
$ flexnbd serve --addr <ADDR> --port <PORT> --file <FILE>
[--sock <SOCK>] [--default-deny] [global option]* [acl entry]*
Serve a file. If any ACL entries are given (which should be IP
@@ -54,11 +54,11 @@ Options
How to interpret an empty ACL. If --default-deny is given, an
empty ACL will let no clients connect. If it is not given, an
empty ACL will let any client connect.
listen
~~~~~~
$ flexnbd listen --addr <ADDR> --port <PORT> --file <FILE>
$ flexnbd listen --addr <ADDR> --port <PORT> --file <FILE>
[--sock <SOCK>] [--default-deny] [global option]* [acl entry]*
Listen for an inbound migration, and quit with a status of 0 on
@@ -85,10 +85,59 @@ Options
^^^^^^^
As for 'serve'.
proxy
~~~~~
$ flexnbd proxy --addr <ADDR> --port <PORT>
--conn-addr <ADDR> --conn-port <PORT> [--bind <ADDR>] [global option]*
Proxy requests from an NBD client to an NBD server, resiliently. Only one
client can be connected (to the address specified by --addr and --port) at a
time, and ACLs cannot be applied to the client, as they can be to clients
connecting directly to a flexnbd in serve mode.
On starting up, the proxy will attempt to connect to the server specified by
--conn-addr and --conn-port (from the address specified by --bind, if given). If
it fails, then the process will die with an error exit status.
Assuming a successful connection to the `upstream` server is made, the proxy
will then start listening on the address specified by --addr and --port, waiting
for `downstream` to connect to it (this will be your NBD client). The client
will be given the same hello message as the proxy was given by the server.
When connected, any request the client makes will be read by the proxy and sent
to the server. If the server goes away for any reason, the proxy will remember
the request and regularly (~ every 5 seconds) try to reconnect to the server.
Upon reconnection, the request is sent and a reply is waited for. When a reply
is received, it is sent back to the client.
When the client disconnects, cleanly or otherwise, the proxy goes back to
waiting for a new client to connect. The connection to the server is maintained
at that point, in case it is needed again.
Only one request may be in-flight at a time under the current architecture; that
doesn't seem to slow things down much relative to alternative options, but may
be changed in the future if it becomes an issue.
Options
^^^^^^^
*--addr, -l ADDR*:
The address to listen on. Required.
*--port, -p PORT*:
The port to listen on. Required.
*--conn-addr, -C ADDR*:
The address of the NBD server to connect to. Required.
*--conn-port, -P PORT*:
The port of the NBD server to connect to. Required.
mirror
~~~~~~
$ flexnbd mirror --addr <ADDR> --port <PORT> --sock SOCK
$ flexnbd mirror --addr <ADDR> --port <PORT> --sock SOCK
[--unlink] [--bind <BIND-ADDR>] [global option]*
Start a migration from the server with control socket SOCK to the server
@@ -128,8 +177,8 @@ Options
*--sock, -s SOCK*:
The control socket of the local server to migrate from. Required.
*--unlink, -u*:
Unlink the served file from the local filesystem after successfully
*--unlink, -u*:
Unlink the served file from the local filesystem after successfully
mirroring.
*--bind, -b BIND-ADDR*:
@@ -190,7 +239,7 @@ value. Currently reported values are:
read
~~~~
$ flexnbd read --addr <ADDR> --port <PORT> --from <OFFSET>
$ flexnbd read --addr <ADDR> --port <PORT> --from <OFFSET>
--size <SIZE> [--bind BIND-ADDR] [global option]*
Connect to the server at ADDR:PORT, and read SIZE bytes starting at
@@ -220,7 +269,7 @@ Options
write
~~~~~
$ cat ... | flexnbd write --addr <ADDR> --port <PORT> --from <OFFSET>
$ cat ... | flexnbd write --addr <ADDR> --port <PORT> --from <OFFSET>
--size <SIZE> [--bind BIND-ADDR] [global option]*
Connect to the server at ADDR:PORT, and write SIZE bytes from STDIN
@@ -299,9 +348,9 @@ the log line.
*SOURCEFILE:SOURCELINE*:
Identifies where in the source code this log line can be found.
*MSG*:
*MSG*:
A short message describing what's happening, how it's being done, or
if you're very lucky *why* it's going on.
if you're very lucky *why* it's going on.
EXAMPLES
--------
@@ -346,11 +395,11 @@ To migrate, we need to provide a destination file of the right size.
Now we check the status of each server, to check that they are both in
the right state:
$ flexnbd status --sock /tmp/flex-source.sock
pid=9648 is_mirroring=false has_control=true
$ flexnbd status --sock /tmp/flex-dest.sock
pid=9651 is_mirroring=false has_control=false
pid=9651 is_mirroring=false has_control=false
$
With this knowledge in hand, we can start the migration:
@@ -366,6 +415,40 @@ Note that because the file is so small in this case, we see the source
server quit soon after we start the migration, and the destination
exited at roughly the same time.
Proxying
~~~~~~~~
The main point of the proxy mode is to allow clients that would otherwise break
when the NBD server goes away (during a migration, for instance) to see a
persistent TCP connection throughout the process, instead of needing its own
reconnection logic.
For maximum reliability, the proxy process would be run on the same machine as
the actual NBD client; an example might look like:
nbd-server-1$ flexnbd serve -l 10.0.0.1 -p 4777 myfile [...]
nbd-client-1$ flexnbd proxy -l 127.0.0.1 -p 4777 -C 10.0.0.1 -P 4777
nbd-client-1$ nbd-client -c 127.0.0.1 4777 /dev/nbd0
nbd-server-2$ flexnbd listen -l 10.0.0.2 -p 4777 -f myfile [...]
nbd-server-1$ flexnbd mirror --addr 10.0.0.2 -p 4777 [...]
Upon completing the migration, the mirroring and listening flexnbd servers will
both exit. With the proxy mediating requests, this does not break the TCP
connection that nbd-client is holding open. If no requests are in-flight, it
will not notice anything at all; if requests are in-flight, then the reply will
take longer than usual to be returned.
When flexnbd is restarted in serve mode on the second server:
nbd-server-2$ flexnbd serve -l 10.0.0.1 -p 4777 -f myfile [...]
The proxy notices and reconnects, fulfiling any request it has in its buffer.
The data in myfile has been moved between physical servers without the nbd
client process having to be disturbed at all.
BUGS
----
@@ -374,9 +457,9 @@ Should be reported to alex@bytemark.co.uk.
AUTHOR
------
Written by Alex Young <alex@bytemark.co.uk>.
Original concept and core code by Matthew Bloch
<matthew@bytemark.co.uk>.
Written by Alex Young <alex@bytemark.co.uk>.
Original concept and core code by Matthew Bloch <matthew@bytemark.co.uk>.
Some additions by Nick Thomas <nick@bytemark.co.uk>
COPYING
-------
@@ -384,3 +467,4 @@ COPYING
Copyright (c) 2012 Bytemark Hosting Ltd. Free use of this software is
granted under the terms of the GNU General Public License version 3 or
later.

View File

@@ -125,6 +125,7 @@ file check("client") =>
build/parse.o
build/client.o
build/serve.o
build/proxy.o
build/acl.o
build/ioutil.o
build/mbox.o
@@ -160,6 +161,7 @@ file check("serve") =>
build/client.o
build/flexthread.o
build/serve.o
build/proxy.o
build/flexnbd.o
build/mirror.o
build/status.o
@@ -177,6 +179,7 @@ file check("readwrite") =>
build/client.o
build/self_pipe.o
build/serve.o
build/proxy.o
build/parse.o
build/acl.o
build/flexthread.o
@@ -210,7 +213,8 @@ file check("flexnbd") =>
build/nbdtypes.o
build/readwrite.o
build/mirror.o
build/serve.o} do |t|
build/serve.o
build/proxy.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
@@ -225,7 +229,7 @@ end
tgt = "build/tests/check_#{m}.o"
maybe_obj_name = "build/#{m}.o"
# Take it out in case we're testing util.o or ioutil.o
deps = ["build/ioutil.o", "build/util.o"] - [maybe_obj_name]
deps = ["build/ioutil.o", "build/util.o", "build/sockutil.o"] - [maybe_obj_name]
# Add it back in if it's something we need to compile
deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name )

View File

@@ -129,6 +129,26 @@ struct flexnbd * flexnbd_create_listening(
}
struct flexnbd * flexnbd_create_proxying(
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind
)
{
struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) );
flexnbd->proxy = proxy_create(
flexnbd,
s_downstream_address,
s_downstream_port,
s_upstream_address,
s_upstream_port,
s_upstream_bind);
flexnbd_create_shared( flexnbd, NULL );
return flexnbd;
}
void flexnbd_spawn_control(struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
@@ -181,7 +201,6 @@ struct server * flexnbd_server( struct flexnbd * flexnbd )
return flexnbd->serve;
}
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl )
{
NULLCHECK( flexnbd );
@@ -255,3 +274,14 @@ int flexnbd_serve( struct flexnbd * flexnbd )
return success;
}
int flexnbd_proxy( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
int success;
success = do_proxy( flexnbd->proxy );
debug("do_proxy success is %d", success );
return success;
}

View File

@@ -4,6 +4,7 @@
#include "acl.h"
#include "mirror.h"
#include "serve.h"
#include "proxy.h"
#include "self_pipe.h"
#include "mbox.h"
#include "control.h"
@@ -11,11 +12,14 @@
/* Carries the "globals". */
struct flexnbd {
/* We always have a serve pointer, but it should never be
* dereferenced outside a flexnbd_switch_lock/unlock pair.
/* Our serve pointer should never be dereferenced outside a
* flexnbd_switch_lock/unlock pair.
*/
struct server * serve;
/* In proxy mode, this is filled instead of serve, above */
struct proxier * proxy;
/* We only have a control object if a control socket name was
* passed on the command line.
*/
@@ -46,6 +50,14 @@ struct flexnbd * flexnbd_create_listening(
int acl_entries,
char** s_acl_entries );
struct flexnbd * flexnbd_create_proxying(
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind
);
void flexnbd_destroy( struct flexnbd * );
enum mirror_state;
enum mirror_state flexnbd_get_mirror_state( struct flexnbd * );
@@ -55,7 +67,9 @@ int flexnbd_signal_fd( struct flexnbd * flexnbd );
int flexnbd_serve( struct flexnbd * flexnbd );
int flexnbd_proxy( struct flexnbd * flexnbd );
struct server * flexnbd_server( struct flexnbd * flexnbd );
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl );
struct status * flexnbd_status_create( struct flexnbd * flexnbd );
#endif

View File

@@ -56,6 +56,29 @@ static char listen_help_text[] =
VERBOSE_LINE
QUIET_LINE;
static struct option proxy_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_CONNECT_ADDR,
GETOPT_CONNECT_PORT,
GETOPT_BIND,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char proxy_short_options[] = "hl:p:C:P:b:" SOPT_QUIET SOPT_VERBOSE;
static char proxy_help_text[] =
"Usage: flexnbd " CMD_PROXY " <options>\n\n"
"Resiliently proxy an NBD connection between client and server\n\n"
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address we will bind to as a proxy.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port we will bind to as a proxy.\n"
"\t--" OPT_CONNECT_ADDR ",-C <ADDR>\tAddress of the proxied server.\n"
"\t--" OPT_CONNECT_PORT ",-P <PORT>\tPort of the proxied server.\n"
"\t--" OPT_BIND ",-b <ADDR>\tThe address we connect from, as a proxy.\n"
QUIET_LINE
VERBOSE_LINE;
static struct option read_options[] = {
GETOPT_HELP,
@@ -173,10 +196,13 @@ char help_help_text_arr[] =
"Usage: flexnbd <cmd> [cmd options]\n\n"
"Commands:\n"
"\tflexnbd serve\n"
"\tflexnbd listen\n"
"\tflexnbd proxy\n"
"\tflexnbd read\n"
"\tflexnbd write\n"
"\tflexnbd acl\n"
"\tflexnbd mirror\n"
"\tflexnbd break\n"
"\tflexnbd status\n"
"\tflexnbd help\n\n"
"See flexnbd help <cmd> for further info\n";
@@ -390,6 +416,46 @@ void read_break_param( int c, char **sock )
}
void read_proxy_param(
int c,
char **downstream_addr,
char **downstream_port,
char **upstream_addr,
char **upstream_port,
char **bind_addr )
{
switch( c ) {
case 'h' :
fprintf( stdout, "%s\n", proxy_help_text );
exit( 0 );
break;
case 'l':
*downstream_addr = optarg;
break;
case 'p':
*downstream_port = optarg;
break;
case 'C':
*upstream_addr = optarg;
break;
case 'P':
*upstream_port = optarg;
break;
case 'b':
*bind_addr = optarg;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( proxy_help_text );
break;
}
}
void read_status_param( int c, char **sock )
{
read_sock_param( c, sock, status_help_text );
@@ -733,6 +799,55 @@ int mode_status( int argc, char *argv[] )
return 0;
}
int mode_proxy( int argc, char *argv[] )
{
int c;
struct flexnbd * flexnbd;
char *downstream_addr = NULL;
char *downstream_port = NULL;
char *upstream_addr = NULL;
char *upstream_port = NULL;
char *bind_addr = NULL;
int success;
while (1) {
c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL );
if ( -1 == c ) { break; }
read_proxy_param( c,
&downstream_addr,
&downstream_port,
&upstream_addr,
&upstream_port,
&bind_addr
);
}
if ( NULL == downstream_addr || NULL == downstream_port ){
fprintf( stderr, "both --addr and --port are required.\n" );
exit_err( proxy_help_text );
} else if ( NULL == upstream_addr || NULL == upstream_port ){
fprintf( stderr, "both --conn-addr and --conn-port are required.\n" );
exit_err( proxy_help_text );
}
flexnbd = flexnbd_create_proxying(
downstream_addr,
downstream_port,
upstream_addr,
upstream_port,
bind_addr
);
info(
"Proxying between %s %s (downstream) and %s %s (upstream)",
downstream_addr, downstream_port, upstream_addr, upstream_port
);
success = flexnbd_proxy( flexnbd );
flexnbd_destroy( flexnbd );
return success ? 0 : 1;
}
int mode_help( int argc, char *argv[] )
{
@@ -757,6 +872,8 @@ int mode_help( int argc, char *argv[] )
help_text = mirror_help_text;
} else if ( IS_CMD( CMD_STATUS, cmd ) ) {
help_text = status_help_text;
} else if ( IS_CMD( CMD_PROXY, cmd ) ) {
help_text = proxy_help_text;
} else { exit_err( help_help_text ); }
}
@@ -790,6 +907,8 @@ void mode(char* mode, int argc, char **argv)
}
else if ( IS_CMD( CMD_STATUS, mode ) ) {
mode_status( argc, argv );
} else if ( IS_CMD( CMD_PROXY, mode ) ) {
mode_proxy( argc, argv );
}
else if ( IS_CMD( CMD_HELP, mode ) ) {
mode_help( argc-1, argv+1 );
@@ -801,4 +920,3 @@ void mode(char* mode, int argc, char **argv)
exit(0);
}

View File

@@ -20,9 +20,12 @@ void mode(char* mode, int argc, char **argv);
#define OPT_SIZE "size"
#define OPT_DENY "default-deny"
#define OPT_UNLINK "unlink"
#define OPT_CONNECT_ADDR "conn-addr"
#define OPT_CONNECT_PORT "conn-port"
#define CMD_SERVE "serve"
#define CMD_LISTEN "listen"
#define CMD_PROXY "proxy"
#define CMD_READ "read"
#define CMD_WRITE "write"
#define CMD_ACL "acl"
@@ -40,7 +43,6 @@ void mode(char* mode, int argc, char **argv);
#define GETOPT_HELP GETOPT_FLAG( OPT_HELP, 'h' )
#define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' )
#define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' )
#define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' )
#define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' )
@@ -49,6 +51,8 @@ void mode(char* mode, int argc, char **argv);
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' )
#define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' )
#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' )
#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' )
#define OPT_VERBOSE "verbose"
#define SOPT_VERBOSE "v"

497
src/proxy.c Normal file
View File

@@ -0,0 +1,497 @@
#include "proxy.h"
#include "readwrite.h"
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include <errno.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
struct proxier* proxy_create(
struct flexnbd* flexnbd,
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind )
{
NULLCHECK( flexnbd );
struct proxier* out;
out = xmalloc( sizeof( struct proxier ) );
out->flexnbd = flexnbd;
FATAL_IF_NULL(s_downstream_address, "Listen address not specified");
NULLCHECK( s_downstream_address );
FATAL_UNLESS(
parse_ip_to_sockaddr( &out->listen_on.generic, s_downstream_address ),
"Couldn't parse downstream address '%s' (use 0 if "
"you want to bind all IPs)",
s_downstream_address
);
FATAL_IF_NULL( s_downstream_port, "Downstream port not specified" );
NULLCHECK( s_downstream_port );
parse_port( s_downstream_port, &out->listen_on.v4 );
FATAL_IF_NULL(s_upstream_address, "Upstream address not specified");
NULLCHECK( s_upstream_address );
FATAL_UNLESS(
parse_ip_to_sockaddr( &out->connect_to.generic, s_upstream_address ),
"Couldn't parse upstream address '%s'",
s_upstream_address
);
FATAL_IF_NULL( s_upstream_port, "Upstream port not specified" );
NULLCHECK( s_upstream_port );
parse_port( s_upstream_port, &out->connect_to.v4 );
if ( s_upstream_bind ) {
FATAL_IF_ZERO(
parse_ip_to_sockaddr( &out->connect_from.generic, s_upstream_bind ),
"Couldn't parse bind address '%s'",
s_upstream_bind
);
}
out->listen_fd = -1;
out->downstream_fd = -1;
out->upstream_fd = -1;
out->req_buf = xmalloc( NBD_MAX_SIZE );
out->rsp_buf = xmalloc( NBD_MAX_SIZE );
return out;
}
void proxy_destroy( struct proxier* proxy )
{
free( proxy->req_buf );
free( proxy->rsp_buf );
free( proxy );
}
/* Try to establish a connection to our upstream server. Return 1 on success,
* 0 on failure
*/
int proxy_connect_to_upstream( struct proxier* proxy )
{
int fd = socket_connect( &proxy->connect_to.generic, &proxy->connect_from.generic );
off64_t size = 0;
if ( -1 == fd ) {
return 0;
}
if( !socket_nbd_read_hello( fd, &size ) ) {
FATAL_IF_NEGATIVE(
close( fd ), SHOW_ERRNO( "FIXME: shouldn't be fatal" )
);
return 0;
}
if ( proxy->upstream_size == 0 ) {
info( "Size of upstream image is %"PRIu64" bytes", size );
} else if ( proxy->upstream_size != size ) {
warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size );
}
proxy->upstream_size = size;
proxy->upstream_fd = fd;
return 1;
}
void proxy_disconnect_from_upstream( struct proxier* proxy )
{
if ( -1 != proxy->upstream_fd ) {
debug(" Closing upstream connection" );
/* TODO: An NBD disconnect would be pleasant here */
FATAL_IF_NEGATIVE(
close( proxy->upstream_fd ),
SHOW_ERRNO( "FIXME: shouldn't be fatal" )
);
proxy->upstream_fd = -1;
}
}
/** Prepares a listening socket for the NBD server, binding etc. */
void proxy_open_listen_socket(struct proxier* params)
{
NULLCHECK( params );
params->listen_fd = socket(params->listen_on.family, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE(
params->listen_fd, SHOW_ERRNO( "Couldn't create listen socket" )
);
/* Allow us to restart quickly */
FATAL_IF_NEGATIVE(
sock_set_reuseaddr(params->listen_fd, 1),
SHOW_ERRNO( "Couldn't set SO_REUSEADDR" )
);
FATAL_IF_NEGATIVE(
sock_set_tcp_nodelay(params->listen_fd, 1),
SHOW_ERRNO( "Couldn't set TCP_NODELAY" )
);
FATAL_UNLESS_ZERO(
sock_try_bind( params->listen_fd, &params->listen_on.generic ),
SHOW_ERRNO( "Failed to bind to listening socket" )
);
/* We're only serving one client at a time, hence backlog of 1 */
FATAL_IF_NEGATIVE(
listen(params->listen_fd, 1),
SHOW_ERRNO( "Failed to listen on listening socket" )
);
info( "Now listening for incoming connections" );
}
/* Return 0 if we should keep running, 1 if an exit has been signaled. Pass it
* an fd_set to check, or set check_fds to NULL to have it perform its own.
* If we do the latter, then wait specifies how many seconds we'll wait for an
* exit signal to show up.
*/
int proxy_should_exit( struct proxier* params, fd_set *check_fds, int wait )
{
struct timeval tv = { wait, 0 };
fd_set internal_fds;
fd_set* fds = check_fds;
int signal_fd = flexnbd_signal_fd( params->flexnbd );
if ( NULL == check_fds ) {
fds = &internal_fds;
FD_ZERO( fds );
FD_SET( signal_fd, fds );
FATAL_IF_NEGATIVE(
sock_try_select(FD_SETSIZE, fds, NULL, NULL, &tv),
SHOW_ERRNO( "select() failed." )
);
}
if ( FD_ISSET( signal_fd, fds ) ) {
info( "Stop signal received" );
return 1;
}
return 0;
}
/* Try to get a request from downstream. If reading from downstream fails, then
* the session will be over. Returns 1 on success, 0 on failure.
*/
int proxy_get_request_from_downstream( struct proxier* proxy )
{
unsigned char* req_hdr_raw = proxy->req_buf;
unsigned char* req_data = proxy->req_buf + NBD_REQUEST_SIZE;
size_t req_buf_size;
struct nbd_request_raw* request_raw = (struct nbd_request_raw*) req_hdr_raw;
struct nbd_request* request = &(proxy->req_hdr);
if ( readloop( proxy->downstream_fd, req_hdr_raw, NBD_REQUEST_SIZE ) == -1 ) {
info( SHOW_ERRNO( "Failed to get request header" ) );
return 0;
}
nbd_r2h_request( request_raw, request );
req_buf_size = NBD_REQUEST_SIZE;
if ( request->type == REQUEST_DISCONNECT ) {
info( "Received disconnect request from client" );
return 0;
}
if ( request->type == REQUEST_READ ) {
if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) {
warn( "NBD read request size %"PRIu32" too large", request->len );
return 0;
}
}
if ( request->type == REQUEST_WRITE ) {
if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) {
warn( "NBD write request size %"PRIu32" too large", request->len );
return 0;
}
if ( readloop( proxy->downstream_fd, req_data, request->len ) == -1 ) {
warn( "Failed to get NBD write request data: %"PRIu32"b", request->len );
return 0;
}
req_buf_size += request->len;
}
debug(
"Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32,
request->type, request->from, request->len
);
proxy->req_buf_size = req_buf_size;
return 1;
}
/* Tries to send the request upstream and receive a response. If upstream breaks
* then we reconnect to it, and keep it up until we have a complete response
* back. Returns 1 on success, 0 on failure, -1 if exit is signalled.
*/
int proxy_run_request_upstream( struct proxier* proxy )
{
unsigned char* rsp_hdr_raw = proxy->rsp_buf;
unsigned char* rsp_data = proxy->rsp_buf + NBD_REPLY_SIZE;
struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw;
struct nbd_request* request = &(proxy->req_hdr);
struct nbd_reply* reply = &(proxy->rsp_hdr);
size_t rsp_buf_size;
if ( proxy->upstream_fd == -1 ) {
debug( "Connecting to upstream" );
if ( !proxy_connect_to_upstream( proxy ) ) {
debug( "Failed to connect to upstream" );
if ( proxy_should_exit( proxy, NULL, 5 ) ) {
return -1;
}
return 0;
}
debug( "Connected to upstream" );
}
if ( writeloop( proxy->upstream_fd, proxy->req_buf, proxy->req_buf_size ) == -1 ) {
warn( "Failed to send request to upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
if ( readloop( proxy->upstream_fd, rsp_hdr_raw, NBD_REPLY_SIZE ) == -1 ) {
debug( "Failed to get reply header from upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
nbd_r2h_reply( reply_raw, reply );
rsp_buf_size = NBD_REPLY_SIZE;
if ( reply->magic != REPLY_MAGIC ) {
debug( "Reply magic is incorrect" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
debug( "NBD reply received from upstream. Response code: %"PRIu32, reply->error );
if ( reply->error != 0 ) {
warn( "NBD error returned from upstream: %"PRIu32, reply->error );
}
if ( reply->error == 0 && request->type == REQUEST_READ ) {
if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) {
debug( "Failed to get reply data from upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
rsp_buf_size += request->len;
}
proxy->rsp_buf_size = rsp_buf_size;
return rsp_buf_size;
}
/* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */
int proxy_send_reply_downstream( struct proxier* proxy )
{
int result;
unsigned char* rsp_buf = proxy->rsp_buf;
debug(
"Writing header (%"PRIu32") + data (%"PRIu32") bytes downstream",
NBD_REPLY_SIZE, proxy->rsp_buf_size - NBD_REPLY_SIZE
);
result = writeloop( proxy->downstream_fd, rsp_buf, proxy->rsp_buf_size );
if ( result == -1 ) {
debug( "Failed to send reply downstream" );
return 0;
}
debug( "Reply sent" );
return 1;
}
/* Here, we negotiate an NBD session with downstream, based on the information
* we got on first connection to upstream. Then we wait for a request to come
* in from downstream, read it into memory, then send it to upstream. If
* upstream dies before responding, we reconnect to upstream and resend it.
* Once we've got a response, we write it directly to downstream, and wait for a
* new request. When downstream disconnects, or we receive an exit signal (which
* can be blocked, unfortunately), we are finished.
*
* This is the simplest possible nbd proxy I can think of. It may not be at all
* performant - let's see.
*/
void proxy_session( struct proxier* proxy )
{
int downstream_fd = proxy->downstream_fd;
uint64_t req_count = 0;
int result;
info( "Beginning proxy session on fd %i", downstream_fd );
if ( !socket_nbd_write_hello( downstream_fd, proxy->upstream_size ) ) {
debug( "Sending hello failed on fd %i, ending session", downstream_fd );
return;
}
while( proxy_get_request_from_downstream( proxy ) ) {
/* Don't start running the request if exit has been signalled */
if ( proxy_should_exit( proxy, NULL, 0 ) ) {
break;
}
do {
result = proxy_run_request_upstream( proxy );
} while ( result == 0 );
/* We have to exit, but don't know if the request was successfully
* proxied or not. We could add that knowledge, and attempt to send a
* reply downstream if it was, but I don't think it's worth it.
*/
if ( result == -1 ) {
break;
}
if ( !proxy_send_reply_downstream( proxy ) ) {
break;
}
proxy->req_buf_size = 0;
proxy->rsp_buf_size = 0;
req_count++;
};
info(
"Finished proxy session on fd %i after %"PRIu64" successful request(s)",
downstream_fd, req_count
);
return;
}
/** Accept an NBD socket connection, dispatch appropriately */
int proxy_accept( struct proxier* params )
{
NULLCHECK( params );
int client_fd;
int signal_fd = flexnbd_signal_fd( params->flexnbd );
fd_set fds;
int should_continue = 1;
union mysockaddr client_address;
socklen_t socklen = sizeof( client_address );
debug("accept loop starting");
FD_ZERO(&fds);
FD_SET(params->listen_fd, &fds);
FD_SET(signal_fd, &fds);
FATAL_IF_NEGATIVE(
sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL),
SHOW_ERRNO( "select() failed" )
);
if ( proxy_should_exit( params, &fds, 0) ) {
should_continue = 0;
}
if ( should_continue && FD_ISSET( params->listen_fd, &fds ) ) {
client_fd = accept( params->listen_fd, &client_address.generic, &socklen );
if ( sock_set_tcp_nodelay(client_fd, 1) == -1 ) {
warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) );
}
info( "Accepted nbd client socket fd %d", client_fd );
params->downstream_fd = client_fd;
proxy_session( params );
if ( close( params->downstream_fd ) == -1 ) {
warn( SHOW_ERRNO( "FIXME: close returned" ) );
}
params->downstream_fd = -1;
}
return should_continue;
}
void proxy_accept_loop( struct proxier* params )
{
NULLCHECK( params );
while( proxy_accept( params ) );
}
/** Closes sockets, frees memory and waits for all requests to clear */
void proxy_cleanup( struct proxier* params )
{
NULLCHECK( params );
info( "cleaning up" );
if ( -1 != params->listen_fd ) {
close( params->listen_fd );
}
debug( "Cleanup done" );
}
/** Full lifecycle of the proxier */
int do_proxy( struct proxier* params )
{
NULLCHECK( params );
error_set_handler( (cleanup_handler*) proxy_cleanup, params );
debug( "Ensuring upstream server is open" );
if ( !proxy_connect_to_upstream( params ) ) {
info( "Couldn't connect to upstream server during initialization" );
proxy_cleanup( params );
return 1;
};
proxy_open_listen_socket( params );
proxy_accept_loop( params );
proxy_cleanup( params );
return 0;
}

71
src/proxy.h Normal file
View File

@@ -0,0 +1,71 @@
#ifndef PROXY_H
#define PROXY_H
#include <sys/types.h>
#include <unistd.h>
#include "flexnbd.h"
#include "parse.h"
#include "nbdtypes.h"
#include "self_pipe.h"
struct proxier {
/* The flexnbd wrapper this proxier is attached to */
struct flexnbd* flexnbd;
/** address/port to bind to */
union mysockaddr listen_on;
/** address/port to connect to */
union mysockaddr connect_to;
/** address to bind to when making outgoing connections */
union mysockaddr connect_from;
/* The socket we listen() on and accept() against */
int listen_fd;
/* The socket returned by accept() that we receive requests from and send
* responses to
*/
int downstream_fd;
/* The socket returned by connect() that we send requests to and receive
* responses from
*/
int upstream_fd;
/* This is the size we advertise to the downstream server */
off64_t upstream_size;
/* Scratch space for the current NBD request from downstream */
unsigned char* req_buf;
/* Number of bytes currently sat in req_buf */
size_t req_buf_size;
/* We transform the raw request header into here */
struct nbd_request req_hdr;
/* Scratch space for the current NBD reply from upstream */
unsigned char* rsp_buf;
/* Number of bytes currently sat in rsp_buf */
size_t rsp_buf_size;
/* We transform the raw reply header into here */
struct nbd_reply rsp_hdr;
};
struct proxier* proxy_create(
struct flexnbd * flexnbd,
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind );
int do_proxy( struct proxier* proxy );
void proxy_destroy( struct proxier* proxy );
#endif

View File

@@ -0,0 +1,13 @@
{
avoid_glibc_bug_do_lookup
Memcheck:Addr8
fun:do_lookup_x
obj:*
fun:_dl_lookup_symbol_x
}
{
avoid_glibc_bug_check_match
Memcheck:Addr8
fun:check_match.12149
}

View File

@@ -21,6 +21,13 @@ class Environment
@fake_pid = nil
end
def proxy1(port=@port2)
@nbd1.proxy(@ip, port)
end
def proxy2(port=@port1)
@nbd2.proxy(@ip, port)
end
def serve1(*acl)
@nbd1.serve(@filename1, *acl)

View File

@@ -21,7 +21,7 @@ class ValgrindExecutor
attr_reader :pid
def run( cmd )
@pid = fork do exec "valgrind --track-origins=yes #{cmd}" end
@pid = fork do exec "valgrind --suppressions=custom.supp --track-origins=yes #{cmd}" end
end
end # class ValgrindExecutor
@@ -131,7 +131,7 @@ class ValgrindKillingExecutor
def run( cmd )
@io_r, io_w = IO.pipe
@pid = fork do exec( "valgrind --xml=yes --xml-fd=#{io_w.fileno} " + cmd ) end
@pid = fork do exec( "valgrind --suppressions=custom.supp --xml=yes --xml-fd=#{io_w.fileno} " + cmd ) end
launch_watch_thread( @pid, @io_r )
@pid
end
@@ -241,6 +241,15 @@ module FlexNBD
"#{acl.join(' ')}"
end
def proxy_cmd( connect_ip, connect_port )
"#{bin} proxy "\
"--addr #{ip} "\
"--port #{port} "\
"--conn-addr #{connect_ip} "\
"--conn-port #{connect_port} "\
"#{@debug}"
end
def read_cmd( offset, length )
"#{bin} read "\
@@ -319,6 +328,7 @@ module FlexNBD
sleep 0.1
end
start_wait_thread( @pid )
at_exit { kill }
end
@@ -336,6 +346,31 @@ module FlexNBD
run_serve_cmd( listen_cmd( file, acl ) )
end
def tcp_server_open?
# raises if the other side doesn't accept()
sock = TCPSocket.new(ip, port) rescue nil
success = !!sock
( sock.close rescue nil) if sock
success
end
def proxy( connect_ip, connect_port )
cmd = proxy_cmd( connect_ip, connect_port )
debug( cmd )
@pid = @executor.run( cmd )
until tcp_server_open?
pid, status = Process.wait2(@pid, Process::WNOHANG)
raise "server did not start (#{cmd})" if pid
sleep 0.1
end
start_wait_thread( @pid )
at_exit { kill }
end
def start_wait_thread( pid )
@wait_thread = Thread.start do
@@ -521,3 +556,4 @@ module FlexNBD
end
end

View File

@@ -0,0 +1,199 @@
require 'test/unit'
require 'environment'
require 'flexnbd/fake_source'
require 'flexnbd/fake_dest'
class TestProxyMode < Test::Unit::TestCase
def setup
super
@env = Environment.new
@env.writefile1( "0" * 16 )
end
def teardown
@env.cleanup
super
end
def with_proxied_client( override_size = nil )
@env.serve1 unless @server_up
@env.proxy2 unless @proxy_up
@env.nbd2.can_die(0)
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
begin
result = client.read_hello
assert_equal "NBDMAGIC", result[:magic]
assert_equal override_size || @env.file1.size, result[:size]
yield client
ensure
client.close rescue nil
end
end
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
assert_raises(RuntimeError) { @env.proxy1 }
end
def test_read_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write_read_request(offset, 4096, "myhandle")
rsp = client.read_response
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
orig_data = @env.file1.read(offset, 4096)
data = client.read_raw(4096)
assert_equal 4096, orig_data.size
assert_equal 4096, data.size
assert_equal( orig_data, data, "Returned data does not match" )
end
end
end
def test_write_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write(offset, "\xFF" * 4096)
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
data = @env.file1.read(offset, 4096)
assert_equal( ( "\xFF" * 4096 ), data, "Data not written" )
end
end
end
def make_fake_server
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
@server_up = true
# We return a thread here because accept() and connect() both block for us
Thread.new do
sc = server.accept # just tell the supervisor we're up
sc.write_hello
[ server, sc ]
end
end
def test_read_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write_read_request( 0, 4096 )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
sc2.write_data( "\xFF" * 4096 )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
data = client.read_raw( 4096 )
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
sc2.close
server.close
end
end
def test_write_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write( 0, ( "\xFF" * 4096 ) )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
data1 = sc1.read_data( 4096 )
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
data2 = sc2.read_data( 4096 )
assert_equal data1, data2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
sc2.close
server.close
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |client|
c2 = nil
assert_raises(Timeout::Error) do
timeout(1) do
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
c2.read_hello
end
end
c2.close rescue nil if c2
end
end
end