From 5930f250347f4a43f3c382fcf393dfcc0bcb54e4 Mon Sep 17 00:00:00 2001 From: Alex Young Date: Thu, 7 Jun 2012 14:25:30 +0100 Subject: [PATCH] Use client stop signals for thread stopping --- Rakefile | 19 +++++++--- src/client.c | 7 ++-- src/serve.c | 82 ++++++++++++++++++++++++++++++++------------ src/util.c | 16 ++++----- src/util.h | 10 +++--- tests/check_client.c | 34 ++++++++++++++---- 6 files changed, 121 insertions(+), 47 deletions(-) diff --git a/Rakefile b/Rakefile index 8eb0bec..7872aad 100644 --- a/Rakefile +++ b/Rakefile @@ -25,18 +25,29 @@ task :flexnbd => 'build/flexnbd' task :build => :flexnbd task :default => :flexnbd +def check(m) + "build/tests/check_#{m}" +end + namespace "test" do desc "Run all tests" task 'run' => ["unit", "scenarios"] desc "Build C tests" - task 'build' => TEST_MODULES.map { |n| "build/tests/check_#{n}" } + task 'build' => TEST_MODULES.map { |n| check n} + + TEST_MODULES.each do |m| + desc "Run tests for #{m}" + task "check_#{m}" => check(m) do + sh check m + end + end desc "Run C tests" task 'unit' => 'build' do TEST_MODULES.each do |n| ENV['EF_DISABLE_BANNER'] = '1' - sh "build/tests/check_#{n}" + sh check n end end @@ -62,7 +73,7 @@ rule 'build/flexnbd' => OBJECTS do |t| end -file "build/tests/check_client" => +file check("client") => %w{tests/check_client.c build/self_pipe.o build/nbdtypes.o @@ -83,7 +94,7 @@ end deps << maybe_obj_name if OBJECTS.include?( maybe_obj_name ) - file "build/tests/check_#{m}" => deps do |t| + file check( m ) => deps do |t| gcc_link(t.name, deps + [LIBCHECK]) end end diff --git a/src/client.c b/src/client.c index 6fc845f..d6dbf95 100644 --- a/src/client.c +++ b/src/client.c @@ -150,16 +150,19 @@ void write_not_zeroes(struct client* client, off64_t from, int len) * try to honour. 0 otherwise. */ int client_read_request( struct client * client , struct nbd_request *out_request ) { + NULLCHECK( client ); + NULLCHECK( out_request ); + struct nbd_request_raw request_raw; fd_set fds; FD_ZERO(&fds); FD_SET(client->socket, &fds); - self_pipe_fd_set( client->serve->close_signal, &fds ); + self_pipe_fd_set( client->stop_signal, &fds ); CLIENT_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, NULL, NULL, NULL), "select() failed"); - if ( self_pipe_fd_isset( client->serve->close_signal, &fds ) ) + if ( self_pipe_fd_isset( client->stop_signal, &fds ) ) return 0; if (readloop(client->socket, &request_raw, sizeof(request_raw)) == -1) { diff --git a/src/serve.c b/src/serve.c index 289fe2f..f162309 100644 --- a/src/serve.c +++ b/src/serve.c @@ -159,22 +159,11 @@ void serve_open_server_socket(struct server* params) ); } -/** - * Check to see if a client thread has finished, and if so, tidy up - * after it. - * Returns 1 if the thread was cleaned up and the slot freed, 0 - * otherwise. - * - * It's important that client_destroy gets called in the same thread - * which signals the client threads to stop. This avoids the - * possibility of sending a stop signal via a signal which has already - * been destroyed. However, it means that stopped client threads, - * including their signal pipes, won't be cleaned up until the next new - * client connection attempt. - */ -int cleanup_client_thread( struct client_tbl_entry * entry ) +int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthread_t, void **) ) { + NULLCHECK( entry ); + NULLCHECK( joinfunc ); int was_closed = 0; void * status; @@ -189,7 +178,7 @@ int cleanup_client_thread( struct client_tbl_entry * entry ) s_client_address, 64 ); - if (pthread_tryjoin_np(entry->thread, &status) < 0) { + if (joinfunc(entry->thread, &status) != 0) { if (errno != EBUSY) SERVER_ERROR_ON_FAILURE(-1, "Problem with joining thread"); } @@ -208,6 +197,35 @@ int cleanup_client_thread( struct client_tbl_entry * entry ) } +/** + * Check to see if a client thread has finished, and if so, tidy up + * after it. + * Returns 1 if the thread was cleaned up and the slot freed, 0 + * otherwise. + * + * It's important that client_destroy gets called in the same thread + * which signals the client threads to stop. This avoids the + * possibility of sending a stop signal via a signal which has already + * been destroyed. However, it means that stopped client threads, + * including their signal pipes, won't be cleaned up until the next new + * client connection attempt. + */ +int cleanup_client_thread( struct client_tbl_entry * entry ) +{ + return tryjoin_client_thread( entry, pthread_tryjoin_np ); +} + + +/** + * Join a client thread after having sent a stop signal to it. + * This function will not return until pthread_join has returned, so + * ensures that the client thread is dead. + */ +int join_client_thread( struct client_tbl_entry *entry ) +{ + return tryjoin_client_thread( entry, pthread_join ); +} + /** We can only accommodate MAX_NBD_CLIENTS connections at once. This function * goes through the current list, waits for any threads that have finished * and returns the next slot free (or -1 if there are none). @@ -296,7 +314,7 @@ void accept_nbd_client( NULLCHECK(client_address); struct client* client_params; - int slot = cleanup_and_find_client_slot(params); + int slot; char s_client_address[64] = {0}; @@ -305,6 +323,7 @@ void accept_nbd_client( return; } + slot = cleanup_and_find_client_slot(params); if (slot < 0) { write(client_fd, "Too many clients", 16); close(client_fd); @@ -313,7 +332,10 @@ void accept_nbd_client( debug( "Client %s accepted.", s_client_address ); client_params = client_create( params, client_fd ); + params->nbd_client[slot].client = client_params; + memcpy(¶ms->nbd_client[slot].address, client_address, + sizeof(union mysockaddr)); if (pthread_create(¶ms->nbd_client[slot].thread, NULL, client_serve, client_params) < 0) { debug( "Thread creation problem." ); @@ -323,9 +345,6 @@ void accept_nbd_client( return; } - memcpy(¶ms->nbd_client[slot].address, client_address, - sizeof(union mysockaddr)); - debug("nbd thread %d started (%s)", (int) params->nbd_client[slot].thread, s_client_address); } @@ -337,6 +356,26 @@ int server_is_closed(struct server* serve) } +void server_close_clients( struct server *params ) +{ + NULLCHECK(params); + + int i, j; + struct client_tbl_entry *entry; + + for( i = 0; i < MAX_NBD_CLIENTS; i++ ) { + entry = ¶ms->nbd_client[i]; + + if ( entry->thread != 0 ) { + client_signal_stop( entry->client ); + } + } + for( j = 0; j < MAX_NBD_CLIENTS; j++ ) { + join_client_thread( ¶ms->nbd_client[i] ); + } +} + + /** Accept either an NBD or control socket connection, dispatch appropriately */ void serve_accept_loop(struct server* params) { @@ -356,14 +395,15 @@ void serve_accept_loop(struct server* params) SERVER_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, NULL, NULL, NULL), "select() failed"); - if ( self_pipe_fd_isset( params->close_signal, &fds) ) + if ( self_pipe_fd_isset( params->close_signal, &fds ) ){ + server_close_clients( params ); return; + } activity_fd = FD_ISSET(params->server_fd, &fds) ? params->server_fd: params->control_fd; client_fd = accept(activity_fd, &client_address.generic, &socklen); - if (activity_fd == params->server_fd) { debug("Accepted nbd client"); accept_nbd_client(params, client_fd, &client_address); diff --git a/src/util.c b/src/util.c index 77b117c..32729b4 100644 --- a/src/util.c +++ b/src/util.c @@ -17,7 +17,7 @@ void error_init() main_thread = pthread_self(); } -void error(int consult_errno, int close_socket, pthread_mutex_t* unlock, const char* format, ...) +void error(int consult_errno, int fatal, int close_socket, pthread_mutex_t* unlock, const char* format, ...) { va_list argptr; @@ -31,18 +31,18 @@ void error(int consult_errno, int close_socket, pthread_mutex_t* unlock, const c fprintf(stderr, " (errno=%d, %s)", errno, strerror(errno)); } - if (close_socket) - close(close_socket); - - if (unlock) - pthread_mutex_unlock(unlock); + if (close_socket) { close(close_socket); } + if (unlock) { pthread_mutex_unlock(unlock); } fprintf(stderr, "\n"); - if (pthread_equal(pthread_self(), main_thread)) + if (fatal || pthread_equal(pthread_self(), main_thread)) { exit(1); - else + } + else { + fprintf(stderr, "Killing Thread\n"); pthread_exit((void*) 1); + } } void* xrealloc(void* ptr, size_t size) diff --git a/src/util.h b/src/util.h index 634d99b..6f1a39f 100644 --- a/src/util.h +++ b/src/util.h @@ -6,7 +6,7 @@ void error_init(); -void error(int consult_errno, int close_socket, pthread_mutex_t* unlock, const char* format, ...); +void error(int consult_errno, int fatal, int close_socket, pthread_mutex_t* unlock, const char* format, ...); void* xrealloc(void* ptr, size_t size); @@ -21,14 +21,14 @@ void debug(const char*msg, ...); #endif #define CLIENT_ERROR(msg, ...) \ - error(0, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__) + error(0, 0, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__) #define CLIENT_ERROR_ON_FAILURE(test, msg, ...) \ - if (test < 0) { error(1, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__); } + if (test < 0) { error(1, 0, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__); } #define SERVER_ERROR(msg, ...) \ - error(0, 0, NULL, msg, ##__VA_ARGS__) + error(0, 1, 0, NULL, msg, ##__VA_ARGS__) #define SERVER_ERROR_ON_FAILURE(test, msg, ...) \ - if (test < 0) { error(1, 0, NULL, msg, ##__VA_ARGS__); } + if (test < 0) { error(1, 1, 0, NULL, msg, ##__VA_ARGS__); } #define NULLCHECK(x); if ( NULL == (x) ) { SERVER_ERROR( "Null " #x "." ); } diff --git a/tests/check_client.c b/tests/check_client.c index 50fd1cd..e545363 100644 --- a/tests/check_client.c +++ b/tests/check_client.c @@ -1,15 +1,18 @@ #include #include "self_pipe.h" +#include "nbdtypes.h" #include "client.h" +#define FAKE_SERVER ((struct server *)23) +#define FAKE_SOCKET (42) START_TEST( test_assigns_socket ) { struct client * c; - c = client_create( NULL, 42 ); + c = client_create( FAKE_SERVER, FAKE_SOCKET ); fail_unless( 42 == c->socket, "Socket wasn't assigned." ); } @@ -22,11 +25,9 @@ START_TEST( test_assigns_server ) /* can't predict the storage size so we can't allocate one on * the stack */ - struct server * s = (struct server *)42; + c = client_create( FAKE_SERVER, FAKE_SOCKET ); - c = client_create( (struct server *)s, 0 ); - - fail_unless( s == c->serve, "Serve wasn't assigned." ); + fail_unless( FAKE_SERVER == c->serve, "Serve wasn't assigned." ); } END_TEST @@ -34,7 +35,7 @@ END_TEST START_TEST( test_opens_stop_signal ) { - struct client *c = client_create( NULL, 0 ); + struct client *c = client_create( FAKE_SERVER, FAKE_SOCKET ); client_signal_stop( c ); @@ -47,7 +48,7 @@ END_TEST START_TEST( test_closes_stop_signal ) { - struct client *c = client_create( NULL, 0 ); + struct client *c = client_create( FAKE_SERVER, FAKE_SOCKET ); int read_fd = c->stop_signal->read_fd; int write_fd = c->stop_signal->write_fd; @@ -59,6 +60,24 @@ START_TEST( test_closes_stop_signal ) END_TEST +START_TEST( test_read_request_quits_on_stop_signal ) +{ + int fds[2]; + struct nbd_request nbdr; + pipe( fds ); + struct client *c = client_create( FAKE_SERVER, fds[0] ); + + client_signal_stop( c ); + + int client_read_request( struct client *, struct nbd_request *); + fail_unless( 0 == client_read_request( c, &nbdr ), "Didn't quit on stop." ); + + close( fds[0] ); + close( fds[1] ); +} +END_TEST + + Suite *client_suite() { Suite *s = suite_create("client"); @@ -71,6 +90,7 @@ Suite *client_suite() tcase_add_test(tc_create, test_assigns_server); tcase_add_test(tc_signal, test_opens_stop_signal); + tcase_add_test(tc_signal, test_read_request_quits_on_stop_signal); tcase_add_test( tc_destroy, test_closes_stop_signal );