From f37a217cb900150a356e587dd1f14baa3f44db9b Mon Sep 17 00:00:00 2001 From: Alex Young Date: Thu, 21 Jun 2012 18:01:50 +0100 Subject: [PATCH] Add listen mode --- Rakefile | 18 +++++++- src/client.c | 48 +++++++++++++++++--- src/client.h | 3 ++ src/flexnbd.c | 103 +++++++++++++++++++++++++++++++++++++++++-- src/listen.c | 87 ++++++++++++++++++++++++++++++++++++ src/listen.h | 26 +++++++++++ src/options.h | 35 +++++++++++++++ src/parse.c | 3 ++ src/serve.c | 35 ++++++++++----- src/serve.h | 12 ++++- tests/check_listen.c | 55 +++++++++++++++++++++++ tests/check_parse.c | 47 ++++++++++++++++++++ 12 files changed, 450 insertions(+), 22 deletions(-) create mode 100644 src/listen.c create mode 100644 src/listen.h create mode 100644 tests/check_listen.c create mode 100644 tests/check_parse.c diff --git a/Rakefile b/Rakefile index 40ca0ba..3680560 100644 --- a/Rakefile +++ b/Rakefile @@ -146,8 +146,24 @@ file check("readwrite") => gcc_link t.name, t.prerequisites + [LIBCHECK] end +file check("listen") => +%w{build/tests/check_listen.o + build/listen.o + build/self_pipe.o + build/nbdtypes.o + build/control.o + build/readwrite.o + build/parse.o + build/client.o + build/serve.o + build/acl.o + build/ioutil.o + build/util.o} do |t| + gcc_link t.name, t.prerequisites + [LIBCHECK] +end -(TEST_MODULES- %w{acl client serve readwrite}).each do |m| + +(TEST_MODULES- %w{acl client serve readwrite listen}).each do |m| tgt = "build/tests/check_#{m}.o" maybe_obj_name = "build/#{m}.o" deps = ["build/ioutil.o", "build/util.o"] - [maybe_obj_name] diff --git a/src/client.c b/src/client.c index b575b27..6a1c26d 100644 --- a/src/client.c +++ b/src/client.c @@ -25,6 +25,8 @@ struct client *client_create( struct server *serve, int socket ) c->stop_signal = self_pipe_create(); + c->entrusted = 0; + return c; } @@ -241,12 +243,18 @@ void client_write_init( struct client * client, uint64_t size ) ); } + + /* Check to see if the client's request needs a reply constructing. * Returns 1 if we do, 0 otherwise. * request_err is set to 0 if the client sent a bad request, in which - * case we send an error reply. + * case we drop the connection. + * FIXME: after an ENTRUST, there's no way to distinguish between a + * DISCONNECT and any bad request. */ -int client_request_needs_reply( struct client * client, struct nbd_request request, int *request_err ) +int client_request_needs_reply( struct client * client, + struct nbd_request request, + int *should_disconnect ) { debug("request type %d", request.type); @@ -257,8 +265,12 @@ int client_request_needs_reply( struct client * client, struct nbd_request reque switch (request.type) { case REQUEST_READ: + FATAL_IF( client->entrusted, + "Received a read request after an entrust message."); break; case REQUEST_WRITE: + FATAL_IF( client->entrusted, + "Received a write request after an entrust message."); /* check it's not out of range */ if ( request.from+request.len > client->serve->size) { debug("request read %ld+%d out of range", @@ -266,14 +278,19 @@ int client_request_needs_reply( struct client * client, struct nbd_request reque request.len ); client_write_reply( client, &request, 1 ); - *request_err = 0; + *should_disconnect = 0; return 0; } break; + case REQUEST_ENTRUST: + /* Yes, we need to reply to an entrust, but we take no + * further action */ + debug("request entrust"); + break; case REQUEST_DISCONNECT: debug("request disconnect"); - *request_err = 1; + *should_disconnect = 1; return 0; default: @@ -283,6 +300,19 @@ int client_request_needs_reply( struct client * client, struct nbd_request reque } +void client_reply_to_entrust( struct client * client, struct nbd_request request ) +{ + /* An entrust needs a response, but has no data. */ + debug( "request entrust" ); + + client_write_reply( client, &request, 0 ); + /* We set this after trying to send the reply, so we know the + * reply got away safely. + */ + client->entrusted = 1; +} + + void client_reply_to_read( struct client* client, struct nbd_request request ) { off64_t offset; @@ -345,15 +375,17 @@ void client_reply( struct client* client, struct nbd_request request ) case REQUEST_READ: client_reply_to_read( client, request ); break; - case REQUEST_WRITE: client_reply_to_write( client, request ); break; + case REQUEST_ENTRUST: + client_reply_to_entrust( client, request ); + break; } } -/* Returns 0 if a request was successfully served. */ +/* Returns 0 if we should continue trying to serve requests */ int client_serve_request(struct client* client) { struct nbd_request request; @@ -416,6 +448,10 @@ void* client_serve(void* client_uncast) ; client->stopped = 1; + if ( client->entrusted ){ + server_control_arrived( client->serve ); + } + FATAL_IF_NEGATIVE( close(client->socket), "Couldn't close socket %d", diff --git a/src/client.h b/src/client.h index 6af19f8..087f091 100644 --- a/src/client.h +++ b/src/client.h @@ -19,6 +19,9 @@ struct client { struct self_pipe * stop_signal; struct server* serve; /* FIXME: remove above duplication */ + + /* Have we seen a REQUEST_ENTRUST message? */ + int entrusted; }; diff --git a/src/flexnbd.c b/src/flexnbd.c index 30a2074..aeff257 100644 --- a/src/flexnbd.c +++ b/src/flexnbd.c @@ -21,6 +21,7 @@ #include "serve.h" +#include "listen.h" #include "util.h" #include @@ -115,7 +116,7 @@ void params_readwrite( } } -void do_serve(struct server* params); +int do_serve(struct server* params); void do_read(struct mode_readwrite_params* params); void do_write(struct mode_readwrite_params* params); void do_remote_command(char* command, char* mode, int argc, char** argv); @@ -153,6 +154,50 @@ void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char } +void read_listen_param( int c, + char **ip_addr, + char **rebind_ip_addr, + char **ip_port, + char **rebind_ip_port, + char **file, + char **sock, + int *default_deny ) +{ + switch(c){ + case 'h': + fprintf(stdout, listen_help_text ); + exit(0); + break; + case 'l': + *ip_addr = optarg; + break; + case 'L': + *rebind_ip_addr = optarg; + break; + case 'p': + *ip_port = optarg; + break; + case 'P': + *rebind_ip_port = optarg; + break; + case 'f': + *file = optarg; + break; + case 's': + *sock = optarg; + break; + case 'd': + *default_deny = 1; + break; + case 'v': + log_level = 0; + break; + default: + exit_err( listen_help_text ); + break; + } +} + void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **bind_addr, char **from, char **size) { switch(c){ @@ -269,13 +314,60 @@ int mode_serve( int argc, char *argv[] ) } if ( err ) { exit_err( serve_help_text ); } - serve = server_create( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS ); - do_serve( serve ); + serve = server_create( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS, 1 ); + if ( 0 == do_serve( serve ) ) { + info( "Control transfered."); + } server_destroy( serve ); return 0; } + +int mode_listen( int argc, char *argv[] ) +{ + int c; + char *ip_addr = NULL; + char *rebind_ip_addr = NULL; + char *ip_port = NULL; + char *rebind_ip_port = NULL; + char *file = NULL; + char *sock = NULL; + int default_deny = 0; // not on by default + int err = 0; + + struct listen * listen; + + while (1) { + c = getopt_long(argc, argv, listen_short_options, listen_options, NULL); + if ( c == -1 ) { break; } + + read_listen_param( c, &ip_addr, &rebind_ip_addr, &ip_port, &rebind_ip_port, + &file, &sock, &default_deny ); + } + + if ( NULL == ip_addr || NULL == ip_port ) { + err = 1; + fprintf( stderr, "both --addr and --port are required.\n" ); + } + if ( NULL == file ) { + err = 1; + fprintf( stderr, "--file is required\n" ); + } + if ( err ) { exit_err( listen_help_text ); } + + listen = listen_create( ip_addr, rebind_ip_addr, + ip_port, rebind_ip_port, + file, sock, default_deny, + argc - optind, argv + optind, MAX_NBD_CLIENTS ); + do_listen( listen ); + listen_destroy( listen ); + + return 0; + +} + + int mode_read( int argc, char *argv[] ) { int c; @@ -439,6 +531,8 @@ int mode_help( int argc, char *argv[] ) cmd = argv[0]; if (IS_CMD( CMD_SERVE, cmd ) ) { help_text = serve_help_text; + } else if ( IS_CMD( CMD_LISTEN, cmd ) ) { + help_text = listen_help_text; } else if ( IS_CMD( CMD_READ, cmd ) ) { help_text = read_help_text; } else if ( IS_CMD( CMD_WRITE, cmd ) ) { @@ -462,6 +556,9 @@ void mode(char* mode, int argc, char **argv) if ( IS_CMD( CMD_SERVE, mode ) ) { mode_serve( argc, argv ); } + else if ( IS_CMD( CMD_LISTEN, mode ) ) { + mode_listen( argc, argv ); + } else if ( IS_CMD( CMD_READ, mode ) ) { mode_read( argc, argv ); } diff --git a/src/listen.c b/src/listen.c new file mode 100644 index 0000000..4c6da57 --- /dev/null +++ b/src/listen.c @@ -0,0 +1,87 @@ +#include "listen.h" +#include "serve.h" +#include "util.h" + +#include + +struct listen * listen_create( + char* s_ip_address, + char* s_rebind_ip_address, + char* s_port, + char* s_rebind_port, + char* s_file, + char *s_ctrl_sock, + int default_deny, + int acl_entries, + char** s_acl_entries, + int max_nbd_clients ) +{ + struct listen * listen; + + listen = (struct listen *)xmalloc( sizeof( listen ) ); + listen->init_serve = server_create( s_ip_address, + s_port, + s_file, + s_ctrl_sock, + default_deny, + acl_entries, + s_acl_entries, + 1, 0); + listen->main_serve = server_create( + s_rebind_ip_address ? s_rebind_ip_address : s_ip_address, + s_rebind_port ? s_rebind_port : s_port, + s_file, + s_ctrl_sock, + default_deny, + acl_entries, + s_acl_entries, + max_nbd_clients, 1); + return listen; +} + + +void listen_destroy( struct listen * listen ) +{ + NULLCHECK( listen ); + free( listen ); +} + + +void listen_switch( struct listen * listen ) +{ + NULLCHECK( listen ); + + /* TODO: Copy acl from init_serve to main_serve */ + /* TODO: rename underlying file from foo.INCOMPLETE to foo */ +} + + +void listen_cleanup( void * unused __attribute__((unused)) ) +{ +} + +void do_listen( struct listen * listen ) +{ + NULLCHECK( listen ); + + int have_control = 0; + + have_control = do_serve( listen->init_serve ); + + if( have_control ) { + info( "Taking control."); + listen_switch( listen ); + server_destroy( listen->init_serve ); + info( "Switched to the main server, serving." ); + do_serve( listen->main_serve ); + } + else { + warn("Failed to take control, giving up."); + server_destroy( listen->init_serve ); + } + server_destroy( listen->main_serve ); + + info("Listen done."); + listen_cleanup( listen ); +} + diff --git a/src/listen.h b/src/listen.h new file mode 100644 index 0000000..b340e61 --- /dev/null +++ b/src/listen.h @@ -0,0 +1,26 @@ +#ifndef LISTEN_H +#define LISTEN_H + +#include "serve.h" + +struct listen { + struct server * init_serve; + struct server * main_serve; +}; + +struct listen * listen_create( + char* s_ip_address, + char* s_rebind_ip_address, + char* s_port, + char* s_rebind_port, + char* s_file, + char *s_ctrl_sock, + int default_deny, + int acl_entries, + char** s_acl_entries, + int max_nbd_clients ); +void listen_destroy( struct listen* ); + +void do_listen( struct listen * ); + +#endif diff --git a/src/options.h b/src/options.h index d1425a0..7c7b035 100644 --- a/src/options.h +++ b/src/options.h @@ -5,8 +5,10 @@ #define OPT_HELP "help" #define OPT_ADDR "addr" +#define OPT_REBIND_ADDR "rebind-addr" #define OPT_BIND "bind" #define OPT_PORT "port" +#define OPT_REBIND_PORT "rebind-port" #define OPT_FILE "file" #define OPT_SOCK "sock" #define OPT_FROM "from" @@ -14,6 +16,7 @@ #define OPT_DENY "default-deny" #define CMD_SERVE "serve" +#define CMD_LISTEN "listen" #define CMD_READ "read" #define CMD_WRITE "write" #define CMD_ACL "acl" @@ -32,7 +35,9 @@ #define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' ) #define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' ) +#define GETOPT_REBIND_ADDR GETOPT_ARG( OPT_REBIND_ADDR, 'L') #define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' ) +#define GETOPT_REBIND_PORT GETOPT_ARG( OPT_REBIND_PORT, 'P') #define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' ) #define GETOPT_SOCK GETOPT_ARG( OPT_SOCK, 's' ) #define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' ) @@ -81,6 +86,36 @@ static char serve_help_text[] = SOCK_LINE VERBOSE_LINE; + +static struct option listen_options[] = { + GETOPT_HELP, + GETOPT_ADDR, + GETOPT_REBIND_ADDR, + GETOPT_PORT, + GETOPT_REBIND_ADDR, + GETOPT_FILE, + GETOPT_SOCK, + GETOPT_DENY, + GETOPT_VERBOSE, + {0} +}; +static char listen_short_options[] = "hl:L:p:P:f:s:d" SOPT_VERBOSE; +static char listen_help_text[] = + "Usage: flexnbd " CMD_LISTEN " [*]\n\n" + "Listen for an incoming migration on ADDR:PORT, " + "then switch to REBIND_ADDR:REBIND_PORT on completion " + "to serve FILE.\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address to listen on.\n" + "\t--" OPT_REBIND_ADDR ",-L \tThe address to switch to, if given.\n" + "\t--" OPT_PORT ",-p \tThe port to listen on.\n" + "\t--" OPT_REBIND_PORT ",-P \tThe port to switch to, if given..\n" + "\t--" OPT_FILE ",-f \tThe file to serve.\n" + "\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n" + SOCK_LINE + VERBOSE_LINE; + + static struct option read_options[] = { GETOPT_HELP, GETOPT_ADDR, diff --git a/src/parse.c b/src/parse.c index 790b72f..dea65f6 100644 --- a/src/parse.c +++ b/src/parse.c @@ -11,6 +11,9 @@ int atoi(const char *nptr); /* FIXME: should change this to return negative on error like everything else */ int parse_ip_to_sockaddr(struct sockaddr* out, char* src) { + NULLCHECK( out ); + NULLCHECK( src ); + char temp[64]; struct sockaddr_in *v4 = (struct sockaddr_in *) out; struct sockaddr_in6 *v6 = (struct sockaddr_in6 *) out; diff --git a/src/serve.c b/src/serve.c index ef2a38e..522abd0 100644 --- a/src/serve.c +++ b/src/serve.c @@ -44,10 +44,12 @@ struct server * server_create ( int default_deny, int acl_entries, char** s_acl_entries, - int max_nbd_clients) + int max_nbd_clients, + int has_control) { struct server * out; out = xmalloc( sizeof( struct server ) ); + out->has_control = has_control; out->max_nbd_clients = max_nbd_clients; out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) ); @@ -56,6 +58,7 @@ struct server * server_create ( FATAL_IF_NULL(s_ip_address, "No IP address supplied"); FATAL_IF_NULL(s_port, "No port number supplied"); FATAL_IF_NULL(s_file, "No filename supplied"); + NULLCHECK( s_ip_address ); FATAL_IF_ZERO( parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address), "Couldn't parse server address '%s' (use 0 if " @@ -271,7 +274,7 @@ int cleanup_client_thread( struct client_tbl_entry * entry ) void cleanup_client_threads( struct client_tbl_entry * entries, size_t entries_len ) { - int i; + size_t i; for( i = 0; i < entries_len; i++ ) { cleanup_client_thread( &entries[i] ); } @@ -575,19 +578,13 @@ int server_accept( struct server * params ) } if ( FD_ISSET( params->server_fd, &fds ) ){ - client_fd = accept( params->server_fd, &client_address.generic, &socklen ); - debug("Accepted nbd client socket"); - accept_nbd_client(params, client_fd, &client_address); - - } else if( FD_ISSET( params->control_fd, &fds ) ) { - + } + else if( FD_ISSET( params->control_fd, &fds ) ) { client_fd = accept( params->control_fd, &client_address.generic, &socklen ); - debug("Accepted control client socket"); - accept_control_connection(params, client_fd, &client_address); } @@ -639,6 +636,17 @@ void serve_wait_for_close( struct server * serve ) } } +/* We've just had an ENTRUST/DISCONNECT pair, so we need to shut down + * and signal our listener that we can safely take over. + */ +void server_control_arrived( struct server *serve ) +{ + NULLCHECK( serve ); + + serve->has_control = 1; + serve_signal_close( serve ); +} + /** Closes sockets, frees memory and waits for all client threads to finish */ void serve_cleanup(struct server* params, @@ -676,15 +684,20 @@ void serve_cleanup(struct server* params, } /** Full lifecycle of the server */ -void do_serve(struct server* params) +int do_serve(struct server* params) { NULLCHECK( params ); + + int has_control; error_set_handler((cleanup_handler*) serve_cleanup, params); serve_open_server_socket(params); serve_open_control_socket(params); serve_init_allocation_map(params); serve_accept_loop(params); + has_control = params->has_control; serve_cleanup(params, 0); + + return has_control; } diff --git a/src/serve.h b/src/serve.h index 7382423..35227ed 100644 --- a/src/serve.h +++ b/src/serve.h @@ -82,6 +82,13 @@ struct server { int max_nbd_clients; struct client_tbl_entry *nbd_client; + + + /* Marker for whether or not this server has control over the + * data in the file, or if we're waiting to receive it from an + * inbound migration which hasn't yet finished. + */ + int has_control; }; struct server * server_create( @@ -92,7 +99,8 @@ struct server * server_create( int default_deny, int acl_entries, char** s_acl_entries, - int max_nbd_clients ); + int max_nbd_clients, + int has_control ); void server_destroy( struct server * ); int server_is_closed(struct server* serve); void server_dirty(struct server *serve, off64_t from, int len); @@ -101,7 +109,9 @@ void server_unlock_io( struct server* serve ); void serve_signal_close( struct server *serve ); void serve_wait_for_close( struct server * serve ); void server_replace_acl( struct server *serve, struct acl * acl); +void server_control_arrived( struct server *serve ); +int do_serve( struct server * ); struct mode_readwrite_params { union mysockaddr connect_to; diff --git a/tests/check_listen.c b/tests/check_listen.c new file mode 100644 index 0000000..1119152 --- /dev/null +++ b/tests/check_listen.c @@ -0,0 +1,55 @@ +#include "serve.h" +#include "listen.h" +#include "util.h" + +#include +#include + +START_TEST( test_defaults_main_serve_opts ) +{ + struct listen * listen = listen_create( "127.0.0.1", NULL, "4777", NULL, + "foo", "bar", 0, 0, NULL, 1 ); + NULLCHECK( listen ); + struct server *init_serve = listen->init_serve; + struct server *main_serve = listen->main_serve; + NULLCHECK( init_serve ); + NULLCHECK( main_serve ); + + fail_unless( 0 == memcmp(&init_serve->bind_to, + &main_serve->bind_to, + sizeof( union mysockaddr )), + "Main serve bind_to was not set" ); +} +END_TEST + + +Suite* listen_suite(void) +{ + Suite *s = suite_create("listen"); + TCase *tc_create = tcase_create("create"); + + tcase_add_test(tc_create, test_defaults_main_serve_opts); + + suite_add_tcase(s, tc_create); + + return s; +} + +#ifdef DEBUG +# define LOG_LEVEL 0 +#else +# define LOG_LEVEL 2 +#endif + +int main(void) +{ + log_level = LOG_LEVEL; + int number_failed; + Suite *s = listen_suite(); + SRunner *sr = srunner_create(s); + srunner_run_all(sr, CK_NORMAL); + number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? 0 : 1; +} + diff --git a/tests/check_parse.c b/tests/check_parse.c new file mode 100644 index 0000000..b30ee6d --- /dev/null +++ b/tests/check_parse.c @@ -0,0 +1,47 @@ +#include "parse.h" +#include "util.h" + +#include + +START_TEST( test_can_parse_ip_address_twice ) +{ + char ip_address[] = "127.0.0.1"; + struct sockaddr saddr; + + parse_ip_to_sockaddr( &saddr, ip_address ); + parse_ip_to_sockaddr( &saddr, ip_address ); +} +END_TEST + + + +Suite* parse_suite(void) +{ + Suite *s = suite_create("parse"); + TCase *tc_create = tcase_create("ip_to_sockaddr"); + + tcase_add_test(tc_create, test_can_parse_ip_address_twice); + + suite_add_tcase(s, tc_create); + + return s; +} + +#ifdef DEBUG +# define LOG_LEVEL 0 +#else +# define LOG_LEVEL 2 +#endif + +int main(void) +{ + log_level = LOG_LEVEL; + int number_failed; + Suite *s = parse_suite(); + SRunner *sr = srunner_create(s); + srunner_run_all(sr, CK_NORMAL); + number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? 0 : 1; +} +