Automated merge with file:///home/lupine/Development/bigv-repos/flexnbd-c-sockutil

This commit is contained in:
nick
2013-02-15 16:53:18 +00:00
23 changed files with 1401 additions and 90 deletions

View File

@@ -24,7 +24,7 @@ COMMANDS
serve
~~~~~
$ flexnbd serve --addr <ADDR> --port <PORT> --file <FILE>
$ flexnbd serve --addr <ADDR> --port <PORT> --file <FILE>
[--sock <SOCK>] [--default-deny] [global option]* [acl entry]*
Serve a file. If any ACL entries are given (which should be IP
@@ -54,11 +54,11 @@ Options
How to interpret an empty ACL. If --default-deny is given, an
empty ACL will let no clients connect. If it is not given, an
empty ACL will let any client connect.
listen
~~~~~~
$ flexnbd listen --addr <ADDR> --port <PORT> --file <FILE>
$ flexnbd listen --addr <ADDR> --port <PORT> --file <FILE>
[--sock <SOCK>] [--default-deny] [global option]* [acl entry]*
Listen for an inbound migration, and quit with a status of 0 on
@@ -85,10 +85,59 @@ Options
^^^^^^^
As for 'serve'.
proxy
~~~~~
$ flexnbd proxy --addr <ADDR> --port <PORT>
--conn-addr <ADDR> --conn-port <PORT> [--bind <ADDR>] [global option]*
Proxy requests from an NBD client to an NBD server, resiliently. Only one
client can be connected (to the address specified by --addr and --port) at a
time, and ACLs cannot be applied to the client, as they can be to clients
connecting directly to a flexnbd in serve mode.
On starting up, the proxy will attempt to connect to the server specified by
--conn-addr and --conn-port (from the address specified by --bind, if given). If
it fails, then the process will die with an error exit status.
Assuming a successful connection to the `upstream` server is made, the proxy
will then start listening on the address specified by --addr and --port, waiting
for `downstream` to connect to it (this will be your NBD client). The client
will be given the same hello message as the proxy was given by the server.
When connected, any request the client makes will be read by the proxy and sent
to the server. If the server goes away for any reason, the proxy will remember
the request and regularly (~ every 5 seconds) try to reconnect to the server.
Upon reconnection, the request is sent and a reply is waited for. When a reply
is received, it is sent back to the client.
When the client disconnects, cleanly or otherwise, the proxy goes back to
waiting for a new client to connect. The connection to the server is maintained
at that point, in case it is needed again.
Only one request may be in-flight at a time under the current architecture; that
doesn't seem to slow things down much relative to alternative options, but may
be changed in the future if it becomes an issue.
Options
^^^^^^^
*--addr, -l ADDR*:
The address to listen on. Required.
*--port, -p PORT*:
The port to listen on. Required.
*--conn-addr, -C ADDR*:
The address of the NBD server to connect to. Required.
*--conn-port, -P PORT*:
The port of the NBD server to connect to. Required.
mirror
~~~~~~
$ flexnbd mirror --addr <ADDR> --port <PORT> --sock SOCK
$ flexnbd mirror --addr <ADDR> --port <PORT> --sock SOCK
[--unlink] [--bind <BIND-ADDR>] [global option]*
Start a migration from the server with control socket SOCK to the server
@@ -128,8 +177,8 @@ Options
*--sock, -s SOCK*:
The control socket of the local server to migrate from. Required.
*--unlink, -u*:
Unlink the served file from the local filesystem after successfully
*--unlink, -u*:
Unlink the served file from the local filesystem after successfully
mirroring.
*--bind, -b BIND-ADDR*:
@@ -190,7 +239,7 @@ value. Currently reported values are:
read
~~~~
$ flexnbd read --addr <ADDR> --port <PORT> --from <OFFSET>
$ flexnbd read --addr <ADDR> --port <PORT> --from <OFFSET>
--size <SIZE> [--bind BIND-ADDR] [global option]*
Connect to the server at ADDR:PORT, and read SIZE bytes starting at
@@ -220,7 +269,7 @@ Options
write
~~~~~
$ cat ... | flexnbd write --addr <ADDR> --port <PORT> --from <OFFSET>
$ cat ... | flexnbd write --addr <ADDR> --port <PORT> --from <OFFSET>
--size <SIZE> [--bind BIND-ADDR] [global option]*
Connect to the server at ADDR:PORT, and write SIZE bytes from STDIN
@@ -299,9 +348,9 @@ the log line.
*SOURCEFILE:SOURCELINE*:
Identifies where in the source code this log line can be found.
*MSG*:
*MSG*:
A short message describing what's happening, how it's being done, or
if you're very lucky *why* it's going on.
if you're very lucky *why* it's going on.
EXAMPLES
--------
@@ -346,11 +395,11 @@ To migrate, we need to provide a destination file of the right size.
Now we check the status of each server, to check that they are both in
the right state:
$ flexnbd status --sock /tmp/flex-source.sock
pid=9648 is_mirroring=false has_control=true
$ flexnbd status --sock /tmp/flex-dest.sock
pid=9651 is_mirroring=false has_control=false
pid=9651 is_mirroring=false has_control=false
$
With this knowledge in hand, we can start the migration:
@@ -366,6 +415,40 @@ Note that because the file is so small in this case, we see the source
server quit soon after we start the migration, and the destination
exited at roughly the same time.
Proxying
~~~~~~~~
The main point of the proxy mode is to allow clients that would otherwise break
when the NBD server goes away (during a migration, for instance) to see a
persistent TCP connection throughout the process, instead of needing its own
reconnection logic.
For maximum reliability, the proxy process would be run on the same machine as
the actual NBD client; an example might look like:
nbd-server-1$ flexnbd serve -l 10.0.0.1 -p 4777 myfile [...]
nbd-client-1$ flexnbd proxy -l 127.0.0.1 -p 4777 -C 10.0.0.1 -P 4777
nbd-client-1$ nbd-client -c 127.0.0.1 4777 /dev/nbd0
nbd-server-2$ flexnbd listen -l 10.0.0.2 -p 4777 -f myfile [...]
nbd-server-1$ flexnbd mirror --addr 10.0.0.2 -p 4777 [...]
Upon completing the migration, the mirroring and listening flexnbd servers will
both exit. With the proxy mediating requests, this does not break the TCP
connection that nbd-client is holding open. If no requests are in-flight, it
will not notice anything at all; if requests are in-flight, then the reply will
take longer than usual to be returned.
When flexnbd is restarted in serve mode on the second server:
nbd-server-2$ flexnbd serve -l 10.0.0.1 -p 4777 -f myfile [...]
The proxy notices and reconnects, fulfiling any request it has in its buffer.
The data in myfile has been moved between physical servers without the nbd
client process having to be disturbed at all.
BUGS
----
@@ -374,9 +457,9 @@ Should be reported to alex@bytemark.co.uk.
AUTHOR
------
Written by Alex Young <alex@bytemark.co.uk>.
Original concept and core code by Matthew Bloch
<matthew@bytemark.co.uk>.
Written by Alex Young <alex@bytemark.co.uk>.
Original concept and core code by Matthew Bloch <matthew@bytemark.co.uk>.
Some additions by Nick Thomas <nick@bytemark.co.uk>
COPYING
-------
@@ -384,3 +467,4 @@ COPYING
Copyright (c) 2012 Bytemark Hosting Ltd. Free use of this software is
granted under the terms of the GNU General Public License version 3 or
later.

View File

@@ -84,7 +84,7 @@ namespace "test" do
desc "Run NBD test scenarios"
task 'scenarios' => 'flexnbd' do
sh "cd tests/acceptance; ruby nbd_scenarios"
sh "cd tests/acceptance; ruby nbd_scenarios -v"
end
end
@@ -125,6 +125,7 @@ file check("client") =>
build/parse.o
build/client.o
build/serve.o
build/proxy.o
build/acl.o
build/ioutil.o
build/mbox.o
@@ -160,6 +161,7 @@ file check("serve") =>
build/client.o
build/flexthread.o
build/serve.o
build/proxy.o
build/flexnbd.o
build/mirror.o
build/status.o
@@ -177,6 +179,7 @@ file check("readwrite") =>
build/client.o
build/self_pipe.o
build/serve.o
build/proxy.o
build/parse.o
build/acl.o
build/flexthread.o
@@ -210,7 +213,8 @@ file check("flexnbd") =>
build/nbdtypes.o
build/readwrite.o
build/mirror.o
build/serve.o} do |t|
build/serve.o
build/proxy.o} do |t|
gcc_link t.name, t.prerequisites + [LIBCHECK]
end
@@ -225,7 +229,7 @@ end
tgt = "build/tests/check_#{m}.o"
maybe_obj_name = "build/#{m}.o"
# Take it out in case we're testing util.o or ioutil.o
deps = ["build/ioutil.o", "build/util.o"] - [maybe_obj_name]
deps = ["build/ioutil.o", "build/util.o", "build/sockutil.o"] - [maybe_obj_name]
# Add it back in if it's something we need to compile
deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name )

