Squash valgrind errors by making sure client threads get joined on termination

This commit is contained in:
Alex Young
2012-06-21 17:11:12 +01:00
parent bafc3d3687
commit a3dc670939
6 changed files with 117 additions and 20 deletions

View File

@@ -422,6 +422,12 @@ void accept_control_connection(struct server* params, int client_fd,
), ),
"Failed to create client thread" "Failed to create client thread"
); );
/* FIXME: This thread *really* shouldn't detach
* Since it can see the server object, if listen switches mode
* while this is live, Bad Things Could Happen.
*/
pthread_detach( control_thread );
} }
void serve_open_control_socket(struct server* params) void serve_open_control_socket(struct server* params)

View File

@@ -81,6 +81,7 @@ struct server * server_create (
out->close_signal = self_pipe_create(); out->close_signal = self_pipe_create();
out->acl_updated_signal = self_pipe_create(); out->acl_updated_signal = self_pipe_create();
out->vacuum_signal = self_pipe_create();
NULLCHECK( out->close_signal ); NULLCHECK( out->close_signal );
NULLCHECK( out->acl_updated_signal ); NULLCHECK( out->acl_updated_signal );
@@ -94,6 +95,8 @@ void server_destroy( struct server * serve )
serve->acl_updated_signal = NULL; serve->acl_updated_signal = NULL;
self_pipe_destroy( serve->close_signal ); self_pipe_destroy( serve->close_signal );
serve->close_signal = NULL; serve->close_signal = NULL;
self_pipe_destroy( serve->vacuum_signal );
serve->vacuum_signal = NULL;
pthread_mutex_destroy( &serve->l_acl ); pthread_mutex_destroy( &serve->l_acl );
pthread_mutex_destroy( &serve->l_io ); pthread_mutex_destroy( &serve->l_io );
@@ -207,7 +210,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
NULLCHECK( joinfunc ); NULLCHECK( joinfunc );
int was_closed = 0; int was_closed = 0;
void * status; void * status=NULL;
int join_errno; int join_errno;
if (entry->thread != 0) { if (entry->thread != 0) {
@@ -222,7 +225,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
join_errno = joinfunc(entry->thread, &status); join_errno = joinfunc(entry->thread, &status);
/* join_errno can legitimately be ESRCH if the thread is /* join_errno can legitimately be ESRCH if the thread is
* already dead, but the cluent still needs tidying up. */ * already dead, but the client still needs tidying up. */
if (join_errno != 0 && !entry->client->stopped ) { if (join_errno != 0 && !entry->client->stopped ) {
FATAL_UNLESS( join_errno == EBUSY, FATAL_UNLESS( join_errno == EBUSY,
"Problem with joining thread %p: %s", "Problem with joining thread %p: %s",
@@ -262,6 +265,14 @@ int cleanup_client_thread( struct client_tbl_entry * entry )
return tryjoin_client_thread( entry, pthread_tryjoin_np ); return tryjoin_client_thread( entry, pthread_tryjoin_np );
} }
void cleanup_client_threads( struct client_tbl_entry * entries )
{
int i;
for( i = 0; i < MAX_NBD_CLIENTS; i++ ) {
cleanup_client_thread( &entries[i] );
}
}
/** /**
* Join a client thread after having sent a stop signal to it. * Join a client thread after having sent a stop signal to it.
@@ -281,15 +292,13 @@ int cleanup_and_find_client_slot(struct server* params)
{ {
NULLCHECK( params ); NULLCHECK( params );
int slot=-1, i,j; int slot=-1, i;
cleanup_client_threads( params->nbd_client );
for ( i = 0; i < MAX_NBD_CLIENTS; i++ ) { for ( i = 0; i < MAX_NBD_CLIENTS; i++ ) {
cleanup_client_thread( &params->nbd_client[i] ); if( params->nbd_client[i].thread == 0 && slot == -1 ){
} slot = i;
for ( j = 0; j < MAX_NBD_CLIENTS; j++ ) {
if( params->nbd_client[j].thread == 0 && slot == -1 ){
slot = j;
break; break;
} }
} }
@@ -353,6 +362,48 @@ int server_should_accept_client(
} }
struct client_cleanup {
pthread_t client_thread;
struct self_pipe * vacuum_signal;
};
void *client_vacuum( void * cleanup_uncast )
{
pthread_detach( pthread_self() );
NULLCHECK( cleanup_uncast );
struct client_cleanup *cleanup = (struct client_cleanup *)cleanup_uncast;
pthread_join( cleanup->client_thread, NULL );
self_pipe_signal( cleanup->vacuum_signal );
free( cleanup );
return NULL;
}
/* Why do we need this rather odd arrangement? Because if we don't have
* it, dead threads don't get tidied up until the next incoming
* connection happens.
*/
int spawn_client_thread(
struct client * client_params,
struct self_pipe * vacuum_signal,
pthread_t *out_thread)
{
struct client_cleanup * cleanup = xmalloc( sizeof( struct client_cleanup ) );
cleanup->vacuum_signal = vacuum_signal;
int result = pthread_create(&cleanup->client_thread, NULL, client_serve, client_params);
if ( 0 == result ){
pthread_t watcher;
pthread_create( &watcher, NULL, client_vacuum, cleanup );
*out_thread = cleanup->client_thread;
}
return result;
}
/** Dispatch function for accepting an NBD connection and starting a thread /** Dispatch function for accepting an NBD connection and starting a thread
* to handle it. Rejects the connection if there is an ACL, and the far end's * to handle it. Rejects the connection if there is an ACL, and the far end's
* address doesn't match, or if there are too many clients already connected. * address doesn't match, or if there are too many clients already connected.
@@ -390,7 +441,9 @@ void accept_nbd_client(
memcpy(&params->nbd_client[slot].address, client_address, memcpy(&params->nbd_client[slot].address, client_address,
sizeof(union mysockaddr)); sizeof(union mysockaddr));
if (pthread_create(&params->nbd_client[slot].thread, NULL, client_serve, client_params) != 0) { pthread_t * thread = &params->nbd_client[slot].thread;
if (spawn_client_thread( client_params, params->vacuum_signal, thread ) != 0) {
debug( "Thread creation problem." ); debug( "Thread creation problem." );
write(client_fd, "Thread creation problem", 23); write(client_fd, "Thread creation problem", 23);
client_destroy( client_params ); client_destroy( client_params );
@@ -479,6 +532,7 @@ void server_replace_acl( struct server *serve, struct acl * new_acl )
} }
/** Accept either an NBD or control socket connection, dispatch appropriately */ /** Accept either an NBD or control socket connection, dispatch appropriately */
int server_accept( struct server * params ) int server_accept( struct server * params )
{ {
@@ -493,6 +547,7 @@ int server_accept( struct server * params )
FD_SET(params->server_fd, &fds); FD_SET(params->server_fd, &fds);
self_pipe_fd_set( params->close_signal, &fds ); self_pipe_fd_set( params->close_signal, &fds );
self_pipe_fd_set( params->acl_updated_signal, &fds ); self_pipe_fd_set( params->acl_updated_signal, &fds );
self_pipe_fd_set( params->vacuum_signal, &fds );
if (params->control_socket_name) { if (params->control_socket_name) {
FD_SET(params->control_fd, &fds); FD_SET(params->control_fd, &fds);
} }
@@ -505,6 +560,11 @@ int server_accept( struct server * params )
return 0; return 0;
} }
if ( self_pipe_fd_isset( params->vacuum_signal, &fds ) ) {
cleanup_client_threads( params->nbd_client );
self_pipe_signal_clear( params->vacuum_signal );
}
if ( self_pipe_fd_isset( params->acl_updated_signal, &fds ) ) { if ( self_pipe_fd_isset( params->acl_updated_signal, &fds ) ) {
self_pipe_signal_clear( params->acl_updated_signal ); self_pipe_signal_clear( params->acl_updated_signal );
server_audit_clients( params ); server_audit_clients( params );

View File

@@ -69,6 +69,11 @@ struct server {
struct self_pipe * acl_updated_signal; struct self_pipe * acl_updated_signal;
pthread_mutex_t l_acl; pthread_mutex_t l_acl;
/** vacuum_signal will be sent when client threads terminate.
* This is mainly to keep valgrind happy
*/
struct self_pipe * vacuum_signal;
struct mirror_status* mirror; struct mirror_status* mirror;
int server_fd; int server_fd;
int control_fd; int control_fd;

View File

@@ -29,6 +29,7 @@ void error_handler(int fatal __attribute__ ((unused)) )
longjmp(context->jmp, 1); longjmp(context->jmp, 1);
} }
void mylog(int line_level, const char* format, ...) void mylog(int line_level, const char* format, ...)
{ {
va_list argptr; va_list argptr;

View File

@@ -59,6 +59,7 @@ extern pthread_key_t cleanup_handler_key;
abort(); \ abort(); \
case 2: /* non-fatal error, return to context of error handler setup */ \ case 2: /* non-fatal error, return to context of error handler setup */ \
context->handler(context->data, 0); \ context->handler(context->data, 0); \
pthread_exit((void *)1);\
default: \ default: \
abort(); \ abort(); \
} \ } \

View File

@@ -19,12 +19,13 @@ class ValgrindExecutor
attr_reader :pid attr_reader :pid
class Error class Error
attr_accessor :what, :kind attr_accessor :what, :kind, :pid
attr_reader :backtrace attr_reader :backtrace
def initialize def initialize
@backtrace=[] @backtrace=[]
@what = "" @what = ""
@kind = "" @kind = ""
@pid = ""
end end
def add_frame def add_frame
@@ -44,7 +45,7 @@ class ValgrindExecutor
end end
def to_s def to_s
([@what + " (#{@kind})"] + @backtrace.map{|h| "#{h[:file]}:#{h[:line]} #{h[:fn]}" }).join("\n") ([@what + " (#{@kind}) in #{@pid}"] + @backtrace.map{|h| "#{h[:file]}:#{h[:line]} #{h[:fn]}" }).join("\n")
end end
end # class Error end # class Error
@@ -54,6 +55,8 @@ class ValgrindExecutor
include REXML::StreamListener include REXML::StreamListener
def initialize( killer ) def initialize( killer )
@killer = killer @killer = killer
@error = Error.new
@found = false
end end
def text( text ) def text( text )
@@ -63,7 +66,7 @@ class ValgrindExecutor
def tag_start(tag, attrs) def tag_start(tag, attrs)
case tag.to_s case tag.to_s
when "error" when "error"
@error = Error.new @found = true
when "frame" when "frame"
@error.add_frame @error.add_frame
end end
@@ -72,23 +75,43 @@ class ValgrindExecutor
def tag_end(tag) def tag_end(tag)
case tag.to_s case tag.to_s
when "what" when "what"
@error.what = @text if @error @error.what = @text if @found
@text = "" @text = ""
when "kind" when "kind"
@error.kind = @text if @error @error.kind = @text if @found
when "file" when "file"
@error.add_file( @text ) if @error @error.add_file( @text ) if @found
when "fn" when "fn"
@error.add_fn( @text ) if @error @error.add_fn( @text ) if @found
when "line" when "line"
@error.add_line( @text ) if @error @error.add_line( @text ) if @found
when "error" when "error", "stack"
@killer.call( @error ) @killer.call( @error )
when "pid"
@error.pid=@text
end end
end end
end # class ErrorListener end # class ErrorListener
class DebugErrorListener < ErrorListener
def text( txt )
print txt
super( txt )
end
def tag_start( tag, attrs )
print "<#{tag}>"
super( tag, attrs )
end
def tag_end( tag )
print "</#{tag}>"
super( tag )
end
end
def initialize def initialize
@pid = nil @pid = nil
end end
@@ -107,6 +130,7 @@ class ValgrindExecutor
$stderr.puts "* Valgrind error spotted:" $stderr.puts "* Valgrind error spotted:"
$stderr.puts err.to_s.split("\n").map{|s| " #{s}"} $stderr.puts err.to_s.split("\n").map{|s| " #{s}"}
$stderr.puts "*"*72 $stderr.puts "*"*72
exit(1)
end end
@@ -214,7 +238,7 @@ class FlexNBD
Thread.start do Thread.start do
Process.waitpid2( pid ) Process.waitpid2( pid )
if @kill if @kill
fail "flexnbd quit with a bad status #{$?.exitstatus}" unless fail "flexnbd quit with a bad status #{$?.exitstatus}" unless
$?.exitstatus == @kill $?.exitstatus == @kill
else else
$stderr.puts "flexnbd quit" $stderr.puts "flexnbd quit"