From 94b4fa887c8752f49a811b0e352402c17b206364 Mon Sep 17 00:00:00 2001 From: Alex Young Date: Wed, 27 Jun 2012 15:45:33 +0100 Subject: [PATCH] Add mboxes --- Rakefile | 53 +- src/acl.c | 5 + src/acl.h | 3 + src/client.c | 21 +- src/control.c | 862 ++++++---------------- src/control.h | 44 +- src/flexnbd.c | 742 ++++++------------- src/flexnbd.h | 75 ++ src/ioutil.c | 5 +- src/ioutil.h | 2 +- src/listen.c | 45 +- src/listen.h | 6 +- src/main.c | 19 + src/mbox.c | 77 ++ src/mbox.h | 55 ++ src/mirror.c | 598 +++++++++++++++ src/mode.c | 711 ++++++++++++++++++ src/mode.h | 76 ++ src/options.h | 215 ------ src/remote.c | 1 + src/self_pipe.h | 3 + src/serve.c | 83 +-- src/serve.h | 72 +- src/util.c | 8 + src/util.h | 10 +- tests/check_client.c | 5 +- tests/check_control.c | 42 ++ tests/check_flexnbd.c | 47 ++ tests/check_listen.c | 8 +- tests/check_mbox.c | 104 +++ tests/check_serve.c | 20 +- tests/fakes/source/close_after_connect.rb | 6 +- tests/flexnbd.rb | 73 +- tests/nbd_scenarios | 37 +- 34 files changed, 2534 insertions(+), 1599 deletions(-) create mode 100644 src/flexnbd.h create mode 100644 src/main.c create mode 100644 src/mbox.c create mode 100644 src/mbox.h create mode 100644 src/mirror.c create mode 100644 src/mode.c create mode 100644 src/mode.h delete mode 100644 src/options.h create mode 100644 tests/check_control.c create mode 100644 tests/check_flexnbd.c create mode 100644 tests/check_mbox.c diff --git a/Rakefile b/Rakefile index 31cdd05..1d96e82 100644 --- a/Rakefile +++ b/Rakefile @@ -2,6 +2,8 @@ $: << '../rake_utils/lib' require 'rake_utils/debian' include RakeUtils::DSL +CC=ENV['CC'] || "gcc" + DEBUG = ENV.has_key?('DEBUG') && %w|yes y ok 1 true t|.include?(ENV['DEBUG']) @@ -71,13 +73,13 @@ end def gcc_compile( target, source ) FileUtils.mkdir_p File.dirname( target ) - sh "gcc -Isrc -c #{CCFLAGS.join(' ')} -o #{target} #{source} " + sh "#{CC} -Isrc -c #{CCFLAGS.join(' ')} -o #{target} #{source} " end def gcc_link(target, objects) FileUtils.mkdir_p File.dirname( target ) - sh "gcc #{LDFLAGS.join(' ')} "+ + sh "#{CC} #{LDFLAGS.join(' ')} "+ LIBS.map { |l| "-l#{l}" }.join(" ")+ " -Isrc " + " -o #{target} "+ @@ -85,7 +87,7 @@ def gcc_link(target, objects) end def headers(c) - `gcc -Isrc -MM #{c}`.gsub("\\\n", " ").split(" ")[2..-1] + `#{CC} -Isrc -MM #{c}`.gsub("\\\n", " ").split(" ")[2..-1] end rule 'build/flexnbd' => OBJECTS do |t| @@ -97,6 +99,8 @@ file check("client") => %w{build/tests/check_client.o build/self_pipe.o build/nbdtypes.o + build/listen.o + build/flexnbd.o build/control.o build/readwrite.o build/parse.o @@ -104,6 +108,9 @@ file check("client") => build/serve.o build/acl.o build/ioutil.o + build/mbox.o + build/mirror.o + build/status.o build/util.o} do |t| gcc_link t.name, t.prerequisites + [LIBCHECK] end @@ -132,7 +139,12 @@ file check("serve") => build/parse.o build/client.o build/serve.o + build/flexnbd.o + build/mirror.o + build/status.o + build/listen.o build/acl.o + build/mbox.o build/ioutil.o build/util.o} do |t| gcc_link t.name, t.prerequisites + [LIBCHECK] @@ -147,7 +159,12 @@ file check("readwrite") => build/parse.o build/acl.o build/control.o + build/flexnbd.o + build/mirror.o + build/status.o + build/listen.o build/nbdtypes.o + build/mbox.o build/ioutil.o build/util.o} do |t| gcc_link t.name, t.prerequisites + [LIBCHECK] @@ -156,6 +173,10 @@ end file check("listen") => %w{build/tests/check_listen.o build/listen.o + build/flexnbd.o + build/status.o + build/mbox.o + build/mirror.o build/self_pipe.o build/nbdtypes.o build/control.o @@ -169,8 +190,32 @@ file check("listen") => gcc_link t.name, t.prerequisites + [LIBCHECK] end +file check("flexnbd") => +%w{build/tests/check_flexnbd.o + build/flexnbd.o + build/ioutil.o + build/util.o + build/control.o + build/listen.o + build/mbox.o + build/status.o + build/self_pipe.o + build/client.o + build/acl.o + build/parse.o + build/nbdtypes.o + build/readwrite.o + build/mirror.o + build/serve.o} do |t| + gcc_link t.name, t.prerequisites + [LIBCHECK] +end -(TEST_MODULES- %w{acl client serve readwrite listen util}).each do |m| +file check("control") => + %w{build/tests/check_control.o} + OBJECTS - ["build/main.o"] do |t| + gcc_link t.name, t.prerequisites + [LIBCHECK] +end + +(TEST_MODULES- %w{control flexnbd acl client serve readwrite listen util}).each do |m| tgt = "build/tests/check_#{m}.o" maybe_obj_name = "build/#{m}.o" # Take it out in case we're testing util.o or ioutil.o diff --git a/src/acl.c b/src/acl.c index acd66c0..587263d 100644 --- a/src/acl.c +++ b/src/acl.c @@ -89,6 +89,11 @@ int acl_includes( struct acl * acl, union mysockaddr * addr ) } } +int acl_default_deny( struct acl * acl ) +{ + NULLCHECK( acl ); + return acl->default_deny; +} void acl_destroy( struct acl * acl ) { diff --git a/src/acl.h b/src/acl.h index 1fd2b77..8c4af3c 100644 --- a/src/acl.h +++ b/src/acl.h @@ -25,6 +25,9 @@ struct acl * acl_create( int len, char **lines, int default_deny ); */ int acl_includes( struct acl *, union mysockaddr *); +/** Get the default_deny status */ +int acl_default_deny( struct acl * ); + /** Free the acl structure and the internal acl entries table. */ diff --git a/src/client.c b/src/client.c index 205317b..a19b67f 100644 --- a/src/client.c +++ b/src/client.c @@ -27,21 +27,24 @@ struct client *client_create( struct server *serve, int socket ) c->entrusted = 0; + debug( "Alloced client %p (%d, %d)", c, c->stop_signal->read_fd, c->stop_signal->write_fd ); return c; } -void client_signal_stop( struct client *client ) +void client_signal_stop( struct client *c) { - NULLCHECK( client ); + NULLCHECK( c); - self_pipe_signal( client->stop_signal ); + debug("client %p: signal stop (%d, %d)", c,c->stop_signal->read_fd, c->stop_signal->write_fd ); + self_pipe_signal( c->stop_signal ); } void client_destroy( struct client *client ) { NULLCHECK( client ); + debug( "Destroying stop signal for client %p", client ); self_pipe_destroy( client->stop_signal ); free( client ); } @@ -256,7 +259,7 @@ void client_write_init( struct client * client, uint64_t size ) nbd_h2r_init( &init, &init_raw ); - FATAL_IF_NEGATIVE( + ERROR_IF_NEGATIVE( writeloop(client->socket, &init_raw, sizeof(init_raw)), "Couldn't send hello" ); @@ -437,7 +440,7 @@ void client_send_hello(struct client* client) void client_cleanup(struct client* client, int fatal __attribute__ ((unused)) ) { - info("client cleanup"); + info("client cleanup for client %p", client); if (client->socket) { close(client->socket); } if (client->mapped) { @@ -452,6 +455,7 @@ void* client_serve(void* client_uncast) error_set_handler((cleanup_handler*) client_cleanup, client); + debug("client: mmap"); FATAL_IF_NEGATIVE( open_and_mmap( client->serve->filename, @@ -461,13 +465,17 @@ void* client_serve(void* client_uncast) ), "Couldn't open/mmap file %s", client->serve->filename ); + debug("client: sending hello"); client_send_hello(client); + debug("client: serving requests"); while (client_serve_request(client) == 0) ; + debug("client: stopped serving requests"); client->stopped = 1; if ( client->entrusted ){ + debug("client: control arrived" ); server_control_arrived( client->serve ); } @@ -477,8 +485,9 @@ void* client_serve(void* client_uncast) client->socket ); - debug("Cleaning up normally in thread %p", pthread_self()); + debug("Cleaning client %p up normally in thread %p", client, pthread_self()); client_cleanup(client, 0); + debug("Client thread done" ); return NULL; } diff --git a/src/control.c b/src/control.c index b0ffae1..3a9802c 100644 --- a/src/control.c +++ b/src/control.c @@ -26,6 +26,7 @@ */ #include "control.h" +#include "mirror.h" #include "serve.h" #include "util.h" #include "ioutil.h" @@ -35,609 +36,222 @@ #include "self_pipe.h" #include "acl.h" #include "status.h" +#include "mbox.h" #include #include #include #include -struct mirror_status * mirror_status_alloc( - union mysockaddr * connect_to, - union mysockaddr * connect_from, - int max_Bps, - int action_at_finish, - struct self_pipe * commit_signal, - enum mirror_state * out_commit_state) + +struct control * control_create(struct flexnbd * flexnbd, const char * csn) { - struct mirror_status * mirror; + struct control * control = xmalloc( sizeof( struct control ) ); - mirror = xmalloc(sizeof(struct mirror_status)); - mirror->connect_to = connect_to; - mirror->connect_from = connect_from; - mirror->max_bytes_per_second = max_Bps; - mirror->action_at_finish = action_at_finish; - mirror->commit_signal = commit_signal; - mirror->commit_state = out_commit_state; + NULLCHECK( csn ); - return mirror; + control->flexnbd = flexnbd; + control->socket_name = csn; + control->close_signal = self_pipe_create(); + + return control; } -void mirror_set_state_f( struct mirror_status * mirror, enum mirror_state state ) + +void control_signal_close( struct control * control) { - NULLCHECK( mirror ); - if ( mirror->commit_state ){ - *mirror->commit_state = state; + + NULLCHECK( control ); + self_pipe_signal( control->close_signal ); +} + + +void control_destroy( struct control * control ) +{ + NULLCHECK( control ); + + self_pipe_destroy( control->close_signal ); + free( control ); +} + +struct control_client * control_client_create( struct flexnbd * flexnbd, int client_fd ) +{ + NULLCHECK( flexnbd ); + + struct control_client * control_client = + xmalloc( sizeof( struct control_client ) ); + + control_client->socket = client_fd; + control_client->flexnbd = flexnbd; + return control_client; +} + + + +void control_client_destroy( struct control_client * client ) +{ + NULLCHECK( client ); + free( client ); +} + + +void control_respond(struct control_client * client); + +void control_handle_client( struct control * control, int client_fd ) +{ + NULLCHECK( control ); + NULLCHECK( control->flexnbd ); + struct control_client * control_client = + control_client_create( control->flexnbd, client_fd ); + + /* We intentionally don't spawn a thread for the client here. + * This is to avoid having more than one thread potentially + * waiting on the migration commit status. + */ + control_respond( control_client ); +} + + +void control_accept_client( struct control * control ) +{ + + int client_fd; + union mysockaddr client_address; + socklen_t addrlen = sizeof( union mysockaddr ); + + client_fd = accept( control->control_fd, &client_address.generic, &addrlen ); + FATAL_IF( -1 == client_fd, "control accept failed" ); + + control_handle_client( control, client_fd ); +} + +int control_accept( struct control * control ) +{ + NULLCHECK( control ); + + fd_set fds; + + FD_ZERO( &fds ); + FD_SET( control->control_fd, &fds ); + self_pipe_fd_set( control->close_signal, &fds ); + FATAL_UNLESS( 0 < select( FD_SETSIZE, &fds, NULL, NULL, NULL ), + "Control select failed." ); + + if ( self_pipe_fd_isset( control->close_signal, &fds ) ){ + return 0; } -} - -#define mirror_set_state( mirror, state ) do{\ - debug( "Mirror state => " #state );\ - mirror_set_state_f( mirror, state );\ -} while(0) - -enum mirror_state mirror_get_state( struct mirror_status * mirror ) -{ - NULLCHECK( mirror ); - if ( mirror->commit_state ){ - return *mirror->commit_state; - } else { - return MS_UNKNOWN; + + if ( FD_ISSET( control->control_fd, &fds ) ) { + control_accept_client( control ); } + return 1; } -void mirror_status_init( struct mirror_status * mirror, char * filename ) +void control_accept_loop( struct control * control ) { - int map_fd; - off64_t size; + while( control_accept( control ) ); +} - NULLCHECK( mirror ); - NULLCHECK( filename ); +int open_control_socket( const char * socket_name ) +{ + struct sockaddr_un bind_address; + int control_fd; + + if (!socket_name) { + fatal( "Tried to open a control socket without a socket name" ); + } + + control_fd = socket(AF_UNIX, SOCK_STREAM, 0); + FATAL_IF_NEGATIVE(control_fd , + "Couldn't create control socket"); + + memset(&bind_address, 0, sizeof(struct sockaddr_un)); + bind_address.sun_family = AF_UNIX; + strncpy(bind_address.sun_path, socket_name, sizeof(bind_address.sun_path)-1); + + unlink(socket_name); /* ignore failure */ + FATAL_IF_NEGATIVE( - open_and_mmap( - filename, - &map_fd, - &size, - (void**) &mirror->mapped - ), - "Failed to open and mmap %s", - filename + bind(control_fd , &bind_address, sizeof(bind_address)), + "Couldn't bind control socket to %s", + socket_name ); - mirror->dirty_map = bitset_alloc(size, 4096); - + FATAL_IF_NEGATIVE( + listen(control_fd , 5), + "Couldn't listen on control socket" + ); + return control_fd; } -/* Call this before a mirror attempt. */ -void mirror_status_reset( struct mirror_status * mirror ) +void control_listen(struct control* control) { - NULLCHECK( mirror ); - NULLCHECK( mirror->dirty_map ); - mirror_set_state( mirror, MS_INIT ); - bitset_set(mirror->dirty_map); + NULLCHECK( control ); + control->control_fd = open_control_socket( control->socket_name ); } -struct mirror_status * mirror_status_create( - struct server * serve, - union mysockaddr * connect_to, - union mysockaddr * connect_from, - int max_Bps, - int action_at_finish, - struct self_pipe * commit_signal, - enum mirror_state * out_commit_state) +void control_serve( struct control * control ) { - /* FIXME: shouldn't map_fd get closed? */ - struct mirror_status * mirror; - - NULLCHECK( serve ); - - mirror = mirror_status_alloc( connect_to, - connect_from, - max_Bps, - action_at_finish, - commit_signal, - out_commit_state ); - - mirror_status_init( mirror, serve->filename ); - mirror_status_reset( mirror ); - - - return mirror; + NULLCHECK( control ); + control_listen( control ); + while( control_accept( control ) ); } -void mirror_status_destroy( struct mirror_status *mirror ) +void * control_runner( void * control_uncast ) { - NULLCHECK( mirror ); - free(mirror->connect_to); - free(mirror->connect_from); - free(mirror->dirty_map); - free(mirror); + NULLCHECK( control_uncast ); + struct control * control = (struct control *)control_uncast; + + control_serve( control ); + return NULL; } -/** The mirror code will split NBD writes, making them this long as a maximum */ -static const int mirror_longest_write = 8<<20; +#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1) -/** If, during a mirror pass, we have sent this number of bytes or fewer, we - * go to freeze the I/O and finish it off. This is just a guess. - */ -static const unsigned int mirror_last_pass_after_bytes_written = 100<<20; - -/** The largest number of full passes we'll do - the last one will always - * cause the I/O to freeze, however many bytes are left to copy. - */ -static const int mirror_maximum_passes = 7; - -/* A single mirror pass over the disc, optionally locking IO around the - * transfer. - */ -int mirror_pass(struct server * serve, int should_lock, uint64_t *written) +void control_write_mirror_response( enum mirror_state mirror_state, int client_fd ) { - uint64_t current = 0; - int success = 1; - struct bitset_mapping *map = serve->mirror->dirty_map; - *written = 0; - - while (current < serve->size) { - int run = bitset_run_count(map, current, mirror_longest_write); - - debug("mirror current=%ld, run=%d", current, run); - - /* FIXME: we could avoid sending sparse areas of the - * disc here, and probably save a lot of bandwidth and - * time (if we know the destination starts off zeroed). - */ - if (bitset_is_set_at(map, current)) { - /* We've found a dirty area, send it */ - debug("^^^ writing"); - - /* We need to stop the main thread from working - * because it might corrupt the dirty map. This - * is likely to slow things down but will be - * safe. - */ - if (should_lock) { server_lock_io( serve ); } - { - /** FIXME: do something useful with bytes/second */ - - /** FIXME: error handling code here won't unlock */ - socket_nbd_write( serve->mirror->client, - current, - run, - 0, - serve->mirror->mapped + current); - - /* now mark it clean */ - bitset_clear_range(map, current, run); - } - if (should_lock) { server_unlock_io( serve ); } - - *written += run; - } - current += run; - - if (serve->mirror->signal_abandon) { - success = 0; + switch (mirror_state) { + case MS_INIT: + case MS_UNKNOWN: + write_socket( "1: Mirror failed to initialise" ); + fatal( "Impossible mirror state: %d", mirror_state ); + case MS_FAIL_CONNECT: + write_socket( "1: Mirror failed to connect"); break; - } + case MS_FAIL_REJECTED: + write_socket( "1: Mirror was rejected" ); + break; + case MS_FAIL_NO_HELLO: + write_socket( "1: Remote server failed to respond"); + break; + case MS_FAIL_SIZE_MISMATCH: + write_socket( "1: Remote size does not match local size" ); + break; + case MS_GO: + case MS_FINALISE: + case MS_DONE: /* Yes, I know we know better, but it's simpler this way */ + write_socket( "0: Mirror started" ); + break; + default: + fatal( "Unhandled mirror state: %d", mirror_state ); } - - return success; -} - - -void mirror_give_control( struct mirror_status * mirror ) -{ - /* TODO: set up an error handler to clean up properly on ERROR. - */ - - /* A transfer of control is expressed as a 3-way handshake. - * First, We send a REQUEST_ENTRUST. If this fails to be - * received, this thread will simply block until the server is - * restarted. If the remote end doesn't understand it, it'll - * disconnect us, and an ERROR *should* bomb this thread. - * FIXME: make the ERROR work. - * If we get an explicit error back from the remote end, then - * again, this thread will bomb out. - * On receiving a valid response, we send a REQUEST_DISCONNECT, - * and we quit without checking for a response. This is the - * remote server's signal to assume control of the file. The - * reason we don't check for a response is the state we end up - * in if the final message goes astray: if we lose the - * REQUEST_DISCONNECT, the sender has quit and the receiver - * hasn't had a signal to take over yet, so the data is safe. - * If we were to wait for a response to the REQUEST_DISCONNECT, - * the sender and receiver would *both* be servicing write - * requests while the response was in flight, and if the - * response went astray we'd have two servers claiming - * responsibility for the same data. - */ - socket_nbd_entrust( mirror->client ); - socket_nbd_disconnect( mirror->client ); -} - - -/* THIS FUNCTION MUST ONLY BE CALLED WITH THE SERVER'S IO LOCKED. */ -void mirror_on_exit( struct server * serve ) -{ - /* Send an explicit entrust and disconnect. After this - * point we cannot allow any reads or writes to the local file. - * We do this *before* trying to shut down the server so that if - * the transfer of control fails, we haven't stopped the server - * and already-connected clients don't get needlessly - * disconnected. - */ - debug( "mirror_give_control"); - mirror_give_control( serve->mirror ); - - /* If we're still here, the transfer of control went ok, and the - * remote is listening (or will be shortly). We can shut the - * server down. - * - * It doesn't matter if we get new client connections before - * now, the IO lock will stop them from doing anything. - */ - debug("serve_signal_close"); - serve_signal_close( serve ); - - /* We have to wait until the server is closed before unlocking - * IO. This is because the client threads check to see if the - * server is still open before reading or writing inside their - * own locks. If we don't wait for the close, there's no way to - * guarantee the server thread will win the race and we risk the - * clients seeing a "successful" write to a dead disc image. - */ - debug("serve_wait_for_close"); - serve_wait_for_close( serve ); - info("Mirror sent."); -} - - -void mirror_cleanup( struct mirror_status * mirror, - int fatal __attribute__((unused))) -{ - NULLCHECK( mirror ); - info( "Cleaning up mirror thread"); - - if( mirror->client && mirror->client > 0 ){ - close( mirror->client ); - } - mirror->client = -1; -} - - - -int mirror_status_connect( struct mirror_status * mirror, off64_t local_size ) -{ - struct sockaddr * connect_from = NULL; - if ( mirror->connect_from ) { - connect_from = &mirror->connect_from->generic; - } - - NULLCHECK( mirror->connect_to ); - - mirror->client = socket_connect(&mirror->connect_to->generic, connect_from); - if ( 0 < mirror->client ) { - fd_set fds; - struct timeval tv = { MS_HELLO_TIME_SECS, 0}; - FD_ZERO( &fds ); - FD_SET( mirror->client, &fds ); - - FATAL_UNLESS( 0 <= select( FD_SETSIZE, &fds, NULL, NULL, &tv ), - "Select failed." ); - - if( FD_ISSET( mirror->client, &fds ) ){ - off64_t remote_size; - if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) { - if( remote_size == local_size ){ - mirror_set_state( mirror, MS_GO ); - } - else { - warn("Remote size (%d) doesn't match local (%d)", - remote_size, local_size ); - mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH ); - } - } - else { - warn( "Mirror attempt rejected." ); - mirror_set_state( mirror, MS_FAIL_REJECTED ); - } - } - else { - warn( "No NBD Hello received." ); - mirror_set_state( mirror, MS_FAIL_NO_HELLO ); - } - } - else { - warn( "Mirror failed to connect."); - mirror_set_state( mirror, MS_FAIL_CONNECT ); - } - - return mirror_get_state(mirror) == MS_GO; -} - - - -void server_run_mirror( struct server *serve ) -{ - NULLCHECK( serve ); - NULLCHECK( serve->mirror ); - - int pass; - uint64_t written; - - info("Starting mirror" ); - for (pass=0; pass < mirror_maximum_passes-1; pass++) { - - debug("mirror start pass=%d", pass); - if ( !mirror_pass( serve, 1, &written ) ){ return; } - - /* if we've not written anything */ - if (written < mirror_last_pass_after_bytes_written) { break; } - } - - mirror_set_state( serve->mirror, MS_FINALISE ); - server_lock_io( serve ); - { - if ( mirror_pass( serve, 0, &written ) && - ACTION_EXIT == serve->mirror->action_at_finish) { - debug("exit!"); - mirror_on_exit( serve ); - info("Server closed, quitting " - "after successful migration"); - } - } - server_unlock_io( serve ); -} - - -void mirror_signal_commit( struct mirror_status * mirror ) -{ - NULLCHECK( mirror ); - - self_pipe_signal( mirror->commit_signal ); -} - -/** Thread launched to drive mirror process */ -void* mirror_runner(void* serve_params_uncast) -{ - /* The supervisor thread relies on there not being any ERROR - * calls until after the mirror_signal_commit() call in this - * function. - * However, *after* that, we should call ERROR_* instead of - * FATAL_* wherever possible. - */ - struct server *serve = (struct server*) serve_params_uncast; - - NULLCHECK( serve ); - NULLCHECK( serve->mirror ); - struct mirror_status * mirror = serve->mirror; - NULLCHECK( mirror->dirty_map ); - - error_set_handler( (cleanup_handler *) mirror_cleanup, mirror ); - - info( "Connecting to mirror" ); - - time_t start_time = time(NULL); - int connected = mirror_status_connect( mirror, serve->size ); - mirror_signal_commit( mirror ); - if ( !connected ) { goto abandon_mirror; } - - /* After this point, if we see a failure we need to disconnect - * and retry everything from mirror_set_state(_, MS_INIT), but - * *without* signaling the commit or abandoning the mirror. - * */ - - if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){ - /* If we get here, then we managed to connect but the - * control thread feeding status back to the user will - * have gone away, leaving the user without meaningful - * feedback. In this instance, they have to assume a - * failure, so we can't afford to let the mirror happen. - * We have to set the state to avoid a race. - */ - mirror_set_state( mirror, MS_FAIL_CONNECT ); - warn( "Mirror connected, but too slowly" ); - goto abandon_mirror; - } - - server_run_mirror( serve ); - - mirror_set_state( mirror, MS_DONE ); -abandon_mirror: - return NULL; -} - - -struct mirror_super * mirror_super_create( - struct server * serve, - union mysockaddr * connect_to, - union mysockaddr * connect_from, - int max_Bps, - int action_at_finish, - enum mirror_state * out_commit_state) -{ - struct mirror_super * super = xmalloc( sizeof( struct mirror_super) ); - super->mirror = mirror_status_create( serve, - connect_to, - connect_from, - max_Bps, - action_at_finish, - self_pipe_create(), - out_commit_state ); - super->commit_signal = self_pipe_create(); - - return super; -} - -void mirror_super_signal_committed( struct mirror_super * super ) -{ - NULLCHECK( super ); - self_pipe_signal( super->commit_signal ); -} - - -void mirror_super_destroy( struct mirror_super * super ) -{ - NULLCHECK( super ); - - mirror_status_destroy( super->mirror ); - self_pipe_destroy( super->commit_signal ); -} - - -/* The mirror supervisor thread. Responsible for kicking off retries if - * the mirror thread fails. - * The mirror_status and mirror_super objects are never freed, and the - * mirror_super_runner thread is never joined. - */ -void * mirror_super_runner( void * serve_uncast ) -{ - struct server * serve = (struct server *) serve_uncast; - NULLCHECK( serve ); - NULLCHECK( serve->mirror ); - NULLCHECK( serve->mirror_super ); - - int should_retry = 0; - int success = 0; - fd_set fds; - int fd_count; - - struct mirror_status * mirror = serve->mirror; - struct mirror_super * super = serve->mirror_super; - - do { - if ( should_retry ) { - /* We don't want to hammer the destination too - * hard, so if this is a retry, insert a delay. */ - sleep( MS_RETRY_DELAY_SECS ); - - /* We also have to reset the bitmap to be sure - * we transfer everything */ - mirror_status_reset( mirror ); - } - - FATAL_IF( 0 != pthread_create( - &mirror->thread, - NULL, - mirror_runner, - serve), - "Failed to create mirror thread"); - - debug("Supervisor waiting for commit signal"); - FD_ZERO( &fds ); - self_pipe_fd_set( mirror->commit_signal, &fds ); - /* There's no timeout on this select. This means that - * the mirror thread *must* signal then abort itself if - * it passes the timeout, and it *must* always signal, - * no matter what. - */ - fd_count = select( FD_SETSIZE, &fds, NULL, NULL, NULL ); - if ( 1 == fd_count ) { - debug( "Supervisor got commit signal" ); - if ( 0 == should_retry ) { - should_retry = 1; - /* Only send this signal the first time */ - mirror_super_signal_committed(super); - debug("Mirror supervisor committed"); - } - } - else { fatal( "Select failed." ); } - - debug("Supervisor waiting for mirror thread" ); - pthread_join( mirror->thread, NULL ); - debug( "Clearing the commit signal. If this blocks," - " it's fatal but we can't check in advance." ); - self_pipe_signal_clear( mirror->commit_signal ); - debug( "Commit signal cleared." ); - - success = MS_DONE == mirror_get_state( mirror ); - - if( success ){ info( "Mirror supervisor success, exiting" ); } - else if (should_retry){ - warn( "Mirror failed, retrying" ); - } - else { warn( "Mirror failed before commit, giving up" ); } - } - while ( should_retry && !success ); - - serve->mirror = NULL; - serve->mirror_super = NULL; - - mirror_super_destroy( super ); - debug( "Mirror supervisor done." ); - - return NULL; } +#undef write_socket #define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1) - -/* We have to pass the mirror_state pointer and the commit_signal - * separately from the mirror itself because the mirror might have been - * freed by the time we get to check it */ -void mirror_watch_startup( struct control_params * client, - struct self_pipe * commit_signal, - enum mirror_state *mirror_state ) -{ - NULLCHECK( client ); - struct server * serve = client->serve; - NULLCHECK( serve ); - struct mirror_status * mirror = serve->mirror; - NULLCHECK( mirror ); - - fd_set fds; - /* This gives a 61 second timeout for the mirror thread to - * either fail or succeed to connect. - */ - struct timeval tv = {MS_CONNECT_TIME_SECS+1,0}; - FD_ZERO( &fds ); - self_pipe_fd_set( commit_signal, &fds ); - ERROR_IF_NEGATIVE( select( FD_SETSIZE, &fds, NULL, NULL, &tv ), "Select failed."); - - if ( self_pipe_fd_isset( commit_signal, &fds ) ){ - switch (*mirror_state) { - case MS_INIT: - case MS_UNKNOWN: - write_socket( "1: Mirror failed to initialise" ); - fatal( "Impossible mirror state: %d", *mirror_state ); - case MS_FAIL_CONNECT: - write_socket( "1: Mirror failed to connect"); - break; - case MS_FAIL_REJECTED: - write_socket( "1: Mirror was rejected" ); - break; - case MS_FAIL_NO_HELLO: - write_socket( "1: Remote server failed to respond"); - break; - case MS_FAIL_SIZE_MISMATCH: - write_socket( "1: Remote size does not match local size" ); - break; - case MS_GO: - case MS_FINALISE: - case MS_DONE: /* Yes, I know we know better, but it's simpler this way */ - write_socket( "0: Mirror started" ); - break; - default: - fatal( "Unhandled mirror state: %d", *mirror_state ); - } - } - else { - /* Timeout. Badness. This "should never happen". */ - write_socket( "1: Mirror timed out connecting to remote host" ); - } -} - /** Command parser to start mirror process from socket input */ -int control_mirror(struct control_params* client, int linesc, char** lines) +int control_mirror(struct control_client* client, int linesc, char** lines) { NULLCHECK( client ); - struct server * serve = client->serve; + struct flexnbd * flexnbd = client->flexnbd; union mysockaddr *connect_to = xmalloc( sizeof( union mysockaddr ) ); union mysockaddr *connect_from = NULL; int use_connect_from = 0; @@ -693,65 +307,79 @@ int control_mirror(struct control_params* client, int linesc, char** lines) return -1; } - enum mirror_state mirror_state; - serve->mirror_super = mirror_super_create( serve, - connect_to, - connect_from, - max_Bps , - action_at_finish, - &mirror_state ); - serve->mirror = serve->mirror_super->mirror; - - FATAL_IF( /* FIXME should free mirror on error */ - 0 != pthread_create( - &serve->mirror_super->thread, - NULL, - mirror_super_runner, - serve - ), - "Failed to create mirror thread" - ); - - mirror_watch_startup( client, serve->mirror_super->commit_signal, &mirror_state ); + /* In theory, we should never have to worry about the switch + * lock here, since we should never be able to start more than + * one mirror at a time. This is enforced by only accepting a + * single client at a time on the control socket. + */ + flexnbd_switch_lock( flexnbd ); + { + struct server * serve = flexnbd_server(flexnbd); + serve->mirror_super = mirror_super_create( + serve->filename, + connect_to, + connect_from, + max_Bps , + action_at_finish); + serve->mirror = serve->mirror_super->mirror; + + FATAL_IF( 0 != pthread_create( + &serve->mirror_super->thread, + NULL, + mirror_super_runner, + serve + ), + "Failed to create mirror thread" + ); + + enum mirror_state state = mirror_super_wait( serve->mirror_super ); + control_write_mirror_response( state, client->socket ); + } + flexnbd_switch_unlock( flexnbd ); debug( "Control thread going away." ); return 0; } +#undef write_socket + + /** Command parser to alter access control list from socket input */ -int control_acl(struct control_params* client, int linesc, char** lines) +int control_acl(struct control_client* client, int linesc, char** lines) { NULLCHECK( client ); + NULLCHECK( client->flexnbd ); + struct flexnbd * flexnbd = client->flexnbd; + + int default_deny = flexnbd_default_deny( flexnbd ); + struct acl * new_acl = acl_create( linesc, lines, default_deny ); - struct acl * old_acl = client->serve->acl; - struct acl * new_acl = acl_create( linesc, lines, old_acl ? old_acl->default_deny : 0 ); - if (new_acl->len != linesc) { write(client->socket, "1: bad spec: ", 13); write(client->socket, lines[new_acl->len], - strlen(lines[new_acl->len])); + strlen(lines[new_acl->len])); write(client->socket, "\n", 1); acl_destroy( new_acl ); } else { - server_replace_acl( client->serve, new_acl ); - write_socket("0: updated"); + flexnbd_replace_acl( flexnbd, new_acl ); + write( client->socket, "0: updated", 10); } - + return 0; } /** FIXME: add some useful statistics */ int control_status( - struct control_params* client, + struct control_client* client, int linesc __attribute__ ((unused)), char** lines __attribute__((unused)) ) { NULLCHECK( client ); - NULLCHECK( client->serve ); - struct status * status = status_create( client->serve ); - + NULLCHECK( client->flexnbd ); + struct status * status = flexnbd_status_create( client->flexnbd ); + write( client->socket, "0: ", 3 ); status_write( status, client->socket ); status_destroy( status ); @@ -759,17 +387,16 @@ int control_status( return 0; } -void control_cleanup(struct control_params* client, +void control_cleanup(struct control_client* client, int fatal __attribute__ ((unused)) ) { if (client->socket) { close(client->socket); } - free(client); + control_client_destroy( client ); } /** Master command parser for control socket connections, delegates quickly */ -void* control_serve(void* client_uncast) +void control_respond(struct control_client * client) { - struct control_params* client = (struct control_params*) client_uncast; char **lines = NULL; int finished=0; @@ -816,62 +443,5 @@ void* control_serve(void* client_uncast) control_cleanup(client, 0); debug("control command handled" ); - - return NULL; -} - -void accept_control_connection(struct server* params, int client_fd, - union mysockaddr* client_address __attribute__ ((unused)) ) -{ - pthread_t control_thread; - struct control_params* control_params; - - control_params = xmalloc(sizeof(struct control_params)); - control_params->socket = client_fd; - control_params->serve = params; - - FATAL_IF( - 0 != pthread_create( - &control_thread, - NULL, - control_serve, - control_params - ), - "Failed to create client thread" - ); - - /* FIXME: This thread *really* shouldn't detach - * Since it can see the server object, if listen switches mode - * while this is live, Bad Things Could Happen. - */ - pthread_detach( control_thread ); -} - -void serve_open_control_socket(struct server* params) -{ - struct sockaddr_un bind_address; - - if (!params->control_socket_name) { return; } - - params->control_fd = socket(AF_UNIX, SOCK_STREAM, 0); - FATAL_IF_NEGATIVE(params->control_fd , - "Couldn't create control socket"); - - memset(&bind_address, 0, sizeof(bind_address)); - bind_address.sun_family = AF_UNIX; - strncpy(bind_address.sun_path, params->control_socket_name, sizeof(bind_address.sun_path)-1); - - unlink(params->control_socket_name); /* ignore failure */ - - FATAL_IF_NEGATIVE( - bind(params->control_fd , &bind_address, sizeof(bind_address)), - "Couldn't bind control socket to %s", - params->control_socket_name - ); - - FATAL_IF_NEGATIVE( - listen(params->control_fd , 5), - "Couldn't listen on control socket" - ); } diff --git a/src/control.h b/src/control.h index b5c2491..8a8821c 100644 --- a/src/control.h +++ b/src/control.h @@ -1,28 +1,32 @@ #ifndef CONTROL_H #define CONTROL_H -/* MS_CONNECT_TIME_SECS - * The length of time after which the sender will assume a connect() to - * the destination has failed. - */ -#define MS_CONNECT_TIME_SECS 60 - -/* MS_HELLO_TIME_SECS - * The length of time the sender will wait for the NBD hello message - * after connect() before aborting the connection attempt. - */ -#define MS_HELLO_TIME_SECS 5 - - -/* MS_RETRY_DELAY_SECS - * The delay after a failed migration attempt before launching another - * thread to try again. - */ -#define MS_RETRY_DELAY_SECS 1 - #include "parse.h" -#include "serve.h" +#include "mirror.h" +#include "control.h" +#include "flexnbd.h" + +struct control { + struct flexnbd * flexnbd; + int control_fd; + const char * socket_name; + + pthread_t thread; + + struct self_pipe * close_signal; +}; + +struct control_client{ + int socket; + struct flexnbd * flexnbd; +}; + +struct control * control_create(struct flexnbd *, const char * control_socket_name); +void control_signal_close( struct control * ); +void control_destroy( struct control * ); + +void * control_runner( void * ); void accept_control_connection(struct server* params, int client_fd, union mysockaddr* client_address); void serve_open_control_socket(struct server* params); diff --git a/src/flexnbd.c b/src/flexnbd.c index aeff257..8165189 100644 --- a/src/flexnbd.c +++ b/src/flexnbd.c @@ -19,10 +19,12 @@ * elsewhere in the program. */ - +#include "flexnbd.h" #include "serve.h" #include "listen.h" #include "util.h" +#include "control.h" +#include "status.h" #include #include @@ -30,571 +32,275 @@ #include #include +#include #include #include #include #include -#include "options.h" #include "acl.h" -void exit_err( char *msg ) +int flexnbd_build_signal_fd(void) { - fprintf( stderr, msg ); - exit( 1 ); + sigset_t mask; + int sfd; + + sigemptyset( &mask ); + sigaddset( &mask, SIGTERM ); + sigaddset( &mask, SIGQUIT ); + sigaddset( &mask, SIGINT ); + + FATAL_UNLESS( 0 == pthread_sigmask( SIG_BLOCK, &mask, NULL ), + "Signal blocking failed" ); + + sfd = signalfd( -1, &mask, 0 ); + FATAL_IF( -1 == sfd, "Failed to get a signal fd" ); + + return sfd; } -/* TODO: Separate this function. - * It should be: - * params_read( struct mode_readwrite_params* out, - * char *s_ip_address, - * char *s_port, - * char *s_from, - * char *s_length ) - * params_write( struct mode_readwrite_params* out, - * char *s_ip_address, - * char *s_port, - * char *s_from, - * char *s_length, - * char *s_filename ) - */ -void params_readwrite( - int write_not_read, - struct mode_readwrite_params* out, +void flexnbd_create_shared( + struct flexnbd * flexnbd, + const char * s_ctrl_sock) +{ + NULLCHECK( flexnbd ); + if ( s_ctrl_sock ){ + flexnbd->control = + control_create( flexnbd, s_ctrl_sock ); + } + else { + flexnbd->control = NULL; + } + + flexnbd->signal_fd = flexnbd_build_signal_fd(); + + pthread_mutex_init( &flexnbd->switch_mutex, NULL ); +} + + +struct flexnbd * flexnbd_create_serving( char* s_ip_address, char* s_port, - char* s_bind_address, - char* s_from, - char* s_length_or_filename -) + char* s_file, + char *s_ctrl_sock, + int default_deny, + int acl_entries, + char** s_acl_entries, + int max_nbd_clients) { - FATAL_IF_NULL(s_ip_address, "No IP address supplied"); - FATAL_IF_NULL(s_port, "No port number supplied"); - FATAL_IF_NULL(s_from, "No from supplied"); - FATAL_IF_NULL(s_length_or_filename, "No length supplied"); - - FATAL_IF_ZERO( - parse_ip_to_sockaddr(&out->connect_to.generic, s_ip_address), - "Couldn't parse connection address '%s'", - s_ip_address - ); - - if (s_bind_address != NULL && - parse_ip_to_sockaddr(&out->connect_from.generic, s_bind_address) == 0) { - fatal("Couldn't parse bind address '%s'", s_bind_address); - } - - parse_port( s_port, &out->connect_to.v4 ); - - out->from = atol(s_from); - - if (write_not_read) { - if (s_length_or_filename[0]-48 < 10) { - out->len = atol(s_length_or_filename); - out->data_fd = 0; - } - else { - out->data_fd = open( - s_length_or_filename, O_RDONLY); - FATAL_IF_NEGATIVE(out->data_fd, - "Couldn't open %s", s_length_or_filename); - out->len = lseek64(out->data_fd, 0, SEEK_END); - FATAL_IF_NEGATIVE(out->len, - "Couldn't find length of %s", s_length_or_filename); - FATAL_IF_NEGATIVE( - lseek64(out->data_fd, 0, SEEK_SET), - "Couldn't rewind %s", s_length_or_filename - ); - } - } - else { - out->len = atol(s_length_or_filename); - out->data_fd = 1; - } -} - -int do_serve(struct server* params); -void do_read(struct mode_readwrite_params* params); -void do_write(struct mode_readwrite_params* params); -void do_remote_command(char* command, char* mode, int argc, char** argv); - -void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char **sock, int *default_deny ) - -{ - switch(c){ - case 'h': - fprintf(stdout, serve_help_text ); - exit( 0 ); - break; - case 'l': - *ip_addr = optarg; - break; - case 'p': - *ip_port = optarg; - break; - case 'f': - *file = optarg; - break; - case 's': - *sock = optarg; - break; - case 'd': - *default_deny = 1; - break; - case 'v': - log_level = 0; - break; - default: - exit_err( serve_help_text ); - break; - } + struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) ); + flexnbd->serve = server_create( + flexnbd, + s_ip_address, + s_port, + s_file, + default_deny, + acl_entries, + s_acl_entries, + max_nbd_clients, + 1); + flexnbd_create_shared( flexnbd, s_ctrl_sock ); + return flexnbd; } -void read_listen_param( int c, - char **ip_addr, - char **rebind_ip_addr, - char **ip_port, - char **rebind_ip_port, - char **file, - char **sock, - int *default_deny ) +struct flexnbd * flexnbd_create_listening( + char* s_ip_address, + char* s_rebind_ip_address, + char* s_port, + char* s_rebind_port, + char* s_file, + char *s_ctrl_sock, + int default_deny, + int acl_entries, + char** s_acl_entries, + int max_nbd_clients ) { - switch(c){ - case 'h': - fprintf(stdout, listen_help_text ); - exit(0); - break; - case 'l': - *ip_addr = optarg; - break; - case 'L': - *rebind_ip_addr = optarg; - break; - case 'p': - *ip_port = optarg; - break; - case 'P': - *rebind_ip_port = optarg; - break; - case 'f': - *file = optarg; - break; - case 's': - *sock = optarg; - break; - case 'd': - *default_deny = 1; - break; - case 'v': - log_level = 0; - break; - default: - exit_err( listen_help_text ); - break; - } -} - -void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **bind_addr, char **from, char **size) -{ - switch(c){ - case 'h': - fprintf(stdout, read_help_text ); - exit( 0 ); - break; - case 'l': - *ip_addr = optarg; - break; - case 'p': - *ip_port = optarg; - break; - case 'F': - *from = optarg; - break; - case 'S': - *size = optarg; - break; - case 'b': - *bind_addr = optarg; - break; - case 'v': - log_level = 0; - break; - default: - exit_err( read_help_text ); - break; - } -} - -void read_sock_param( int c, char **sock, char *help_text ) -{ - switch(c){ - case 'h': - fprintf( stdout, help_text ); - exit( 0 ); - break; - case 's': - *sock = optarg; - break; - case 'v': - log_level = 0; - break; - default: - exit_err( help_text ); - break; - } -} - -void read_acl_param( int c, char **sock ) -{ - read_sock_param( c, sock, acl_help_text ); -} - -void read_mirror_param( int c, char **sock, char **ip_addr, char **ip_port, char **bind_addr ) -{ - switch( c ){ - case 'h': - fprintf( stdout, mirror_help_text ); - exit( 0 ); - break; - case 's': - *sock = optarg; - break; - case 'l': - *ip_addr = optarg; - break; - case 'p': - *ip_port = optarg; - break; - case 'b': - *bind_addr = optarg; - case 'v': - log_level = 0; - break; - default: - exit_err( mirror_help_text ); - break; - } -} - -void read_status_param( int c, char **sock ) -{ - read_sock_param( c, sock, status_help_text ); -} - -int mode_serve( int argc, char *argv[] ) -{ - int c; - char *ip_addr = NULL; - char *ip_port = NULL; - char *file = NULL; - char *sock = NULL; - int default_deny = 0; // not on by default - int err = 0; - - struct server * serve; - - while (1) { - c = getopt_long(argc, argv, serve_short_options, serve_options, NULL); - if ( c == -1 ) { break; } - - read_serve_param( c, &ip_addr, &ip_port, &file, &sock, &default_deny ); - } - - if ( NULL == ip_addr || NULL == ip_port ) { - err = 1; - fprintf( stderr, "both --addr and --port are required.\n" ); - } - if ( NULL == file ) { - err = 1; - fprintf( stderr, "--file is required\n" ); - } - if ( err ) { exit_err( serve_help_text ); } - - serve = server_create( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS, 1 ); - if ( 0 == do_serve( serve ) ) { - info( "Control transfered."); - } - server_destroy( serve ); - - return 0; + struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) ); + flexnbd->listen = listen_create( + flexnbd, + s_ip_address, + s_rebind_ip_address, + s_port, + s_rebind_port, + s_file, + default_deny, + acl_entries, + s_acl_entries, + max_nbd_clients); + flexnbd->serve = flexnbd->listen->init_serve; + flexnbd_create_shared( flexnbd, s_ctrl_sock ); + return flexnbd; } -int mode_listen( int argc, char *argv[] ) +void flexnbd_spawn_control(struct flexnbd * flexnbd ) { - int c; - char *ip_addr = NULL; - char *rebind_ip_addr = NULL; - char *ip_port = NULL; - char *rebind_ip_port = NULL; - char *file = NULL; - char *sock = NULL; - int default_deny = 0; // not on by default - int err = 0; + NULLCHECK( flexnbd ); + NULLCHECK( flexnbd->control ); - struct listen * listen; + pthread_t * control_thread = &flexnbd->control->thread; - while (1) { - c = getopt_long(argc, argv, listen_short_options, listen_options, NULL); - if ( c == -1 ) { break; } + FATAL_UNLESS( 0 == pthread_create( + control_thread, + NULL, + control_runner, + flexnbd->control ), + "Couldn't create the control thread" ); +} - read_listen_param( c, &ip_addr, &rebind_ip_addr, &ip_port, &rebind_ip_port, - &file, &sock, &default_deny ); - } - - if ( NULL == ip_addr || NULL == ip_port ) { - err = 1; - fprintf( stderr, "both --addr and --port are required.\n" ); - } - if ( NULL == file ) { - err = 1; - fprintf( stderr, "--file is required\n" ); - } - if ( err ) { exit_err( listen_help_text ); } - - listen = listen_create( ip_addr, rebind_ip_addr, - ip_port, rebind_ip_port, - file, sock, default_deny, - argc - optind, argv + optind, MAX_NBD_CLIENTS ); - do_listen( listen ); - listen_destroy( listen ); - - return 0; +void flexnbd_stop_control( struct flexnbd * flexnbd ) +{ + NULLCHECK( flexnbd ); + NULLCHECK( flexnbd->control ); + control_signal_close( flexnbd->control ); + FATAL_UNLESS( 0 == pthread_join( flexnbd->control->thread, NULL ), + "Failed joining the control thread" ); } -int mode_read( int argc, char *argv[] ) +int flexnbd_signal_fd( struct flexnbd * flexnbd ) { - int c; - char *ip_addr = NULL; - char *ip_port = NULL; - char *bind_addr = NULL; - char *from = NULL; - char *size = NULL; - int err = 0; - - struct mode_readwrite_params readwrite; - - while (1){ - c = getopt_long(argc, argv, read_short_options, read_options, NULL); - - if ( c == -1 ) { break; } - - read_readwrite_param( c, &ip_addr, &ip_port, &bind_addr, &from, &size ); - } - - if ( NULL == ip_addr || NULL == ip_port ) { - err = 1; - fprintf( stderr, "both --addr and --port are required.\n" ); - } - if ( NULL == from || NULL == size ) { - err = 1; - fprintf( stderr, "both --from and --size are required.\n" ); - } - if ( err ) { exit_err( read_help_text ); } - - memset( &readwrite, 0, sizeof( readwrite ) ); - params_readwrite( 0, &readwrite, ip_addr, ip_port, bind_addr, from, size ); - do_read( &readwrite ); - return 0; + NULLCHECK( flexnbd ); + return flexnbd->signal_fd; } -int mode_write( int argc, char *argv[] ) +void flexnbd_destroy( struct flexnbd * flexnbd ) { - int c; - char *ip_addr = NULL; - char *ip_port = NULL; - char *bind_addr = NULL; - char *from = NULL; - char *size = NULL; - int err = 0; - - struct mode_readwrite_params readwrite; - - while (1){ - c = getopt_long(argc, argv, write_short_options, write_options, NULL); - if ( c == -1 ) { break; } - - read_readwrite_param( c, &ip_addr, &ip_port, &bind_addr, &from, &size ); + NULLCHECK( flexnbd ); + if ( flexnbd->control ) { + control_destroy( flexnbd->control ); + } + if ( flexnbd->listen ) { + listen_destroy( flexnbd->listen ); } - if ( NULL == ip_addr || NULL == ip_port ) { - err = 1; - fprintf( stderr, "both --addr and --port are required.\n" ); - } - if ( NULL == from || NULL == size ) { - err = 1; - fprintf( stderr, "both --from and --size are required.\n" ); - } - if ( err ) { exit_err( write_help_text ); } - - memset( &readwrite, 0, sizeof( readwrite ) ); - params_readwrite( 1, &readwrite, ip_addr, ip_port, bind_addr, from, size ); - do_write( &readwrite ); - return 0; -} - -int mode_acl( int argc, char *argv[] ) -{ - int c; - char *sock = NULL; - - while (1) { - c = getopt_long( argc, argv, acl_short_options, acl_options, NULL ); - if ( c == -1 ) { break; } - read_acl_param( c, &sock ); - } - - if ( NULL == sock ){ - fprintf( stderr, "--sock is required.\n" ); - exit_err( acl_help_text ); - } - - /* Don't use the CMD_ACL macro here, "acl" is the remote command - * name, not the cli option - */ - do_remote_command( "acl", sock, argc - optind, argv + optind ); - - return 0; + close( flexnbd->signal_fd ); + free( flexnbd ); } -int mode_mirror( int argc, char *argv[] ) +/* THOU SHALT NOT DEREFERENCE flexnbd->serve OUTSIDE A SWITCH LOCK + */ +void flexnbd_switch_lock( struct flexnbd * flexnbd ) { - int c; - char *sock = NULL; - char *remote_argv[4] = {0}; - int err = 0; + NULLCHECK( flexnbd ); + pthread_mutex_lock( &flexnbd->switch_mutex ); +} - while (1) { - c = getopt_long( argc, argv, mirror_short_options, mirror_options, NULL); - if ( -1 == c ) { break; } - read_mirror_param( c, &sock, &remote_argv[0], &remote_argv[1], &remote_argv[2] ); - } +void flexnbd_switch_unlock( struct flexnbd * flexnbd ) +{ + NULLCHECK( flexnbd ); + pthread_mutex_unlock( &flexnbd->switch_mutex ); +} - if ( NULL == sock ){ - fprintf( stderr, "--sock is required.\n" ); - err = 1; +struct server * flexnbd_server( struct flexnbd * flexnbd ) +{ + NULLCHECK( flexnbd ); + return flexnbd->serve; +} + + +void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl ) +{ + NULLCHECK( flexnbd ); + flexnbd_switch_lock( flexnbd ); + { + server_replace_acl( flexnbd_server(flexnbd), acl ); } - if ( NULL == remote_argv[0] || NULL == remote_argv[1] ) { - fprintf( stderr, "both --addr and --port are required.\n"); - err = 1; - } - if ( err ) { exit_err( mirror_help_text ); } + flexnbd_switch_unlock( flexnbd ); +} + + +struct status * flexnbd_status_create( struct flexnbd * flexnbd ) +{ + NULLCHECK( flexnbd ); + struct status * status; - if (remote_argv[2] == NULL) { - do_remote_command( "mirror", sock, 2, remote_argv ); + flexnbd_switch_lock( flexnbd ); + { + status = status_create( flexnbd_server( flexnbd ) ); + } + flexnbd_switch_unlock( flexnbd ); + return status; +} + +/** THOU SHALT *ONLY* CALL THIS FROM INSIDE A SWITCH LOCK + */ +void flexnbd_set_server( struct flexnbd * flexnbd, struct server * serve ) +{ + NULLCHECK( flexnbd ); + flexnbd->serve = serve; +} + + +/* Calls the given callback to exchange server objects, then sets + * flexnbd->server so everything else can see it. */ +void flexnbd_switch( struct flexnbd * flexnbd, struct server *(listen_cb)(struct listen *) ) +{ + NULLCHECK( flexnbd ); + NULLCHECK( flexnbd->listen ); + + flexnbd_switch_lock( flexnbd ); + { + struct server * new_server = listen_cb( flexnbd->listen ); + NULLCHECK( new_server ); + flexnbd_set_server( flexnbd, new_server ); + } + flexnbd_switch_unlock( flexnbd ); + +} + +/* Get the default_deny of the current server object. This takes the + * switch_lock to avoid nastiness if the server switches and gets freed + * in the dereference chain. + * This means that this function must not be called if the switch lock + * is already held. + */ +int flexnbd_default_deny( struct flexnbd * flexnbd ) +{ + int result; + + NULLCHECK( flexnbd ); + flexnbd_switch_lock( flexnbd ); + { + result = server_default_deny( flexnbd->serve ); + } + flexnbd_switch_unlock( flexnbd ); + return result; +} + + +int flexnbd_serve( struct flexnbd * flexnbd ) +{ + NULLCHECK( flexnbd ); + int success; + + if ( flexnbd->control ){ + debug( "Spawning control thread" ); + flexnbd_spawn_control( flexnbd ); + } + + if ( flexnbd->listen ){ + success = do_listen( flexnbd->listen ); } else { - do_remote_command( "mirror", sock, 3, remote_argv ); + do_serve( flexnbd->serve ); + /* We can't tell here what the intent was. We can + * legitimately exit either in control or not. + */ + success = 1; } - return 0; -} - - -int mode_status( int argc, char *argv[] ) -{ - int c; - char *sock = NULL; - - while (1) { - c = getopt_long( argc, argv, status_short_options, status_options, NULL ); - if ( -1 == c ) { break; } - read_status_param( c, &sock ); - } - - if ( NULL == sock ){ - fprintf( stderr, "--sock is required.\n" ); - exit_err( acl_help_text ); - } - - do_remote_command( "status", sock, argc - optind, argv + optind ); - - return 0; -} - - -int mode_help( int argc, char *argv[] ) -{ - char *cmd; - char *help_text; - - if ( argc < 1 ){ - help_text = help_help_text; - } else { - cmd = argv[0]; - if (IS_CMD( CMD_SERVE, cmd ) ) { - help_text = serve_help_text; - } else if ( IS_CMD( CMD_LISTEN, cmd ) ) { - help_text = listen_help_text; - } else if ( IS_CMD( CMD_READ, cmd ) ) { - help_text = read_help_text; - } else if ( IS_CMD( CMD_WRITE, cmd ) ) { - help_text = write_help_text; - } else if ( IS_CMD( CMD_ACL, cmd ) ) { - help_text = acl_help_text; - } else if ( IS_CMD( CMD_MIRROR, cmd ) ) { - help_text = mirror_help_text; - } else if ( IS_CMD( CMD_STATUS, cmd ) ) { - help_text = status_help_text; - } else { exit_err( help_help_text ); } - } - - fprintf( stdout, help_text ); - return 0; -} - - -void mode(char* mode, int argc, char **argv) -{ - if ( IS_CMD( CMD_SERVE, mode ) ) { - mode_serve( argc, argv ); - } - else if ( IS_CMD( CMD_LISTEN, mode ) ) { - mode_listen( argc, argv ); - } - else if ( IS_CMD( CMD_READ, mode ) ) { - mode_read( argc, argv ); - } - else if ( IS_CMD( CMD_WRITE, mode ) ) { - mode_write( argc, argv ); - } - else if ( IS_CMD( CMD_ACL, mode ) ) { - mode_acl( argc, argv ); - } - else if ( IS_CMD( CMD_MIRROR, mode ) ) { - mode_mirror( argc, argv ); - } - else if ( IS_CMD( CMD_STATUS, mode ) ) { - mode_status( argc, argv ); - } - else if ( IS_CMD( CMD_HELP, mode ) ) { - mode_help( argc-1, argv+1 ); - } - else { - mode_help( argc-1, argv+1 ); - exit( 1 ); - } - exit(0); -} - - -int main(int argc, char** argv) -{ - signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */ - error_init(); - - if (argc < 2) { - exit_err( help_help_text ); - } - mode(argv[1], argc-1, argv+1); /* never returns */ - - return 0; + if ( flexnbd->control ) { + debug( "Stopping control thread" ); + flexnbd_stop_control( flexnbd ); + } + + return success; } diff --git a/src/flexnbd.h b/src/flexnbd.h new file mode 100644 index 0000000..6878523 --- /dev/null +++ b/src/flexnbd.h @@ -0,0 +1,75 @@ +#ifndef FLEXNBD_H +#define FLEXNBD_H + +#include "acl.h" +#include "mirror.h" +#include "serve.h" +#include "listen.h" +#include "self_pipe.h" +#include "mbox.h" +#include "control.h" + +/* Carries the "globals". */ +struct flexnbd { + + /* We always have a serve pointer, but it should never be + * dereferenced outside a flexnbd_switch_lock/unlock pair. + */ + struct server * serve; + /* We only have a listen object if the process was started in + * listen mode. + */ + struct listen * listen; + /* We only have a control object if a control socket name was + * passed on the command line. + */ + struct control * control; + + /* switch_mutex is the lock around dereferencing the serve + * pointer. + */ + pthread_mutex_t switch_mutex; + + /* File descriptor for a signalfd(2) signal stream. */ + int signal_fd; +}; + +struct flexnbd * flexnbd_create(void); +struct flexnbd * flexnbd_create_serving( + char* s_ip_address, + char* s_port, + char* s_file, + char *s_ctrl_sock, + int default_deny, + int acl_entries, + char** s_acl_entries, + int max_nbd_clients); + +struct flexnbd * flexnbd_create_listening( + char* s_ip_address, + char* s_rebind_ip_address, + char* s_port, + char* s_rebind_port, + char* s_file, + char *s_ctrl_sock, + int default_deny, + int acl_entries, + char** s_acl_entries, + int max_nbd_clients ); + +void flexnbd_destroy( struct flexnbd * ); +enum mirror_state; +enum mirror_state flexnbd_get_mirror_state( struct flexnbd * ); +void flexnbd_switch_lock( struct flexnbd * ); +void flexnbd_switch_unlock( struct flexnbd * ); +int flexnbd_default_deny( struct flexnbd * ); +void flexnbd_set_server( struct flexnbd * flexnbd, struct server * serve ); +void flexnbd_switch( struct flexnbd * flexnbd, struct server *(listen_cb)(struct listen *) ); +int flexnbd_signal_fd( struct flexnbd * flexnbd ); + + +int flexnbd_serve( struct flexnbd * flexnbd ); +struct server * flexnbd_server( struct flexnbd * flexnbd ); +void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl ); +struct status * flexnbd_status_create( struct flexnbd * flexnbd ); +#endif diff --git a/src/ioutil.c b/src/ioutil.c index 72c516e..6aec0be 100644 --- a/src/ioutil.c +++ b/src/ioutil.c @@ -75,7 +75,7 @@ struct bitset_mapping* build_allocation_map(int fd, uint64_t size, int resolutio return allocation_map; } -int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map) +int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **out_map) { off64_t size; @@ -237,7 +237,6 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines) * -1 for a read error */ if (readden <= 1) { return lines_count; } - fprintf(stderr, "Mallocing for %s\n", line ); *lines = xrealloc(*lines, (lines_count+1) * sizeof(char*)); (*lines)[lines_count] = strdup(line); if ((*lines)[lines_count][0] == 0) { @@ -251,7 +250,7 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines) int fd_is_closed( int fd_in ) { int errno_old = errno; - int result = fcntl( fd_in, F_GETFD, 9 ) < 0; + int result = fcntl( fd_in, F_GETFL ) < 0; errno = errno_old; return result; } diff --git a/src/ioutil.h b/src/ioutil.h index 3f4b727..09552d9 100644 --- a/src/ioutil.h +++ b/src/ioutil.h @@ -52,7 +52,7 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines); * ''out_size'' and the address of the mmap in ''out_map''. If anything goes * wrong, returns -1 setting errno, otherwise 0. */ -int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map); +int open_and_mmap( const char* filename, int* out_fd, off64_t *out_size, void **out_map); /** Check to see whether the given file descriptor is closed. diff --git a/src/listen.c b/src/listen.c index cc96d82..001d34e 100644 --- a/src/listen.c +++ b/src/listen.c @@ -1,38 +1,41 @@ #include "listen.h" #include "serve.h" #include "util.h" +#include "flexnbd.h" #include struct listen * listen_create( + struct flexnbd * flexnbd, char* s_ip_address, char* s_rebind_ip_address, char* s_port, char* s_rebind_port, char* s_file, - char *s_ctrl_sock, int default_deny, int acl_entries, char** s_acl_entries, int max_nbd_clients ) { + NULLCHECK( flexnbd ); struct listen * listen; listen = (struct listen *)xmalloc( sizeof( struct listen ) ); + listen->flexnbd = flexnbd; listen->init_serve = server_create( + flexnbd, s_ip_address, s_port, s_file, - s_ctrl_sock, default_deny, acl_entries, s_acl_entries, 1, 0); listen->main_serve = server_create( + flexnbd, s_rebind_ip_address ? s_rebind_ip_address : s_ip_address, s_rebind_port ? s_rebind_port : s_port, s_file, - s_ctrl_sock, default_deny, acl_entries, s_acl_entries, @@ -48,12 +51,17 @@ void listen_destroy( struct listen * listen ) } -void listen_switch( struct listen * listen ) +struct server *listen_switch( struct listen * listen ) { NULLCHECK( listen ); /* TODO: Copy acl from init_serve to main_serve */ /* TODO: rename underlying file from foo.INCOMPLETE to foo */ + + server_destroy( listen->init_serve ); + listen->init_serve = NULL; + info( "Switched to the main server, serving." ); + return listen->main_serve; } @@ -61,28 +69,47 @@ void listen_cleanup( void * unused __attribute__((unused)) ) { } -void do_listen( struct listen * listen ) +int do_listen( struct listen * listen ) { NULLCHECK( listen ); int have_control = 0; + flexnbd_switch_lock( listen->flexnbd ); + { + flexnbd_set_server( listen->flexnbd, listen->init_serve ); + } + flexnbd_switch_unlock( listen->flexnbd ); + + /* WATCH FOR RACES HERE: flexnbd->serve is set, but the server + * isn't running yet and the switch lock is released. + */ have_control = do_serve( listen->init_serve ); + if( have_control ) { info( "Taking control."); - listen_switch( listen ); - server_destroy( listen->init_serve ); - info( "Switched to the main server, serving." ); + flexnbd_switch( listen->flexnbd, listen_switch ); + /* WATCH FOR RACES HERE: the server hasn't been + * restarted before we release the flexnbd switch lock. + * do_serve doesn't return, so there's not a lot of + * choice about that. + */ do_serve( listen->main_serve ); } else { warn("Failed to take control, giving up."); server_destroy( listen->init_serve ); + listen->init_serve = NULL; } + /* TODO: here we must signal the control thread to stop before + * it tries to */ server_destroy( listen->main_serve ); + listen->main_serve = NULL; - info("Listen done."); + debug("Listen done, cleaning up"); listen_cleanup( listen ); + + return have_control; } diff --git a/src/listen.h b/src/listen.h index b340e61..55a2bd4 100644 --- a/src/listen.h +++ b/src/listen.h @@ -1,26 +1,28 @@ #ifndef LISTEN_H #define LISTEN_H +#include "flexnbd.h" #include "serve.h" struct listen { + struct flexnbd * flexnbd; struct server * init_serve; struct server * main_serve; }; struct listen * listen_create( + struct flexnbd * flexnbd, char* s_ip_address, char* s_rebind_ip_address, char* s_port, char* s_rebind_port, char* s_file, - char *s_ctrl_sock, int default_deny, int acl_entries, char** s_acl_entries, int max_nbd_clients ); void listen_destroy( struct listen* ); -void do_listen( struct listen * ); +int do_listen( struct listen * ); #endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..b8f52ba --- /dev/null +++ b/src/main.c @@ -0,0 +1,19 @@ +#include "util.h" +#include "mode.h" + +#include + + +int main(int argc, char** argv) +{ + signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */ + error_init(); + + if (argc < 2) { + exit_err( help_help_text ); + } + mode(argv[1], argc-1, argv+1); /* never returns */ + + return 0; +} + diff --git a/src/mbox.c b/src/mbox.c new file mode 100644 index 0000000..a175e41 --- /dev/null +++ b/src/mbox.c @@ -0,0 +1,77 @@ +#include "mbox.h" +#include "util.h" + +#include + +struct mbox * mbox_create( void ) +{ + struct mbox * mbox = xmalloc( sizeof( struct mbox ) ); + FATAL_UNLESS( 0 == pthread_cond_init( &mbox->filled_cond, NULL ), + "Failed to initialise a condition variable" ); + FATAL_UNLESS( 0 == pthread_cond_init( &mbox->emptied_cond, NULL ), + "Failed to initialise a condition variable" ); + FATAL_UNLESS( 0 == pthread_mutex_init( &mbox->mutex, NULL ), + "Failed to initialise a mutex" ); + return mbox; +} + +void mbox_post( struct mbox * mbox, void * contents ) +{ + pthread_mutex_lock( &mbox->mutex ); + { + if (mbox->full){ + pthread_cond_wait( &mbox->emptied_cond, &mbox->mutex ); + } + mbox->contents = contents; + mbox->full = 1; + while( 0 != pthread_cond_signal( &mbox->filled_cond ) ); + } + pthread_mutex_unlock( &mbox->mutex ); +} + + +void * mbox_contents( struct mbox * mbox ) +{ + return mbox->contents; +} + + +int mbox_is_full( struct mbox * mbox ) +{ + return mbox->full; +} + + +void * mbox_receive( struct mbox * mbox ) +{ + NULLCHECK( mbox ); + void * result; + + pthread_mutex_lock( &mbox->mutex ); + { + if ( !mbox->full ) { + pthread_cond_wait( &mbox->filled_cond, &mbox->mutex ); + } + mbox->full = 0; + result = mbox->contents; + mbox->contents = NULL; + while( 0 != pthread_cond_signal( &mbox->emptied_cond)); + } + pthread_mutex_unlock( &mbox->mutex ); + + + return result; +} + + +void mbox_destroy( struct mbox * mbox ) +{ + NULLCHECK( mbox ); + + while( 0 != pthread_cond_destroy( &mbox->emptied_cond ) ); + while( 0 != pthread_cond_destroy( &mbox->filled_cond ) ); + + while( 0 != pthread_mutex_destroy( &mbox->mutex ) ); + + free( mbox ); +} diff --git a/src/mbox.h b/src/mbox.h new file mode 100644 index 0000000..3af54d8 --- /dev/null +++ b/src/mbox.h @@ -0,0 +1,55 @@ +#ifndef MBOX_H +#define MBOX_H + +/** mbox + * A thread sync object. Put a void * into the mbox in one thread, and + * get it out in another. The receiving thread will block if there's + * nothing in the mbox, and the sending thread will block if there is. + * The mbox doesn't assume any responsibility for the pointer it's + * passed - you must free it yourself if it's malloced. + */ + + +#include + + +struct mbox { + void * contents; + + /** Marker to tell us if there's content in the box. + * Keeping this separate allows us to use NULL for the contents. + */ + int full; + + /** This gets signaled by mbox_post, and waited on by + * mbox_receive */ + pthread_cond_t filled_cond; + /** This is signaled by mbox_receive, and waited on by mbox_post */ + pthread_cond_t emptied_cond; + pthread_mutex_t mutex; +}; + + +/* Create an mbox. */ +struct mbox * mbox_create(void); + +/* Put something in the mbox, blocking if it's already full. + * That something can be NULL if you want. + */ +void mbox_post( struct mbox *, void *); + +/* See what's in the mbox. This isn't thread-safe. */ +void * mbox_contents( struct mbox *); + +/* See if anything has been put into the mbox. This isn't thread-safe. + * */ +int mbox_is_full( struct mbox *); + +/* Get the contents from the mbox, blocking if there's nothing there. */ +void * mbox_receive( struct mbox *); + +/* Free the mbox and destroy the associated pthread bits. */ +void mbox_destroy( struct mbox *); + + +#endif diff --git a/src/mirror.c b/src/mirror.c new file mode 100644 index 0000000..71faf6f --- /dev/null +++ b/src/mirror.c @@ -0,0 +1,598 @@ +/* FlexNBD server (C) Bytemark Hosting 2012 + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + + +#include "mirror.h" +#include "serve.h" +#include "util.h" +#include "ioutil.h" +#include "parse.h" +#include "readwrite.h" +#include "bitset.h" +#include "self_pipe.h" +#include "acl.h" +#include "status.h" + + +#include +#include +#include +#include + +struct mirror_status * mirror_status_alloc( + union mysockaddr * connect_to, + union mysockaddr * connect_from, + int max_Bps, + int action_at_finish, + struct self_pipe * commit_signal) +{ + struct mirror_status * mirror; + + mirror = xmalloc(sizeof(struct mirror_status)); + mirror->connect_to = connect_to; + mirror->connect_from = connect_from; + mirror->max_bytes_per_second = max_Bps; + mirror->action_at_finish = action_at_finish; + mirror->commit_signal = commit_signal; + mirror->commit_state = MS_UNKNOWN; + + return mirror; +} + +void mirror_set_state_f( struct mirror_status * mirror, enum mirror_state state ) +{ + NULLCHECK( mirror ); + mirror->commit_state = state; +} + +#define mirror_set_state( mirror, state ) do{\ + debug( "Mirror state => " #state );\ + mirror_set_state_f( mirror, state );\ +} while(0) + +enum mirror_state mirror_get_state( struct mirror_status * mirror ) +{ + NULLCHECK( mirror ); + return mirror->commit_state; +} + + +void mirror_status_init( struct mirror_status * mirror, const char * filename ) +{ + int map_fd; + off64_t size; + + NULLCHECK( mirror ); + NULLCHECK( filename ); + + FATAL_IF_NEGATIVE( + open_and_mmap( + filename, + &map_fd, + &size, + (void**) &mirror->mapped + ), + "Failed to open and mmap %s", + filename + ); + + mirror->dirty_map = bitset_alloc(size, 4096); + +} + + +/* Call this before a mirror attempt. */ +void mirror_status_reset( struct mirror_status * mirror ) +{ + NULLCHECK( mirror ); + NULLCHECK( mirror->dirty_map ); + mirror_set_state( mirror, MS_INIT ); + bitset_set(mirror->dirty_map); +} + + +struct mirror_status * mirror_status_create( + const char * filename, + union mysockaddr * connect_to, + union mysockaddr * connect_from, + int max_Bps, + int action_at_finish, + struct self_pipe * commit_signal) +{ + /* FIXME: shouldn't map_fd get closed? */ + struct mirror_status * mirror; + + mirror = mirror_status_alloc( connect_to, + connect_from, + max_Bps, + action_at_finish, + commit_signal); + + mirror_status_init( mirror, filename ); + mirror_status_reset( mirror ); + + + return mirror; +} + + +void mirror_status_destroy( struct mirror_status *mirror ) +{ + NULLCHECK( mirror ); + free(mirror->connect_to); + free(mirror->connect_from); + free(mirror->dirty_map); + free(mirror); +} + + +/** The mirror code will split NBD writes, making them this long as a maximum */ +static const int mirror_longest_write = 8<<20; + +/** If, during a mirror pass, we have sent this number of bytes or fewer, we + * go to freeze the I/O and finish it off. This is just a guess. + */ +static const unsigned int mirror_last_pass_after_bytes_written = 100<<20; + +/** The largest number of full passes we'll do - the last one will always + * cause the I/O to freeze, however many bytes are left to copy. + */ +static const int mirror_maximum_passes = 7; + +/* A single mirror pass over the disc, optionally locking IO around the + * transfer. + */ +int mirror_pass(struct server * serve, int should_lock, uint64_t *written) +{ + uint64_t current = 0; + int success = 1; + struct bitset_mapping *map = serve->mirror->dirty_map; + *written = 0; + + while (current < serve->size) { + int run = bitset_run_count(map, current, mirror_longest_write); + + debug("mirror current=%ld, run=%d", current, run); + + /* FIXME: we could avoid sending sparse areas of the + * disc here, and probably save a lot of bandwidth and + * time (if we know the destination starts off zeroed). + */ + if (bitset_is_set_at(map, current)) { + /* We've found a dirty area, send it */ + debug("^^^ writing"); + + /* We need to stop the main thread from working + * because it might corrupt the dirty map. This + * is likely to slow things down but will be + * safe. + */ + if (should_lock) { server_lock_io( serve ); } + { + /** FIXME: do something useful with bytes/second */ + + /** FIXME: error handling code here won't unlock */ + socket_nbd_write( serve->mirror->client, + current, + run, + 0, + serve->mirror->mapped + current); + + /* now mark it clean */ + bitset_clear_range(map, current, run); + } + if (should_lock) { server_unlock_io( serve ); } + + *written += run; + } + current += run; + + if (serve->mirror->signal_abandon) { + debug("Abandon message received" ); + success = 0; + break; + } + } + + return success; +} + + +void mirror_give_control( struct mirror_status * mirror ) +{ + debug( "mirror: entrusting and disconnecting" ); + /* TODO: set up an error handler to clean up properly on ERROR. + */ + + /* A transfer of control is expressed as a 3-way handshake. + * First, We send a REQUEST_ENTRUST. If this fails to be + * received, this thread will simply block until the server is + * restarted. If the remote end doesn't understand it, it'll + * disconnect us, and an ERROR *should* bomb this thread. + * FIXME: make the ERROR work. + * If we get an explicit error back from the remote end, then + * again, this thread will bomb out. + * On receiving a valid response, we send a REQUEST_DISCONNECT, + * and we quit without checking for a response. This is the + * remote server's signal to assume control of the file. The + * reason we don't check for a response is the state we end up + * in if the final message goes astray: if we lose the + * REQUEST_DISCONNECT, the sender has quit and the receiver + * hasn't had a signal to take over yet, so the data is safe. + * If we were to wait for a response to the REQUEST_DISCONNECT, + * the sender and receiver would *both* be servicing write + * requests while the response was in flight, and if the + * response went astray we'd have two servers claiming + * responsibility for the same data. + */ + socket_nbd_entrust( mirror->client ); + socket_nbd_disconnect( mirror->client ); +} + + +/* THIS FUNCTION MUST ONLY BE CALLED WITH THE SERVER'S IO LOCKED. */ +void mirror_on_exit( struct server * serve ) +{ + /* Send an explicit entrust and disconnect. After this + * point we cannot allow any reads or writes to the local file. + * We do this *before* trying to shut down the server so that if + * the transfer of control fails, we haven't stopped the server + * and already-connected clients don't get needlessly + * disconnected. + */ + debug( "mirror_give_control"); + mirror_give_control( serve->mirror ); + + /* If we're still here, the transfer of control went ok, and the + * remote is listening (or will be shortly). We can shut the + * server down. + * + * It doesn't matter if we get new client connections before + * now, the IO lock will stop them from doing anything. + */ + debug("serve_signal_close"); + serve_signal_close( serve ); + + /* We have to wait until the server is closed before unlocking + * IO. This is because the client threads check to see if the + * server is still open before reading or writing inside their + * own locks. If we don't wait for the close, there's no way to + * guarantee the server thread will win the race and we risk the + * clients seeing a "successful" write to a dead disc image. + */ + debug("serve_wait_for_close"); + serve_wait_for_close( serve ); + info("Mirror sent."); +} + + +void mirror_cleanup( struct mirror_status * mirror, + int fatal __attribute__((unused))) +{ + NULLCHECK( mirror ); + info( "Cleaning up mirror thread"); + + if( mirror->client && mirror->client > 0 ){ + close( mirror->client ); + } + mirror->client = -1; +} + + + +int mirror_status_connect( struct mirror_status * mirror, off64_t local_size ) +{ + struct sockaddr * connect_from = NULL; + if ( mirror->connect_from ) { + connect_from = &mirror->connect_from->generic; + } + + NULLCHECK( mirror->connect_to ); + + mirror->client = socket_connect(&mirror->connect_to->generic, connect_from); + if ( 0 < mirror->client ) { + fd_set fds; + struct timeval tv = { MS_HELLO_TIME_SECS, 0}; + FD_ZERO( &fds ); + FD_SET( mirror->client, &fds ); + + FATAL_UNLESS( 0 <= select( FD_SETSIZE, &fds, NULL, NULL, &tv ), + "Select failed." ); + + if( FD_ISSET( mirror->client, &fds ) ){ + off64_t remote_size; + if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) { + if( remote_size == local_size ){ + mirror_set_state( mirror, MS_GO ); + } + else { + warn("Remote size (%d) doesn't match local (%d)", + remote_size, local_size ); + mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH ); + } + } + else { + warn( "Mirror attempt rejected." ); + mirror_set_state( mirror, MS_FAIL_REJECTED ); + } + } + else { + warn( "No NBD Hello received." ); + mirror_set_state( mirror, MS_FAIL_NO_HELLO ); + } + } + else { + warn( "Mirror failed to connect."); + mirror_set_state( mirror, MS_FAIL_CONNECT ); + } + + return MS_GO == mirror_get_state(mirror); +} + + + +void mirror_run( struct server *serve ) +{ + NULLCHECK( serve ); + NULLCHECK( serve->mirror ); + + int pass; + uint64_t written; + + info("Starting mirror" ); + for (pass=0; pass < mirror_maximum_passes-1; pass++) { + + debug("mirror start pass=%d", pass); + if ( !mirror_pass( serve, 1, &written ) ){ + debug("Failed mirror pass state is %d", mirror_get_state( serve->mirror ) ); + debug("pass failed, giving up"); + return; } + + /* if we've not written anything */ + if (written < mirror_last_pass_after_bytes_written) { break; } + } + + mirror_set_state( serve->mirror, MS_FINALISE ); + server_lock_io( serve ); + { + if ( mirror_pass( serve, 0, &written ) && + ACTION_EXIT == serve->mirror->action_at_finish) { + debug("exit!"); + mirror_on_exit( serve ); + info("Server closed, quitting " + "after successful migration"); + } + } + server_unlock_io( serve ); +} + + +void mirror_signal_commit( struct mirror_status * mirror ) +{ + NULLCHECK( mirror ); + + self_pipe_signal( mirror->commit_signal ); +} + +/** Thread launched to drive mirror process */ +void* mirror_runner(void* serve_params_uncast) +{ + /* The supervisor thread relies on there not being any ERROR + * calls until after the mirror_signal_commit() call in this + * function. + * However, *after* that, we should call ERROR_* instead of + * FATAL_* wherever possible. + */ + struct server *serve = (struct server*) serve_params_uncast; + + NULLCHECK( serve ); + NULLCHECK( serve->mirror ); + struct mirror_status * mirror = serve->mirror; + NULLCHECK( mirror->dirty_map ); + + error_set_handler( (cleanup_handler *) mirror_cleanup, mirror ); + + info( "Connecting to mirror" ); + + time_t start_time = time(NULL); + int connected = mirror_status_connect( mirror, serve->size ); + mirror_signal_commit( mirror ); + if ( !connected ) { goto abandon_mirror; } + + /* After this point, if we see a failure we need to disconnect + * and retry everything from mirror_set_state(_, MS_INIT), but + * *without* signaling the commit or abandoning the mirror. + * */ + + if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){ + /* If we get here, then we managed to connect but the + * control thread feeding status back to the user will + * have gone away, leaving the user without meaningful + * feedback. In this instance, they have to assume a + * failure, so we can't afford to let the mirror happen. + * We have to set the state to avoid a race. + */ + mirror_set_state( mirror, MS_FAIL_CONNECT ); + warn( "Mirror connected, but too slowly" ); + goto abandon_mirror; + } + + mirror_run( serve ); + + mirror_set_state( mirror, MS_DONE ); +abandon_mirror: + return NULL; +} + + +struct mirror_super * mirror_super_create( + const char * filename, + union mysockaddr * connect_to, + union mysockaddr * connect_from, + int max_Bps, + int action_at_finish) +{ + struct mirror_super * super = xmalloc( sizeof( struct mirror_super) ); + super->mirror = mirror_status_create( + filename, + connect_to, + connect_from, + max_Bps, + action_at_finish, + self_pipe_create()) ; + super->state_mbox = mbox_create(); + return super; +} + + +/* Post the current state of the mirror into super->state_mbox */ +void mirror_super_signal_committed( struct mirror_super * super ) +{ + NULLCHECK( super ); + NULLCHECK( super->state_mbox ); + enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) ); + + *contents = mirror_get_state( super->mirror ); + + mbox_post( super->state_mbox, contents ); +} + + +void mirror_super_destroy( struct mirror_super * super ) +{ + NULLCHECK( super ); + + mbox_destroy( super->state_mbox ); + self_pipe_destroy( super->mirror->commit_signal ); + mirror_status_destroy( super->mirror ); + free( super ); +} + + +/* The mirror supervisor thread. Responsible for kicking off retries if + * the mirror thread fails. + * The mirror_status and mirror_super objects are never freed, and the + * mirror_super_runner thread is never joined. + */ +void * mirror_super_runner( void * serve_uncast ) +{ + struct server * serve = (struct server *) serve_uncast; + NULLCHECK( serve ); + NULLCHECK( serve->mirror ); + NULLCHECK( serve->mirror_super ); + + int should_retry = 0; + int success = 0; + fd_set fds; + int fd_count; + + struct mirror_status * mirror = serve->mirror; + struct mirror_super * super = serve->mirror_super; + + do { + if ( should_retry ) { + /* We don't want to hammer the destination too + * hard, so if this is a retry, insert a delay. */ + sleep( MS_RETRY_DELAY_SECS ); + + /* We also have to reset the bitmap to be sure + * we transfer everything */ + mirror_status_reset( mirror ); + } + + FATAL_IF( 0 != pthread_create( + &mirror->thread, + NULL, + mirror_runner, + serve), + "Failed to create mirror thread"); + + + debug("Supervisor waiting for commit signal"); + FD_ZERO( &fds ); + self_pipe_fd_set( mirror->commit_signal, &fds ); + /* There's no timeout on this select. This means that + * the mirror thread *must* signal then abort itself if + * it passes the timeout, and it *must* always signal, + * no matter what. + */ + fd_count = select( FD_SETSIZE, &fds, NULL, NULL, NULL ); + if ( 1 == fd_count ) { + debug( "Supervisor got commit signal" ); + if ( 0 == should_retry ) { + should_retry = 1; + /* Only send this signal the first time */ + mirror_super_signal_committed(super); + debug("Mirror supervisor committed"); + } + } + else { fatal( "Select failed." ); } + + + debug("Supervisor waiting for mirror thread" ); + pthread_join( mirror->thread, NULL ); + debug( "Clearing the commit signal. If this blocks," + " it's fatal but we can't check in advance." ); + self_pipe_signal_clear( mirror->commit_signal ); + debug( "Commit signal cleared." ); + + success = MS_DONE == mirror_get_state( mirror ); + + + if( success ){ info( "Mirror supervisor success, exiting" ); } + else if ( mirror->signal_abandon ) { + info( "Mirror abandoned" ); + should_retry = 0; + } + else if (should_retry){ + warn( "Mirror failed, retrying" ); + } + else { warn( "Mirror failed before commit, giving up" ); } + } + while ( should_retry && !success ); + + serve->mirror = NULL; + serve->mirror_super = NULL; + + mirror_super_destroy( super ); + debug( "Mirror supervisor done." ); + + return NULL; +} + + +#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1) + +/* Call this in the thread where you want to receive the mirror state */ +enum mirror_state mirror_super_wait( struct mirror_super * super ) +{ + NULLCHECK( super ); + NULLCHECK( super->state_mbox ); + + struct mbox * mbox = super->state_mbox; + enum mirror_state mirror_state; + enum mirror_state * contents; + + contents = (enum mirror_state*)mbox_receive( mbox ); + NULLCHECK( contents ); + + mirror_state = *contents; + + free(contents); + + return mirror_state; +} + diff --git a/src/mode.c b/src/mode.c new file mode 100644 index 0000000..1d538d8 --- /dev/null +++ b/src/mode.c @@ -0,0 +1,711 @@ +#include "mode.h" +#include "flexnbd.h" + +#include +#include +#include +#include + + +static struct option serve_options[] = { + GETOPT_HELP, + GETOPT_ADDR, + GETOPT_PORT, + GETOPT_FILE, + GETOPT_SOCK, + GETOPT_DENY, + GETOPT_VERBOSE, + {0} +}; +static char serve_short_options[] = "hl:p:f:s:d" SOPT_VERBOSE; +static char serve_help_text[] = + "Usage: flexnbd " CMD_SERVE " [*]\n\n" + "Serve FILE from ADDR:PORT, with an optional control socket at SOCK.\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address to serve on.\n" + "\t--" OPT_PORT ",-p \tThe port to serve on.\n" + "\t--" OPT_FILE ",-f \tThe file to serve.\n" + "\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n" + SOCK_LINE + VERBOSE_LINE; + + +static struct option listen_options[] = { + GETOPT_HELP, + GETOPT_ADDR, + GETOPT_REBIND_ADDR, + GETOPT_PORT, + GETOPT_REBIND_ADDR, + GETOPT_FILE, + GETOPT_SOCK, + GETOPT_DENY, + GETOPT_VERBOSE, + {0} +}; +static char listen_short_options[] = "hl:L:p:P:f:s:d" SOPT_VERBOSE; +static char listen_help_text[] = + "Usage: flexnbd " CMD_LISTEN " [*]\n\n" + "Listen for an incoming migration on ADDR:PORT, " + "then switch to REBIND_ADDR:REBIND_PORT on completion " + "to serve FILE.\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address to listen on.\n" + "\t--" OPT_REBIND_ADDR ",-L \tThe address to switch to, if given.\n" + "\t--" OPT_PORT ",-p \tThe port to listen on.\n" + "\t--" OPT_REBIND_PORT ",-P \tThe port to switch to, if given..\n" + "\t--" OPT_FILE ",-f \tThe file to serve.\n" + "\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n" + SOCK_LINE + VERBOSE_LINE; + + +static struct option read_options[] = { + GETOPT_HELP, + GETOPT_ADDR, + GETOPT_PORT, + GETOPT_FROM, + GETOPT_SIZE, + GETOPT_BIND, + GETOPT_VERBOSE, + {0} +}; +static char read_short_options[] = "hl:p:F:S:b:" SOPT_VERBOSE; +static char read_help_text[] = + "Usage: flexnbd " CMD_READ " \n\n" + "Read SIZE bytes from a server at ADDR:PORT to stdout, starting at OFFSET.\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address to read from.\n" + "\t--" OPT_PORT ",-p \tThe port to read from.\n" + "\t--" OPT_FROM ",-F \tByte offset to read from.\n" + "\t--" OPT_SIZE ",-S \tBytes to read.\n" + BIND_LINE + VERBOSE_LINE; + + +static struct option *write_options = read_options; +static char *write_short_options = read_short_options; +static char write_help_text[] = + "Usage: flexnbd " CMD_WRITE" \n\n" + "Write SIZE bytes from stdin to a server at ADDR:PORT, starting at OFFSET.\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address to write to.\n" + "\t--" OPT_PORT ",-p \tThe port to write to.\n" + "\t--" OPT_FROM ",-F \tByte offset to write from.\n" + "\t--" OPT_SIZE ",-S \tBytes to write.\n" + BIND_LINE + VERBOSE_LINE; + +static struct option acl_options[] = { + GETOPT_HELP, + GETOPT_SOCK, + GETOPT_VERBOSE, + {0} +}; +static char acl_short_options[] = "hs:" SOPT_VERBOSE; +static char acl_help_text[] = + "Usage: flexnbd " CMD_ACL " [+]\n\n" + "Set the access control list for a server with control socket SOCK.\n\n" + HELP_LINE + SOCK_LINE + VERBOSE_LINE; + +static struct option mirror_options[] = { + GETOPT_HELP, + GETOPT_SOCK, + GETOPT_ADDR, + GETOPT_PORT, + GETOPT_BIND, + GETOPT_VERBOSE, + {0} +}; +static char mirror_short_options[] = "hs:l:p:b:" SOPT_VERBOSE; +static char mirror_help_text[] = + "Usage: flexnbd " CMD_MIRROR " \n\n" + "Start mirroring from the server with control socket SOCK to one at ADDR:PORT.\n\n" + HELP_LINE + "\t--" OPT_ADDR ",-l \tThe address to mirror to.\n" + "\t--" OPT_PORT ",-p \tThe port to mirror to.\n" + SOCK_LINE + BIND_LINE + VERBOSE_LINE; + + +static struct option status_options[] = { + GETOPT_HELP, + GETOPT_SOCK, + GETOPT_VERBOSE, + {0} +}; +static char status_short_options[] = "hs:" SOPT_VERBOSE; +static char status_help_text[] = + "Usage: flexnbd " CMD_STATUS " \n\n" + "Get the status for a server with control socket SOCK.\n\n" + HELP_LINE + SOCK_LINE + VERBOSE_LINE; + +char help_help_text_arr[] = + "Usage: flexnbd [cmd options]\n\n" + "Commands:\n" + "\tflexnbd serve\n" + "\tflexnbd read\n" + "\tflexnbd write\n" + "\tflexnbd acl\n" + "\tflexnbd mirror\n" + "\tflexnbd status\n" + "\tflexnbd help\n\n" + "See flexnbd help for further info\n"; +/* Slightly odd array/pointer pair to stop the compiler from complaining + * about symbol sizes + */ +char * help_help_text = help_help_text_arr; + + + +int do_serve(struct server* params); +void do_read(struct mode_readwrite_params* params); +void do_write(struct mode_readwrite_params* params); +void do_remote_command(char* command, char* mode, int argc, char** argv); + + +void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char **sock, int *default_deny ) +{ + switch(c){ + case 'h': + fprintf(stdout, "%s\n", serve_help_text ); + exit( 0 ); + break; + case 'l': + *ip_addr = optarg; + break; + case 'p': + *ip_port = optarg; + break; + case 'f': + *file = optarg; + break; + case 's': + *sock = optarg; + break; + case 'd': + *default_deny = 1; + break; + case 'v': + log_level = 0; + break; + default: + exit_err( serve_help_text ); + break; + } +} + + +void read_listen_param( int c, + char **ip_addr, + char **rebind_ip_addr, + char **ip_port, + char **rebind_ip_port, + char **file, + char **sock, + int *default_deny ) +{ + switch(c){ + case 'h': + fprintf(stdout, "%s\n", listen_help_text ); + exit(0); + break; + case 'l': + *ip_addr = optarg; + break; + case 'L': + *rebind_ip_addr = optarg; + break; + case 'p': + *ip_port = optarg; + break; + case 'P': + *rebind_ip_port = optarg; + break; + case 'f': + *file = optarg; + break; + case 's': + *sock = optarg; + break; + case 'd': + *default_deny = 1; + break; + case 'v': + log_level = 0; + break; + default: + exit_err( listen_help_text ); + break; + } +} + +void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **bind_addr, char **from, char **size) +{ + switch(c){ + case 'h': + fprintf(stdout, "%s\n", read_help_text ); + exit( 0 ); + break; + case 'l': + *ip_addr = optarg; + break; + case 'p': + *ip_port = optarg; + break; + case 'F': + *from = optarg; + break; + case 'S': + *size = optarg; + break; + case 'b': + *bind_addr = optarg; + break; + case 'v': + log_level = 0; + break; + default: + exit_err( read_help_text ); + break; + } +} + +void read_sock_param( int c, char **sock, char *help_text ) +{ + switch(c){ + case 'h': + fprintf( stdout, "%s\n", help_text ); + exit( 0 ); + break; + case 's': + *sock = optarg; + break; + case 'v': + log_level = 0; + break; + default: + exit_err( help_text ); + break; + } +} + +void read_acl_param( int c, char **sock ) +{ + read_sock_param( c, sock, acl_help_text ); +} + +void read_mirror_param( int c, char **sock, char **ip_addr, char **ip_port, char **bind_addr ) +{ + switch( c ){ + case 'h': + fprintf( stdout, "%s\n", mirror_help_text ); + exit( 0 ); + break; + case 's': + *sock = optarg; + break; + case 'l': + *ip_addr = optarg; + break; + case 'p': + *ip_port = optarg; + break; + case 'b': + *bind_addr = optarg; + case 'v': + log_level = 0; + break; + default: + exit_err( mirror_help_text ); + break; + } +} + +void read_status_param( int c, char **sock ) +{ + read_sock_param( c, sock, status_help_text ); +} + +int mode_serve( int argc, char *argv[] ) +{ + int c; + char *ip_addr = NULL; + char *ip_port = NULL; + char *file = NULL; + char *sock = NULL; + int default_deny = 0; // not on by default + int err = 0; + + int success; + + struct flexnbd * flexnbd; + + while (1) { + c = getopt_long(argc, argv, serve_short_options, serve_options, NULL); + if ( c == -1 ) { break; } + + read_serve_param( c, &ip_addr, &ip_port, &file, &sock, &default_deny ); + } + + if ( NULL == ip_addr || NULL == ip_port ) { + err = 1; + fprintf( stderr, "both --addr and --port are required.\n" ); + } + if ( NULL == file ) { + err = 1; + fprintf( stderr, "--file is required\n" ); + } + if ( err ) { exit_err( serve_help_text ); } + + flexnbd = flexnbd_create_serving( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS ); + success = flexnbd_serve( flexnbd ); + flexnbd_destroy( flexnbd ); + + return 0; +} + + +int mode_listen( int argc, char *argv[] ) +{ + int c; + char *ip_addr = NULL; + char *rebind_ip_addr = NULL; + char *ip_port = NULL; + char *rebind_ip_port = NULL; + char *file = NULL; + char *sock = NULL; + int default_deny = 0; // not on by default + int err = 0; + + int success; + + struct flexnbd * flexnbd; + + while (1) { + c = getopt_long(argc, argv, listen_short_options, listen_options, NULL); + if ( c == -1 ) { break; } + + read_listen_param( c, &ip_addr, &rebind_ip_addr, &ip_port, &rebind_ip_port, + &file, &sock, &default_deny ); + } + + if ( NULL == ip_addr || NULL == ip_port ) { + err = 1; + fprintf( stderr, "both --addr and --port are required.\n" ); + } + if ( NULL == file ) { + err = 1; + fprintf( stderr, "--file is required\n" ); + } + if ( err ) { exit_err( listen_help_text ); } + + flexnbd = flexnbd_create_listening( + ip_addr, + rebind_ip_addr, + ip_port, + rebind_ip_port, + file, + sock, + default_deny, + argc - optind, + argv + optind, + MAX_NBD_CLIENTS ); + success = flexnbd_serve( flexnbd ); + flexnbd_destroy( flexnbd ); + + return success ? 0 : 1; +} + + +/* TODO: Separate this function. + * It should be: + * params_read( struct mode_readwrite_params* out, + * char *s_ip_address, + * char *s_port, + * char *s_from, + * char *s_length ) + * params_write( struct mode_readwrite_params* out, + * char *s_ip_address, + * char *s_port, + * char *s_from, + * char *s_length, + * char *s_filename ) + */ +void params_readwrite( + int write_not_read, + struct mode_readwrite_params* out, + char* s_ip_address, + char* s_port, + char* s_bind_address, + char* s_from, + char* s_length_or_filename +) +{ + FATAL_IF_NULL(s_ip_address, "No IP address supplied"); + FATAL_IF_NULL(s_port, "No port number supplied"); + FATAL_IF_NULL(s_from, "No from supplied"); + FATAL_IF_NULL(s_length_or_filename, "No length supplied"); + + FATAL_IF_ZERO( + parse_ip_to_sockaddr(&out->connect_to.generic, s_ip_address), + "Couldn't parse connection address '%s'", + s_ip_address + ); + + if (s_bind_address != NULL && + parse_ip_to_sockaddr(&out->connect_from.generic, s_bind_address) == 0) { + fatal("Couldn't parse bind address '%s'", s_bind_address); + } + + parse_port( s_port, &out->connect_to.v4 ); + + out->from = atol(s_from); + + if (write_not_read) { + if (s_length_or_filename[0]-48 < 10) { + out->len = atol(s_length_or_filename); + out->data_fd = 0; + } + else { + out->data_fd = open( + s_length_or_filename, O_RDONLY); + FATAL_IF_NEGATIVE(out->data_fd, + "Couldn't open %s", s_length_or_filename); + out->len = lseek64(out->data_fd, 0, SEEK_END); + FATAL_IF_NEGATIVE(out->len, + "Couldn't find length of %s", s_length_or_filename); + FATAL_IF_NEGATIVE( + lseek64(out->data_fd, 0, SEEK_SET), + "Couldn't rewind %s", s_length_or_filename + ); + } + } + else { + out->len = atol(s_length_or_filename); + out->data_fd = 1; + } +} + + +int mode_read( int argc, char *argv[] ) +{ + int c; + char *ip_addr = NULL; + char *ip_port = NULL; + char *bind_addr = NULL; + char *from = NULL; + char *size = NULL; + int err = 0; + + struct mode_readwrite_params readwrite; + + while (1){ + c = getopt_long(argc, argv, read_short_options, read_options, NULL); + + if ( c == -1 ) { break; } + + read_readwrite_param( c, &ip_addr, &ip_port, &bind_addr, &from, &size ); + } + + if ( NULL == ip_addr || NULL == ip_port ) { + err = 1; + fprintf( stderr, "both --addr and --port are required.\n" ); + } + if ( NULL == from || NULL == size ) { + err = 1; + fprintf( stderr, "both --from and --size are required.\n" ); + } + if ( err ) { exit_err( read_help_text ); } + + memset( &readwrite, 0, sizeof( readwrite ) ); + params_readwrite( 0, &readwrite, ip_addr, ip_port, bind_addr, from, size ); + do_read( &readwrite ); + return 0; +} + +int mode_write( int argc, char *argv[] ) +{ + int c; + char *ip_addr = NULL; + char *ip_port = NULL; + char *bind_addr = NULL; + char *from = NULL; + char *size = NULL; + int err = 0; + + struct mode_readwrite_params readwrite; + + while (1){ + c = getopt_long(argc, argv, write_short_options, write_options, NULL); + if ( c == -1 ) { break; } + + read_readwrite_param( c, &ip_addr, &ip_port, &bind_addr, &from, &size ); + } + + if ( NULL == ip_addr || NULL == ip_port ) { + err = 1; + fprintf( stderr, "both --addr and --port are required.\n" ); + } + if ( NULL == from || NULL == size ) { + err = 1; + fprintf( stderr, "both --from and --size are required.\n" ); + } + if ( err ) { exit_err( write_help_text ); } + + memset( &readwrite, 0, sizeof( readwrite ) ); + params_readwrite( 1, &readwrite, ip_addr, ip_port, bind_addr, from, size ); + do_write( &readwrite ); + return 0; +} + +int mode_acl( int argc, char *argv[] ) +{ + int c; + char *sock = NULL; + + while (1) { + c = getopt_long( argc, argv, acl_short_options, acl_options, NULL ); + if ( c == -1 ) { break; } + read_acl_param( c, &sock ); + } + + if ( NULL == sock ){ + fprintf( stderr, "--sock is required.\n" ); + exit_err( acl_help_text ); + } + + /* Don't use the CMD_ACL macro here, "acl" is the remote command + * name, not the cli option + */ + do_remote_command( "acl", sock, argc - optind, argv + optind ); + + return 0; +} + + +int mode_mirror( int argc, char *argv[] ) +{ + int c; + char *sock = NULL; + char *remote_argv[4] = {0}; + int err = 0; + + while (1) { + c = getopt_long( argc, argv, mirror_short_options, mirror_options, NULL); + if ( -1 == c ) { break; } + read_mirror_param( c, &sock, &remote_argv[0], &remote_argv[1], &remote_argv[2] ); + } + + if ( NULL == sock ){ + fprintf( stderr, "--sock is required.\n" ); + err = 1; + } + if ( NULL == remote_argv[0] || NULL == remote_argv[1] ) { + fprintf( stderr, "both --addr and --port are required.\n"); + err = 1; + } + if ( err ) { exit_err( mirror_help_text ); } + + if (remote_argv[2] == NULL) { + do_remote_command( "mirror", sock, 2, remote_argv ); + } + else { + do_remote_command( "mirror", sock, 3, remote_argv ); + } + + return 0; +} + + +int mode_status( int argc, char *argv[] ) +{ + int c; + char *sock = NULL; + + while (1) { + c = getopt_long( argc, argv, status_short_options, status_options, NULL ); + if ( -1 == c ) { break; } + read_status_param( c, &sock ); + } + + if ( NULL == sock ){ + fprintf( stderr, "--sock is required.\n" ); + exit_err( acl_help_text ); + } + + do_remote_command( "status", sock, argc - optind, argv + optind ); + + return 0; +} + + +int mode_help( int argc, char *argv[] ) +{ + char *cmd; + char *help_text = NULL; + + if ( argc < 1 ){ + help_text = help_help_text; + } else { + cmd = argv[0]; + if (IS_CMD( CMD_SERVE, cmd ) ) { + help_text = serve_help_text; + } else if ( IS_CMD( CMD_LISTEN, cmd ) ) { + help_text = listen_help_text; + } else if ( IS_CMD( CMD_READ, cmd ) ) { + help_text = read_help_text; + } else if ( IS_CMD( CMD_WRITE, cmd ) ) { + help_text = write_help_text; + } else if ( IS_CMD( CMD_ACL, cmd ) ) { + help_text = acl_help_text; + } else if ( IS_CMD( CMD_MIRROR, cmd ) ) { + help_text = mirror_help_text; + } else if ( IS_CMD( CMD_STATUS, cmd ) ) { + help_text = status_help_text; + } else { exit_err( help_help_text ); } + } + + fprintf( stdout, "%s\n", help_text ); + return 0; +} + + +void mode(char* mode, int argc, char **argv) +{ + if ( IS_CMD( CMD_SERVE, mode ) ) { + exit( mode_serve( argc, argv ) ); + } + else if ( IS_CMD( CMD_LISTEN, mode ) ) { + exit( mode_listen( argc, argv ) ); + } + else if ( IS_CMD( CMD_READ, mode ) ) { + mode_read( argc, argv ); + } + else if ( IS_CMD( CMD_WRITE, mode ) ) { + mode_write( argc, argv ); + } + else if ( IS_CMD( CMD_ACL, mode ) ) { + mode_acl( argc, argv ); + } + else if ( IS_CMD( CMD_MIRROR, mode ) ) { + mode_mirror( argc, argv ); + } + else if ( IS_CMD( CMD_STATUS, mode ) ) { + mode_status( argc, argv ); + } + else if ( IS_CMD( CMD_HELP, mode ) ) { + mode_help( argc-1, argv+1 ); + } + else { + mode_help( argc-1, argv+1 ); + exit( 1 ); + } + exit(0); +} + + diff --git a/src/mode.h b/src/mode.h new file mode 100644 index 0000000..46d3c92 --- /dev/null +++ b/src/mode.h @@ -0,0 +1,76 @@ +#ifndef MODE_H +#define MODE_H + + +void mode(char* mode, int argc, char **argv); + + +#include + +#define GETOPT_ARG(x,s) {(x), 1, 0, (s)} +#define GETOPT_FLAG(x,v) {(x), 0, 0, (v)} + +#define OPT_HELP "help" +#define OPT_ADDR "addr" +#define OPT_REBIND_ADDR "rebind-addr" +#define OPT_BIND "bind" +#define OPT_PORT "port" +#define OPT_REBIND_PORT "rebind-port" +#define OPT_FILE "file" +#define OPT_SOCK "sock" +#define OPT_FROM "from" +#define OPT_SIZE "size" +#define OPT_DENY "default-deny" + +#define CMD_SERVE "serve" +#define CMD_LISTEN "listen" +#define CMD_READ "read" +#define CMD_WRITE "write" +#define CMD_ACL "acl" +#define CMD_MIRROR "mirror" +#define CMD_STATUS "status" +#define CMD_HELP "help" +#define LEN_CMD_MAX 7 + +#define PATH_LEN_MAX 1024 +#define ADDR_LEN_MAX 64 + + +#define IS_CMD(x,c) (strncmp((x),(c),(LEN_CMD_MAX)) == 0) + +#define GETOPT_HELP GETOPT_FLAG( OPT_HELP, 'h' ) +#define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' ) + +#define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' ) +#define GETOPT_REBIND_ADDR GETOPT_ARG( OPT_REBIND_ADDR, 'L') +#define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' ) +#define GETOPT_REBIND_PORT GETOPT_ARG( OPT_REBIND_PORT, 'P') +#define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' ) +#define GETOPT_SOCK GETOPT_ARG( OPT_SOCK, 's' ) +#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' ) +#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' ) +#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' ) + +#ifdef DEBUG +# define OPT_VERBOSE "verbose" +# define SOPT_VERBOSE "v" +# define GETOPT_VERBOSE GETOPT_FLAG( OPT_VERBOSE, 'v' ) +# define VERBOSE_LINE \ + "\t--" OPT_VERBOSE ",-" SOPT_VERBOSE "\t\tOutput debug information.\n" +#else +# define GETOPT_VERBOSE {0} +# define VERBOSE_LINE "" +# define SOPT_VERBOSE "" +#endif + +#define HELP_LINE \ + "\t--" OPT_HELP ",-h \tThis text.\n" +#define SOCK_LINE \ + "\t--" OPT_SOCK ",-s \tPath to the control socket.\n" +#define BIND_LINE \ + "\t--" OPT_BIND ",-b \tBind the local socket to a particular IP address.\n" + + +char * help_help_text; + +#endif diff --git a/src/options.h b/src/options.h deleted file mode 100644 index 7c7b035..0000000 --- a/src/options.h +++ /dev/null @@ -1,215 +0,0 @@ -#define OPTIONS_H - -#define GETOPT_ARG(x,s) {(x), 1, 0, (s)} -#define GETOPT_FLAG(x,v) {(x), 0, 0, (v)} - -#define OPT_HELP "help" -#define OPT_ADDR "addr" -#define OPT_REBIND_ADDR "rebind-addr" -#define OPT_BIND "bind" -#define OPT_PORT "port" -#define OPT_REBIND_PORT "rebind-port" -#define OPT_FILE "file" -#define OPT_SOCK "sock" -#define OPT_FROM "from" -#define OPT_SIZE "size" -#define OPT_DENY "default-deny" - -#define CMD_SERVE "serve" -#define CMD_LISTEN "listen" -#define CMD_READ "read" -#define CMD_WRITE "write" -#define CMD_ACL "acl" -#define CMD_MIRROR "mirror" -#define CMD_STATUS "status" -#define CMD_HELP "help" -#define LEN_CMD_MAX 7 - -#define PATH_LEN_MAX 1024 -#define ADDR_LEN_MAX 64 - - -#define IS_CMD(x,c) (strncmp((x),(c),(LEN_CMD_MAX)) == 0) - -#define GETOPT_HELP GETOPT_FLAG( OPT_HELP, 'h' ) -#define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' ) - -#define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' ) -#define GETOPT_REBIND_ADDR GETOPT_ARG( OPT_REBIND_ADDR, 'L') -#define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' ) -#define GETOPT_REBIND_PORT GETOPT_ARG( OPT_REBIND_PORT, 'P') -#define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' ) -#define GETOPT_SOCK GETOPT_ARG( OPT_SOCK, 's' ) -#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' ) -#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' ) -#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' ) - -#ifdef DEBUG -# define OPT_VERBOSE "verbose" -# define SOPT_VERBOSE "v" -# define GETOPT_VERBOSE GETOPT_FLAG( OPT_VERBOSE, 'v' ) -# define VERBOSE_LINE \ - "\t--" OPT_VERBOSE ",-" SOPT_VERBOSE "\t\tOutput debug information.\n" -#else -# define GETOPT_VERBOSE {0} -# define VERBOSE_LINE "" -# define SOPT_VERBOSE "" -#endif - -#define HELP_LINE \ - "\t--" OPT_HELP ",-h \tThis text.\n" -#define SOCK_LINE \ - "\t--" OPT_SOCK ",-s \tPath to the control socket.\n" -#define BIND_LINE \ - "\t--" OPT_BIND ",-b \tBind the local socket to a particular IP address.\n" - - -static struct option serve_options[] = { - GETOPT_HELP, - GETOPT_ADDR, - GETOPT_PORT, - GETOPT_FILE, - GETOPT_SOCK, - GETOPT_DENY, - GETOPT_VERBOSE, - {0} -}; -static char serve_short_options[] = "hl:p:f:s:d" SOPT_VERBOSE; -static char serve_help_text[] = - "Usage: flexnbd " CMD_SERVE " [*]\n\n" - "Serve FILE from ADDR:PORT, with an optional control socket at SOCK.\n\n" - HELP_LINE - "\t--" OPT_ADDR ",-l \tThe address to serve on.\n" - "\t--" OPT_PORT ",-p \tThe port to serve on.\n" - "\t--" OPT_FILE ",-f \tThe file to serve.\n" - "\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n" - SOCK_LINE - VERBOSE_LINE; - - -static struct option listen_options[] = { - GETOPT_HELP, - GETOPT_ADDR, - GETOPT_REBIND_ADDR, - GETOPT_PORT, - GETOPT_REBIND_ADDR, - GETOPT_FILE, - GETOPT_SOCK, - GETOPT_DENY, - GETOPT_VERBOSE, - {0} -}; -static char listen_short_options[] = "hl:L:p:P:f:s:d" SOPT_VERBOSE; -static char listen_help_text[] = - "Usage: flexnbd " CMD_LISTEN " [*]\n\n" - "Listen for an incoming migration on ADDR:PORT, " - "then switch to REBIND_ADDR:REBIND_PORT on completion " - "to serve FILE.\n\n" - HELP_LINE - "\t--" OPT_ADDR ",-l \tThe address to listen on.\n" - "\t--" OPT_REBIND_ADDR ",-L \tThe address to switch to, if given.\n" - "\t--" OPT_PORT ",-p \tThe port to listen on.\n" - "\t--" OPT_REBIND_PORT ",-P \tThe port to switch to, if given..\n" - "\t--" OPT_FILE ",-f \tThe file to serve.\n" - "\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n" - SOCK_LINE - VERBOSE_LINE; - - -static struct option read_options[] = { - GETOPT_HELP, - GETOPT_ADDR, - GETOPT_PORT, - GETOPT_FROM, - GETOPT_SIZE, - GETOPT_BIND, - GETOPT_VERBOSE, - {0} -}; -static char read_short_options[] = "hl:p:F:S:b:" SOPT_VERBOSE; -static char read_help_text[] = - "Usage: flexnbd " CMD_READ " \n\n" - "Read SIZE bytes from a server at ADDR:PORT to stdout, starting at OFFSET.\n\n" - HELP_LINE - "\t--" OPT_ADDR ",-l \tThe address to read from.\n" - "\t--" OPT_PORT ",-p \tThe port to read from.\n" - "\t--" OPT_FROM ",-F \tByte offset to read from.\n" - "\t--" OPT_SIZE ",-S \tBytes to read.\n" - BIND_LINE - VERBOSE_LINE; - - -static struct option *write_options = read_options; -static char *write_short_options = read_short_options; -static char write_help_text[] = - "Usage: flexnbd " CMD_WRITE" \n\n" - "Write SIZE bytes from stdin to a server at ADDR:PORT, starting at OFFSET.\n\n" - HELP_LINE - "\t--" OPT_ADDR ",-l \tThe address to write to.\n" - "\t--" OPT_PORT ",-p \tThe port to write to.\n" - "\t--" OPT_FROM ",-F \tByte offset to write from.\n" - "\t--" OPT_SIZE ",-S \tBytes to write.\n" - BIND_LINE - VERBOSE_LINE; - -struct option acl_options[] = { - GETOPT_HELP, - GETOPT_SOCK, - GETOPT_VERBOSE, - {0} -}; -static char acl_short_options[] = "hs:" SOPT_VERBOSE; -static char acl_help_text[] = - "Usage: flexnbd " CMD_ACL " [+]\n\n" - "Set the access control list for a server with control socket SOCK.\n\n" - HELP_LINE - SOCK_LINE - VERBOSE_LINE; - -struct option mirror_options[] = { - GETOPT_HELP, - GETOPT_SOCK, - GETOPT_ADDR, - GETOPT_PORT, - GETOPT_BIND, - GETOPT_VERBOSE, - {0} -}; -static char mirror_short_options[] = "hs:l:p:b:" SOPT_VERBOSE; -static char mirror_help_text[] = - "Usage: flexnbd " CMD_MIRROR " \n\n" - "Start mirroring from the server with control socket SOCK to one at ADDR:PORT.\n\n" - HELP_LINE - "\t--" OPT_ADDR ",-l \tThe address to mirror to.\n" - "\t--" OPT_PORT ",-p \tThe port to mirror to.\n" - SOCK_LINE - BIND_LINE - VERBOSE_LINE; - - -struct option status_options[] = { - GETOPT_HELP, - GETOPT_SOCK, - GETOPT_VERBOSE, - {0} -}; -static char status_short_options[] = "hs:" SOPT_VERBOSE; -static char status_help_text[] = - "Usage: flexnbd " CMD_STATUS " \n\n" - "Get the status for a server with control socket SOCK.\n\n" - HELP_LINE - SOCK_LINE - VERBOSE_LINE; - -static char help_help_text[] = - "Usage: flexnbd [cmd options]\n\n" - "Commands:\n" - "\tflexnbd serve\n" - "\tflexnbd read\n" - "\tflexnbd write\n" - "\tflexnbd acl\n" - "\tflexnbd mirror\n" - "\tflexnbd status\n" - "\tflexnbd help\n\n" - "See flexnbd help for further info\n"; - diff --git a/src/remote.c b/src/remote.c index dc5801b..778c6dc 100644 --- a/src/remote.c +++ b/src/remote.c @@ -27,6 +27,7 @@ void do_remote_command(char* command, char* socket_name, int argc, char** argv) { char newline=10; int i; + debug( "connecting to run remote command %s", command ); int remote = socket(AF_UNIX, SOCK_STREAM, 0); struct sockaddr_un address; char response[max_response]; diff --git a/src/self_pipe.h b/src/self_pipe.h index 17b3269..76576df 100644 --- a/src/self_pipe.h +++ b/src/self_pipe.h @@ -1,3 +1,4 @@ +#ifndef SELF_PIPE_H #define SELF_PIPE_H #include @@ -14,3 +15,5 @@ int self_pipe_signal_clear( struct self_pipe *sig ); int self_pipe_destroy( struct self_pipe * sig ); int self_pipe_fd_set( struct self_pipe * sig, fd_set * fds ); int self_pipe_fd_isset( struct self_pipe *sig, fd_set *fds ); + +#endif diff --git a/src/serve.c b/src/serve.c index 0798ab5..c7ec0b7 100644 --- a/src/serve.c +++ b/src/serve.c @@ -37,18 +37,20 @@ static inline void* sockaddr_address_data(struct sockaddr* sockaddr) } struct server * server_create ( + struct flexnbd * flexnbd, char* s_ip_address, char* s_port, char* s_file, - char *s_ctrl_sock, int default_deny, int acl_entries, char** s_acl_entries, int max_nbd_clients, int has_control) { + NULLCHECK( flexnbd ); struct server * out; out = xmalloc( sizeof( struct server ) ); + out->flexnbd = flexnbd; out->has_control = has_control; out->max_nbd_clients = max_nbd_clients; out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) ); @@ -66,9 +68,6 @@ struct server * server_create ( s_ip_address ); - /* control_socket_name is optional. It just won't get created if - * we pass NULL. */ - out->control_socket_name = s_ctrl_sock; out->acl = acl_create( acl_entries, s_acl_entries, default_deny ); if (out->acl && out->acl->len != acl_entries) { @@ -87,7 +86,6 @@ struct server * server_create ( out->close_signal = self_pipe_create(); out->acl_updated_signal = self_pipe_create(); - out->vacuum_signal = self_pipe_create(); NULLCHECK( out->close_signal ); NULLCHECK( out->acl_updated_signal ); @@ -101,8 +99,6 @@ void server_destroy( struct server * serve ) serve->acl_updated_signal = NULL; self_pipe_destroy( serve->close_signal ); serve->close_signal = NULL; - self_pipe_destroy( serve->vacuum_signal ); - serve->vacuum_signal = NULL; pthread_mutex_destroy( &serve->l_acl ); pthread_mutex_destroy( &serve->l_io ); @@ -230,6 +226,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre s_client_address, 64 ); + debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread ); join_errno = joinfunc(entry->thread, &status); /* join_errno can legitimately be ESRCH if the thread is * already dead, but the client still needs tidying up. */ @@ -245,6 +242,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre s_client_address, (uint64_t)status); client_destroy( entry->client ); + entry->client = NULL; entry->thread = 0; was_closed = 1; } @@ -366,44 +364,13 @@ int server_should_accept_client( } -struct client_cleanup { - pthread_t client_thread; - struct self_pipe * vacuum_signal; -}; - -void *client_vacuum( void * cleanup_uncast ) -{ - pthread_detach( pthread_self() ); - NULLCHECK( cleanup_uncast ); - struct client_cleanup *cleanup = (struct client_cleanup *)cleanup_uncast; - - pthread_join( cleanup->client_thread, NULL ); - self_pipe_signal( cleanup->vacuum_signal ); - free( cleanup ); - return NULL; -} - - -/* Why do we need this rather odd arrangement? Because if we don't have - * it, dead threads don't get tidied up until the next incoming - * connection happens. - */ int spawn_client_thread( struct client * client_params, - struct self_pipe * vacuum_signal, pthread_t *out_thread) { - struct client_cleanup * cleanup = xmalloc( sizeof( struct client_cleanup ) ); - cleanup->vacuum_signal = vacuum_signal; - int result = pthread_create(&cleanup->client_thread, NULL, client_serve, client_params); + int result = pthread_create(out_thread, NULL, client_serve, client_params); - if ( 0 == result ){ - pthread_t watcher; - pthread_create( &watcher, NULL, client_vacuum, cleanup ); - - *out_thread = cleanup->client_thread; - } return result; } @@ -446,7 +413,7 @@ void accept_nbd_client( pthread_t * thread = ¶ms->nbd_client[slot].thread; - if (spawn_client_thread( client_params, params->vacuum_signal, thread ) != 0) { + if ( 0 != spawn_client_thread( client_params, thread ) ) { debug( "Thread creation problem." ); client_destroy( client_params ); close(client_fd); @@ -501,6 +468,7 @@ void server_close_clients( struct server *params ) entry = ¶ms->nbd_client[i]; if ( entry->thread != 0 ) { + debug( "Stop signaling client %p", entry->client ); client_signal_stop( entry->client ); } } @@ -547,12 +515,9 @@ int server_accept( struct server * params ) FD_ZERO(&fds); FD_SET(params->server_fd, &fds); + FD_SET(flexnbd_signal_fd( params->flexnbd ), &fds); self_pipe_fd_set( params->close_signal, &fds ); self_pipe_fd_set( params->acl_updated_signal, &fds ); - self_pipe_fd_set( params->vacuum_signal, &fds ); - if (params->control_socket_name) { - FD_SET(params->control_fd, &fds); - } FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds, NULL, NULL, NULL), "select() failed"); @@ -562,11 +527,13 @@ int server_accept( struct server * params ) return 0; } - if ( self_pipe_fd_isset( params->vacuum_signal, &fds ) ) { - cleanup_client_threads( params->nbd_client, params->max_nbd_clients ); - self_pipe_signal_clear( params->vacuum_signal ); + if ( FD_ISSET( flexnbd_signal_fd( params->flexnbd ), &fds ) ){ + debug( "Stop signal received." ); + server_close_clients( params ); + return 0; } + if ( self_pipe_fd_isset( params->acl_updated_signal, &fds ) ) { self_pipe_signal_clear( params->acl_updated_signal ); server_audit_clients( params ); @@ -577,12 +544,6 @@ int server_accept( struct server * params ) debug("Accepted nbd client socket"); accept_nbd_client(params, client_fd, &client_address); } - else if( FD_ISSET( params->control_fd, &fds ) ) { - client_fd = accept( params->control_fd, &client_address.generic, &socklen ); - debug("Accepted control client socket"); - accept_control_connection(params, client_fd, &client_address); - } - return 1; } @@ -655,17 +616,16 @@ void serve_cleanup(struct server* params, int i; if (params->server_fd){ close(params->server_fd); } - if (params->control_fd){ close(params->control_fd); } - if (params->control_socket_name){ ; } if (params->allocation_map) { free(params->allocation_map); } - if (params->mirror) { - pthread_t mirror_t = params->mirror->thread; + if (params->mirror_super) { + /* AWOOGA! RACE! */ + pthread_t mirror_t = params->mirror_super->thread; params->mirror->signal_abandon = 1; - pthread_join(mirror_t, NULL); + pthread_join( mirror_t, NULL ); } for (i=0; i < params->max_nbd_clients; i++) { @@ -687,6 +647,11 @@ int server_is_in_control( struct server *serve ) return serve->has_control; } +int server_default_deny( struct server * serve ) +{ + NULLCHECK( serve ); + return acl_default_deny( serve->acl ); +} /** Full lifecycle of the server */ int do_serve(struct server* params) @@ -697,12 +662,10 @@ int do_serve(struct server* params) error_set_handler((cleanup_handler*) serve_cleanup, params); serve_open_server_socket(params); - serve_open_control_socket(params); serve_init_allocation_map(params); serve_accept_loop(params); has_control = params->has_control; serve_cleanup(params, 0); - debug("Server %s control.", has_control ? "has" : "does not have" ); return has_control; } diff --git a/src/serve.h b/src/serve.h index 76f5f6b..8620440 100644 --- a/src/serve.h +++ b/src/serve.h @@ -4,66 +4,13 @@ #include #include +#include "flexnbd.h" #include "parse.h" #include "acl.h" static const int block_allocation_resolution = 4096;//128<<10; -enum mirror_finish_action { - ACTION_EXIT, - ACTION_NOTHING -}; - -enum mirror_state { - MS_UNKNOWN, - MS_INIT, - MS_GO, - MS_FINALISE, - MS_DONE, - MS_FAIL_CONNECT, - MS_FAIL_REJECTED, - MS_FAIL_NO_HELLO, - MS_FAIL_SIZE_MISMATCH -}; - -struct mirror_status { - pthread_t thread; - /* set to 1, then join thread to make mirror terminate early */ - int signal_abandon; - union mysockaddr * connect_to; - union mysockaddr * connect_from; - int client; - char *filename; - off64_t max_bytes_per_second; - enum mirror_finish_action action_at_finish; - - char *mapped; - struct bitset_mapping *dirty_map; - - /* Pass a commit state pointer, then it will be updated - * immediately before commit_signal is sent. - */ - enum mirror_state * commit_state; - - /* commit_signal is sent immediately after attempting to connect - * and checking the remote size, whether successful or not. - */ - struct self_pipe * commit_signal; -}; - - -struct mirror_super { - struct mirror_status * mirror; - struct self_pipe * commit_signal; - pthread_t thread; -}; - - -struct control_params { - int socket; - struct server* serve; -}; struct client_tbl_entry { pthread_t thread; @@ -74,6 +21,9 @@ struct client_tbl_entry { #define MAX_NBD_CLIENTS 16 struct server { + /* The flexnbd wrapper this server is attached to */ + struct flexnbd * flexnbd; + /** address/port to bind to */ union mysockaddr bind_to; /** (static) file name to serve */ @@ -101,11 +51,6 @@ struct server { struct self_pipe * acl_updated_signal; pthread_mutex_t l_acl; - /** vacuum_signal will be sent when client threads terminate. - * This is mainly to keep valgrind happy - */ - struct self_pipe * vacuum_signal; - struct mirror_status* mirror; struct mirror_super * mirror_super; int server_fd; @@ -117,18 +62,18 @@ struct server { struct client_tbl_entry *nbd_client; - /* Marker for whether or not this server has control over the - * data in the file, or if we're waiting to receive it from an - * inbound migration which hasn't yet finished. + /* Marker for whether this server has control over the data in + * the file, or if we're waiting to receive it from an inbound + * migration which hasn't yet finished. */ int has_control; }; struct server * server_create( + struct flexnbd * flexnbd, char* s_ip_address, char* s_port, char* s_file, - char *s_ctrl_sock, int default_deny, int acl_entries, char** s_acl_entries, @@ -144,6 +89,7 @@ void serve_wait_for_close( struct server * serve ); void server_replace_acl( struct server *serve, struct acl * acl); void server_control_arrived( struct server *serve ); int server_is_in_control( struct server *serve ); +int server_default_deny( struct server * serve ); int do_serve( struct server * ); diff --git a/src/util.c b/src/util.c index a98aa98..96e1391 100644 --- a/src/util.c +++ b/src/util.c @@ -33,6 +33,14 @@ void error_handler(int fatal __attribute__ ((unused)) ) } +void exit_err( const char *msg ) +{ + fprintf( stderr, "%s\n", msg ); + exit( 1 ); +} + + + void mylog(int line_level, const char* format, ...) { va_list argptr; diff --git a/src/util.h b/src/util.h index 5426d0c..6d0f57b 100644 --- a/src/util.h +++ b/src/util.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include void* xrealloc(void* ptr, size_t size); void* xmalloc(size_t size); @@ -18,6 +20,9 @@ extern int log_level; /* set up the error globals */ void error_init(void); + +void exit_err( const char * ); + /* error_set_handler must be a macro not a function due to setjmp stack rules */ #include @@ -52,8 +57,9 @@ extern pthread_key_t cleanup_handler_key; case 0: /* setup time */ \ if (old) { free(old); }\ if( EINVAL == pthread_setspecific(cleanup_handler_key, context) ) { \ - fprintf(stderr, "Tried to set an error handler" \ - " without calling error_init().\n");\ + fprintf(stderr, "Tried to set an error handler at %s:%d" \ + " without calling error_init().\n", \ + __FILE__, __LINE__ );\ abort();\ }\ break; \ diff --git a/tests/check_client.c b/tests/check_client.c index 06985a1..3df7206 100644 --- a/tests/check_client.c +++ b/tests/check_client.c @@ -1,13 +1,16 @@ #include +#include #include "self_pipe.h" #include "nbdtypes.h" +#include "serve.h" #include "client.h" #include -#define FAKE_SERVER ((struct server *)23) +struct server fake_server = {0}; +#define FAKE_SERVER &fake_server #define FAKE_SOCKET (42) START_TEST( test_assigns_socket ) diff --git a/tests/check_control.c b/tests/check_control.c new file mode 100644 index 0000000..fbb2ec9 --- /dev/null +++ b/tests/check_control.c @@ -0,0 +1,42 @@ +#include "control.h" +#include "flexnbd.h" + +#include + + +START_TEST( test_assigns_sock_name ) +{ + struct flexnbd flexnbd = {0}; + char csn[] = "foobar"; + + struct control * control = control_create(&flexnbd, csn ); + + fail_unless( csn == control->socket_name, "Socket name not assigned" ); +} +END_TEST + + +Suite *control_suite(void) +{ + Suite *s = suite_create("control"); + + TCase *tc_create = tcase_create("create"); + + tcase_add_test(tc_create, test_assigns_sock_name); + suite_add_tcase( s, tc_create ); + + return s; +} + +int main(void) +{ + int number_failed; + + Suite *s = control_suite(); + SRunner *sr = srunner_create(s); + srunner_run_all(sr, CK_NORMAL); + number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? 0 : 1; +} + diff --git a/tests/check_flexnbd.c b/tests/check_flexnbd.c new file mode 100644 index 0000000..aa64bbe --- /dev/null +++ b/tests/check_flexnbd.c @@ -0,0 +1,47 @@ +#include "flexnbd.h" + +#include + + +START_TEST( test_listening_assigns_sock ) +{ + struct flexnbd * flexnbd = flexnbd_create_listening( + "127.0.0.1", + NULL, + "4777", + NULL, + "fakefile", + "fakesock", + 0, + 0, + NULL, + 1 ); + fail_if( NULL == flexnbd->control->socket_name, "No socket was copied" ); +} +END_TEST + + +Suite *flexnbd_suite(void) +{ + Suite *s = suite_create("flexnbd"); + + TCase *tc_create = tcase_create("create"); + + tcase_add_test(tc_create, test_listening_assigns_sock); + suite_add_tcase( s, tc_create ); + + return s; +} + +int main(void) +{ + int number_failed; + + Suite *s = flexnbd_suite(); + SRunner *sr = srunner_create(s); + srunner_run_all(sr, CK_NORMAL); + number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? 0 : 1; +} + diff --git a/tests/check_listen.c b/tests/check_listen.c index 1119152..90a54d0 100644 --- a/tests/check_listen.c +++ b/tests/check_listen.c @@ -1,14 +1,16 @@ #include "serve.h" #include "listen.h" #include "util.h" +#include "flexnbd.h" #include #include START_TEST( test_defaults_main_serve_opts ) { - struct listen * listen = listen_create( "127.0.0.1", NULL, "4777", NULL, - "foo", "bar", 0, 0, NULL, 1 ); + struct flexnbd flexnbd; + struct listen * listen = listen_create( &flexnbd, "127.0.0.1", NULL, "4777", NULL, + "foo", 0, 0, NULL, 1 ); NULLCHECK( listen ); struct server *init_serve = listen->init_serve; struct server *main_serve = listen->main_serve; @@ -28,7 +30,7 @@ Suite* listen_suite(void) Suite *s = suite_create("listen"); TCase *tc_create = tcase_create("create"); - tcase_add_test(tc_create, test_defaults_main_serve_opts); + tcase_add_exit_test(tc_create, test_defaults_main_serve_opts, 0); suite_add_tcase(s, tc_create); diff --git a/tests/check_mbox.c b/tests/check_mbox.c new file mode 100644 index 0000000..34ff07c --- /dev/null +++ b/tests/check_mbox.c @@ -0,0 +1,104 @@ +#include "mbox.h" +#include "util.h" + +#include +#include + +START_TEST( test_allocs_cvar ) +{ + struct mbox * mbox = mbox_create(); + fail_if( NULL == mbox, "Nothing allocated" ); + + pthread_cond_t cond_zero; + /* A freshly inited pthread_cond_t is set to {0} */ + memset( &cond_zero, 'X', sizeof( cond_zero ) ); + fail_if( memcmp( &cond_zero, &mbox->filled_cond, sizeof( cond_zero ) ) == 0 , + "Condition variable not allocated" ); + fail_if( memcmp( &cond_zero, &mbox->emptied_cond, sizeof( cond_zero ) ) == 0 , + "Condition variable not allocated" ); +} +END_TEST + + +START_TEST( test_post_stores_value ) +{ + struct mbox * mbox = mbox_create(); + + void * deadbeef = (void *)0xDEADBEEF; + mbox_post( mbox, deadbeef ); + + fail_unless( deadbeef == mbox_contents( mbox ), + "Contents were not posted" ); +} +END_TEST + + +void * mbox_receive_runner( void * mbox_uncast ) +{ + struct mbox * mbox = (struct mbox *)mbox_uncast; + void * contents = NULL; + + contents = mbox_receive( mbox ); + return contents; +} + + +START_TEST( test_receive_blocks_until_post ) +{ + struct mbox * mbox = mbox_create(); + pthread_t receiver; + pthread_create( &receiver, NULL, mbox_receive_runner, mbox ); + + void * deadbeef = (void *)0xDEADBEEF; + void * retval =NULL; + usleep(10000); + fail_unless( EBUSY == pthread_tryjoin_np( receiver, &retval ), + "Receiver thread wasn't blocked"); + + mbox_post( mbox, deadbeef ); + fail_unless( 0 == pthread_join( receiver, &retval ), + "Failed to join the receiver thread" ); + fail_unless( retval == deadbeef, + "Return value was wrong" ); + + +} +END_TEST + + +Suite* acl_suite(void) +{ + Suite *s = suite_create("acl"); + TCase *tc_create = tcase_create("create"); + TCase *tc_post = tcase_create("post"); + + tcase_add_test(tc_create, test_allocs_cvar); + + tcase_add_test( tc_post, test_post_stores_value ); + tcase_add_test( tc_post, test_receive_blocks_until_post); + + suite_add_tcase(s, tc_create); + suite_add_tcase(s, tc_post); + + return s; +} + + + +int main(void) +{ +#ifdef DEBUG + log_level = 0; +#else + log_level = 2; +#endif + int number_failed; + Suite *s = acl_suite(); + SRunner *sr = srunner_create(s); + srunner_run_all(sr, CK_NORMAL); + log_level = 0; + number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? 0 : 1; +} + diff --git a/tests/check_serve.c b/tests/check_serve.c index e711f90..e58073d 100644 --- a/tests/check_serve.c +++ b/tests/check_serve.c @@ -3,6 +3,7 @@ #include "self_pipe.h" #include "client.h" +#include "flexnbd.h" #include #include @@ -61,7 +62,8 @@ void teardown( void ) START_TEST( test_replaces_acl ) { - struct server * s = server_create( "127.0.0.1", "0", dummy_file, NULL, 0, 0, NULL, 1); + struct flexnbd flexnbd; + struct server * s = server_create( &flexnbd, "127.0.0.1", "0", dummy_file, 0, 0, NULL, 1, 1 ); struct acl * new_acl = acl_create( 0, NULL, 0 ); server_replace_acl( s, new_acl ); @@ -74,7 +76,8 @@ END_TEST START_TEST( test_signals_acl_updated ) { - struct server * s = server_create( "127.0.0.1", "0", dummy_file, NULL, 0, 0, NULL, 1 ); + struct flexnbd flexnbd; + struct server * s = server_create( &flexnbd, "127.0.0.1", "0", dummy_file, 0, 0, NULL, 1, 1 ); struct acl * new_acl = acl_create( 0, NULL, 0 ); server_replace_acl( s, new_acl ); @@ -141,7 +144,8 @@ START_TEST( test_acl_update_closes_bad_client ) * and socket out of the server structure, we should be testing * a client socket. */ - struct server * s = server_create( "127.0.0.7", "0", dummy_file, NULL, 0, 0, NULL, 1 ); + struct flexnbd flexnbd; + struct server * s = server_create( &flexnbd, "127.0.0.7", "0", dummy_file, 0, 0, NULL, 1, 1 ); struct acl * new_acl = acl_create( 0, NULL, 1 ); struct client * c; struct client_tbl_entry * entry; @@ -168,12 +172,12 @@ START_TEST( test_acl_update_closes_bad_client ) server_replace_acl( s, new_acl ); + /* accept again, so that we can react to the acl replacement signal */ server_accept( s ); - pthread_join( entry->thread, NULL ); + /* Fail if we time out here */ + while( !fd_is_closed( server_fd ) ); - myfail_unless( fd_is_closed(server_fd), - "Client socket wasn't closed." ); close( client_fd ); server_close_clients( s ); server_destroy( s ); @@ -183,7 +187,8 @@ END_TEST START_TEST( test_acl_update_leaves_good_client ) { - struct server * s = server_create( "127.0.0.7", "0", dummy_file, NULL, 0, 0, NULL, 1 ); + struct flexnbd flexnbd; + struct server * s = server_create( &flexnbd, "127.0.0.7", "0", dummy_file, 0, 0, NULL, 1, 1 ); char *lines[] = {"127.0.0.1"}; struct acl * new_acl = acl_create( 1, lines, 1 ); @@ -243,6 +248,7 @@ Suite* serve_suite(void) int main(void) { log_level = LOG_LEVEL; + error_init(); int number_failed; Suite *s = serve_suite(); SRunner *sr = srunner_create(s); diff --git a/tests/fakes/source/close_after_connect.rb b/tests/fakes/source/close_after_connect.rb index 1145992..4ed2966 100755 --- a/tests/fakes/source/close_after_connect.rb +++ b/tests/fakes/source/close_after_connect.rb @@ -20,7 +20,11 @@ rescue Timeout::Error exit 1 end -Timeout.timeout( 2 ) do +Timeout.timeout( 3 ) do + # Sleep to be sure we don't try to connect too soon. That wouldn't + # be a problem for the destination, but it would prevent us from + # determining success or failure here. + sleep 0.5 sock = TCPSocket.open( addr, port.to_i ) sock.close end diff --git a/tests/flexnbd.rb b/tests/flexnbd.rb index fc3ef97..5c49345 100644 --- a/tests/flexnbd.rb +++ b/tests/flexnbd.rb @@ -12,7 +12,7 @@ class Executor attr_reader :pid def run( cmd ) - @pid = fork do exec @cmd end + @pid = fork do exec cmd end end end # class Executor @@ -20,6 +20,15 @@ end # class Executor class ValgrindExecutor attr_reader :pid + def run( cmd ) + @pid = fork do exec "valgrind --track-origins=yes #{cmd}" end + end +end # class ValgrindExecutor + + +class ValgrindKillingExecutor + attr_reader :pid + class Error attr_accessor :what, :kind, :pid attr_reader :backtrace @@ -137,10 +146,15 @@ class ValgrindExecutor private + + def pick_listener + ENV['DEBUG'] ? DebugErrorListener : ErrorListener + end + def launch_watch_thread(pid, io_r) Thread.start do io_source = REXML::IOSource.new( io_r ) - listener = DebugErrorListener.new( self ) + listener = pick_listener.new( self ) REXML::Document.parse_stream( io_source, listener ) end end @@ -160,15 +174,28 @@ class FlexNBD end end + def pick_executor + kls = if ENV['VALGRIND'] + if ENV['VALGRIND'] =~ /kill/ + ValgrindKillingExecutor + else + ValgrindExecutor + end + else + Executor + end + end + + def initialize(bin, ip, port) @bin = bin - @debug = `#{@bin} serve --help` =~ /--verbose/ ? "--verbose" : "" + @debug = (ENV['DEBUG'] && `#{@bin} serve --help` =~ /--verbose/) ? "--verbose" : "" raise "#{bin} not executable" unless File.executable?(bin) - @executor = ENV['VALGRIND'] ? ValgrindExecutor.new : Executor.new + @executor = pick_executor.new @ctrl = "/tmp/.flexnbd.ctrl.#{Time.now.to_i}.#{rand}" @ip = ip @port = port - @kill = false + @kill = [] end @@ -193,7 +220,7 @@ class FlexNBD def listen_cmd( file, acl ) - "#{@bin} listen "\ + "#{bin} listen "\ "--addr #{ip} "\ "--port #{port} "\ "--file #{file} "\ @@ -270,8 +297,8 @@ class FlexNBD _, status = Process.waitpid2( pid ) if @kill - fail "flexnbd quit with a bad status: #{status.to_i}" unless - @kill.include? status.to_i + fail "flexnbd quit with a bad status: #{status.exitstatus}" unless + @kill.include? status.exitstatus else $stderr.puts "flexnbd #{self.pid} quit" fail "flexnbd #{self.pid} quit early with status #{status.to_i}" @@ -281,8 +308,8 @@ class FlexNBD def can_die(*status) - status << 0 if status.empty? - @kill = status + status = [0] if status.empty? + @kill += status end def kill @@ -321,6 +348,10 @@ class FlexNBD end + def join + @wait_thread.join + end + def mirror_unchecked( dest_ip, dest_port, bandwidth=nil, action=nil, timeout=nil ) cmd = mirror_cmd( dest_ip, dest_port) debug( cmd ) @@ -364,8 +395,9 @@ class FlexNBD cmd = status_cmd() debug( cmd ) - maybe_timeout( cmd, timeout ) + o,e = maybe_timeout( cmd, timeout ) + [parse_status(o), e] end @@ -379,5 +411,24 @@ class FlexNBD return [code, message] end end + + + def parse_status( status ) + hsh = {} + + status.split(" ").each do |part| + next if part.strip.empty? + a,b = part.split("=") + b.strip! + b = true if b == "true" + b = false if b == "false" + + hsh[a.strip] = b + end + + hsh + end + + end diff --git a/tests/nbd_scenarios b/tests/nbd_scenarios index 972f617..96a0714 100644 --- a/tests/nbd_scenarios +++ b/tests/nbd_scenarios @@ -41,37 +41,12 @@ class Environment end - def parse_status( status ) - hsh = {} - - status.split(" ").each do |part| - next if part.strip.empty? - a,b = part.split("=") - b.strip! - b = true if b == "true" - b = false if b == "false" - - hsh[a.strip] = b - end - - hsh - end - - - def status( nbd ) - stdout, stderr = nbd.status - [parse_status(stdout), stderr] - end - def status1 - status( @nbd1 ) + @nbd1.status.first end def status2 - puts "Getting status" - result = status( @nbd2 ) - puts "Got status" - return result + @nbd2.status.first end @@ -153,6 +128,8 @@ class NBDScenarios < Test::Unit::TestCase end def teardown + @env.nbd1.can_die(0) + @env.nbd2.can_die(0) @env.cleanup end @@ -216,6 +193,8 @@ class NBDScenarios < Test::Unit::TestCase @env.nbd1.can_die stdout, stderr = @env.mirror12 + @env.nbd1.join + assert_equal(@env.file1.read_original( 0, @env.blocksize ), @env.file2.read( 0, @env.blocksize ) ) assert @env.status2['has_control'], "destination didn't take control" @@ -233,6 +212,7 @@ class NBDConnectSourceFailureScenarios < Test::Unit::TestCase end def teardown + @env.nbd1.can_die(0) @env.cleanup end @@ -300,18 +280,21 @@ class NBDConnectDestFailureScenarios < Test::Unit::TestCase def test_hello_blocked_by_disconnect_causes_error_not_fatal + @env.nbd1.can_die(1) run_fake( "source/close_after_connect" ) assert_no_control end def test_hello_goes_astray_causes_timeout_error + @env.nbd1.can_die(1) run_fake( "source/hang_after_hello" ) assert_no_control end def test_disconnect_after_hello_causes_error_not_fatal + @env.nbd1.can_die(1) run_fake( "source/close_after_hello" ) assert_no_control end