View File

@@ -1,7 +1,8 @@
#include "client.h"
#include "serve.h"
#include "util.h"
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include "bitset.h"
#include "nbdtypes.h"
#include "self_pipe.h"
@@ -189,7 +190,7 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
FD_ZERO(&fds);
FD_SET(client->socket, &fds);
self_pipe_fd_set( client->stop_signal, &fds );
fd_count = select(FD_SETSIZE, &fds, NULL, NULL, ptv);
fd_count = sock_try_select(FD_SETSIZE, &fds, NULL, NULL, ptv);
if ( fd_count == 0 ) {
/* This "can't ever happen" */
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
@@ -344,30 +345,43 @@ void client_flush( struct client * client, size_t len )
int client_request_needs_reply( struct client * client,
struct nbd_request request )
{
debug("request type %d", request.type);
/* The client is stupid, but don't take down the whole server as a result.
* We send a reply before disconnecting so that at least some indication of
* the problem is visible, and so proxies don't retry the same (bad) request
* forever.
*/
if (request.magic != REQUEST_MAGIC) {
fatal("Bad magic %08x", request.magic);
warn("Bad magic %08x from client", request.magic);
client_write_reply( client, &request, EBADMSG );
client->disconnect = 1; // no need to flush
return 0;
}
debug(
"request type=%"PRIu32", from=%"PRIu64", len=%"PRIu32,
request.type, request.from, request.len
);
/* check it's not out of range */
if ( request.from+request.len > client->serve->size) {
warn("write request %"PRIu64"+%"PRIu32" out of range",
request.from, request.len
);
if ( request.type == REQUEST_WRITE ) {
client_flush( client, request.len );
}
client_write_reply( client, &request, EPERM ); /* TODO: Change to ERANGE ? */
client->disconnect = 0;
return 0;
}
switch (request.type)
{
case REQUEST_READ:
break;
case REQUEST_WRITE:
/* check it's not out of range */
if ( request.from+request.len > client->serve->size) {
warn("write request %d+%d out of range",
request.from,
request.len
);
client_write_reply( client, &request, 1 );
client_flush( client, request.len );
client->disconnect = 0;
return 0;
}
break;
case REQUEST_DISCONNECT:
debug("request disconnect");
client->disconnect = 1;

View File

@@ -129,6 +129,26 @@ struct flexnbd * flexnbd_create_listening(
}
struct flexnbd * flexnbd_create_proxying(
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind
)
{
struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) );
flexnbd->proxy = proxy_create(
flexnbd,
s_downstream_address,
s_downstream_port,
s_upstream_address,
s_upstream_port,
s_upstream_bind);
flexnbd_create_shared( flexnbd, NULL );
return flexnbd;
}
void flexnbd_spawn_control(struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
@@ -181,7 +201,6 @@ struct server * flexnbd_server( struct flexnbd * flexnbd )
return flexnbd->serve;
}
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl )
{
NULLCHECK( flexnbd );
@@ -255,3 +274,14 @@ int flexnbd_serve( struct flexnbd * flexnbd )
return success;
}
int flexnbd_proxy( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
int success;
success = do_proxy( flexnbd->proxy );
debug("do_proxy success is %d", success );
return success;
}

View File

@@ -4,6 +4,7 @@
#include "acl.h"
#include "mirror.h"
#include "serve.h"
#include "proxy.h"
#include "self_pipe.h"
#include "mbox.h"
#include "control.h"
@@ -11,11 +12,14 @@
/* Carries the "globals". */
struct flexnbd {
/* We always have a serve pointer, but it should never be
* dereferenced outside a flexnbd_switch_lock/unlock pair.
/* Our serve pointer should never be dereferenced outside a
* flexnbd_switch_lock/unlock pair.
*/
struct server * serve;
/* In proxy mode, this is filled instead of serve, above */
struct proxier * proxy;
/* We only have a control object if a control socket name was
* passed on the command line.
*/
@@ -46,6 +50,14 @@ struct flexnbd * flexnbd_create_listening(
int acl_entries,
char** s_acl_entries );
struct flexnbd * flexnbd_create_proxying(
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind
);
void flexnbd_destroy( struct flexnbd * );
enum mirror_state;
enum mirror_state flexnbd_get_mirror_state( struct flexnbd * );
@@ -55,7 +67,9 @@ int flexnbd_signal_fd( struct flexnbd * flexnbd );
int flexnbd_serve( struct flexnbd * flexnbd );
int flexnbd_proxy( struct flexnbd * flexnbd );
struct server * flexnbd_server( struct flexnbd * flexnbd );
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl );
struct status * flexnbd_status_create( struct flexnbd * flexnbd );
#endif

