Add mboxes

This commit is contained in:
Alex Young
2012-06-27 15:45:33 +01:00
parent 2078d17053
commit 94b4fa887c
34 changed files with 2534 additions and 1599 deletions

View File

@@ -37,18 +37,20 @@ static inline void* sockaddr_address_data(struct sockaddr* sockaddr)
}
struct server * server_create (
struct flexnbd * flexnbd,
char* s_ip_address,
char* s_port,
char* s_file,
char *s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int has_control)
{
NULLCHECK( flexnbd );
struct server * out;
out = xmalloc( sizeof( struct server ) );
out->flexnbd = flexnbd;
out->has_control = has_control;
out->max_nbd_clients = max_nbd_clients;
out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) );
@@ -66,9 +68,6 @@ struct server * server_create (
s_ip_address
);
/* control_socket_name is optional. It just won't get created if
* we pass NULL. */
out->control_socket_name = s_ctrl_sock;
out->acl = acl_create( acl_entries, s_acl_entries, default_deny );
if (out->acl && out->acl->len != acl_entries) {
@@ -87,7 +86,6 @@ struct server * server_create (
out->close_signal = self_pipe_create();
out->acl_updated_signal = self_pipe_create();
out->vacuum_signal = self_pipe_create();
NULLCHECK( out->close_signal );
NULLCHECK( out->acl_updated_signal );
@@ -101,8 +99,6 @@ void server_destroy( struct server * serve )
serve->acl_updated_signal = NULL;
self_pipe_destroy( serve->close_signal );
serve->close_signal = NULL;
self_pipe_destroy( serve->vacuum_signal );
serve->vacuum_signal = NULL;
pthread_mutex_destroy( &serve->l_acl );
pthread_mutex_destroy( &serve->l_io );
@@ -230,6 +226,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
s_client_address,
64 );
debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread );
join_errno = joinfunc(entry->thread, &status);
/* join_errno can legitimately be ESRCH if the thread is
* already dead, but the client still needs tidying up. */
@@ -245,6 +242,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
s_client_address,
(uint64_t)status);
client_destroy( entry->client );
entry->client = NULL;
entry->thread = 0;
was_closed = 1;
}
@@ -366,44 +364,13 @@ int server_should_accept_client(
}
struct client_cleanup {
pthread_t client_thread;
struct self_pipe * vacuum_signal;
};
void *client_vacuum( void * cleanup_uncast )
{
pthread_detach( pthread_self() );
NULLCHECK( cleanup_uncast );
struct client_cleanup *cleanup = (struct client_cleanup *)cleanup_uncast;
pthread_join( cleanup->client_thread, NULL );
self_pipe_signal( cleanup->vacuum_signal );
free( cleanup );
return NULL;
}
/* Why do we need this rather odd arrangement? Because if we don't have
* it, dead threads don't get tidied up until the next incoming
* connection happens.
*/
int spawn_client_thread(
struct client * client_params,
struct self_pipe * vacuum_signal,
pthread_t *out_thread)
{
struct client_cleanup * cleanup = xmalloc( sizeof( struct client_cleanup ) );
cleanup->vacuum_signal = vacuum_signal;
int result = pthread_create(&cleanup->client_thread, NULL, client_serve, client_params);
int result = pthread_create(out_thread, NULL, client_serve, client_params);
if ( 0 == result ){
pthread_t watcher;
pthread_create( &watcher, NULL, client_vacuum, cleanup );
*out_thread = cleanup->client_thread;
}
return result;
}
@@ -446,7 +413,7 @@ void accept_nbd_client(
pthread_t * thread = &params->nbd_client[slot].thread;
if (spawn_client_thread( client_params, params->vacuum_signal, thread ) != 0) {
if ( 0 != spawn_client_thread( client_params, thread ) ) {
debug( "Thread creation problem." );
client_destroy( client_params );
close(client_fd);
@@ -501,6 +468,7 @@ void server_close_clients( struct server *params )
entry = &params->nbd_client[i];
if ( entry->thread != 0 ) {
debug( "Stop signaling client %p", entry->client );
client_signal_stop( entry->client );
}
}
@@ -547,12 +515,9 @@ int server_accept( struct server * params )
FD_ZERO(&fds);
FD_SET(params->server_fd, &fds);
FD_SET(flexnbd_signal_fd( params->flexnbd ), &fds);
self_pipe_fd_set( params->close_signal, &fds );
self_pipe_fd_set( params->acl_updated_signal, &fds );
self_pipe_fd_set( params->vacuum_signal, &fds );
if (params->control_socket_name) {
FD_SET(params->control_fd, &fds);
}
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds,
NULL, NULL, NULL), "select() failed");
@@ -562,11 +527,13 @@ int server_accept( struct server * params )
return 0;
}
if ( self_pipe_fd_isset( params->vacuum_signal, &fds ) ) {
cleanup_client_threads( params->nbd_client, params->max_nbd_clients );
self_pipe_signal_clear( params->vacuum_signal );
if ( FD_ISSET( flexnbd_signal_fd( params->flexnbd ), &fds ) ){
debug( "Stop signal received." );
server_close_clients( params );
return 0;
}
if ( self_pipe_fd_isset( params->acl_updated_signal, &fds ) ) {
self_pipe_signal_clear( params->acl_updated_signal );
server_audit_clients( params );
@@ -577,12 +544,6 @@ int server_accept( struct server * params )
debug("Accepted nbd client socket");
accept_nbd_client(params, client_fd, &client_address);
}
else if( FD_ISSET( params->control_fd, &fds ) ) {
client_fd = accept( params->control_fd, &client_address.generic, &socklen );
debug("Accepted control client socket");
accept_control_connection(params, client_fd, &client_address);
}
return 1;
}
@@ -655,17 +616,16 @@ void serve_cleanup(struct server* params,
int i;
if (params->server_fd){ close(params->server_fd); }
if (params->control_fd){ close(params->control_fd); }
if (params->control_socket_name){ ; }
if (params->allocation_map) {
free(params->allocation_map);
}
if (params->mirror) {
pthread_t mirror_t = params->mirror->thread;
if (params->mirror_super) {
/* AWOOGA! RACE! */
pthread_t mirror_t = params->mirror_super->thread;
params->mirror->signal_abandon = 1;
pthread_join(mirror_t, NULL);
pthread_join( mirror_t, NULL );
}
for (i=0; i < params->max_nbd_clients; i++) {
@@ -687,6 +647,11 @@ int server_is_in_control( struct server *serve )
return serve->has_control;
}
int server_default_deny( struct server * serve )
{
NULLCHECK( serve );
return acl_default_deny( serve->acl );
}
/** Full lifecycle of the server */
int do_serve(struct server* params)
@@ -697,12 +662,10 @@ int do_serve(struct server* params)
error_set_handler((cleanup_handler*) serve_cleanup, params);
serve_open_server_socket(params);
serve_open_control_socket(params);
serve_init_allocation_map(params);
serve_accept_loop(params);
has_control = params->has_control;
serve_cleanup(params, 0);
debug("Server %s control.", has_control ? "has" : "does not have" );
return has_control;
}