Free all possibly held mutexes in error handlers
Now that we have 3 mutexes lying around, it's important that we check and free these if necessary if error() is called in any thread that can hold them. To do this, we now have flexthread.c, which defines a flexthread_mutex struct. This is a wrapper around a pthread_mutex_t and a pthread_t. The idea is that in the error handler, the thread can check whether it holds the mutex and can free it if and only if it does. This is important because pthread fast mutexes can be freed by *any* thread, not just the thread which holds them. Note: it is only ever safe for a thread to check if it holds the mutex itself. It is *never* safe to check if another thread holds a mutex without first locking that mutex, which makes the whole operation rather pointless.
This commit is contained in:
@@ -470,6 +470,10 @@ void client_cleanup(struct client* client,
|
||||
munmap(client->mapped, client->serve->size);
|
||||
}
|
||||
if (client->fileno) { close(client->fileno); }
|
||||
|
||||
if ( server_io_locked( client->serve ) ) { server_unlock_io( client->serve ); }
|
||||
if ( server_acl_locked( client->serve ) ) { server_unlock_acl( client->serve ); }
|
||||
|
||||
}
|
||||
|
||||
void* client_serve(void* client_uncast)
|
||||
|
@@ -205,6 +205,7 @@ void control_serve( struct control * control )
|
||||
|
||||
void * control_runner( void * control_uncast )
|
||||
{
|
||||
debug("Control thread");
|
||||
NULLCHECK( control_uncast );
|
||||
struct control * control = (struct control *)control_uncast;
|
||||
|
||||
@@ -313,7 +314,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
|
||||
* one mirror at a time. This is enforced by only accepting a
|
||||
* single client at a time on the control socket.
|
||||
*/
|
||||
flexnbd_switch_lock( flexnbd );
|
||||
flexnbd_lock_switch( flexnbd );
|
||||
{
|
||||
struct server * serve = flexnbd_server(flexnbd);
|
||||
serve->mirror_super = mirror_super_create(
|
||||
@@ -333,10 +334,13 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
|
||||
"Failed to create mirror thread"
|
||||
);
|
||||
|
||||
debug("Control thread mirror super waiting");
|
||||
enum mirror_state state = mirror_super_wait( serve->mirror_super );
|
||||
debug("Control thread writing response");
|
||||
control_write_mirror_response( state, client->socket );
|
||||
}
|
||||
flexnbd_switch_unlock( flexnbd );
|
||||
debug( "Control thread unlocking switch" );
|
||||
flexnbd_unlock_switch( flexnbd );
|
||||
debug( "Control thread going away." );
|
||||
|
||||
return 0;
|
||||
@@ -392,6 +396,12 @@ void control_cleanup(struct control_client* client,
|
||||
int fatal __attribute__ ((unused)) )
|
||||
{
|
||||
if (client->socket) { close(client->socket); }
|
||||
|
||||
/* This is wrongness */
|
||||
if ( server_io_locked( client->flexnbd->serve ) ) { server_unlock_io( client->flexnbd->serve ); }
|
||||
if ( server_acl_locked( client->flexnbd->serve ) ) { server_unlock_acl( client->flexnbd->serve ); }
|
||||
if ( flexnbd_switch_locked( client->flexnbd ) ) { flexnbd_unlock_switch( client->flexnbd ); }
|
||||
|
||||
control_client_destroy( client );
|
||||
}
|
||||
|
||||
|
@@ -77,7 +77,7 @@ void flexnbd_create_shared(
|
||||
|
||||
flexnbd->signal_fd = flexnbd_build_signal_fd();
|
||||
|
||||
pthread_mutex_init( &flexnbd->switch_mutex, NULL );
|
||||
flexnbd->switch_mutex = flexthread_mutex_create();
|
||||
}
|
||||
|
||||
|
||||
@@ -186,16 +186,22 @@ void flexnbd_destroy( struct flexnbd * flexnbd )
|
||||
|
||||
/* THOU SHALT NOT DEREFERENCE flexnbd->serve OUTSIDE A SWITCH LOCK
|
||||
*/
|
||||
void flexnbd_switch_lock( struct flexnbd * flexnbd )
|
||||
void flexnbd_lock_switch( struct flexnbd * flexnbd )
|
||||
{
|
||||
NULLCHECK( flexnbd );
|
||||
pthread_mutex_lock( &flexnbd->switch_mutex );
|
||||
flexthread_mutex_lock( flexnbd->switch_mutex );
|
||||
}
|
||||
|
||||
void flexnbd_switch_unlock( struct flexnbd * flexnbd )
|
||||
void flexnbd_unlock_switch( struct flexnbd * flexnbd )
|
||||
{
|
||||
NULLCHECK( flexnbd );
|
||||
pthread_mutex_unlock( &flexnbd->switch_mutex );
|
||||
flexthread_mutex_unlock( flexnbd->switch_mutex );
|
||||
}
|
||||
|
||||
int flexnbd_switch_locked( struct flexnbd * flexnbd )
|
||||
{
|
||||
NULLCHECK( flexnbd );
|
||||
return flexthread_mutex_held( flexnbd->switch_mutex );
|
||||
}
|
||||
|
||||
struct server * flexnbd_server( struct flexnbd * flexnbd )
|
||||
@@ -208,11 +214,11 @@ struct server * flexnbd_server( struct flexnbd * flexnbd )
|
||||
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl )
|
||||
{
|
||||
NULLCHECK( flexnbd );
|
||||
flexnbd_switch_lock( flexnbd );
|
||||
flexnbd_lock_switch( flexnbd );
|
||||
{
|
||||
server_replace_acl( flexnbd_server(flexnbd), acl );
|
||||
}
|
||||
flexnbd_switch_unlock( flexnbd );
|
||||
flexnbd_unlock_switch( flexnbd );
|
||||
}
|
||||
|
||||
|
||||
@@ -221,11 +227,11 @@ struct status * flexnbd_status_create( struct flexnbd * flexnbd )
|
||||
NULLCHECK( flexnbd );
|
||||
struct status * status;
|
||||
|
||||
flexnbd_switch_lock( flexnbd );
|
||||
flexnbd_lock_switch( flexnbd );
|
||||
{
|
||||
status = status_create( flexnbd_server( flexnbd ) );
|
||||
}
|
||||
flexnbd_switch_unlock( flexnbd );
|
||||
flexnbd_unlock_switch( flexnbd );
|
||||
return status;
|
||||
}
|
||||
|
||||
@@ -245,13 +251,13 @@ void flexnbd_switch( struct flexnbd * flexnbd, struct server *(listen_cb)(struct
|
||||
NULLCHECK( flexnbd );
|
||||
NULLCHECK( flexnbd->listen );
|
||||
|
||||
flexnbd_switch_lock( flexnbd );
|
||||
flexnbd_lock_switch( flexnbd );
|
||||
{
|
||||
struct server * new_server = listen_cb( flexnbd->listen );
|
||||
NULLCHECK( new_server );
|
||||
flexnbd_set_server( flexnbd, new_server );
|
||||
}
|
||||
flexnbd_switch_unlock( flexnbd );
|
||||
flexnbd_unlock_switch( flexnbd );
|
||||
|
||||
}
|
||||
|
||||
@@ -266,11 +272,11 @@ int flexnbd_default_deny( struct flexnbd * flexnbd )
|
||||
int result;
|
||||
|
||||
NULLCHECK( flexnbd );
|
||||
flexnbd_switch_lock( flexnbd );
|
||||
flexnbd_lock_switch( flexnbd );
|
||||
{
|
||||
result = server_default_deny( flexnbd->serve );
|
||||
}
|
||||
flexnbd_switch_unlock( flexnbd );
|
||||
flexnbd_unlock_switch( flexnbd );
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@@ -8,6 +8,7 @@
|
||||
#include "self_pipe.h"
|
||||
#include "mbox.h"
|
||||
#include "control.h"
|
||||
#include "flexthread.h"
|
||||
|
||||
/* Carries the "globals". */
|
||||
struct flexnbd {
|
||||
@@ -28,7 +29,7 @@ struct flexnbd {
|
||||
/* switch_mutex is the lock around dereferencing the serve
|
||||
* pointer.
|
||||
*/
|
||||
pthread_mutex_t switch_mutex;
|
||||
struct flexthread_mutex * switch_mutex;
|
||||
|
||||
/* File descriptor for a signalfd(2) signal stream. */
|
||||
int signal_fd;
|
||||
@@ -60,8 +61,9 @@ struct flexnbd * flexnbd_create_listening(
|
||||
void flexnbd_destroy( struct flexnbd * );
|
||||
enum mirror_state;
|
||||
enum mirror_state flexnbd_get_mirror_state( struct flexnbd * );
|
||||
void flexnbd_switch_lock( struct flexnbd * );
|
||||
void flexnbd_switch_unlock( struct flexnbd * );
|
||||
void flexnbd_lock_switch( struct flexnbd * );
|
||||
void flexnbd_unlock_switch( struct flexnbd * );
|
||||
int flexnbd_switch_locked( struct flexnbd * );
|
||||
int flexnbd_default_deny( struct flexnbd * );
|
||||
void flexnbd_set_server( struct flexnbd * flexnbd, struct server * serve );
|
||||
void flexnbd_switch( struct flexnbd * flexnbd, struct server *(listen_cb)(struct listen *) );
|
||||
|
75
src/flexthread.c
Normal file
75
src/flexthread.c
Normal file
@@ -0,0 +1,75 @@
|
||||
#include "flexthread.h"
|
||||
#include "util.h"
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
|
||||
struct flexthread_mutex * flexthread_mutex_create(void)
|
||||
{
|
||||
struct flexthread_mutex * ftm =
|
||||
xmalloc( sizeof( struct flexthread_mutex ) );
|
||||
|
||||
FATAL_UNLESS( 0 == pthread_mutex_init( &ftm->mutex, NULL ),
|
||||
"Mutex initialisation failed" );
|
||||
return ftm;
|
||||
|
||||
}
|
||||
|
||||
|
||||
void flexthread_mutex_destroy( struct flexthread_mutex * ftm )
|
||||
{
|
||||
NULLCHECK( ftm );
|
||||
|
||||
if( flexthread_mutex_held( ftm ) ) {
|
||||
flexthread_mutex_unlock( ftm );
|
||||
}
|
||||
else if ( (pthread_t)NULL != ftm->holder ) {
|
||||
/* This "should never happen": if we can try to destroy
|
||||
* a mutex currently held by another thread, there's a
|
||||
* logic bug somewhere. I know the test here is racy,
|
||||
* but there's not a lot we can do about it at this
|
||||
* point.
|
||||
*/
|
||||
fatal( "Attempted to destroy a flexthread_mutex"\
|
||||
" held by another thread!" );
|
||||
}
|
||||
|
||||
FATAL_UNLESS( 0 == pthread_mutex_destroy( &ftm->mutex ),
|
||||
"Mutex destroy failed" );
|
||||
free( ftm );
|
||||
}
|
||||
|
||||
|
||||
int flexthread_mutex_lock( struct flexthread_mutex * ftm )
|
||||
{
|
||||
NULLCHECK( ftm );
|
||||
|
||||
int failure = pthread_mutex_lock( &ftm->mutex );
|
||||
if ( 0 == failure ) {
|
||||
ftm->holder = pthread_self();
|
||||
}
|
||||
|
||||
return failure;
|
||||
}
|
||||
|
||||
|
||||
int flexthread_mutex_unlock( struct flexthread_mutex * ftm )
|
||||
{
|
||||
NULLCHECK( ftm );
|
||||
|
||||
pthread_t orig = ftm->holder;
|
||||
ftm->holder = (pthread_t)NULL;
|
||||
int failure = pthread_mutex_unlock( &ftm->mutex );
|
||||
if ( 0 != failure ) {
|
||||
ftm->holder = orig;
|
||||
}
|
||||
return failure;
|
||||
}
|
||||
|
||||
|
||||
int flexthread_mutex_held( struct flexthread_mutex * ftm )
|
||||
{
|
||||
NULLCHECK( ftm );
|
||||
return pthread_self() == ftm->holder;
|
||||
}
|
||||
|
29
src/flexthread.h
Normal file
29
src/flexthread.h
Normal file
@@ -0,0 +1,29 @@
|
||||
#ifndef FLEXTHREAD_H
|
||||
#define FLEXTHREAD_H
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
/* Define a mutex wrapper object. This wrapper allows us to easily
|
||||
* track whether or not we currently hold the wrapped mutex. If we hold
|
||||
* the mutex when we destroy it, then we first release it.
|
||||
*
|
||||
* These are specifically for the case where an ERROR_* handler gets
|
||||
* called when we might (or might not) have a mutex held. The
|
||||
* flexthread_mutex_held() function will tell you if your thread
|
||||
* currently holds the given mutex. It's not safe to make any other
|
||||
* comparisons.
|
||||
*/
|
||||
|
||||
struct flexthread_mutex {
|
||||
pthread_mutex_t mutex;
|
||||
pthread_t holder;
|
||||
};
|
||||
|
||||
struct flexthread_mutex * flexthread_mutex_create(void);
|
||||
void flexthread_mutex_destroy( struct flexthread_mutex * );
|
||||
|
||||
int flexthread_mutex_lock( struct flexthread_mutex * );
|
||||
int flexthread_mutex_unlock( struct flexthread_mutex * );
|
||||
int flexthread_mutex_held( struct flexthread_mutex * );
|
||||
|
||||
#endif
|
11
src/listen.c
11
src/listen.c
@@ -65,8 +65,13 @@ struct server *listen_switch( struct listen * listen )
|
||||
}
|
||||
|
||||
|
||||
void listen_cleanup( void * unused __attribute__((unused)) )
|
||||
void listen_cleanup( struct listen * listen )
|
||||
{
|
||||
NULLCHECK( listen );
|
||||
|
||||
if ( flexnbd_switch_locked( listen->flexnbd ) ) {
|
||||
flexnbd_unlock_switch( listen->flexnbd );
|
||||
}
|
||||
}
|
||||
|
||||
int do_listen( struct listen * listen )
|
||||
@@ -75,11 +80,11 @@ int do_listen( struct listen * listen )
|
||||
|
||||
int have_control = 0;
|
||||
|
||||
flexnbd_switch_lock( listen->flexnbd );
|
||||
flexnbd_lock_switch( listen->flexnbd );
|
||||
{
|
||||
flexnbd_set_server( listen->flexnbd, listen->init_serve );
|
||||
}
|
||||
flexnbd_switch_unlock( listen->flexnbd );
|
||||
flexnbd_unlock_switch( listen->flexnbd );
|
||||
|
||||
/* WATCH FOR RACES HERE: flexnbd->serve is set, but the server
|
||||
* isn't running yet and the switch lock is released.
|
||||
|
11
src/mirror.c
11
src/mirror.c
@@ -23,7 +23,6 @@
|
||||
#include "readwrite.h"
|
||||
#include "bitset.h"
|
||||
#include "self_pipe.h"
|
||||
#include "acl.h"
|
||||
#include "status.h"
|
||||
|
||||
|
||||
@@ -182,6 +181,7 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written)
|
||||
*/
|
||||
if (should_lock) { server_lock_io( serve ); }
|
||||
{
|
||||
debug("in lock block");
|
||||
/** FIXME: do something useful with bytes/second */
|
||||
|
||||
/** FIXME: error handling code here won't unlock */
|
||||
@@ -194,6 +194,7 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written)
|
||||
|
||||
/* now mark it clean */
|
||||
bitset_clear_range(map, current, run);
|
||||
debug("leaving lock block");
|
||||
}
|
||||
if (should_lock) { server_unlock_io( serve ); }
|
||||
|
||||
@@ -280,9 +281,11 @@ void mirror_on_exit( struct server * serve )
|
||||
}
|
||||
|
||||
|
||||
void mirror_cleanup( struct mirror_status * mirror,
|
||||
void mirror_cleanup( struct server * serve,
|
||||
int fatal __attribute__((unused)))
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
struct mirror_status * mirror = serve->mirror;
|
||||
NULLCHECK( mirror );
|
||||
info( "Cleaning up mirror thread");
|
||||
|
||||
@@ -290,6 +293,8 @@ void mirror_cleanup( struct mirror_status * mirror,
|
||||
close( mirror->client );
|
||||
}
|
||||
mirror->client = -1;
|
||||
|
||||
if( server_io_locked( serve ) ){ server_unlock_io( serve ); }
|
||||
}
|
||||
|
||||
|
||||
@@ -404,7 +409,7 @@ void* mirror_runner(void* serve_params_uncast)
|
||||
struct mirror_status * mirror = serve->mirror;
|
||||
NULLCHECK( mirror->dirty_map );
|
||||
|
||||
error_set_handler( (cleanup_handler *) mirror_cleanup, mirror );
|
||||
error_set_handler( (cleanup_handler *) mirror_cleanup, serve );
|
||||
|
||||
info( "Connecting to mirror" );
|
||||
|
||||
|
@@ -39,8 +39,7 @@ void self_pipe_server_error( int err, char *msg )
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a struct self_pipe, opening the pipe and allocating a
|
||||
* pthread mutex.
|
||||
* Allocate a struct self_pipe, opening the pipe.
|
||||
*
|
||||
* Returns NULL if the pipe couldn't be opened or if we couldn't set it
|
||||
* non-blocking.
|
||||
|
40
src/serve.c
40
src/serve.c
@@ -81,8 +81,8 @@ struct server * server_create (
|
||||
strcpy(out->filename_incomplete, s_file);
|
||||
strcpy(out->filename_incomplete + strlen(s_file), ".INCOMPLETE");
|
||||
|
||||
pthread_mutex_init(&out->l_io, NULL);
|
||||
pthread_mutex_init(&out->l_acl, NULL);
|
||||
out->l_io = flexthread_mutex_create();
|
||||
out->l_acl= flexthread_mutex_create();
|
||||
|
||||
out->close_signal = self_pipe_create();
|
||||
out->acl_updated_signal = self_pipe_create();
|
||||
@@ -100,8 +100,8 @@ void server_destroy( struct server * serve )
|
||||
self_pipe_destroy( serve->close_signal );
|
||||
serve->close_signal = NULL;
|
||||
|
||||
pthread_mutex_destroy( &serve->l_acl );
|
||||
pthread_mutex_destroy( &serve->l_io );
|
||||
flexthread_mutex_destroy( serve->l_acl );
|
||||
flexthread_mutex_destroy( serve->l_io );
|
||||
|
||||
if ( serve->acl ) {
|
||||
acl_destroy( serve->acl );
|
||||
@@ -126,23 +126,39 @@ void server_dirty(struct server *serve, off64_t from, int len)
|
||||
|
||||
#define SERVER_LOCK( s, f, msg ) \
|
||||
do { NULLCHECK( s ); \
|
||||
FATAL_IF( 0 != pthread_mutex_lock( &s->f ), msg ); } while (0)
|
||||
FATAL_IF( 0 != flexthread_mutex_lock( s->f ), msg ); } while (0)
|
||||
#define SERVER_UNLOCK( s, f, msg ) \
|
||||
do { NULLCHECK( s ); \
|
||||
FATAL_IF( 0 != pthread_mutex_unlock( &s->f ), msg ); } while (0)
|
||||
FATAL_IF( 0 != flexthread_mutex_unlock( s->f ), msg ); } while (0)
|
||||
|
||||
void server_lock_io( struct server * serve)
|
||||
{
|
||||
debug("IO locking");
|
||||
|
||||
SERVER_LOCK( serve, l_io, "Problem with I/O lock" );
|
||||
}
|
||||
|
||||
void server_unlock_io( struct server* serve )
|
||||
{
|
||||
debug("IO unlocking");
|
||||
|
||||
SERVER_UNLOCK( serve, l_io, "Problem with I/O unlock" );
|
||||
}
|
||||
|
||||
|
||||
/* This is only to be called from error handlers. */
|
||||
int server_io_locked( struct server * serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
return flexthread_mutex_held( serve->l_io );
|
||||
}
|
||||
|
||||
|
||||
|
||||
void server_lock_acl( struct server *serve )
|
||||
{
|
||||
debug("ACL locking");
|
||||
|
||||
SERVER_LOCK( serve, l_acl, "Problem with ACL lock" );
|
||||
}
|
||||
|
||||
@@ -152,6 +168,13 @@ void server_unlock_acl( struct server *serve )
|
||||
}
|
||||
|
||||
|
||||
int server_acl_locked( struct server * serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
return flexthread_mutex_held( serve->l_acl );
|
||||
}
|
||||
|
||||
|
||||
/** Return the actual port the server bound to. This is used because we
|
||||
* are allowed to pass "0" on the command-line.
|
||||
*/
|
||||
@@ -640,6 +663,11 @@ void serve_cleanup(struct server* params,
|
||||
pthread_join(thread_id, &status);
|
||||
}
|
||||
}
|
||||
|
||||
if ( server_acl_locked( params ) ) {
|
||||
server_unlock_acl( params );
|
||||
}
|
||||
|
||||
debug( "Cleanup done");
|
||||
}
|
||||
|
||||
|
11
src/serve.h
11
src/serve.h
@@ -38,7 +38,7 @@ struct server {
|
||||
uint64_t size;
|
||||
|
||||
/** Claims around any I/O to this file */
|
||||
pthread_mutex_t l_io;
|
||||
struct flexthread_mutex * l_io;
|
||||
|
||||
/** to interrupt accept loop and clients, write() to close_signal[1] */
|
||||
struct self_pipe * close_signal;
|
||||
@@ -49,7 +49,9 @@ struct server {
|
||||
* has been replaced
|
||||
*/
|
||||
struct self_pipe * acl_updated_signal;
|
||||
pthread_mutex_t l_acl;
|
||||
|
||||
/* Claimed around any updates to the ACL. */
|
||||
struct flexthread_mutex * l_acl;
|
||||
|
||||
struct mirror_status* mirror;
|
||||
struct mirror_super * mirror_super;
|
||||
@@ -90,6 +92,11 @@ void server_replace_acl( struct server *serve, struct acl * acl);
|
||||
void server_control_arrived( struct server *serve );
|
||||
int server_is_in_control( struct server *serve );
|
||||
int server_default_deny( struct server * serve );
|
||||
int server_io_locked( struct server * serve );
|
||||
int server_acl_locked( struct server * serve );
|
||||
void server_lock_acl( struct server *serve );
|
||||
void server_unlock_acl( struct server *serve );
|
||||
|
||||
|
||||
int do_serve( struct server * );
|
||||
|
||||
|
Reference in New Issue
Block a user