View File

@@ -56,6 +56,29 @@ static char listen_help_text[] =
VERBOSE_LINE
QUIET_LINE;
static struct option proxy_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_CONNECT_ADDR,
GETOPT_CONNECT_PORT,
GETOPT_BIND,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char proxy_short_options[] = "hl:p:C:P:b:" SOPT_QUIET SOPT_VERBOSE;
static char proxy_help_text[] =
"Usage: flexnbd " CMD_PROXY " <options>\n\n"
"Resiliently proxy an NBD connection between client and server\n\n"
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address we will bind to as a proxy.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port we will bind to as a proxy.\n"
"\t--" OPT_CONNECT_ADDR ",-C <ADDR>\tAddress of the proxied server.\n"
"\t--" OPT_CONNECT_PORT ",-P <PORT>\tPort of the proxied server.\n"
"\t--" OPT_BIND ",-b <ADDR>\tThe address we connect from, as a proxy.\n"
QUIET_LINE
VERBOSE_LINE;
static struct option read_options[] = {
GETOPT_HELP,
@@ -173,10 +196,13 @@ char help_help_text_arr[] =
"Usage: flexnbd <cmd> [cmd options]\n\n"
"Commands:\n"
"\tflexnbd serve\n"
"\tflexnbd listen\n"
"\tflexnbd proxy\n"
"\tflexnbd read\n"
"\tflexnbd write\n"
"\tflexnbd acl\n"
"\tflexnbd mirror\n"
"\tflexnbd break\n"
"\tflexnbd status\n"
"\tflexnbd help\n\n"
"See flexnbd help <cmd> for further info\n";
@@ -390,6 +416,46 @@ void read_break_param( int c, char **sock )
}
void read_proxy_param(
int c,
char **downstream_addr,
char **downstream_port,
char **upstream_addr,
char **upstream_port,
char **bind_addr )
{
switch( c ) {
case 'h' :
fprintf( stdout, "%s\n", proxy_help_text );
exit( 0 );
break;
case 'l':
*downstream_addr = optarg;
break;
case 'p':
*downstream_port = optarg;
break;
case 'C':
*upstream_addr = optarg;
break;
case 'P':
*upstream_port = optarg;
break;
case 'b':
*bind_addr = optarg;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( proxy_help_text );
break;
}
}
void read_status_param( int c, char **sock )
{
read_sock_param( c, sock, status_help_text );
@@ -733,6 +799,55 @@ int mode_status( int argc, char *argv[] )
return 0;
}
int mode_proxy( int argc, char *argv[] )
{
int c;
struct flexnbd * flexnbd;
char *downstream_addr = NULL;
char *downstream_port = NULL;
char *upstream_addr = NULL;
char *upstream_port = NULL;
char *bind_addr = NULL;
int success;
while (1) {
c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL );
if ( -1 == c ) { break; }
read_proxy_param( c,
&downstream_addr,
&downstream_port,
&upstream_addr,
&upstream_port,
&bind_addr
);
}
if ( NULL == downstream_addr || NULL == downstream_port ){
fprintf( stderr, "both --addr and --port are required.\n" );
exit_err( proxy_help_text );
} else if ( NULL == upstream_addr || NULL == upstream_port ){
fprintf( stderr, "both --conn-addr and --conn-port are required.\n" );
exit_err( proxy_help_text );
}
flexnbd = flexnbd_create_proxying(
downstream_addr,
downstream_port,
upstream_addr,
upstream_port,
bind_addr
);
info(
"Proxying between %s %s (downstream) and %s %s (upstream)",
downstream_addr, downstream_port, upstream_addr, upstream_port
);
success = flexnbd_proxy( flexnbd );
flexnbd_destroy( flexnbd );
return success ? 0 : 1;
}
int mode_help( int argc, char *argv[] )
{
@@ -757,6 +872,8 @@ int mode_help( int argc, char *argv[] )
help_text = mirror_help_text;
} else if ( IS_CMD( CMD_STATUS, cmd ) ) {
help_text = status_help_text;
} else if ( IS_CMD( CMD_PROXY, cmd ) ) {
help_text = proxy_help_text;
} else { exit_err( help_help_text ); }
}
@@ -790,6 +907,8 @@ void mode(char* mode, int argc, char **argv)
}
else if ( IS_CMD( CMD_STATUS, mode ) ) {
mode_status( argc, argv );
} else if ( IS_CMD( CMD_PROXY, mode ) ) {
mode_proxy( argc, argv );
}
else if ( IS_CMD( CMD_HELP, mode ) ) {
mode_help( argc-1, argv+1 );
@@ -801,4 +920,3 @@ void mode(char* mode, int argc, char **argv)
exit(0);
}

View File

@@ -20,9 +20,12 @@ void mode(char* mode, int argc, char **argv);
#define OPT_SIZE "size"
#define OPT_DENY "default-deny"
#define OPT_UNLINK "unlink"
#define OPT_CONNECT_ADDR "conn-addr"
#define OPT_CONNECT_PORT "conn-port"
#define CMD_SERVE "serve"
#define CMD_LISTEN "listen"
#define CMD_PROXY "proxy"
#define CMD_READ "read"
#define CMD_WRITE "write"
#define CMD_ACL "acl"
@@ -40,7 +43,6 @@ void mode(char* mode, int argc, char **argv);
#define GETOPT_HELP GETOPT_FLAG( OPT_HELP, 'h' )
#define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' )
#define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' )
#define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' )
#define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' )
@@ -49,6 +51,8 @@ void mode(char* mode, int argc, char **argv);
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' )
#define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' )
#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' )
#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' )
#define OPT_VERBOSE "verbose"
#define SOPT_VERBOSE "v"

497
src/proxy.c Normal file
View File

