Move the mirror commit state mbox to struct control
The mirror_super signals the commit state to the control thread via an mbox, and this mbox is moved to control. It was owned by mirror_super, but the problem with that is that mirror_super can free the mbox before the control client has been scheduled to receive the message. If it's owned by the control object, that can't happen.
This commit is contained in:
@@ -44,7 +44,9 @@
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
struct control * control_create(struct flexnbd * flexnbd, const char * csn)
|
||||
struct control * control_create(
|
||||
struct flexnbd * flexnbd,
|
||||
const char * csn)
|
||||
{
|
||||
struct control * control = xmalloc( sizeof( struct control ) );
|
||||
|
||||
@@ -53,6 +55,7 @@ struct control * control_create(struct flexnbd * flexnbd, const char * csn)
|
||||
control->flexnbd = flexnbd;
|
||||
control->socket_name = csn;
|
||||
control->close_signal = self_pipe_create();
|
||||
control->mirror_state_mbox = mbox_create();
|
||||
|
||||
return control;
|
||||
}
|
||||
@@ -70,11 +73,15 @@ void control_destroy( struct control * control )
|
||||
{
|
||||
NULLCHECK( control );
|
||||
|
||||
mbox_destroy( control->mirror_state_mbox );
|
||||
self_pipe_destroy( control->close_signal );
|
||||
free( control );
|
||||
}
|
||||
|
||||
struct control_client * control_client_create( struct flexnbd * flexnbd, int client_fd )
|
||||
struct control_client * control_client_create(
|
||||
struct flexnbd * flexnbd,
|
||||
int client_fd ,
|
||||
struct mbox * state_mbox )
|
||||
{
|
||||
NULLCHECK( flexnbd );
|
||||
|
||||
@@ -83,6 +90,7 @@ struct control_client * control_client_create( struct flexnbd * flexnbd, int cli
|
||||
|
||||
control_client->socket = client_fd;
|
||||
control_client->flexnbd = flexnbd;
|
||||
control_client->mirror_state_mbox = state_mbox;
|
||||
return control_client;
|
||||
}
|
||||
|
||||
@@ -102,7 +110,10 @@ void control_handle_client( struct control * control, int client_fd )
|
||||
NULLCHECK( control );
|
||||
NULLCHECK( control->flexnbd );
|
||||
struct control_client * control_client =
|
||||
control_client_create( control->flexnbd, client_fd );
|
||||
control_client_create(
|
||||
control->flexnbd,
|
||||
client_fd ,
|
||||
control->mirror_state_mbox);
|
||||
|
||||
/* We intentionally don't spawn a thread for the client here.
|
||||
* This is to avoid having more than one thread potentially
|
||||
@@ -260,6 +271,28 @@ void control_write_mirror_response( enum mirror_state mirror_state, int client_f
|
||||
#undef write_socket
|
||||
|
||||
|
||||
/* Call this in the thread where you want to receive the mirror state */
|
||||
enum mirror_state control_client_mirror_wait(
|
||||
struct control_client* client)
|
||||
{
|
||||
NULLCHECK( client );
|
||||
NULLCHECK( client->mirror_state_mbox );
|
||||
|
||||
struct mbox * mbox = client->mirror_state_mbox;
|
||||
enum mirror_state mirror_state;
|
||||
enum mirror_state * contents;
|
||||
|
||||
contents = (enum mirror_state*)mbox_receive( mbox );
|
||||
NULLCHECK( contents );
|
||||
|
||||
mirror_state = *contents;
|
||||
|
||||
free( contents );
|
||||
|
||||
return mirror_state;
|
||||
}
|
||||
|
||||
|
||||
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
|
||||
/** Command parser to start mirror process from socket input */
|
||||
int control_mirror(struct control_client* client, int linesc, char** lines)
|
||||
@@ -333,7 +366,8 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
|
||||
connect_to,
|
||||
connect_from,
|
||||
max_Bps ,
|
||||
action_at_finish);
|
||||
action_at_finish,
|
||||
client->mirror_state_mbox );
|
||||
serve->mirror = serve->mirror_super->mirror;
|
||||
|
||||
FATAL_IF( 0 != pthread_create(
|
||||
@@ -346,7 +380,8 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
|
||||
);
|
||||
|
||||
debug("Control thread mirror super waiting");
|
||||
enum mirror_state state = mirror_super_wait( serve->mirror_super );
|
||||
enum mirror_state state =
|
||||
control_client_mirror_wait( client );
|
||||
debug("Control thread writing response");
|
||||
control_write_mirror_response( state, client->socket );
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@
|
||||
#include "mirror.h"
|
||||
#include "control.h"
|
||||
#include "flexnbd.h"
|
||||
#include "mbox.h"
|
||||
|
||||
struct control {
|
||||
struct flexnbd * flexnbd;
|
||||
@@ -15,14 +16,32 @@ struct control {
|
||||
pthread_t thread;
|
||||
|
||||
struct self_pipe * close_signal;
|
||||
|
||||
/* This is owned by the control object, and used by a
|
||||
* mirror_super to communicate the state of a mirror attempt as
|
||||
* early as feasible. It can't be owned by the mirror_super
|
||||
* object because the mirror_super object can be freed at any
|
||||
* time (including while the control_client is waiting on it),
|
||||
* whereas the control object lasts for the lifetime of the
|
||||
* process (and we can only have a mirror thread if the control
|
||||
* thread has started it).
|
||||
*/
|
||||
struct mbox * mirror_state_mbox;
|
||||
};
|
||||
|
||||
struct control_client{
|
||||
int socket;
|
||||
struct flexnbd * flexnbd;
|
||||
|
||||
/* Passed in on creation. We know it's all right to do this
|
||||
* because we know there's only ever one control_client.
|
||||
*/
|
||||
struct mbox * mirror_state_mbox;
|
||||
};
|
||||
|
||||
struct control * control_create(struct flexnbd *, const char * control_socket_name);
|
||||
struct control * control_create(
|
||||
struct flexnbd *,
|
||||
const char * control_socket_name );
|
||||
void control_signal_close( struct control * );
|
||||
void control_destroy( struct control * );
|
||||
|
||||
|
@@ -55,11 +55,11 @@ void * mbox_receive( struct mbox * mbox )
|
||||
mbox->full = 0;
|
||||
result = mbox->contents;
|
||||
mbox->contents = NULL;
|
||||
|
||||
while( 0 != pthread_cond_signal( &mbox->emptied_cond));
|
||||
}
|
||||
pthread_mutex_unlock( &mbox->mutex );
|
||||
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
30
src/mirror.c
30
src/mirror.c
@@ -481,7 +481,8 @@ struct mirror_super * mirror_super_create(
|
||||
union mysockaddr * connect_to,
|
||||
union mysockaddr * connect_from,
|
||||
int max_Bps,
|
||||
int action_at_finish)
|
||||
int action_at_finish,
|
||||
struct mbox * state_mbox)
|
||||
{
|
||||
struct mirror_super * super = xmalloc( sizeof( struct mirror_super) );
|
||||
super->mirror = mirror_create(
|
||||
@@ -491,12 +492,12 @@ struct mirror_super * mirror_super_create(
|
||||
max_Bps,
|
||||
action_at_finish,
|
||||
mbox_create() ) ;
|
||||
super->state_mbox = mbox_create();
|
||||
super->state_mbox = state_mbox;
|
||||
return super;
|
||||
}
|
||||
|
||||
|
||||
/* Post the current state of the mirror into super->state_mbox */
|
||||
/* Post the current state of the mirror into super->state_mbox.*/
|
||||
void mirror_super_signal_committed(
|
||||
struct mirror_super * super ,
|
||||
enum mirror_state commit_state )
|
||||
@@ -514,7 +515,6 @@ void mirror_super_destroy( struct mirror_super * super )
|
||||
{
|
||||
NULLCHECK( super );
|
||||
|
||||
mbox_destroy( super->state_mbox );
|
||||
mbox_destroy( super->mirror->commit_signal );
|
||||
mirror_destroy( super->mirror );
|
||||
free( super );
|
||||
@@ -612,25 +612,3 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
}
|
||||
|
||||
|
||||
#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1)
|
||||
|
||||
/* Call this in the thread where you want to receive the mirror state */
|
||||
enum mirror_state mirror_super_wait( struct mirror_super * super )
|
||||
{
|
||||
NULLCHECK( super );
|
||||
NULLCHECK( super->state_mbox );
|
||||
|
||||
struct mbox * mbox = super->state_mbox;
|
||||
enum mirror_state mirror_state;
|
||||
enum mirror_state * contents;
|
||||
|
||||
contents = (enum mirror_state*)mbox_receive( mbox );
|
||||
NULLCHECK( contents );
|
||||
|
||||
mirror_state = *contents;
|
||||
|
||||
free(contents);
|
||||
|
||||
return mirror_state;
|
||||
}
|
||||
|
||||
|
@@ -100,7 +100,8 @@ struct mirror_super * mirror_super_create(
|
||||
union mysockaddr * connect_to,
|
||||
union mysockaddr * connect_from,
|
||||
int max_Bps,
|
||||
int action_at_finish);
|
||||
int action_at_finish,
|
||||
struct mbox * state_mbox
|
||||
);
|
||||
void * mirror_super_runner( void * serve_uncast );
|
||||
enum mirror_state mirror_super_wait( struct mirror_super * );
|
||||
#endif
|
||||
|
@@ -4,6 +4,9 @@
|
||||
# seconds. After that time, the client should have disconnected,
|
||||
# which we can can't effectively check.
|
||||
#
|
||||
# We also expect the client *not* to reconnect, since it could feed back
|
||||
# an error.
|
||||
#
|
||||
# This allows the test runner to check that the command-line sees the
|
||||
# right error message after the timeout time.
|
||||
|
||||
@@ -19,4 +22,14 @@ client = server.accept( "Client didn't make a connection" )
|
||||
sleep(FlexNBD::MS_HELLO_TIME_SECS + 1)
|
||||
|
||||
client.close
|
||||
|
||||
# Invert the sense of the timeout exception, since we *don't* want a
|
||||
# connection attempt
|
||||
begin
|
||||
server.accept( "Expected timeout" )
|
||||
fail "Unexpected reconnection"
|
||||
rescue Timeout::Error
|
||||
# expected
|
||||
end
|
||||
|
||||
server.close
|
||||
|
Reference in New Issue
Block a user