diff --git a/src/control.c b/src/control.c index 3e6aadf..f88e0a9 100644 --- a/src/control.c +++ b/src/control.c @@ -422,6 +422,12 @@ void accept_control_connection(struct server* params, int client_fd, ), "Failed to create client thread" ); + + /* FIXME: This thread *really* shouldn't detach + * Since it can see the server object, if listen switches mode + * while this is live, Bad Things Could Happen. + */ + pthread_detach( control_thread ); } void serve_open_control_socket(struct server* params) diff --git a/src/serve.c b/src/serve.c index 2c319ff..f44bf84 100644 --- a/src/serve.c +++ b/src/serve.c @@ -81,6 +81,7 @@ struct server * server_create ( out->close_signal = self_pipe_create(); out->acl_updated_signal = self_pipe_create(); + out->vacuum_signal = self_pipe_create(); NULLCHECK( out->close_signal ); NULLCHECK( out->acl_updated_signal ); @@ -94,6 +95,8 @@ void server_destroy( struct server * serve ) serve->acl_updated_signal = NULL; self_pipe_destroy( serve->close_signal ); serve->close_signal = NULL; + self_pipe_destroy( serve->vacuum_signal ); + serve->vacuum_signal = NULL; pthread_mutex_destroy( &serve->l_acl ); pthread_mutex_destroy( &serve->l_io ); @@ -207,7 +210,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre NULLCHECK( joinfunc ); int was_closed = 0; - void * status; + void * status=NULL; int join_errno; if (entry->thread != 0) { @@ -222,7 +225,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre join_errno = joinfunc(entry->thread, &status); /* join_errno can legitimately be ESRCH if the thread is - * already dead, but the cluent still needs tidying up. */ + * already dead, but the client still needs tidying up. */ if (join_errno != 0 && !entry->client->stopped ) { FATAL_UNLESS( join_errno == EBUSY, "Problem with joining thread %p: %s", @@ -262,6 +265,14 @@ int cleanup_client_thread( struct client_tbl_entry * entry ) return tryjoin_client_thread( entry, pthread_tryjoin_np ); } +void cleanup_client_threads( struct client_tbl_entry * entries ) +{ + int i; + for( i = 0; i < MAX_NBD_CLIENTS; i++ ) { + cleanup_client_thread( &entries[i] ); + } +} + /** * Join a client thread after having sent a stop signal to it. @@ -281,15 +292,13 @@ int cleanup_and_find_client_slot(struct server* params) { NULLCHECK( params ); - int slot=-1, i,j; + int slot=-1, i; + + cleanup_client_threads( params->nbd_client ); for ( i = 0; i < MAX_NBD_CLIENTS; i++ ) { - cleanup_client_thread( ¶ms->nbd_client[i] ); - } - - for ( j = 0; j < MAX_NBD_CLIENTS; j++ ) { - if( params->nbd_client[j].thread == 0 && slot == -1 ){ - slot = j; + if( params->nbd_client[i].thread == 0 && slot == -1 ){ + slot = i; break; } } @@ -353,6 +362,48 @@ int server_should_accept_client( } +struct client_cleanup { + pthread_t client_thread; + struct self_pipe * vacuum_signal; +}; + + +void *client_vacuum( void * cleanup_uncast ) +{ + pthread_detach( pthread_self() ); + NULLCHECK( cleanup_uncast ); + struct client_cleanup *cleanup = (struct client_cleanup *)cleanup_uncast; + + pthread_join( cleanup->client_thread, NULL ); + self_pipe_signal( cleanup->vacuum_signal ); + free( cleanup ); + return NULL; +} + + +/* Why do we need this rather odd arrangement? Because if we don't have + * it, dead threads don't get tidied up until the next incoming + * connection happens. + */ +int spawn_client_thread( + struct client * client_params, + struct self_pipe * vacuum_signal, + pthread_t *out_thread) +{ + struct client_cleanup * cleanup = xmalloc( sizeof( struct client_cleanup ) ); + cleanup->vacuum_signal = vacuum_signal; + int result = pthread_create(&cleanup->client_thread, NULL, client_serve, client_params); + + if ( 0 == result ){ + pthread_t watcher; + pthread_create( &watcher, NULL, client_vacuum, cleanup ); + + *out_thread = cleanup->client_thread; + } + return result; +} + + /** Dispatch function for accepting an NBD connection and starting a thread * to handle it. Rejects the connection if there is an ACL, and the far end's * address doesn't match, or if there are too many clients already connected. @@ -390,7 +441,9 @@ void accept_nbd_client( memcpy(¶ms->nbd_client[slot].address, client_address, sizeof(union mysockaddr)); - if (pthread_create(¶ms->nbd_client[slot].thread, NULL, client_serve, client_params) != 0) { + pthread_t * thread = ¶ms->nbd_client[slot].thread; + + if (spawn_client_thread( client_params, params->vacuum_signal, thread ) != 0) { debug( "Thread creation problem." ); write(client_fd, "Thread creation problem", 23); client_destroy( client_params ); @@ -479,6 +532,7 @@ void server_replace_acl( struct server *serve, struct acl * new_acl ) } + /** Accept either an NBD or control socket connection, dispatch appropriately */ int server_accept( struct server * params ) { @@ -493,6 +547,7 @@ int server_accept( struct server * params ) FD_SET(params->server_fd, &fds); self_pipe_fd_set( params->close_signal, &fds ); self_pipe_fd_set( params->acl_updated_signal, &fds ); + self_pipe_fd_set( params->vacuum_signal, &fds ); if (params->control_socket_name) { FD_SET(params->control_fd, &fds); } @@ -505,6 +560,11 @@ int server_accept( struct server * params ) return 0; } + if ( self_pipe_fd_isset( params->vacuum_signal, &fds ) ) { + cleanup_client_threads( params->nbd_client ); + self_pipe_signal_clear( params->vacuum_signal ); + } + if ( self_pipe_fd_isset( params->acl_updated_signal, &fds ) ) { self_pipe_signal_clear( params->acl_updated_signal ); server_audit_clients( params ); diff --git a/src/serve.h b/src/serve.h index 72f6c2c..14a76fc 100644 --- a/src/serve.h +++ b/src/serve.h @@ -69,6 +69,11 @@ struct server { struct self_pipe * acl_updated_signal; pthread_mutex_t l_acl; + /** vacuum_signal will be sent when client threads terminate. + * This is mainly to keep valgrind happy + */ + struct self_pipe * vacuum_signal; + struct mirror_status* mirror; int server_fd; int control_fd; diff --git a/src/util.c b/src/util.c index 2e2481d..96736b8 100644 --- a/src/util.c +++ b/src/util.c @@ -29,6 +29,7 @@ void error_handler(int fatal __attribute__ ((unused)) ) longjmp(context->jmp, 1); } + void mylog(int line_level, const char* format, ...) { va_list argptr; diff --git a/src/util.h b/src/util.h index 52cec40..aa233f6 100644 --- a/src/util.h +++ b/src/util.h @@ -59,6 +59,7 @@ extern pthread_key_t cleanup_handler_key; abort(); \ case 2: /* non-fatal error, return to context of error handler setup */ \ context->handler(context->data, 0); \ + pthread_exit((void *)1);\ default: \ abort(); \ } \ diff --git a/tests/flexnbd.rb b/tests/flexnbd.rb index 64969a5..ce13607 100644 --- a/tests/flexnbd.rb +++ b/tests/flexnbd.rb @@ -19,12 +19,13 @@ class ValgrindExecutor attr_reader :pid class Error - attr_accessor :what, :kind + attr_accessor :what, :kind, :pid attr_reader :backtrace def initialize @backtrace=[] @what = "" @kind = "" + @pid = "" end def add_frame @@ -44,7 +45,7 @@ class ValgrindExecutor end def to_s - ([@what + " (#{@kind})"] + @backtrace.map{|h| "#{h[:file]}:#{h[:line]} #{h[:fn]}" }).join("\n") + ([@what + " (#{@kind}) in #{@pid}"] + @backtrace.map{|h| "#{h[:file]}:#{h[:line]} #{h[:fn]}" }).join("\n") end end # class Error @@ -54,6 +55,8 @@ class ValgrindExecutor include REXML::StreamListener def initialize( killer ) @killer = killer + @error = Error.new + @found = false end def text( text ) @@ -63,7 +66,7 @@ class ValgrindExecutor def tag_start(tag, attrs) case tag.to_s when "error" - @error = Error.new + @found = true when "frame" @error.add_frame end @@ -72,23 +75,43 @@ class ValgrindExecutor def tag_end(tag) case tag.to_s when "what" - @error.what = @text if @error + @error.what = @text if @found @text = "" when "kind" - @error.kind = @text if @error + @error.kind = @text if @found when "file" - @error.add_file( @text ) if @error + @error.add_file( @text ) if @found when "fn" - @error.add_fn( @text ) if @error + @error.add_fn( @text ) if @found when "line" - @error.add_line( @text ) if @error - when "error" + @error.add_line( @text ) if @found + when "error", "stack" @killer.call( @error ) + when "pid" + @error.pid=@text end end end # class ErrorListener + class DebugErrorListener < ErrorListener + def text( txt ) + print txt + super( txt ) + end + + def tag_start( tag, attrs ) + print "<#{tag}>" + super( tag, attrs ) + end + + def tag_end( tag ) + print "" + super( tag ) + end + end + + def initialize @pid = nil end @@ -107,6 +130,7 @@ class ValgrindExecutor $stderr.puts "* Valgrind error spotted:" $stderr.puts err.to_s.split("\n").map{|s| " #{s}"} $stderr.puts "*"*72 + exit(1) end @@ -214,7 +238,7 @@ class FlexNBD Thread.start do Process.waitpid2( pid ) if @kill - fail "flexnbd quit with a bad status #{$?.exitstatus}" unless + fail "flexnbd quit with a bad status #{$?.exitstatus}" unless $?.exitstatus == @kill else $stderr.puts "flexnbd quit"