@@ -0,0 +1,497 @@
#include "proxy.h"
#include "readwrite.h"
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include <errno.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
struct proxier* proxy_create(
struct flexnbd* flexnbd,
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind )
{
NULLCHECK( flexnbd );
struct proxier* out;
out = xmalloc( sizeof( struct proxier ) );
out->flexnbd = flexnbd;
FATAL_IF_NULL(s_downstream_address, "Listen address not specified");
NULLCHECK( s_downstream_address );
FATAL_UNLESS(
parse_ip_to_sockaddr( &out->listen_on.generic, s_downstream_address ),
"Couldn't parse downstream address '%s' (use 0 if "
"you want to bind all IPs)",
s_downstream_address
);
FATAL_IF_NULL( s_downstream_port, "Downstream port not specified" );
NULLCHECK( s_downstream_port );
parse_port( s_downstream_port, &out->listen_on.v4 );
FATAL_IF_NULL(s_upstream_address, "Upstream address not specified");
NULLCHECK( s_upstream_address );
FATAL_UNLESS(
parse_ip_to_sockaddr( &out->connect_to.generic, s_upstream_address ),
"Couldn't parse upstream address '%s'",
s_upstream_address
);
FATAL_IF_NULL( s_upstream_port, "Upstream port not specified" );
NULLCHECK( s_upstream_port );
parse_port( s_upstream_port, &out->connect_to.v4 );
if ( s_upstream_bind ) {
FATAL_IF_ZERO(
parse_ip_to_sockaddr( &out->connect_from.generic, s_upstream_bind ),
"Couldn't parse bind address '%s'",
s_upstream_bind
);
}
out->listen_fd = -1;
out->downstream_fd = -1;
out->upstream_fd = -1;
out->req_buf = xmalloc( NBD_MAX_SIZE );
out->rsp_buf = xmalloc( NBD_MAX_SIZE );
return out;
}
void proxy_destroy( struct proxier* proxy )
{
free( proxy->req_buf );
free( proxy->rsp_buf );
free( proxy );
}
/* Try to establish a connection to our upstream server. Return 1 on success,
* 0 on failure
*/
int proxy_connect_to_upstream( struct proxier* proxy )
{
int fd = socket_connect( &proxy->connect_to.generic, &proxy->connect_from.generic );
off64_t size = 0;
if ( -1 == fd ) {
return 0;
}
if( !socket_nbd_read_hello( fd, &size ) ) {
FATAL_IF_NEGATIVE(
close( fd ), SHOW_ERRNO( "FIXME: shouldn't be fatal" )
);
return 0;
}
if ( proxy->upstream_size == 0 ) {
info( "Size of upstream image is %"PRIu64" bytes", size );
} else if ( proxy->upstream_size != size ) {
warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size );
}
proxy->upstream_size = size;
proxy->upstream_fd = fd;
return 1;
}
void proxy_disconnect_from_upstream( struct proxier* proxy )
{
if ( -1 != proxy->upstream_fd ) {
debug(" Closing upstream connection" );
/* TODO: An NBD disconnect would be pleasant here */
FATAL_IF_NEGATIVE(
close( proxy->upstream_fd ),
SHOW_ERRNO( "FIXME: shouldn't be fatal" )
);
proxy->upstream_fd = -1;
}
}
/** Prepares a listening socket for the NBD server, binding etc. */
void proxy_open_listen_socket(struct proxier* params)
{
NULLCHECK( params );
params->listen_fd = socket(params->listen_on.family, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE(
params->listen_fd, SHOW_ERRNO( "Couldn't create listen socket" )
);
/* Allow us to restart quickly */
FATAL_IF_NEGATIVE(
sock_set_reuseaddr(params->listen_fd, 1),
SHOW_ERRNO( "Couldn't set SO_REUSEADDR" )
);
FATAL_IF_NEGATIVE(
sock_set_tcp_nodelay(params->listen_fd, 1),
SHOW_ERRNO( "Couldn't set TCP_NODELAY" )
);
FATAL_UNLESS_ZERO(
sock_try_bind( params->listen_fd, &params->listen_on.generic ),
SHOW_ERRNO( "Failed to bind to listening socket" )
);
/* We're only serving one client at a time, hence backlog of 1 */
FATAL_IF_NEGATIVE(
listen(params->listen_fd, 1),
SHOW_ERRNO( "Failed to listen on listening socket" )
);
info( "Now listening for incoming connections" );
}
/* Return 0 if we should keep running, 1 if an exit has been signaled. Pass it
* an fd_set to check, or set check_fds to NULL to have it perform its own.
* If we do the latter, then wait specifies how many seconds we'll wait for an
* exit signal to show up.
*/
int proxy_should_exit( struct proxier* params, fd_set *check_fds, int wait )
{
struct timeval tv = { wait, 0 };
fd_set internal_fds;
fd_set* fds = check_fds;
int signal_fd = flexnbd_signal_fd( params->flexnbd );
if ( NULL == check_fds ) {
fds = &internal_fds;
FD_ZERO( fds );
FD_SET( signal_fd, fds );
FATAL_IF_NEGATIVE(
sock_try_select(FD_SETSIZE, fds, NULL, NULL, &tv),
SHOW_ERRNO( "select() failed." )
);
}
if ( FD_ISSET( signal_fd, fds ) ) {
info( "Stop signal received" );
return 1;
}
return 0;
}
/* Try to get a request from downstream. If reading from downstream fails, then
* the session will be over. Returns 1 on success, 0 on failure.
*/
int proxy_get_request_from_downstream( struct proxier* proxy )
{
unsigned char* req_hdr_raw = proxy->req_buf;
unsigned char* req_data = proxy->req_buf + NBD_REQUEST_SIZE;
size_t req_buf_size;
struct nbd_request_raw* request_raw = (struct nbd_request_raw*) req_hdr_raw;
struct nbd_request* request = &(proxy->req_hdr);
if ( readloop( proxy->downstream_fd, req_hdr_raw, NBD_REQUEST_SIZE ) == -1 ) {
info( SHOW_ERRNO( "Failed to get request header" ) );
return 0;
}
nbd_r2h_request( request_raw, request );
req_buf_size = NBD_REQUEST_SIZE;
if ( request->type == REQUEST_DISCONNECT ) {
info( "Received disconnect request from client" );
return 0;
}
if ( request->type == REQUEST_READ ) {
if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) {
warn( "NBD read request size %"PRIu32" too large", request->len );
return 0;
}
}
if ( request->type == REQUEST_WRITE ) {
if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) {
warn( "NBD write request size %"PRIu32" too large", request->len );
return 0;
}
if ( readloop( proxy->downstream_fd, req_data, request->len ) == -1 ) {
warn( "Failed to get NBD write request data: %"PRIu32"b", request->len );
return 0;
}
req_buf_size += request->len;
}
debug(
"Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32,
request->type, request->from, request->len
);
proxy->req_buf_size = req_buf_size;
return 1;
}
/* Tries to send the request upstream and receive a response. If upstream breaks
* then we reconnect to it, and keep it up until we have a complete response
* back. Returns 1 on success, 0 on failure, -1 if exit is signalled.
*/
int proxy_run_request_upstream( struct proxier* proxy )
{
unsigned char* rsp_hdr_raw = proxy->rsp_buf;
unsigned char* rsp_data = proxy->rsp_buf + NBD_REPLY_SIZE;
struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw;
struct nbd_request* request = &(proxy->req_hdr);
struct nbd_reply* reply = &(proxy->rsp_hdr);
size_t rsp_buf_size;
if ( proxy->upstream_fd == -1 ) {
debug( "Connecting to upstream" );
if ( !proxy_connect_to_upstream( proxy ) ) {
debug( "Failed to connect to upstream" );
if ( proxy_should_exit( proxy, NULL, 5 ) ) {
return -1;
}
return 0;
}
debug( "Connected to upstream" );
}
if ( writeloop( proxy->upstream_fd, proxy->req_buf, proxy->req_buf_size ) == -1 ) {
warn( "Failed to send request to upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
if ( readloop( proxy->upstream_fd, rsp_hdr_raw, NBD_REPLY_SIZE ) == -1 ) {
debug( "Failed to get reply header from upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
nbd_r2h_reply( reply_raw, reply );
rsp_buf_size = NBD_REPLY_SIZE;
if ( reply->magic != REPLY_MAGIC ) {
debug( "Reply magic is incorrect" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
debug( "NBD reply received from upstream. Response code: %"PRIu32, reply->error );
if ( reply->error != 0 ) {
warn( "NBD error returned from upstream: %"PRIu32, reply->error );
}
if ( reply->error == 0 && request->type == REQUEST_READ ) {
if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) {
debug( "Failed to get reply data from upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
rsp_buf_size += request->len;
}
proxy->rsp_buf_size = rsp_buf_size;
return rsp_buf_size;
}
/* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */
int proxy_send_reply_downstream( struct proxier* proxy )
{
int result;
unsigned char* rsp_buf = proxy->rsp_buf;
debug(
"Writing header (%"PRIu32") + data (%"PRIu32") bytes downstream",
NBD_REPLY_SIZE, proxy->rsp_buf_size - NBD_REPLY_SIZE
);
result = writeloop( proxy->downstream_fd, rsp_buf, proxy->rsp_buf_size );
if ( result == -1 ) {
debug( "Failed to send reply downstream" );
return 0;
}
debug( "Reply sent" );
return 1;
}
/* Here, we negotiate an NBD session with downstream, based on the information
* we got on first connection to upstream. Then we wait for a request to come
* in from downstream, read it into memory, then send it to upstream. If
* upstream dies before responding, we reconnect to upstream and resend it.
* Once we've got a response, we write it directly to downstream, and wait for a
* new request. When downstream disconnects, or we receive an exit signal (which
* can be blocked, unfortunately), we are finished.
*
* This is the simplest possible nbd proxy I can think of. It may not be at all
* performant - let's see.
*/
void proxy_session( struct proxier* proxy )
{
int downstream_fd = proxy->downstream_fd;
uint64_t req_count = 0;
int result;
info( "Beginning proxy session on fd %i", downstream_fd );
if ( !socket_nbd_write_hello( downstream_fd, proxy->upstream_size ) ) {
debug( "Sending hello failed on fd %i, ending session", downstream_fd );
return;
}
while( proxy_get_request_from_downstream( proxy ) ) {
/* Don't start running the request if exit has been signalled */
if ( proxy_should_exit( proxy, NULL, 0 ) ) {
break;
}
do {
result = proxy_run_request_upstream( proxy );
} while ( result == 0 );
/* We have to exit, but don't know if the request was successfully
* proxied or not. We could add that knowledge, and attempt to send a
* reply downstream if it was, but I don't think it's worth it.
*/
if ( result == -1 ) {
break;
}
if ( !proxy_send_reply_downstream( proxy ) ) {
break;
}
proxy->req_buf_size = 0;
proxy->rsp_buf_size = 0;
req_count++;
};
info(
"Finished proxy session on fd %i after %"PRIu64" successful request(s)",
downstream_fd, req_count
);
return;
}
/** Accept an NBD socket connection, dispatch appropriately */
int proxy_accept( struct proxier* params )
{
NULLCHECK( params );
int client_fd;
int signal_fd = flexnbd_signal_fd( params->flexnbd );
fd_set fds;
int should_continue = 1;
union mysockaddr client_address;
socklen_t socklen = sizeof( client_address );
debug("accept loop starting");
FD_ZERO(&fds);
FD_SET(params->listen_fd, &fds);
FD_SET(signal_fd, &fds);
FATAL_IF_NEGATIVE(
sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL),
SHOW_ERRNO( "select() failed" )
);
if ( proxy_should_exit( params, &fds, 0) ) {
should_continue = 0;
}
if ( should_continue && FD_ISSET( params->listen_fd, &fds ) ) {
client_fd = accept( params->listen_fd, &client_address.generic, &socklen );
if ( sock_set_tcp_nodelay(client_fd, 1) == -1 ) {
warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) );
}
info( "Accepted nbd client socket fd %d", client_fd );
params->downstream_fd = client_fd;
proxy_session( params );
if ( close( params->downstream_fd ) == -1 ) {
warn( SHOW_ERRNO( "FIXME: close returned" ) );
}
params->downstream_fd = -1;
}
return should_continue;
}
void proxy_accept_loop( struct proxier* params )
{
NULLCHECK( params );
while( proxy_accept( params ) );
}
/** Closes sockets, frees memory and waits for all requests to clear */
void proxy_cleanup( struct proxier* params )
{
NULLCHECK( params );
info( "cleaning up" );
if ( -1 != params->listen_fd ) {
close( params->listen_fd );
}
debug( "Cleanup done" );
}
/** Full lifecycle of the proxier */
int do_proxy( struct proxier* params )
{
NULLCHECK( params );
error_set_handler( (cleanup_handler*) proxy_cleanup, params );
debug( "Ensuring upstream server is open" );
if ( !proxy_connect_to_upstream( params ) ) {
info( "Couldn't connect to upstream server during initialization" );
proxy_cleanup( params );
return 1;
};
proxy_open_listen_socket( params );
proxy_accept_loop( params );
proxy_cleanup( params );
return 0;
}

71
src/proxy.h Normal file
View File

@@ -0,0 +1,71 @@
#ifndef PROXY_H
#define PROXY_H
#include <sys/types.h>
#include <unistd.h>
#include "flexnbd.h"
#include "parse.h"
#include "nbdtypes.h"
#include "self_pipe.h"
struct proxier {
/* The flexnbd wrapper this proxier is attached to */
struct flexnbd* flexnbd;
/** address/port to bind to */
union mysockaddr listen_on;
/** address/port to connect to */
union mysockaddr connect_to;
/** address to bind to when making outgoing connections */
union mysockaddr connect_from;
/* The socket we listen() on and accept() against */
int listen_fd;
/* The socket returned by accept() that we receive requests from and send
* responses to
*/
int downstream_fd;
/* The socket returned by connect() that we send requests to and receive
* responses from
*/
int upstream_fd;
/* This is the size we advertise to the downstream server */
off64_t upstream_size;
/* Scratch space for the current NBD request from downstream */
unsigned char* req_buf;
/* Number of bytes currently sat in req_buf */
size_t req_buf_size;
/* We transform the raw request header into here */
struct nbd_request req_hdr;
/* Scratch space for the current NBD reply from upstream */
unsigned char* rsp_buf;
/* Number of bytes currently sat in rsp_buf */
size_t rsp_buf_size;
/* We transform the raw reply header into here */
struct nbd_reply rsp_hdr;
};
struct proxier* proxy_create(
struct flexnbd * flexnbd,
char* s_downstream_address,
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind );
int do_proxy( struct proxier* proxy );
void proxy_destroy( struct proxier* proxy );
#endif

View File

@@ -1,7 +1,8 @@
#include "nbdtypes.h"
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include "serve.h"
#include "serve.h"
#include <stdlib.h>
#include <string.h>
@@ -18,14 +19,14 @@ int socket_connect(struct sockaddr* to, struct sockaddr* from)
if (NULL != from) {
if ( 0 > bind(fd, from, sizeof(struct sockaddr_in6)) ){
warn( "bind() failed");
close( fd );
FATAL_IF_NEGATIVE( close( fd ), SHOW_ERRNO( "FIXME" ) );
return -1;
}
}
if ( 0 > connect(fd, to, sizeof(struct sockaddr_in6)) ) {
warn( "connect failed" );
close( fd );
if ( 0 > sock_try_connect( fd, to, sizeof( struct sockaddr_in6 ), 15 ) ) {
warn( SHOW_ERRNO( "connect failed" ) );
FATAL_IF_NEGATIVE( close( fd ), SHOW_ERRNO( "FIXME" ) );
return -1;
}
@@ -88,7 +89,7 @@ void fill_request(struct nbd_request *request, int type, off64_t from, int len)
void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply)
{
struct nbd_reply_raw reply_raw;
ERROR_IF_NEGATIVE(readloop(fd, &reply_raw, sizeof(struct nbd_reply_raw)),
"Couldn't read reply");
@@ -108,14 +109,15 @@ void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply)
void wait_for_data( int fd, int timeout_secs )
{
fd_set fds;
struct timeval tv = {timeout_secs, 0};
struct timeval tv = { timeout_secs, 0 };
int selected;
FD_ZERO( &fds );
FD_SET( fd, &fds );
selected = select( FD_SETSIZE,
&fds, NULL, NULL,
timeout_secs >=0 ? &tv : NULL );
selected = sock_try_select(
FD_SETSIZE, &fds, NULL, NULL, timeout_secs >=0 ? &tv : NULL
);
FATAL_IF( -1 == selected, "Select failed" );
ERROR_IF( 0 == selected, "Timed out waiting for reply" );
@@ -126,16 +128,16 @@ void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, i
{
struct nbd_request request;
struct nbd_reply reply;
fill_request(&request, REQUEST_READ, from, len);
FATAL_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)),
"Couldn't write request");
wait_for_data( fd, timeout_secs );
read_reply(fd, &request, &reply);
if (out_buf) {
FATAL_IF_NEGATIVE(readloop(fd, out_buf, len),
FATAL_IF_NEGATIVE(readloop(fd, out_buf, len),
"Read failed");
}
else {
@@ -150,13 +152,13 @@ void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf, in
{
struct nbd_request request;
struct nbd_reply reply;
fill_request(&request, REQUEST_WRITE, from, len);
ERROR_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)),
"Couldn't write request");
if (in_buf) {
ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len),
ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len),
"Write failed");
}
else {
@@ -165,7 +167,7 @@ void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf, in
"Splice failed"
);
}
wait_for_data( fd, timeout_secs );
read_reply(fd, &request, &reply);
}
@@ -200,13 +202,13 @@ int socket_nbd_disconnect( int fd )
fatal( error_type " connection failed." );\
}\
}
void do_read(struct mode_readwrite_params* params)
{
params->client = socket_connect(&params->connect_to.generic, &params->connect_from.generic);
FATAL_IF_NEGATIVE( params->client, "Couldn't connect." );
CHECK_RANGE("read");
socket_nbd_read(params->client, params->from, params->len,
socket_nbd_read(params->client, params->from, params->len,
params->data_fd, NULL, 10);
close(params->client);
}
@@ -216,7 +218,7 @@ void do_write(struct mode_readwrite_params* params)
params->client = socket_connect(&params->connect_to.generic, &params->connect_from.generic);
FATAL_IF_NEGATIVE( params->client, "Couldn't connect." );
CHECK_RANGE("write");
socket_nbd_write(params->client, params->from, params->len,
socket_nbd_write(params->client, params->from, params->len,
params->data_fd, NULL, 10);
close(params->client);
}

View File

@@ -636,8 +636,10 @@ int server_accept( struct server * params )
self_pipe_fd_set( params->close_signal, &fds );
self_pipe_fd_set( params->acl_updated_signal, &fds );
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds,
NULL, NULL, NULL), "select() failed");
FATAL_IF_NEGATIVE(
sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL),
SHOW_ERRNO( "select() failed" )
);
if ( self_pipe_fd_isset( params->close_signal, &fds ) ){
server_close_clients( params );

View File

@@ -1,7 +1,14 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include "sockutil.h"
#include "util.h"
size_t sockaddr_size(const struct sockaddr* sa)
size_t sockaddr_size( const struct sockaddr* sa )
{
size_t ret = 0;
@@ -17,7 +24,7 @@ size_t sockaddr_size(const struct sockaddr* sa)
return ret;
}
const char* sockaddr_address_string(const struct sockaddr* sa, char* dest, size_t len)
const char* sockaddr_address_string( const struct sockaddr* sa, char* dest, size_t len )
{
NULLCHECK( sa );
NULLCHECK( dest );
@@ -47,21 +54,39 @@ const char* sockaddr_address_string(const struct sockaddr* sa, char* dest, size_
return ret;
}
int sock_set_reuseaddr(int fd, int optval)
int sock_set_reuseaddr( int fd, int optval )
{
return setsockopt( fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval) );
}
/* Set the tcp_nodelay option */
int sock_set_tcp_nodelay(int fd, int optval)
int sock_set_tcp_nodelay( int fd, int optval )
{
return setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval) );
}
int sock_try_bind(int fd, const struct sockaddr* sa)
int sock_set_nonblock( int fd, int optval )
{
int flags = fcntl( fd, F_GETFL );
if ( flags == -1 ) {
return -1;
}
if ( optval ) {
flags = flags | O_NONBLOCK;
} else {
flags = flags & (~O_NONBLOCK);
}
return fcntl( fd, F_SETFL, flags );
}
int sock_try_bind( int fd, const struct sockaddr* sa )
{
int bind_result;
char s_address[256];
int retry = 1;
sockaddr_address_string( sa, &s_address[0], 256 );
@@ -92,13 +117,99 @@ int sock_try_bind(int fd, const struct sockaddr* sa)
continue;
case EADDRINUSE:
warn( "%s in use, giving up.", s_address );
retry = 0;
break;
default:
warn( "giving up" );
retry = 0;
}
}
} while ( 1 );
} while ( retry );
return bind_result;
}
int sock_try_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
{
int result;
do {
result = select(nfds, readfds, writefds, exceptfds, timeout);
if ( errno != EINTR ) {
break;
}
} while ( result == -1 );
return result;
}
int sock_try_connect( int fd, struct sockaddr* to, socklen_t addrlen, int wait )
{
fd_set fds;
struct timeval tv = { wait, 0 };
int result = 0;
if ( sock_set_nonblock( fd, 1 ) == -1 ) {
warn( SHOW_ERRNO( "Failed to set socket non-blocking for connect()" ) );
return connect( fd, to, addrlen );
}
FD_ZERO( &fds );
FD_SET( fd, &fds );
do {
result = connect( fd, to, addrlen );
if ( result == -1 ) {
switch( errno ) {
case EINPROGRESS:
result = 0;
break; /* success */
case EAGAIN:
case EINTR:
break; /* Try again */
default:
warn( SHOW_ERRNO( "Failed to connect()") );
goto out;
}
}
} while ( result == -1 );
if ( -1 == sock_try_select( FD_SETSIZE, NULL, &fds, NULL, &tv) ) {
warn( SHOW_ERRNO( "failed to select() on non-blocking connect" ) );
result = -1;
goto out;
}
if ( !FD_ISSET( fd, &fds ) ) {
result = -1;
errno = ETIMEDOUT;
goto out;
}
int scratch;
socklen_t s_size = sizeof( scratch );
if ( getsockopt( fd, SOL_SOCKET, SO_ERROR, &scratch, &s_size ) == -1 ) {
result = -1;
warn( SHOW_ERRNO( "getsockopt() failed" ) );
goto out;
}
if ( scratch == EINPROGRESS ) {
scratch = ETIMEDOUT;
}
result = scratch ? -1 : 0;
errno = scratch;
out:
if ( sock_set_nonblock( fd, 0 ) == -1 ) {
warn( SHOW_ERRNO( "Failed to make socket blocking after connect()" ) );
return -1;
}
debug( "sock_try_connect: %i", result );
return result;
}

