From a90f84972b2c7d4c0cd264e65eb9c5280060fd20 Mon Sep 17 00:00:00 2001 From: Alex Young Date: Thu, 7 Jun 2012 11:44:19 +0100 Subject: [PATCH] Add stop signals to client threads --- Rakefile | 20 +++- src/client.c | 37 +++++++- src/client.h | 5 + src/flexnbd.c | 5 +- src/ioutil.c | 8 ++ src/ioutil.h | 5 + src/self_pipe.c | 8 +- src/serve.c | 215 ++++++++++++++++++++++++++++++------------- src/serve.h | 10 +- src/util.c | 11 ++- src/util.h | 3 + tests/check_client.c | 95 +++++++++++++++++++ tests/flexnbd.rb | 95 ++++++++++++++----- 13 files changed, 417 insertions(+), 100 deletions(-) create mode 100644 tests/check_client.c diff --git a/Rakefile b/Rakefile index 359ebad..8eb0bec 100644 --- a/Rakefile +++ b/Rakefile @@ -61,8 +61,24 @@ rule 'build/flexnbd' => OBJECTS do |t| gcc_link(t.name, t.sources) end -TEST_MODULES.each do |m| - deps = ["tests/check_#{m}.c", "build/util.o"] + +file "build/tests/check_client" => +%w{tests/check_client.c + build/self_pipe.o + build/nbdtypes.o + build/control.o + build/readwrite.o + build/parse.o + build/client.o + build/serve.o + build/ioutil.o + build/util.o} do |t| + gcc_link t.name, t.prerequisites + [LIBCHECK] +end + + +(TEST_MODULES-["client"]).each do |m| + deps = ["tests/check_#{m}.c", "build/ioutil.o", "build/util.o"] maybe_obj_name = "build/#{m}.o" deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name ) diff --git a/src/client.c b/src/client.c index 20e6468..6fc845f 100644 --- a/src/client.c +++ b/src/client.c @@ -12,6 +12,39 @@ #include +struct client *client_create( struct server *serve, int socket ) +{ + NULLCHECK( serve ); + + struct client *c; + + c = xmalloc( sizeof( struct server ) ); + c->socket = socket; + c->serve = serve; + + c->stop_signal = self_pipe_create(); + + return c; +} + + +void client_signal_stop( struct client *client ) +{ + NULLCHECK( client ); + + self_pipe_signal( client->stop_signal ); +} + +void client_destroy( struct client *client ) +{ + NULLCHECK( client ); + + self_pipe_destroy( client->stop_signal ); + free( client ); +} + + + /** * So waiting on client->socket is len bytes of data, and we must write it all * to client->mapped. However while doing do we must consult the bitmap @@ -26,6 +59,8 @@ */ void write_not_zeroes(struct client* client, off64_t from, int len) { + NULLCHECK( client ); + char *map = client->serve->block_allocation_map; while (len > 0) { @@ -349,10 +384,8 @@ void* client_serve(void* client_uncast) client->socket ); - close(client->socket); close(client->fileno); munmap(client->mapped, client->serve->size); - free(client); return NULL; } diff --git a/src/client.h b/src/client.h index 3823b97..90a6bee 100644 --- a/src/client.h +++ b/src/client.h @@ -6,11 +6,16 @@ struct client { int fileno; char* mapped; + + struct self_pipe * stop_signal; struct server* serve; /* FIXME: remove above duplication */ }; void* client_serve(void* client_uncast); +struct client * client_create( struct server * serve, int socket ); +void client_destroy( struct client * client ); +void client_signal_stop( struct client * client ); #endif diff --git a/src/flexnbd.c b/src/flexnbd.c index f0ae799..e616438 100644 --- a/src/flexnbd.c +++ b/src/flexnbd.c @@ -190,11 +190,12 @@ void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char case 's': *sock = optarg; break; - case 'v': - set_debug(1); case 'd': *default_deny = 1; break; + case 'v': + set_debug(1); + break; default: exit_err( serve_help_text ); break; diff --git a/src/ioutil.c b/src/ioutil.c index f61ff12..d6c7e3c 100644 --- a/src/ioutil.c +++ b/src/ioutil.c @@ -235,3 +235,11 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines) } } + +int fd_is_closed( int fd_in ) +{ + int errno_old = errno; + int result = fcntl( fd_in, F_GETFD, 9 ) < 0; + errno = errno_old; + return result; +} diff --git a/src/ioutil.h b/src/ioutil.h index ef09df7..30fe66c 100644 --- a/src/ioutil.h +++ b/src/ioutil.h @@ -54,5 +54,10 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines); */ int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map); + +/** Check to see whether the given file descriptor is closed. + */ +int fd_is_closed( int fd_in ); + #endif diff --git a/src/self_pipe.c b/src/self_pipe.c index 992fa31..71861fe 100644 --- a/src/self_pipe.c +++ b/src/self_pipe.c @@ -100,21 +100,25 @@ int self_pipe_signal( struct self_pipe * sig ) /** * Clear a received signal from the pipe. Every signal sent must be * cleared by one (and only one) recipient when they return from select(). + * Returns the number of bytes read, which will be 1 on success and 0 if + * there was no signal. */ int self_pipe_signal_clear( struct self_pipe *sig ) { char buf[1]; - read( sig->read_fd, buf, 1 ); - return 1; + return read( sig->read_fd, buf, 1 ); } + /** * Close the pipe and free the self_pipe. Do not try to use the * self_pipe struct after calling this, the innards are mush. */ int self_pipe_destroy( struct self_pipe * sig ) { + NULLCHECK(sig); + while( close( sig->read_fd ) == -1 && errno == EINTR ); while( close( sig->write_fd ) == -1 && errno == EINTR ); diff --git a/src/serve.c b/src/serve.c index f45dfaa..289fe2f 100644 --- a/src/serve.c +++ b/src/serve.c @@ -22,6 +22,8 @@ static inline void* sockaddr_address_data(struct sockaddr* sockaddr) { + NULLCHECK( sockaddr ); + struct sockaddr_in* in = (struct sockaddr_in*) sockaddr; struct sockaddr_in6* in6 = (struct sockaddr_in6*) sockaddr; @@ -34,12 +36,16 @@ static inline void* sockaddr_address_data(struct sockaddr* sockaddr) void server_dirty(struct server *serve, off64_t from, int len) { + NULLCHECK( serve ); + if (serve->mirror) bitset_set_range(serve->mirror->dirty_map, from, len); } int server_lock_io( struct server * serve) { + NULLCHECK( serve ); + SERVER_ERROR_ON_FAILURE( pthread_mutex_lock(&serve->l_io), "Problem with I/O lock" @@ -51,6 +57,8 @@ int server_lock_io( struct server * serve) void server_unlock_io( struct server* serve ) { + NULLCHECK( serve ); + SERVER_ERROR_ON_FAILURE( pthread_mutex_unlock(&serve->l_io), "Problem with I/O unlock" @@ -64,6 +72,8 @@ static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 }; */ int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], union mysockaddr* test) { + NULLCHECK( test ); + int i; for (i=0; i < list_length; i++) { @@ -117,6 +127,8 @@ int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], union myso /** Prepares a listening socket for the NBD server, binding etc. */ void serve_open_server_socket(struct server* params) { + NULLCHECK( params ); + int optval=1; params->server_fd= socket(params->bind_to.generic.sa_family == AF_INET ? @@ -147,46 +159,130 @@ void serve_open_server_socket(struct server* params) ); } +/** + * Check to see if a client thread has finished, and if so, tidy up + * after it. + * Returns 1 if the thread was cleaned up and the slot freed, 0 + * otherwise. + * + * It's important that client_destroy gets called in the same thread + * which signals the client threads to stop. This avoids the + * possibility of sending a stop signal via a signal which has already + * been destroyed. However, it means that stopped client threads, + * including their signal pipes, won't be cleaned up until the next new + * client connection attempt. + */ +int cleanup_client_thread( struct client_tbl_entry * entry ) +{ + NULLCHECK( entry ); + + int was_closed = 0; + void * status; + + if (entry->thread != 0) { + char s_client_address[64]; + + memset(s_client_address, 0, 64); + strcpy(s_client_address, "???"); + inet_ntop( entry->address.generic.sa_family, + sockaddr_address_data(&entry->address.generic), + s_client_address, + 64 ); + + if (pthread_tryjoin_np(entry->thread, &status) < 0) { + if (errno != EBUSY) + SERVER_ERROR_ON_FAILURE(-1, "Problem with joining thread"); + } + else { + debug("nbd thread %p exited (%s) with status %ld", + (int) entry->thread, + s_client_address, + (uint64_t)status); + client_destroy( entry->client ); + entry->thread = 0; + was_closed = 1; + } + } + + return was_closed; +} + + /** We can only accommodate MAX_NBD_CLIENTS connections at once. This function * goes through the current list, waits for any threads that have finished * and returns the next slot free (or -1 if there are none). */ int cleanup_and_find_client_slot(struct server* params) { - int slot=-1, i; - - for (i=0; i < MAX_NBD_CLIENTS; i++) { - void* status; - - if (params->nbd_client[i].thread != 0) { - char s_client_address[64]; - - memset(s_client_address, 0, 64); - strcpy(s_client_address, "???"); - inet_ntop( - params->nbd_client[i].address.generic.sa_family, - sockaddr_address_data(¶ms->nbd_client[i].address.generic), - s_client_address, - 64 - ); - - if (pthread_tryjoin_np(params->nbd_client[i].thread, &status) < 0) { - if (errno != EBUSY) - SERVER_ERROR_ON_FAILURE(-1, "Problem with joining thread"); - } - else { - params->nbd_client[i].thread = 0; - debug("nbd thread %d exited (%s) with status %ld", (int) params->nbd_client[i].thread, s_client_address, (uint64_t)status); - } + NULLCHECK( params ); + + int slot=-1, i,j; + + for ( i = 0; i < MAX_NBD_CLIENTS; i++ ) { + cleanup_client_thread( ¶ms->nbd_client[i] ); + } + + for ( j = 0; j < MAX_NBD_CLIENTS; j++ ) { + if( params->nbd_client[j].thread == 0 && slot == -1 ){ + slot = j; + break; } - - if (params->nbd_client[i].thread == 0 && slot == -1) - slot = i; } + if ( -1 == slot ) { debug( "No client slot found." ); } + return slot; } + +int server_acl_accepts( struct server *params, union mysockaddr * client_address ) +{ + NULLCHECK( params ); + NULLCHECK( client_address ); + + if (params->acl) { + if (is_included_in_acl(params->acl_entries, params->acl, client_address)) + return 1; + } else { + if (!params->default_deny) + return 1; + } + return 0; +} + + +int server_should_accept_client( + struct server * params, + int client_fd, + union mysockaddr * client_address, + char *s_client_address, + size_t s_client_address_len ) +{ + NULLCHECK( params ); + NULLCHECK( client_address ); + NULLCHECK( s_client_address ); + + if (inet_ntop(client_address->generic.sa_family, + sockaddr_address_data(&client_address->generic), + s_client_address, s_client_address_len ) == NULL) { + debug( "Rejecting client %s: Bad client_address", s_client_address ); + write(client_fd, "Bad client_address", 18); + return 0; + } + + if ( !server_acl_accepts( params, client_address ) ) { + debug( "Rejecting client %s: Access control error", s_client_address ); + debug( "We %s have an acl, and default_deny is %s", + (params->acl ? "do" : "do not"), + (params->default_deny ? "true" : "false") ); + write(client_fd, "Access control error", 20); + return 0; + } + + return 1; +} + + /** Dispatch function for accepting an NBD connection and starting a thread * to handle it. Rejects the connection if there is an ACL, and the far end's * address doesn't match, or if there are too many clients already connected. @@ -196,49 +292,33 @@ void accept_nbd_client( int client_fd, union mysockaddr* client_address) { + NULLCHECK(params); + NULLCHECK(client_address); + struct client* client_params; int slot = cleanup_and_find_client_slot(params); - char s_client_address[64]; - int acl_passed = 0; + char s_client_address[64] = {0}; - - if (inet_ntop(client_address->generic.sa_family, - sockaddr_address_data(&client_address->generic), - s_client_address, 64) == NULL) { - write(client_fd, "Bad client_address", 18); - close(client_fd); + + if ( !server_should_accept_client( params, client_fd, client_address, s_client_address, 64 ) ) { + close( client_fd ); return; } - - if (params->acl) { - if (is_included_in_acl(params->acl_entries, params->acl, client_address)) - acl_passed = 1; - } else { - if (!params->default_deny) - acl_passed = 1; - } - - if (!acl_passed) { - write(client_fd, "Access control error", 20); - close(client_fd); - return; - } - - if (slot < 0) { write(client_fd, "Too many clients", 16); close(client_fd); return; } - client_params = xmalloc(sizeof(struct client)); - client_params->socket = client_fd; - client_params->serve = params; + debug( "Client %s accepted.", s_client_address ); + client_params = client_create( params, client_fd ); + params->nbd_client[slot].client = client_params; if (pthread_create(¶ms->nbd_client[slot].thread, NULL, client_serve, client_params) < 0) { + debug( "Thread creation problem." ); write(client_fd, "Thread creation problem", 23); - free(client_params); + client_destroy( client_params ); close(client_fd); return; } @@ -252,16 +332,15 @@ void accept_nbd_client( int server_is_closed(struct server* serve) { - int errno_old = errno; - int result = fcntl(serve->server_fd, F_GETFD, 0) < 0; - errno = errno_old; - return result; + NULLCHECK( serve ); + return fd_is_closed( serve->server_fd ); } /** Accept either an NBD or control socket connection, dispatch appropriately */ void serve_accept_loop(struct server* params) { + NULLCHECK( params ); while (1) { int activity_fd, client_fd; union mysockaddr client_address; @@ -285,10 +364,14 @@ void serve_accept_loop(struct server* params) client_fd = accept(activity_fd, &client_address.generic, &socklen); - if (activity_fd == params->server_fd) + if (activity_fd == params->server_fd) { + debug("Accepted nbd client"); accept_nbd_client(params, client_fd, &client_address); - if (activity_fd == params->control_fd) + } + if (activity_fd == params->control_fd) { + debug("Accepted control client"); accept_control_connection(params, client_fd, &client_address); + } } } @@ -298,8 +381,11 @@ void serve_accept_loop(struct server* params) */ void serve_init_allocation_map(struct server* params) { + NULLCHECK( params ); + int fd = open(params->filename, O_RDONLY); off64_t size; + SERVER_ERROR_ON_FAILURE(fd, "Couldn't open %s", params->filename); size = lseek64(fd, 0, SEEK_END); params->size = size; @@ -314,6 +400,7 @@ void serve_init_allocation_map(struct server* params) /* Tell the server to close all the things. */ void serve_signal_close( struct server * serve ) { + NULLCHECK( serve ); self_pipe_signal( serve->close_signal ); } @@ -321,6 +408,8 @@ void serve_signal_close( struct server * serve ) /** Closes sockets, frees memory and waits for all client threads to finish */ void serve_cleanup(struct server* params) { + NULLCHECK( params ); + int i; close(params->server_fd); @@ -354,6 +443,8 @@ void serve_cleanup(struct server* params) /** Full lifecycle of the server */ void do_serve(struct server* params) { + NULLCHECK( params ); + pthread_mutex_init(¶ms->l_io, NULL); params->close_signal = self_pipe_create(); diff --git a/src/serve.h b/src/serve.h index 9cb4b48..852584e 100644 --- a/src/serve.h +++ b/src/serve.h @@ -35,6 +35,13 @@ struct control_params { struct server* serve; }; +struct client_tbl_entry { + pthread_t thread; + union mysockaddr address; + struct client * client; +}; + + #define MAX_NBD_CLIENTS 16 struct server { /** address/port to bind to */ @@ -71,8 +78,7 @@ struct server { char* block_allocation_map; - struct { pthread_t thread; union mysockaddr address; } - nbd_client[MAX_NBD_CLIENTS]; + struct client_tbl_entry nbd_client[MAX_NBD_CLIENTS]; }; int server_is_closed(struct server* serve); diff --git a/src/util.c b/src/util.c index 05be379..77b117c 100644 --- a/src/util.c +++ b/src/util.c @@ -70,13 +70,16 @@ void set_debug(int value) { # include void debug(const char *msg, ...) { - va_list argp; + va_list argp; + va_start( argp, msg ); if ( global_debug ) { - fprintf(stderr, "%08x %4d: ", (int) pthread_self(), (int) clock() ); - fprintf(stderr, msg, argp); - fprintf(stderr, "\n"); + fprintf(stderr, "%08x %4d: ", (int) pthread_self(), (int) clock() ); + vfprintf(stderr, msg, argp); + fprintf(stderr, "\n"); } + + va_end( argp ); } #endif diff --git a/src/util.h b/src/util.h index a8e4e55..634d99b 100644 --- a/src/util.h +++ b/src/util.h @@ -30,5 +30,8 @@ void debug(const char*msg, ...); #define SERVER_ERROR_ON_FAILURE(test, msg, ...) \ if (test < 0) { error(1, 0, NULL, msg, ##__VA_ARGS__); } + +#define NULLCHECK(x); if ( NULL == (x) ) { SERVER_ERROR( "Null " #x "." ); } + #endif diff --git a/tests/check_client.c b/tests/check_client.c new file mode 100644 index 0000000..50fd1cd --- /dev/null +++ b/tests/check_client.c @@ -0,0 +1,95 @@ +#include + +#include "self_pipe.h" + +#include "client.h" + + +START_TEST( test_assigns_socket ) +{ + struct client * c; + + c = client_create( NULL, 42 ); + + fail_unless( 42 == c->socket, "Socket wasn't assigned." ); +} +END_TEST + + +START_TEST( test_assigns_server ) +{ + struct client * c; + /* can't predict the storage size so we can't allocate one on + * the stack + */ + struct server * s = (struct server *)42; + + c = client_create( (struct server *)s, 0 ); + + fail_unless( s == c->serve, "Serve wasn't assigned." ); + +} +END_TEST + + +START_TEST( test_opens_stop_signal ) +{ + struct client *c = client_create( NULL, 0 ); + + client_signal_stop( c ); + + fail_unless( 1 == self_pipe_signal_clear( c->stop_signal ), + "No signal was sent." ); + +} +END_TEST + + +START_TEST( test_closes_stop_signal ) +{ + struct client *c = client_create( NULL, 0 ); + int read_fd = c->stop_signal->read_fd; + int write_fd = c->stop_signal->write_fd; + + client_destroy( c ); + + fail_unless( fd_is_closed( read_fd ), "Stop signal wasn't destroyed." ); + fail_unless( fd_is_closed( write_fd ), "Stop signal wasn't destroyed." ); +} +END_TEST + + +Suite *client_suite() +{ + Suite *s = suite_create("client"); + + TCase *tc_create = tcase_create("create"); + TCase *tc_signal = tcase_create("signal"); + TCase *tc_destroy = tcase_create("destroy"); + + tcase_add_test(tc_create, test_assigns_socket); + tcase_add_test(tc_create, test_assigns_server); + + tcase_add_test(tc_signal, test_opens_stop_signal); + + tcase_add_test( tc_destroy, test_closes_stop_signal ); + + suite_add_tcase(s, tc_create); + suite_add_tcase(s, tc_signal); + suite_add_tcase(s, tc_destroy); + + return s; +} + +int main(void) +{ + int number_failed; + + Suite *s = client_suite(); + SRunner *sr = srunner_create(s); + srunner_run_all(sr, CK_NORMAL); + number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? 0 : 1; +} + diff --git a/tests/flexnbd.rb b/tests/flexnbd.rb index 66801cb..a7eb6de 100644 --- a/tests/flexnbd.rb +++ b/tests/flexnbd.rb @@ -1,4 +1,7 @@ require 'socket' +require 'thread' + +Thread.abort_on_exception = true # Noddy test class to exercise FlexNBD from the outside for testing. # @@ -7,27 +10,64 @@ class FlexNBD def initialize(bin, ip, port) @bin = bin - @debug = `#{@bin} serve --help` =~ /--debug/ ? "--debug" : "" + @debug = `#{@bin} serve --help` =~ /--verbose/ ? "--verbose" : "" @valgrind = ENV['VALGRIND'] ? "valgrind " : "" @bin = "#{@valgrind}#{@bin}" raise "#{bin} not executable" unless File.executable?(bin) @ctrl = "/tmp/.flexnbd.ctrl.#{Time.now.to_i}.#{rand}" @ip = ip @port = port + @kill = false end + def debug? + !@debug.empty? + end + + def debug( msg ) + $stderr.puts msg if debug? + end + + + def serve_cmd( file, acl ) + "#{@bin} serve "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--file #{file} "\ + "--sock #{ctrl} "\ + "#{@debug} "\ + "#{acl.join(' ')}" + end + + + def read_cmd( offset, length ) + "#{@bin} read "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--from #{offset} "\ + "#{@debug} "\ + "--size #{length}" + end + + + def write_cmd( offset, data ) + "#{@bin} write "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--from #{offset} "\ + "#{@debug} "\ + "--size #{data.length}" + end + + def serve(file, *acl) File.unlink(ctrl) if File.exists?(ctrl) - cmd ="#{@bin} serve "\ - "--addr #{ip} "\ - "--port #{port} "\ - "--file #{file} "\ - "--sock #{ctrl} "\ - "#{@debug} "\ - "#{acl.join(' ')}" - @pid = fork do - exec(cmd) - end + cmd =serve_cmd( file, acl ) + debug( cmd ) + + @pid = fork do exec(cmd) end + start_wait_thread( @pid ) + while !File.socket?(ctrl) pid, status = Process.wait2(@pid, Process::WNOHANG) raise "server did not start (#{cmd})" if pid @@ -36,30 +76,37 @@ class FlexNBD at_exit { kill } end + def start_wait_thread( pid ) + Thread.start do + Process.waitpid2( pid ) + unless @kill + $stderr.puts "flexnbd quit" + fail "flexnbd quit early" + end + end + end + + def kill + @kill = true Process.kill("INT", @pid) - Process.wait(@pid) end def read(offset, length) - IO.popen("#{@bin} read "\ - "--addr #{ip} "\ - "--port #{port} "\ - "--from #{offset} "\ - "#{@debug} "\ - "--size #{length}","r") do |fh| + cmd = read_cmd( offset, length ) + debug( cmd ) + + IO.popen(cmd) do |fh| return fh.read end raise "read failed" unless $?.success? end def write(offset, data) - IO.popen("#{@bin} write "\ - "--addr #{ip} "\ - "--port #{port} "\ - "--from #{offset} "\ - "#{@debug} "\ - "--size #{data.length}","w") do |fh| + cmd = write_cmd( offset, data ) + debug( cmd ) + + IO.popen(cmd, "w") do |fh| fh.write(data) end raise "write failed" unless $?.success?