mbox: Simplified
Removed the existing mbox that is used to pass transactions from one thread to the other, use a non-locking FIFO and a simple semaphore instead. Removed all notion of void* from the FIFO system, use the now generic FIFO. Also started to rework the mirror.c code to use typedefs for structs enums etc. DO NOT USE. check_mbox is borken and needs changing. Currently 'functional' otherwise like this, but requires more testing. Signed-off-by: Michel Pollet <buserror@gmail.com>
This commit is contained in:
@@ -83,7 +83,7 @@ void control_destroy( struct control * control )
|
|||||||
struct control_client * control_client_create(
|
struct control_client * control_client_create(
|
||||||
struct flexnbd * flexnbd,
|
struct flexnbd * flexnbd,
|
||||||
int client_fd ,
|
int client_fd ,
|
||||||
struct mbox * state_mbox )
|
struct mbox_t * state_mbox )
|
||||||
{
|
{
|
||||||
NULLCHECK( flexnbd );
|
NULLCHECK( flexnbd );
|
||||||
|
|
||||||
@@ -256,7 +256,7 @@ void * control_runner( void * control_uncast )
|
|||||||
|
|
||||||
#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1)
|
#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1)
|
||||||
|
|
||||||
void control_write_mirror_response( enum mirror_state mirror_state, int client_fd )
|
void control_write_mirror_response( mirror_state_t mirror_state, int client_fd )
|
||||||
{
|
{
|
||||||
switch (mirror_state) {
|
switch (mirror_state) {
|
||||||
case MS_INIT:
|
case MS_INIT:
|
||||||
@@ -290,22 +290,16 @@ void control_write_mirror_response( enum mirror_state mirror_state, int client_f
|
|||||||
|
|
||||||
|
|
||||||
/* Call this in the thread where you want to receive the mirror state */
|
/* Call this in the thread where you want to receive the mirror state */
|
||||||
enum mirror_state control_client_mirror_wait(
|
mirror_state_t control_client_mirror_wait(
|
||||||
struct control_client* client)
|
struct control_client* client)
|
||||||
{
|
{
|
||||||
NULLCHECK( client );
|
NULLCHECK( client );
|
||||||
NULLCHECK( client->mirror_state_mbox );
|
NULLCHECK( client->mirror_state_mbox );
|
||||||
|
|
||||||
struct mbox * mbox = client->mirror_state_mbox;
|
struct mbox_t * mbox = client->mirror_state_mbox;
|
||||||
enum mirror_state mirror_state;
|
mirror_state_t mirror_state;
|
||||||
enum mirror_state * contents;
|
|
||||||
|
|
||||||
contents = (enum mirror_state*)mbox_receive( mbox );
|
mirror_state = mbox_receive( mbox ).i;
|
||||||
NULLCHECK( contents );
|
|
||||||
|
|
||||||
mirror_state = *contents;
|
|
||||||
|
|
||||||
free( contents );
|
|
||||||
|
|
||||||
return mirror_state;
|
return mirror_state;
|
||||||
}
|
}
|
||||||
@@ -425,7 +419,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
|
|||||||
);
|
);
|
||||||
|
|
||||||
debug("Control thread mirror super waiting");
|
debug("Control thread mirror super waiting");
|
||||||
enum mirror_state state =
|
mirror_state_t state =
|
||||||
control_client_mirror_wait( client );
|
control_client_mirror_wait( client );
|
||||||
debug("Control thread writing response");
|
debug("Control thread writing response");
|
||||||
control_write_mirror_response( state, client->socket );
|
control_write_mirror_response( state, client->socket );
|
||||||
|
@@ -31,7 +31,7 @@ struct control {
|
|||||||
* process (and we can only have a mirror thread if the control
|
* process (and we can only have a mirror thread if the control
|
||||||
* thread has started it).
|
* thread has started it).
|
||||||
*/
|
*/
|
||||||
struct mbox * mirror_state_mbox;
|
struct mbox_t * mirror_state_mbox;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct control_client{
|
struct control_client{
|
||||||
@@ -41,7 +41,7 @@ struct control_client{
|
|||||||
/* Passed in on creation. We know it's all right to do this
|
/* Passed in on creation. We know it's all right to do this
|
||||||
* because we know there's only ever one control_client.
|
* because we know there's only ever one control_client.
|
||||||
*/
|
*/
|
||||||
struct mbox * mirror_state_mbox;
|
struct mbox_t * mirror_state_mbox;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct control * control_create(
|
struct control * control_create(
|
||||||
|
@@ -1,77 +1,73 @@
|
|||||||
|
|
||||||
#include "mbox.h"
|
#include "mbox.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
|
|
||||||
|
#include <sys/socket.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|
||||||
struct mbox * mbox_create( void )
|
DEFINE_FIFO(mbox_item_t, mbox_fifo);
|
||||||
|
|
||||||
|
#define ARRAY_SIZE(w) (sizeof(w) / sizeof((w)[0]))
|
||||||
|
|
||||||
|
mbox_p mbox_create( void )
|
||||||
{
|
{
|
||||||
struct mbox * mbox = xmalloc( sizeof( struct mbox ) );
|
mbox_p mbox = xmalloc( sizeof( struct mbox_t ) );
|
||||||
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->filled_cond, NULL ),
|
|
||||||
"Failed to initialise a condition variable" );
|
int sv[2];
|
||||||
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->emptied_cond, NULL ),
|
FATAL_UNLESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0,
|
||||||
"Failed to initialise a condition variable" );
|
"Failed to socketpair");
|
||||||
FATAL_UNLESS( 0 == pthread_mutex_init( &mbox->mutex, NULL ),
|
mbox->signalw = sv[0];
|
||||||
"Failed to initialise a mutex" );
|
mbox->signalr = sv[1];
|
||||||
|
|
||||||
return mbox;
|
return mbox;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mbox_post( struct mbox * mbox, void * contents )
|
void mbox_post( mbox_p mbox, mbox_item_t item )
|
||||||
{
|
{
|
||||||
pthread_mutex_lock( &mbox->mutex );
|
mbox_fifo_write(&mbox->fifo, item);
|
||||||
{
|
{
|
||||||
if (mbox->full){
|
uint8_t w;
|
||||||
pthread_cond_wait( &mbox->emptied_cond, &mbox->mutex );
|
FATAL_UNLESS((write(mbox->signalw, &w, 1)) == 1,
|
||||||
}
|
"Write to socketpair");
|
||||||
mbox->contents = contents;
|
|
||||||
mbox->full = 1;
|
|
||||||
while( 0 != pthread_cond_signal( &mbox->filled_cond ) );
|
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock( &mbox->mutex );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void * mbox_contents( struct mbox * mbox )
|
mbox_item_t mbox_contents( mbox_p mbox )
|
||||||
{
|
{
|
||||||
return mbox->contents;
|
const mbox_item_t zero = {0};
|
||||||
|
|
||||||
|
return mbox_fifo_isempty(&mbox->fifo) ?
|
||||||
|
zero :
|
||||||
|
mbox_fifo_read_at(&mbox->fifo, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int mbox_is_full( struct mbox * mbox )
|
int mbox_is_full( mbox_p mbox )
|
||||||
{
|
{
|
||||||
return mbox->full;
|
return mbox_fifo_isfull(&mbox->fifo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void * mbox_receive( struct mbox * mbox )
|
mbox_item_t mbox_receive( mbox_p mbox )
|
||||||
{
|
|
||||||
NULLCHECK( mbox );
|
|
||||||
void * result;
|
|
||||||
|
|
||||||
pthread_mutex_lock( &mbox->mutex );
|
|
||||||
{
|
|
||||||
if ( !mbox->full ) {
|
|
||||||
pthread_cond_wait( &mbox->filled_cond, &mbox->mutex );
|
|
||||||
}
|
|
||||||
mbox->full = 0;
|
|
||||||
result = mbox->contents;
|
|
||||||
mbox->contents = NULL;
|
|
||||||
|
|
||||||
while( 0 != pthread_cond_signal( &mbox->emptied_cond));
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock( &mbox->mutex );
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void mbox_destroy( struct mbox * mbox )
|
|
||||||
{
|
{
|
||||||
NULLCHECK( mbox );
|
NULLCHECK( mbox );
|
||||||
|
|
||||||
while( 0 != pthread_cond_destroy( &mbox->emptied_cond ) );
|
while (mbox_fifo_isempty(&mbox->fifo)) {
|
||||||
while( 0 != pthread_cond_destroy( &mbox->filled_cond ) );
|
uint8_t w;
|
||||||
|
FATAL_UNLESS((read(mbox->signalr, &w, 1)) == 1,
|
||||||
|
"Read from socketpair");
|
||||||
|
}
|
||||||
|
|
||||||
while( 0 != pthread_mutex_destroy( &mbox->mutex ) );
|
return mbox_fifo_read(&mbox->fifo);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void mbox_destroy( mbox_p mbox )
|
||||||
|
{
|
||||||
|
NULLCHECK( mbox );
|
||||||
|
|
||||||
|
close(mbox->signalw);
|
||||||
|
close(mbox->signalr);
|
||||||
free( mbox );
|
free( mbox );
|
||||||
}
|
}
|
||||||
|
@@ -11,45 +11,43 @@
|
|||||||
|
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include "fifo_declare.h"
|
||||||
|
|
||||||
|
typedef union {
|
||||||
|
uint64_t i;
|
||||||
|
void * p;
|
||||||
|
} mbox_item_t;
|
||||||
|
|
||||||
|
DECLARE_FIFO(mbox_item_t, mbox_fifo, 8);
|
||||||
|
|
||||||
|
typedef struct mbox_t {
|
||||||
|
mbox_fifo_t fifo;
|
||||||
|
// socketpair() ends
|
||||||
|
int signalw, signalr;
|
||||||
|
} mbox_t, *mbox_p;
|
||||||
|
|
||||||
|
|
||||||
struct mbox {
|
/* Create an mbox_t. */
|
||||||
void * contents;
|
mbox_p mbox_create(void);
|
||||||
|
|
||||||
/** Marker to tell us if there's content in the box.
|
|
||||||
* Keeping this separate allows us to use NULL for the contents.
|
|
||||||
*/
|
|
||||||
int full;
|
|
||||||
|
|
||||||
/** This gets signaled by mbox_post, and waited on by
|
|
||||||
* mbox_receive */
|
|
||||||
pthread_cond_t filled_cond;
|
|
||||||
/** This is signaled by mbox_receive, and waited on by mbox_post */
|
|
||||||
pthread_cond_t emptied_cond;
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/* Create an mbox. */
|
|
||||||
struct mbox * mbox_create(void);
|
|
||||||
|
|
||||||
/* Put something in the mbox, blocking if it's already full.
|
/* Put something in the mbox, blocking if it's already full.
|
||||||
* That something can be NULL if you want.
|
* That something can be NULL if you want.
|
||||||
*/
|
*/
|
||||||
void mbox_post( struct mbox *, void *);
|
void mbox_post( mbox_p , mbox_item_t item);
|
||||||
|
|
||||||
/* See what's in the mbox. This isn't thread-safe. */
|
/* See what's in the mbox. This isn't thread-safe. */
|
||||||
void * mbox_contents( struct mbox *);
|
mbox_item_t mbox_contents( mbox_p );
|
||||||
|
|
||||||
/* See if anything has been put into the mbox. This isn't thread-safe.
|
/* See if anything has been put into the mbox. This isn't thread-safe.
|
||||||
* */
|
* */
|
||||||
int mbox_is_full( struct mbox *);
|
int mbox_is_full( mbox_p );
|
||||||
|
|
||||||
/* Get the contents from the mbox, blocking if there's nothing there. */
|
/* Get the contents from the mbox, blocking if there's nothing there. */
|
||||||
void * mbox_receive( struct mbox *);
|
mbox_item_t mbox_receive( mbox_p );
|
||||||
|
|
||||||
/* Free the mbox and destroy the associated pthread bits. */
|
/* Free the mbox and destroy the associated pthread bits. */
|
||||||
void mbox_destroy( struct mbox *);
|
void mbox_destroy( mbox_p );
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@@ -66,7 +66,7 @@ struct xfer {
|
|||||||
|
|
||||||
struct mirror_ctrl {
|
struct mirror_ctrl {
|
||||||
struct server *serve;
|
struct server *serve;
|
||||||
struct mirror *mirror;
|
mirror_p mirror;
|
||||||
|
|
||||||
/* libev stuff */
|
/* libev stuff */
|
||||||
struct ev_loop *ev_loop;
|
struct ev_loop *ev_loop;
|
||||||
@@ -90,16 +90,16 @@ struct mirror_ctrl {
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct mirror * mirror_alloc(
|
mirror_p mirror_alloc(
|
||||||
union mysockaddr * connect_to,
|
union mysockaddr * connect_to,
|
||||||
union mysockaddr * connect_from,
|
union mysockaddr * connect_from,
|
||||||
uint64_t max_Bps,
|
uint64_t max_Bps,
|
||||||
enum mirror_finish_action action_at_finish,
|
mirror_finish_action_t action_at_finish,
|
||||||
struct mbox * commit_signal)
|
mbox_p commit_signal)
|
||||||
{
|
{
|
||||||
struct mirror * mirror;
|
mirror_p mirror;
|
||||||
|
|
||||||
mirror = xmalloc(sizeof(struct mirror));
|
mirror = xmalloc(sizeof(mirror_t));
|
||||||
mirror->connect_to = connect_to;
|
mirror->connect_to = connect_to;
|
||||||
mirror->connect_from = connect_from;
|
mirror->connect_from = connect_from;
|
||||||
mirror->max_bytes_per_second = max_Bps;
|
mirror->max_bytes_per_second = max_Bps;
|
||||||
@@ -116,7 +116,7 @@ struct mirror * mirror_alloc(
|
|||||||
return mirror;
|
return mirror;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mirror_set_state_f( struct mirror * mirror, enum mirror_state state )
|
void mirror_set_state_f( mirror_p mirror, mirror_state_t state )
|
||||||
{
|
{
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
mirror->commit_state = state;
|
mirror->commit_state = state;
|
||||||
@@ -127,7 +127,7 @@ void mirror_set_state_f( struct mirror * mirror, enum mirror_state state )
|
|||||||
mirror_set_state_f( mirror, state );\
|
mirror_set_state_f( mirror, state );\
|
||||||
} while(0)
|
} while(0)
|
||||||
|
|
||||||
enum mirror_state mirror_get_state( struct mirror * mirror )
|
mirror_state_t mirror_get_state( mirror_p mirror )
|
||||||
{
|
{
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
return mirror->commit_state;
|
return mirror->commit_state;
|
||||||
@@ -136,7 +136,7 @@ enum mirror_state mirror_get_state( struct mirror * mirror )
|
|||||||
#define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state
|
#define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state
|
||||||
|
|
||||||
|
|
||||||
void mirror_init( struct mirror * mirror, const char * filename )
|
void mirror_init( mirror_p mirror, const char * filename )
|
||||||
{
|
{
|
||||||
int map_fd;
|
int map_fd;
|
||||||
uint64_t size;
|
uint64_t size;
|
||||||
@@ -163,7 +163,7 @@ void mirror_init( struct mirror * mirror, const char * filename )
|
|||||||
|
|
||||||
|
|
||||||
/* Call this before a mirror attempt. */
|
/* Call this before a mirror attempt. */
|
||||||
void mirror_reset( struct mirror * mirror )
|
void mirror_reset( mirror_p mirror )
|
||||||
{
|
{
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
mirror_set_state( mirror, MS_INIT );
|
mirror_set_state( mirror, MS_INIT );
|
||||||
@@ -176,16 +176,16 @@ void mirror_reset( struct mirror * mirror )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
struct mirror * mirror_create(
|
mirror_p mirror_create(
|
||||||
const char * filename,
|
const char * filename,
|
||||||
union mysockaddr * connect_to,
|
union mysockaddr * connect_to,
|
||||||
union mysockaddr * connect_from,
|
union mysockaddr * connect_from,
|
||||||
uint64_t max_Bps,
|
uint64_t max_Bps,
|
||||||
int action_at_finish,
|
int action_at_finish,
|
||||||
struct mbox * commit_signal)
|
mbox_p commit_signal)
|
||||||
{
|
{
|
||||||
/* FIXME: shouldn't map_fd get closed? */
|
/* FIXME: shouldn't map_fd get closed? */
|
||||||
struct mirror * mirror;
|
mirror_p mirror;
|
||||||
|
|
||||||
mirror = mirror_alloc( connect_to,
|
mirror = mirror_alloc( connect_to,
|
||||||
connect_from,
|
connect_from,
|
||||||
@@ -201,7 +201,7 @@ struct mirror * mirror_create(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mirror_destroy( struct mirror *mirror )
|
void mirror_destroy( mirror_p mirror )
|
||||||
{
|
{
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
self_pipe_destroy( mirror->abandon_signal );
|
self_pipe_destroy( mirror->abandon_signal );
|
||||||
@@ -254,7 +254,7 @@ void mirror_cleanup( struct server * serve,
|
|||||||
int fatal __attribute__((unused)))
|
int fatal __attribute__((unused)))
|
||||||
{
|
{
|
||||||
NULLCHECK( serve );
|
NULLCHECK( serve );
|
||||||
struct mirror * mirror = serve->mirror;
|
mirror_p mirror = serve->mirror;
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
info( "Cleaning up mirror thread");
|
info( "Cleaning up mirror thread");
|
||||||
|
|
||||||
@@ -270,7 +270,7 @@ void mirror_cleanup( struct server * serve,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int mirror_connect( struct mirror * mirror, uint64_t local_size )
|
int mirror_connect( mirror_p mirror, uint64_t local_size )
|
||||||
{
|
{
|
||||||
struct sockaddr * connect_from = NULL;
|
struct sockaddr * connect_from = NULL;
|
||||||
int connected = 0;
|
int connected = 0;
|
||||||
@@ -325,7 +325,7 @@ int mirror_connect( struct mirror * mirror, uint64_t local_size )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int mirror_should_quit( struct mirror * mirror )
|
int mirror_should_quit( mirror_p mirror )
|
||||||
{
|
{
|
||||||
switch( mirror->action_at_finish ) {
|
switch( mirror->action_at_finish ) {
|
||||||
case ACTION_EXIT:
|
case ACTION_EXIT:
|
||||||
@@ -359,7 +359,7 @@ int mirror_should_wait( struct mirror_ctrl *ctrl )
|
|||||||
* next transfer, then puts it together. */
|
* next transfer, then puts it together. */
|
||||||
int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
|
int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
|
||||||
{
|
{
|
||||||
struct mirror* mirror = ctrl->mirror;
|
mirror_p mirror = ctrl->mirror;
|
||||||
struct server* serve = ctrl->serve;
|
struct server* serve = ctrl->serve;
|
||||||
struct bitset_stream_entry e = { .event = BITSET_STREAM_UNSET };
|
struct bitset_stream_entry e = { .event = BITSET_STREAM_UNSET };
|
||||||
uint64_t current = mirror->offset, run = 0, size = serve->size;
|
uint64_t current = mirror->offset, run = 0, size = serve->size;
|
||||||
@@ -508,7 +508,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|||||||
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
||||||
NULLCHECK( ctrl );
|
NULLCHECK( ctrl );
|
||||||
|
|
||||||
struct mirror *m = ctrl->mirror;
|
mirror_p m = ctrl->mirror;
|
||||||
NULLCHECK( m );
|
NULLCHECK( m );
|
||||||
|
|
||||||
struct xfer *xfer = &ctrl->xfer;
|
struct xfer *xfer = &ctrl->xfer;
|
||||||
@@ -733,7 +733,7 @@ void mirror_run( struct server *serve )
|
|||||||
NULLCHECK( serve );
|
NULLCHECK( serve );
|
||||||
NULLCHECK( serve->mirror );
|
NULLCHECK( serve->mirror );
|
||||||
|
|
||||||
struct mirror *m = serve->mirror;
|
mirror_p m = serve->mirror;
|
||||||
|
|
||||||
m->migration_started = monotonic_time_ms();
|
m->migration_started = monotonic_time_ms();
|
||||||
info("Starting mirror" );
|
info("Starting mirror" );
|
||||||
@@ -849,18 +849,17 @@ void mirror_run( struct server *serve )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mbox_post_mirror_state( struct mbox * mbox, enum mirror_state st )
|
void mbox_post_mirror_state( mbox_p mbox, mirror_state_t st )
|
||||||
{
|
{
|
||||||
NULLCHECK( mbox );
|
NULLCHECK( mbox );
|
||||||
enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) );
|
|
||||||
|
|
||||||
*contents = st;
|
mbox_item_t ste = { .i = st };
|
||||||
|
|
||||||
mbox_post( mbox, contents );
|
mbox_post( mbox, ste );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mirror_signal_commit( struct mirror * mirror )
|
void mirror_signal_commit( mirror_p mirror )
|
||||||
{
|
{
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
|
|
||||||
@@ -887,7 +886,7 @@ void* mirror_runner(void* serve_params_uncast)
|
|||||||
|
|
||||||
NULLCHECK( serve );
|
NULLCHECK( serve );
|
||||||
NULLCHECK( serve->mirror );
|
NULLCHECK( serve->mirror );
|
||||||
struct mirror * mirror = serve->mirror;
|
mirror_p mirror = serve->mirror;
|
||||||
|
|
||||||
error_set_handler( (cleanup_handler *) mirror_cleanup, serve );
|
error_set_handler( (cleanup_handler *) mirror_cleanup, serve );
|
||||||
|
|
||||||
@@ -932,15 +931,15 @@ abandon_mirror:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
struct mirror_super * mirror_super_create(
|
mirror_super_p mirror_super_create(
|
||||||
const char * filename,
|
const char * filename,
|
||||||
union mysockaddr * connect_to,
|
union mysockaddr * connect_to,
|
||||||
union mysockaddr * connect_from,
|
union mysockaddr * connect_from,
|
||||||
uint64_t max_Bps,
|
uint64_t max_Bps,
|
||||||
enum mirror_finish_action action_at_finish,
|
mirror_finish_action_t action_at_finish,
|
||||||
struct mbox * state_mbox)
|
mbox_p state_mbox)
|
||||||
{
|
{
|
||||||
struct mirror_super * super = xmalloc( sizeof( struct mirror_super) );
|
mirror_super_p super = xmalloc( sizeof( struct mirror_super_t) );
|
||||||
super->mirror = mirror_create(
|
super->mirror = mirror_create(
|
||||||
filename,
|
filename,
|
||||||
connect_to,
|
connect_to,
|
||||||
@@ -955,8 +954,8 @@ struct mirror_super * mirror_super_create(
|
|||||||
|
|
||||||
/* Post the current state of the mirror into super->state_mbox.*/
|
/* Post the current state of the mirror into super->state_mbox.*/
|
||||||
void mirror_super_signal_committed(
|
void mirror_super_signal_committed(
|
||||||
struct mirror_super * super ,
|
mirror_super_p super ,
|
||||||
enum mirror_state commit_state )
|
mirror_state_t commit_state )
|
||||||
{
|
{
|
||||||
NULLCHECK( super );
|
NULLCHECK( super );
|
||||||
NULLCHECK( super->state_mbox );
|
NULLCHECK( super->state_mbox );
|
||||||
@@ -967,7 +966,7 @@ void mirror_super_signal_committed(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mirror_super_destroy( struct mirror_super * super )
|
void mirror_super_destroy( mirror_super_p super )
|
||||||
{
|
{
|
||||||
NULLCHECK( super );
|
NULLCHECK( super );
|
||||||
|
|
||||||
@@ -993,8 +992,8 @@ void * mirror_super_runner( void * serve_uncast )
|
|||||||
int should_retry = 0;
|
int should_retry = 0;
|
||||||
int success = 0, abandoned = 0;
|
int success = 0, abandoned = 0;
|
||||||
|
|
||||||
struct mirror * mirror = serve->mirror;
|
mirror_p mirror = serve->mirror;
|
||||||
struct mirror_super * super = serve->mirror_super;
|
mirror_super_p super = serve->mirror_super;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
FATAL_IF( 0 != pthread_create(
|
FATAL_IF( 0 != pthread_create(
|
||||||
@@ -1005,8 +1004,8 @@ void * mirror_super_runner( void * serve_uncast )
|
|||||||
"Failed to create mirror thread");
|
"Failed to create mirror thread");
|
||||||
|
|
||||||
debug("Supervisor waiting for commit signal");
|
debug("Supervisor waiting for commit signal");
|
||||||
enum mirror_state * commit_state =
|
mirror_state_t commit_state =
|
||||||
mbox_receive( mirror->commit_signal );
|
mbox_receive( mirror->commit_signal ).i;
|
||||||
|
|
||||||
debug( "Supervisor got commit signal" );
|
debug( "Supervisor got commit signal" );
|
||||||
if ( first_pass ) {
|
if ( first_pass ) {
|
||||||
@@ -1015,18 +1014,14 @@ void * mirror_super_runner( void * serve_uncast )
|
|||||||
* retry behind the scenes. This may race with migration completing
|
* retry behind the scenes. This may race with migration completing
|
||||||
* but since we "shouldn't retry" in that case either, that's fine
|
* but since we "shouldn't retry" in that case either, that's fine
|
||||||
*/
|
*/
|
||||||
should_retry = *commit_state == MS_GO;
|
should_retry = commit_state == MS_GO;
|
||||||
|
|
||||||
/* Only send this signal the first time */
|
/* Only send this signal the first time */
|
||||||
mirror_super_signal_committed(
|
mirror_super_signal_committed(
|
||||||
super,
|
super,
|
||||||
*commit_state);
|
commit_state);
|
||||||
debug("Mirror supervisor committed");
|
debug("Mirror supervisor committed");
|
||||||
}
|
}
|
||||||
/* We only care about the value of the commit signal on
|
|
||||||
* the first pass, so this is ok
|
|
||||||
*/
|
|
||||||
free( commit_state );
|
|
||||||
|
|
||||||
debug("Supervisor waiting for mirror thread" );
|
debug("Supervisor waiting for mirror thread" );
|
||||||
pthread_join( mirror->thread, NULL );
|
pthread_join( mirror->thread, NULL );
|
||||||
|
@@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
#include "bitset.h"
|
#include "bitset.h"
|
||||||
#include "self_pipe.h"
|
#include "self_pipe.h"
|
||||||
enum mirror_state;
|
|
||||||
#include "serve.h"
|
#include "serve.h"
|
||||||
#include "mbox.h"
|
#include "mbox.h"
|
||||||
|
|
||||||
@@ -57,14 +57,14 @@ enum mirror_state;
|
|||||||
#define MS_REQUEST_LIMIT_SECS 60
|
#define MS_REQUEST_LIMIT_SECS 60
|
||||||
#define MS_REQUEST_LIMIT_SECS_F 60.0
|
#define MS_REQUEST_LIMIT_SECS_F 60.0
|
||||||
|
|
||||||
enum mirror_finish_action {
|
typedef enum {
|
||||||
ACTION_EXIT,
|
ACTION_EXIT = 0,
|
||||||
ACTION_UNLINK,
|
ACTION_UNLINK,
|
||||||
ACTION_NOTHING
|
ACTION_NOTHING
|
||||||
};
|
} mirror_finish_action_t;
|
||||||
|
|
||||||
enum mirror_state {
|
typedef enum {
|
||||||
MS_UNKNOWN,
|
MS_UNKNOWN = 0,
|
||||||
MS_INIT,
|
MS_INIT,
|
||||||
MS_GO,
|
MS_GO,
|
||||||
MS_ABANDONED,
|
MS_ABANDONED,
|
||||||
@@ -73,9 +73,9 @@ enum mirror_state {
|
|||||||
MS_FAIL_REJECTED,
|
MS_FAIL_REJECTED,
|
||||||
MS_FAIL_NO_HELLO,
|
MS_FAIL_NO_HELLO,
|
||||||
MS_FAIL_SIZE_MISMATCH
|
MS_FAIL_SIZE_MISMATCH
|
||||||
};
|
} mirror_state_t;
|
||||||
|
|
||||||
struct mirror {
|
typedef struct mirror_t {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
|
|
||||||
/* Signal to this then join the thread if you want to abandon mirroring */
|
/* Signal to this then join the thread if you want to abandon mirroring */
|
||||||
@@ -90,19 +90,19 @@ struct mirror {
|
|||||||
* over the network) are considered */
|
* over the network) are considered */
|
||||||
uint64_t max_bytes_per_second;
|
uint64_t max_bytes_per_second;
|
||||||
|
|
||||||
enum mirror_finish_action action_at_finish;
|
mirror_finish_action_t action_at_finish;
|
||||||
|
|
||||||
char *mapped;
|
char *mapped;
|
||||||
|
|
||||||
/* We need to send every byte at least once; we do so by */
|
/* We need to send every byte at least once; we do so by */
|
||||||
uint64_t offset;
|
uint64_t offset;
|
||||||
|
|
||||||
enum mirror_state commit_state;
|
mirror_state_t commit_state;
|
||||||
|
|
||||||
/* commit_signal is sent immediately after attempting to connect
|
/* commit_signal is sent immediately after attempting to connect
|
||||||
* and checking the remote size, whether successful or not.
|
* and checking the remote size, whether successful or not.
|
||||||
*/
|
*/
|
||||||
struct mbox * commit_signal;
|
struct mbox_t * commit_signal;
|
||||||
|
|
||||||
/* The time (from monotonic_time_ms()) the migration was started. Can be
|
/* The time (from monotonic_time_ms()) the migration was started. Can be
|
||||||
* used to calculate bps, etc. */
|
* used to calculate bps, etc. */
|
||||||
@@ -110,14 +110,14 @@ struct mirror {
|
|||||||
|
|
||||||
/* Running count of all bytes we've transferred */
|
/* Running count of all bytes we've transferred */
|
||||||
uint64_t all_dirty;
|
uint64_t all_dirty;
|
||||||
};
|
} mirror_t, *mirror_p;
|
||||||
|
|
||||||
|
|
||||||
struct mirror_super {
|
typedef struct mirror_super_t {
|
||||||
struct mirror * mirror;
|
mirror_p mirror;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
struct mbox * state_mbox;
|
struct mbox_t * state_mbox;
|
||||||
};
|
} mirror_super_t, *mirror_super_p;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -127,13 +127,13 @@ struct mirror_super {
|
|||||||
struct server;
|
struct server;
|
||||||
struct flexnbd;
|
struct flexnbd;
|
||||||
|
|
||||||
struct mirror_super * mirror_super_create(
|
mirror_super_p mirror_super_create(
|
||||||
const char * filename,
|
const char * filename,
|
||||||
union mysockaddr * connect_to,
|
union mysockaddr * connect_to,
|
||||||
union mysockaddr * connect_from,
|
union mysockaddr * connect_from,
|
||||||
uint64_t max_Bps,
|
uint64_t max_Bps,
|
||||||
enum mirror_finish_action action_at_finish,
|
mirror_finish_action_t action_at_finish,
|
||||||
struct mbox * state_mbox
|
struct mbox_t * state_mbox
|
||||||
);
|
);
|
||||||
void * mirror_super_runner( void * serve_uncast );
|
void * mirror_super_runner( void * serve_uncast );
|
||||||
|
|
||||||
|
@@ -53,8 +53,8 @@ struct server {
|
|||||||
* shutting down on a SIGTERM. */
|
* shutting down on a SIGTERM. */
|
||||||
struct flexthread_mutex * l_start_mirror;
|
struct flexthread_mutex * l_start_mirror;
|
||||||
|
|
||||||
struct mirror* mirror;
|
struct mirror_t * mirror;
|
||||||
struct mirror_super * mirror_super;
|
struct mirror_super_t * mirror_super;
|
||||||
/* This is used to stop the mirror from starting after we
|
/* This is used to stop the mirror from starting after we
|
||||||
* receive a SIGTERM */
|
* receive a SIGTERM */
|
||||||
int mirror_can_start;
|
int mirror_can_start;
|
||||||
|
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
START_TEST( test_allocs_cvar )
|
START_TEST( test_allocs_cvar )
|
||||||
{
|
{
|
||||||
struct mbox * mbox = mbox_create();
|
struct mbox_t * mbox = mbox_create();
|
||||||
fail_if( NULL == mbox, "Nothing allocated" );
|
fail_if( NULL == mbox, "Nothing allocated" );
|
||||||
|
|
||||||
pthread_cond_t cond_zero;
|
pthread_cond_t cond_zero;
|
||||||
@@ -22,7 +22,7 @@ END_TEST
|
|||||||
|
|
||||||
START_TEST( test_post_stores_value )
|
START_TEST( test_post_stores_value )
|
||||||
{
|
{
|
||||||
struct mbox * mbox = mbox_create();
|
struct mbox_t * mbox = mbox_create();
|
||||||
|
|
||||||
void * deadbeef = (void *)0xDEADBEEF;
|
void * deadbeef = (void *)0xDEADBEEF;
|
||||||
mbox_post( mbox, deadbeef );
|
mbox_post( mbox, deadbeef );
|
||||||
@@ -33,19 +33,18 @@ START_TEST( test_post_stores_value )
|
|||||||
END_TEST
|
END_TEST
|
||||||
|
|
||||||
|
|
||||||
void * mbox_receive_runner( void * mbox_uncast )
|
mbox_item_t mbox_receive_runner( void * mbox_uncast )
|
||||||
{
|
{
|
||||||
struct mbox * mbox = (struct mbox *)mbox_uncast;
|
struct mbox_t * mbox = (struct mbox_t *)mbox_uncast;
|
||||||
void * contents = NULL;
|
void * contents = NULL;
|
||||||
|
|
||||||
contents = mbox_receive( mbox );
|
return mbox_receive( mbox );
|
||||||
return contents;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
START_TEST( test_receive_blocks_until_post )
|
START_TEST( test_receive_blocks_until_post )
|
||||||
{
|
{
|
||||||
struct mbox * mbox = mbox_create();
|
struct mbox_t * mbox = mbox_create();
|
||||||
pthread_t receiver;
|
pthread_t receiver;
|
||||||
pthread_create( &receiver, NULL, mbox_receive_runner, mbox );
|
pthread_create( &receiver, NULL, mbox_receive_runner, mbox );
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user