View File

@@ -2,11 +2,9 @@
#define SOCKUTIL_H
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/select.h>
/* Returns the size of the sockaddr, or 0 on error */
size_t sockaddr_size(const struct sockaddr* sa);
@@ -25,8 +23,16 @@ int sock_set_tcp_nodelay(int fd, int optval);
/* TODO: Set the tcp_cork option */
// int sock_set_cork(int fd, int optval);
int sock_set_nonblock(int fd, int optval);
/* Attempt to bind the fd to the sockaddr, retrying common transient failures */
int sock_try_bind(int fd, const struct sockaddr* sa);
/* Try to call select(), retrying EINTR */
int sock_try_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
/* Try to call connect(), timing out after wait seconds */
int sock_try_connect( int fd, struct sockaddr* to, socklen_t addrlen, int wait );
#endif

View File

@@ -21,6 +21,13 @@ class Environment
@fake_pid = nil
end
def proxy1(port=@port2)
@nbd1.proxy(@ip, port)
end
def proxy2(port=@port1)
@nbd2.proxy(@ip, port)
end
def serve1(*acl)
@nbd1.serve(@filename1, *acl)

View File

@@ -13,10 +13,8 @@ addr, port = *ARGV
client = FakeSource.new( addr, port, "Timed out connecting", "127.0.0.6" )
sleep( 0.25 )
client.ensure_disconnected
rsp = client.disconnected? ? 0 : 1
client.close
exit(0)
exit(rsp)

