From c19901cf10d3fb3bb7618f4ec6331425873caf80 Mon Sep 17 00:00:00 2001 From: Michel Pollet Date: Sat, 1 Oct 2016 12:15:09 +0100 Subject: [PATCH] mbox: Simplified Removed the existing mbox that is used to pass transactions from one thread to the other, use a non-locking FIFO and a simple semaphore instead. Removed all notion of void* from the FIFO system, use the now generic FIFO. Also started to rework the mirror.c code to use typedefs for structs enums etc. DO NOT USE. check_mbox is borken and needs changing. Currently 'functional' otherwise like this, but requires more testing. Signed-off-by: Michel Pollet --- src/server/control.c | 20 ++++----- src/server/control.h | 4 +- src/server/mbox.c | 90 ++++++++++++++++++++--------------------- src/server/mbox.h | 46 ++++++++++----------- src/server/mirror.c | 81 +++++++++++++++++-------------------- src/server/mirror.h | 38 ++++++++--------- src/server/serve.h | 4 +- tests/unit/check_mbox.c | 13 +++--- 8 files changed, 139 insertions(+), 157 deletions(-) diff --git a/src/server/control.c b/src/server/control.c index 63f0720..7d47eab 100644 --- a/src/server/control.c +++ b/src/server/control.c @@ -83,7 +83,7 @@ void control_destroy( struct control * control ) struct control_client * control_client_create( struct flexnbd * flexnbd, int client_fd , - struct mbox * state_mbox ) + struct mbox_t * state_mbox ) { NULLCHECK( flexnbd ); @@ -256,7 +256,7 @@ void * control_runner( void * control_uncast ) #define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1) -void control_write_mirror_response( enum mirror_state mirror_state, int client_fd ) +void control_write_mirror_response( mirror_state_t mirror_state, int client_fd ) { switch (mirror_state) { case MS_INIT: @@ -290,22 +290,16 @@ void control_write_mirror_response( enum mirror_state mirror_state, int client_f /* Call this in the thread where you want to receive the mirror state */ -enum mirror_state control_client_mirror_wait( +mirror_state_t control_client_mirror_wait( struct control_client* client) { NULLCHECK( client ); NULLCHECK( client->mirror_state_mbox ); - struct mbox * mbox = client->mirror_state_mbox; - enum mirror_state mirror_state; - enum mirror_state * contents; + struct mbox_t * mbox = client->mirror_state_mbox; + mirror_state_t mirror_state; - contents = (enum mirror_state*)mbox_receive( mbox ); - NULLCHECK( contents ); - - mirror_state = *contents; - - free( contents ); + mirror_state = mbox_receive( mbox ).i; return mirror_state; } @@ -425,7 +419,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines) ); debug("Control thread mirror super waiting"); - enum mirror_state state = + mirror_state_t state = control_client_mirror_wait( client ); debug("Control thread writing response"); control_write_mirror_response( state, client->socket ); diff --git a/src/server/control.h b/src/server/control.h index 977c565..bc35634 100644 --- a/src/server/control.h +++ b/src/server/control.h @@ -31,7 +31,7 @@ struct control { * process (and we can only have a mirror thread if the control * thread has started it). */ - struct mbox * mirror_state_mbox; + struct mbox_t * mirror_state_mbox; }; struct control_client{ @@ -41,7 +41,7 @@ struct control_client{ /* Passed in on creation. We know it's all right to do this * because we know there's only ever one control_client. */ - struct mbox * mirror_state_mbox; + struct mbox_t * mirror_state_mbox; }; struct control * control_create( diff --git a/src/server/mbox.c b/src/server/mbox.c index 3e96785..1f8beec 100644 --- a/src/server/mbox.c +++ b/src/server/mbox.c @@ -1,77 +1,73 @@ + #include "mbox.h" #include "util.h" +#include #include -struct mbox * mbox_create( void ) +DEFINE_FIFO(mbox_item_t, mbox_fifo); + +#define ARRAY_SIZE(w) (sizeof(w) / sizeof((w)[0])) + +mbox_p mbox_create( void ) { - struct mbox * mbox = xmalloc( sizeof( struct mbox ) ); - FATAL_UNLESS( 0 == pthread_cond_init( &mbox->filled_cond, NULL ), - "Failed to initialise a condition variable" ); - FATAL_UNLESS( 0 == pthread_cond_init( &mbox->emptied_cond, NULL ), - "Failed to initialise a condition variable" ); - FATAL_UNLESS( 0 == pthread_mutex_init( &mbox->mutex, NULL ), - "Failed to initialise a mutex" ); + mbox_p mbox = xmalloc( sizeof( struct mbox_t ) ); + + int sv[2]; + FATAL_UNLESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0, + "Failed to socketpair"); + mbox->signalw = sv[0]; + mbox->signalr = sv[1]; + return mbox; } -void mbox_post( struct mbox * mbox, void * contents ) +void mbox_post( mbox_p mbox, mbox_item_t item ) { - pthread_mutex_lock( &mbox->mutex ); + mbox_fifo_write(&mbox->fifo, item); { - if (mbox->full){ - pthread_cond_wait( &mbox->emptied_cond, &mbox->mutex ); - } - mbox->contents = contents; - mbox->full = 1; - while( 0 != pthread_cond_signal( &mbox->filled_cond ) ); + uint8_t w; + FATAL_UNLESS((write(mbox->signalw, &w, 1)) == 1, + "Write to socketpair"); } - pthread_mutex_unlock( &mbox->mutex ); } -void * mbox_contents( struct mbox * mbox ) +mbox_item_t mbox_contents( mbox_p mbox ) { - return mbox->contents; + const mbox_item_t zero = {0}; + + return mbox_fifo_isempty(&mbox->fifo) ? + zero : + mbox_fifo_read_at(&mbox->fifo, 0); } -int mbox_is_full( struct mbox * mbox ) +int mbox_is_full( mbox_p mbox ) { - return mbox->full; + return mbox_fifo_isfull(&mbox->fifo); } -void * mbox_receive( struct mbox * mbox ) -{ - NULLCHECK( mbox ); - void * result; - - pthread_mutex_lock( &mbox->mutex ); - { - if ( !mbox->full ) { - pthread_cond_wait( &mbox->filled_cond, &mbox->mutex ); - } - mbox->full = 0; - result = mbox->contents; - mbox->contents = NULL; - - while( 0 != pthread_cond_signal( &mbox->emptied_cond)); - } - pthread_mutex_unlock( &mbox->mutex ); - - return result; -} - - -void mbox_destroy( struct mbox * mbox ) +mbox_item_t mbox_receive( mbox_p mbox ) { NULLCHECK( mbox ); - while( 0 != pthread_cond_destroy( &mbox->emptied_cond ) ); - while( 0 != pthread_cond_destroy( &mbox->filled_cond ) ); + while (mbox_fifo_isempty(&mbox->fifo)) { + uint8_t w; + FATAL_UNLESS((read(mbox->signalr, &w, 1)) == 1, + "Read from socketpair"); + } - while( 0 != pthread_mutex_destroy( &mbox->mutex ) ); + return mbox_fifo_read(&mbox->fifo); +} + +void mbox_destroy( mbox_p mbox ) +{ + NULLCHECK( mbox ); + + close(mbox->signalw); + close(mbox->signalr); free( mbox ); } diff --git a/src/server/mbox.h b/src/server/mbox.h index 3af54d8..1352799 100644 --- a/src/server/mbox.h +++ b/src/server/mbox.h @@ -11,45 +11,43 @@ #include +#include +#include "fifo_declare.h" + +typedef union { + uint64_t i; + void * p; +} mbox_item_t; + +DECLARE_FIFO(mbox_item_t, mbox_fifo, 8); + +typedef struct mbox_t { + mbox_fifo_t fifo; + // socketpair() ends + int signalw, signalr; +} mbox_t, *mbox_p; -struct mbox { - void * contents; - - /** Marker to tell us if there's content in the box. - * Keeping this separate allows us to use NULL for the contents. - */ - int full; - - /** This gets signaled by mbox_post, and waited on by - * mbox_receive */ - pthread_cond_t filled_cond; - /** This is signaled by mbox_receive, and waited on by mbox_post */ - pthread_cond_t emptied_cond; - pthread_mutex_t mutex; -}; - - -/* Create an mbox. */ -struct mbox * mbox_create(void); +/* Create an mbox_t. */ +mbox_p mbox_create(void); /* Put something in the mbox, blocking if it's already full. * That something can be NULL if you want. */ -void mbox_post( struct mbox *, void *); +void mbox_post( mbox_p , mbox_item_t item); /* See what's in the mbox. This isn't thread-safe. */ -void * mbox_contents( struct mbox *); +mbox_item_t mbox_contents( mbox_p ); /* See if anything has been put into the mbox. This isn't thread-safe. * */ -int mbox_is_full( struct mbox *); +int mbox_is_full( mbox_p ); /* Get the contents from the mbox, blocking if there's nothing there. */ -void * mbox_receive( struct mbox *); +mbox_item_t mbox_receive( mbox_p ); /* Free the mbox and destroy the associated pthread bits. */ -void mbox_destroy( struct mbox *); +void mbox_destroy( mbox_p ); #endif diff --git a/src/server/mirror.c b/src/server/mirror.c index 708a67d..b4a7ea3 100644 --- a/src/server/mirror.c +++ b/src/server/mirror.c @@ -66,7 +66,7 @@ struct xfer { struct mirror_ctrl { struct server *serve; - struct mirror *mirror; + mirror_p mirror; /* libev stuff */ struct ev_loop *ev_loop; @@ -90,16 +90,16 @@ struct mirror_ctrl { }; -struct mirror * mirror_alloc( +mirror_p mirror_alloc( union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, - enum mirror_finish_action action_at_finish, - struct mbox * commit_signal) + mirror_finish_action_t action_at_finish, + mbox_p commit_signal) { - struct mirror * mirror; + mirror_p mirror; - mirror = xmalloc(sizeof(struct mirror)); + mirror = xmalloc(sizeof(mirror_t)); mirror->connect_to = connect_to; mirror->connect_from = connect_from; mirror->max_bytes_per_second = max_Bps; @@ -116,7 +116,7 @@ struct mirror * mirror_alloc( return mirror; } -void mirror_set_state_f( struct mirror * mirror, enum mirror_state state ) +void mirror_set_state_f( mirror_p mirror, mirror_state_t state ) { NULLCHECK( mirror ); mirror->commit_state = state; @@ -127,7 +127,7 @@ void mirror_set_state_f( struct mirror * mirror, enum mirror_state state ) mirror_set_state_f( mirror, state );\ } while(0) -enum mirror_state mirror_get_state( struct mirror * mirror ) +mirror_state_t mirror_get_state( mirror_p mirror ) { NULLCHECK( mirror ); return mirror->commit_state; @@ -136,7 +136,7 @@ enum mirror_state mirror_get_state( struct mirror * mirror ) #define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state -void mirror_init( struct mirror * mirror, const char * filename ) +void mirror_init( mirror_p mirror, const char * filename ) { int map_fd; uint64_t size; @@ -163,7 +163,7 @@ void mirror_init( struct mirror * mirror, const char * filename ) /* Call this before a mirror attempt. */ -void mirror_reset( struct mirror * mirror ) +void mirror_reset( mirror_p mirror ) { NULLCHECK( mirror ); mirror_set_state( mirror, MS_INIT ); @@ -176,16 +176,16 @@ void mirror_reset( struct mirror * mirror ) } -struct mirror * mirror_create( +mirror_p mirror_create( const char * filename, union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, int action_at_finish, - struct mbox * commit_signal) + mbox_p commit_signal) { /* FIXME: shouldn't map_fd get closed? */ - struct mirror * mirror; + mirror_p mirror; mirror = mirror_alloc( connect_to, connect_from, @@ -201,7 +201,7 @@ struct mirror * mirror_create( } -void mirror_destroy( struct mirror *mirror ) +void mirror_destroy( mirror_p mirror ) { NULLCHECK( mirror ); self_pipe_destroy( mirror->abandon_signal ); @@ -254,7 +254,7 @@ void mirror_cleanup( struct server * serve, int fatal __attribute__((unused))) { NULLCHECK( serve ); - struct mirror * mirror = serve->mirror; + mirror_p mirror = serve->mirror; NULLCHECK( mirror ); info( "Cleaning up mirror thread"); @@ -270,7 +270,7 @@ void mirror_cleanup( struct server * serve, } -int mirror_connect( struct mirror * mirror, uint64_t local_size ) +int mirror_connect( mirror_p mirror, uint64_t local_size ) { struct sockaddr * connect_from = NULL; int connected = 0; @@ -325,7 +325,7 @@ int mirror_connect( struct mirror * mirror, uint64_t local_size ) } -int mirror_should_quit( struct mirror * mirror ) +int mirror_should_quit( mirror_p mirror ) { switch( mirror->action_at_finish ) { case ACTION_EXIT: @@ -359,7 +359,7 @@ int mirror_should_wait( struct mirror_ctrl *ctrl ) * next transfer, then puts it together. */ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl ) { - struct mirror* mirror = ctrl->mirror; + mirror_p mirror = ctrl->mirror; struct server* serve = ctrl->serve; struct bitset_stream_entry e = { .event = BITSET_STREAM_UNSET }; uint64_t current = mirror->offset, run = 0, size = serve->size; @@ -508,7 +508,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents ) struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data; NULLCHECK( ctrl ); - struct mirror *m = ctrl->mirror; + mirror_p m = ctrl->mirror; NULLCHECK( m ); struct xfer *xfer = &ctrl->xfer; @@ -733,7 +733,7 @@ void mirror_run( struct server *serve ) NULLCHECK( serve ); NULLCHECK( serve->mirror ); - struct mirror *m = serve->mirror; + mirror_p m = serve->mirror; m->migration_started = monotonic_time_ms(); info("Starting mirror" ); @@ -849,18 +849,17 @@ void mirror_run( struct server *serve ) } -void mbox_post_mirror_state( struct mbox * mbox, enum mirror_state st ) +void mbox_post_mirror_state( mbox_p mbox, mirror_state_t st ) { NULLCHECK( mbox ); - enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) ); - *contents = st; + mbox_item_t ste = { .i = st }; - mbox_post( mbox, contents ); + mbox_post( mbox, ste ); } -void mirror_signal_commit( struct mirror * mirror ) +void mirror_signal_commit( mirror_p mirror ) { NULLCHECK( mirror ); @@ -887,7 +886,7 @@ void* mirror_runner(void* serve_params_uncast) NULLCHECK( serve ); NULLCHECK( serve->mirror ); - struct mirror * mirror = serve->mirror; + mirror_p mirror = serve->mirror; error_set_handler( (cleanup_handler *) mirror_cleanup, serve ); @@ -932,15 +931,15 @@ abandon_mirror: } -struct mirror_super * mirror_super_create( +mirror_super_p mirror_super_create( const char * filename, union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, - enum mirror_finish_action action_at_finish, - struct mbox * state_mbox) + mirror_finish_action_t action_at_finish, + mbox_p state_mbox) { - struct mirror_super * super = xmalloc( sizeof( struct mirror_super) ); + mirror_super_p super = xmalloc( sizeof( struct mirror_super_t) ); super->mirror = mirror_create( filename, connect_to, @@ -955,8 +954,8 @@ struct mirror_super * mirror_super_create( /* Post the current state of the mirror into super->state_mbox.*/ void mirror_super_signal_committed( - struct mirror_super * super , - enum mirror_state commit_state ) + mirror_super_p super , + mirror_state_t commit_state ) { NULLCHECK( super ); NULLCHECK( super->state_mbox ); @@ -967,7 +966,7 @@ void mirror_super_signal_committed( } -void mirror_super_destroy( struct mirror_super * super ) +void mirror_super_destroy( mirror_super_p super ) { NULLCHECK( super ); @@ -993,8 +992,8 @@ void * mirror_super_runner( void * serve_uncast ) int should_retry = 0; int success = 0, abandoned = 0; - struct mirror * mirror = serve->mirror; - struct mirror_super * super = serve->mirror_super; + mirror_p mirror = serve->mirror; + mirror_super_p super = serve->mirror_super; do { FATAL_IF( 0 != pthread_create( @@ -1005,8 +1004,8 @@ void * mirror_super_runner( void * serve_uncast ) "Failed to create mirror thread"); debug("Supervisor waiting for commit signal"); - enum mirror_state * commit_state = - mbox_receive( mirror->commit_signal ); + mirror_state_t commit_state = + mbox_receive( mirror->commit_signal ).i; debug( "Supervisor got commit signal" ); if ( first_pass ) { @@ -1015,18 +1014,14 @@ void * mirror_super_runner( void * serve_uncast ) * retry behind the scenes. This may race with migration completing * but since we "shouldn't retry" in that case either, that's fine */ - should_retry = *commit_state == MS_GO; + should_retry = commit_state == MS_GO; /* Only send this signal the first time */ mirror_super_signal_committed( super, - *commit_state); + commit_state); debug("Mirror supervisor committed"); } - /* We only care about the value of the commit signal on - * the first pass, so this is ok - */ - free( commit_state ); debug("Supervisor waiting for mirror thread" ); pthread_join( mirror->thread, NULL ); diff --git a/src/server/mirror.h b/src/server/mirror.h index d390fdf..cd38cf1 100644 --- a/src/server/mirror.h +++ b/src/server/mirror.h @@ -7,7 +7,7 @@ #include "bitset.h" #include "self_pipe.h" -enum mirror_state; + #include "serve.h" #include "mbox.h" @@ -57,14 +57,14 @@ enum mirror_state; #define MS_REQUEST_LIMIT_SECS 60 #define MS_REQUEST_LIMIT_SECS_F 60.0 -enum mirror_finish_action { - ACTION_EXIT, +typedef enum { + ACTION_EXIT = 0, ACTION_UNLINK, ACTION_NOTHING -}; +} mirror_finish_action_t; -enum mirror_state { - MS_UNKNOWN, +typedef enum { + MS_UNKNOWN = 0, MS_INIT, MS_GO, MS_ABANDONED, @@ -73,9 +73,9 @@ enum mirror_state { MS_FAIL_REJECTED, MS_FAIL_NO_HELLO, MS_FAIL_SIZE_MISMATCH -}; +} mirror_state_t; -struct mirror { +typedef struct mirror_t { pthread_t thread; /* Signal to this then join the thread if you want to abandon mirroring */ @@ -90,19 +90,19 @@ struct mirror { * over the network) are considered */ uint64_t max_bytes_per_second; - enum mirror_finish_action action_at_finish; + mirror_finish_action_t action_at_finish; char *mapped; /* We need to send every byte at least once; we do so by */ uint64_t offset; - enum mirror_state commit_state; + mirror_state_t commit_state; /* commit_signal is sent immediately after attempting to connect * and checking the remote size, whether successful or not. */ - struct mbox * commit_signal; + struct mbox_t * commit_signal; /* The time (from monotonic_time_ms()) the migration was started. Can be * used to calculate bps, etc. */ @@ -110,14 +110,14 @@ struct mirror { /* Running count of all bytes we've transferred */ uint64_t all_dirty; -}; +} mirror_t, *mirror_p; -struct mirror_super { - struct mirror * mirror; +typedef struct mirror_super_t { + mirror_p mirror; pthread_t thread; - struct mbox * state_mbox; -}; + struct mbox_t * state_mbox; +} mirror_super_t, *mirror_super_p; @@ -127,13 +127,13 @@ struct mirror_super { struct server; struct flexnbd; -struct mirror_super * mirror_super_create( +mirror_super_p mirror_super_create( const char * filename, union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, - enum mirror_finish_action action_at_finish, - struct mbox * state_mbox + mirror_finish_action_t action_at_finish, + struct mbox_t * state_mbox ); void * mirror_super_runner( void * serve_uncast ); diff --git a/src/server/serve.h b/src/server/serve.h index 5d04e14..252f3b5 100644 --- a/src/server/serve.h +++ b/src/server/serve.h @@ -53,8 +53,8 @@ struct server { * shutting down on a SIGTERM. */ struct flexthread_mutex * l_start_mirror; - struct mirror* mirror; - struct mirror_super * mirror_super; + struct mirror_t * mirror; + struct mirror_super_t * mirror_super; /* This is used to stop the mirror from starting after we * receive a SIGTERM */ int mirror_can_start; diff --git a/tests/unit/check_mbox.c b/tests/unit/check_mbox.c index 5399531..65a7827 100644 --- a/tests/unit/check_mbox.c +++ b/tests/unit/check_mbox.c @@ -6,7 +6,7 @@ START_TEST( test_allocs_cvar ) { - struct mbox * mbox = mbox_create(); + struct mbox_t * mbox = mbox_create(); fail_if( NULL == mbox, "Nothing allocated" ); pthread_cond_t cond_zero; @@ -22,7 +22,7 @@ END_TEST START_TEST( test_post_stores_value ) { - struct mbox * mbox = mbox_create(); + struct mbox_t * mbox = mbox_create(); void * deadbeef = (void *)0xDEADBEEF; mbox_post( mbox, deadbeef ); @@ -33,19 +33,18 @@ START_TEST( test_post_stores_value ) END_TEST -void * mbox_receive_runner( void * mbox_uncast ) +mbox_item_t mbox_receive_runner( void * mbox_uncast ) { - struct mbox * mbox = (struct mbox *)mbox_uncast; + struct mbox_t * mbox = (struct mbox_t *)mbox_uncast; void * contents = NULL; - contents = mbox_receive( mbox ); - return contents; + return mbox_receive( mbox ); } START_TEST( test_receive_blocks_until_post ) { - struct mbox * mbox = mbox_create(); + struct mbox_t * mbox = mbox_create(); pthread_t receiver; pthread_create( &receiver, NULL, mbox_receive_runner, mbox );