View File

@@ -8,6 +8,10 @@ class FileWriter
@pattern = ""
end
def size
@blocksize * @pattern.split("").size
end
# We write in fixed block sizes, given by "blocksize"
# _ means skip a block
# 0 means write a block full of zeroes

View File

@@ -241,6 +241,15 @@ module FlexNBD
"#{acl.join(' ')}"
end
def proxy_cmd( connect_ip, connect_port )
"#{bin} proxy "\
"--addr #{ip} "\
"--port #{port} "\
"--conn-addr #{connect_ip} "\
"--conn-port #{connect_port} "\
"#{@debug}"
end
def read_cmd( offset, length )
"#{bin} read "\
@@ -319,6 +328,7 @@ module FlexNBD
sleep 0.1
end
start_wait_thread( @pid )
at_exit { kill }
end
@@ -336,6 +346,31 @@ module FlexNBD
run_serve_cmd( listen_cmd( file, acl ) )
end
def tcp_server_open?
# raises if the other side doesn't accept()
sock = TCPSocket.new(ip, port) rescue nil
success = !!sock
( sock.close rescue nil) if sock
success
end
def proxy( connect_ip, connect_port )
cmd = proxy_cmd( connect_ip, connect_port )
debug( cmd )
@pid = @executor.run( cmd )
until tcp_server_open?
pid, status = Process.wait2(@pid, Process::WNOHANG)
raise "server did not start (#{cmd})" if pid
sleep 0.1
end
start_wait_thread( @pid )
at_exit { kill }
end
def start_wait_thread( pid )
@wait_thread = Thread.start do

View File

@@ -32,6 +32,9 @@ module FlexNBD
end
read_constants()
REQUEST_MAGIC = "\x25\x60\x95\x13" unless defined?(REQUEST_MAGIC)
REPLY_MAGIC = "\x67\x44\x66\x98" unless defined?(REPLY_MAGIC)
end # module FlexNBD

View File

@@ -56,8 +56,6 @@ module FlexNBD
}
end
REPLY_MAGIC="\x67\x44\x66\x98"
def write_error( handle )
write_reply( handle, 1 )
end
@@ -76,7 +74,7 @@ module FlexNBD
if opts[:magic] == :wrong
write_rand( @sock, 4 )
else
@sock.write( REPLY_MAGIC )
@sock.write( ::FlexNBD::REPLY_MAGIC )
end
@sock.write( [err].pack("N") )
@@ -93,6 +91,10 @@ module FlexNBD
@sock.read( len )
end
def write_data( len )
@sock.write( len )
end
def self.parse_be64(str)
raise "String is the wrong length: 8 bytes expected (#{str.length} received)" unless
@@ -161,3 +163,4 @@ module FlexNBD
end # module FakeDest
end # module FlexNBD

View File

@@ -30,7 +30,7 @@ module FlexNBD
def read_hello()
timing_out( FlexNBD::MS_HELLO_TIME_SECS,
timing_out( ::FlexNBD::MS_HELLO_TIME_SECS,
"Timed out waiting for hello." ) do
fail "No hello." unless (hello = @sock.read( 152 )) &&
hello.length==152
@@ -47,15 +47,14 @@ module FlexNBD
end
def send_request( type, handle="myhandle", from=0, len=0 )
def send_request( type, handle="myhandle", from=0, len=0, magic=REQUEST_MAGIC )
fail "Bad handle" unless handle.length == 8
@sock.write( "\x25\x60\x95\x13" )
@sock.write( magic )
@sock.write( [type].pack( 'N' ) )
@sock.write( handle )
@sock.write( "\x0"*4 )
@sock.write( [from].pack( 'N' ) )
@sock.write( [len ].pack( 'N' ) )
@sock.write( [n64( from )].pack( 'q' ) )
@sock.write( [len].pack( 'N' ) )
end
@@ -122,10 +121,10 @@ module FlexNBD
end
def ensure_disconnected
Timeout.timeout( 2 ) do
@sock.read(1)
end
def disconnected?
result = nil
Timeout.timeout( 2 ) { result = ( @sock.read(1) == nil ) }
result
end
@@ -140,6 +139,22 @@ module FlexNBD
end
end
private
# take a 64-bit number, turn it upside down (due to :
# http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-core/11920
# )
def n64(b)
((b & 0xff00000000000000) >> 56) |
((b & 0x00ff000000000000) >> 40) |
((b & 0x0000ff0000000000) >> 24) |
((b & 0x000000ff00000000) >> 8) |
((b & 0x00000000ff000000) << 8) |
((b & 0x0000000000ff0000) << 24) |
((b & 0x000000000000ff00) << 40) |
((b & 0x00000000000000ff) << 56)
end
end # class FakeSource
end # module FlexNBD

View File

@@ -0,0 +1,199 @@
require 'test/unit'
require 'environment'
require 'flexnbd/fake_source'
require 'flexnbd/fake_dest'
class TestProxyMode < Test::Unit::TestCase
def setup
super
@env = Environment.new
@env.writefile1( "0" * 16 )
end
def teardown
@env.cleanup
super
end
def with_proxied_client( override_size = nil )
@env.serve1 unless @server_up
@env.proxy2 unless @proxy_up
@env.nbd2.can_die(0)
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
begin
result = client.read_hello
assert_equal "NBDMAGIC", result[:magic]
assert_equal override_size || @env.file1.size, result[:size]
yield client
ensure
client.close rescue nil
end
end
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
assert_raises(RuntimeError) { @env.proxy1 }
end
def test_read_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write_read_request(offset, 4096, "myhandle")
rsp = client.read_response
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
orig_data = @env.file1.read(offset, 4096)
data = client.read_raw(4096)
assert_equal 4096, orig_data.size
assert_equal 4096, data.size
assert_equal( orig_data, data, "Returned data does not match" )
end
end
end
def test_write_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write(offset, "\xFF" * 4096)
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
data = @env.file1.read(offset, 4096)
assert_equal( ( "\xFF" * 4096 ), data, "Data not written" )
end
end
end
def make_fake_server
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
@server_up = true
# We return a thread here because accept() and connect() both block for us
Thread.new do
sc = server.accept # just tell the supervisor we're up
sc.write_hello
[ server, sc ]
end
end
def test_read_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write_read_request( 0, 4096 )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
sc2.write_data( "\xFF" * 4096 )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
data = client.read_raw( 4096 )
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
sc2.close
server.close
end
end
def test_write_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write( 0, ( "\xFF" * 4096 ) )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
data1 = sc1.read_data( 4096 )
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
data2 = sc2.read_data( 4096 )
assert_equal data1, data2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
sc2.close
server.close
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |client|
c2 = nil
assert_raises(Timeout::Error) do
timeout(1) do
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
c2.read_hello
end
end
c2.close rescue nil if c2
end
end
end

View File

@@ -0,0 +1,86 @@
require 'test/unit'
require 'environment'
require 'flexnbd/fake_source'
class TestServeMode < Test::Unit::TestCase
def setup
super
@env = Environment.new
@env.writefile1( "0" )
@env.serve1
end
def teardown
@env.cleanup
super
end
def connect_to_server
client = FlexNBD::FakeSource.new(@env.ip, @env.port1, "Connecting to server failed")
begin
result = client.read_hello
assert_equal "NBDMAGIC", result[:magic]
assert_equal @env.file1.size, result[:size]
yield client
ensure
client.close rescue nil
end
end
def test_bad_request_magic_receives_error_response
connect_to_server do |client|
# replace REQUEST_MAGIC with all 0s to make it look bad
client.send_request( 0, "myhandle", 0, 0, "\x00\x00\x00\x00" )
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}"
# The client should be disconnected now
assert client.disconnected?, "Server not disconnected"
end
end
def test_read_request_out_of_bounds_receives_error_response
connect_to_server do |client|
client.write_read_request( @env.file1.size, 4096 )
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}"
# Ensure we're not disconnected by sending a request. We don't care about
# whether the reply is good or not, here.
client.write_read_request( 0, 4096 )
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
end
end
def test_write_request_out_of_bounds_receives_error_response
connect_to_server do |client|
client.write( @env.file1.size, "\x00" * 4096 )
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert rsp[:error] != 0, "Server sent success reply back: #{rsp[:error]}"
# Ensure we're not disconnected by sending a request. We don't care about
# whether the reply is good or not, here.
client.write( 0, "\x00" * @env.file1.size )
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
end
end
end

View File

@@ -1,3 +1,7 @@
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include "sockutil.h"
#include <check.h>