Removed proxying completely and fixed the pthread_join bug revealed in the process

This commit is contained in:
Alex Young
2012-06-12 15:08:07 +01:00
parent 2a71b4e7a4
commit c7525f87dc
7 changed files with 50 additions and 37 deletions

View File

@@ -19,6 +19,7 @@ struct client *client_create( struct server *serve, int socket )
struct client *c; struct client *c;
c = xmalloc( sizeof( struct server ) ); c = xmalloc( sizeof( struct server ) );
c->stopped = 0;
c->socket = socket; c->socket = socket;
c->serve = serve; c->serve = serve;
@@ -170,6 +171,7 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
"select() failed"); "select() failed");
if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){ if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){
debug("Client received stop signal.");
return 0; return 0;
} }
@@ -388,7 +390,6 @@ void* client_serve(void* client_uncast)
error_set_handler((cleanup_handler*) client_cleanup, client); error_set_handler((cleanup_handler*) client_cleanup, client);
//client_open_file(client);
FATAL_IF_NEGATIVE( FATAL_IF_NEGATIVE(
open_and_mmap( open_and_mmap(
client->serve->filename, client->serve->filename,
@@ -402,6 +403,7 @@ void* client_serve(void* client_uncast)
while (client_serve_request(client) == 0) while (client_serve_request(client) == 0)
; ;
client->stopped = 1;
FATAL_IF_NEGATIVE( FATAL_IF_NEGATIVE(
close(client->socket), close(client->socket),
@@ -409,6 +411,7 @@ void* client_serve(void* client_uncast)
client->socket client->socket
); );
debug("Cleaning up normally in thread %p", pthread_self());
client_cleanup(client, 0); client_cleanup(client, 0);
return NULL; return NULL;

View File

@@ -3,6 +3,14 @@
struct client { struct client {
/* When we call pthread_join, if the thread is already dead
* we can get an ESRCH. Since we have no other way to tell
* if that ESRCH is from a dead thread or a thread that never
* existed, we use a `stopped` flag to indicate a thread which
* did exist, but went away. Only check this after a
* pthread_join call.
*/
int stopped;
int socket; int socket;
int fileno; int fileno;

View File

@@ -133,11 +133,6 @@ void* mirror_runner(void* serve_params_uncast)
/* a successful finish ends here */ /* a successful finish ends here */
switch (serve->mirror->action_at_finish) switch (serve->mirror->action_at_finish)
{ {
case ACTION_PROXY:
debug("proxy!");
serve->proxy_fd = serve->mirror->client;
/* don't close our file descriptor, we still need it! */
break;
case ACTION_EXIT: case ACTION_EXIT:
debug("exit!"); debug("exit!");
close(serve->mirror->client); close(serve->mirror->client);
@@ -203,19 +198,16 @@ int control_mirror(struct control_params* client, int linesc, char** lines)
max_bytes_per_second = atoi(lines[2]); max_bytes_per_second = atoi(lines[2]);
} }
action_at_finish = ACTION_PROXY; action_at_finish = ACTION_EXIT;
if (linesc > 4) { if (linesc > 4) {
if (strcmp("proxy", lines[3]) == 0) { if (strcmp("exit", lines[3]) == 0) {
action_at_finish = ACTION_PROXY;
}
else if (strcmp("exit", lines[3]) == 0) {
action_at_finish = ACTION_EXIT; action_at_finish = ACTION_EXIT;
} }
else if (strcmp("nothing", lines[3]) == 0) { else if (strcmp("nothing", lines[3]) == 0) {
action_at_finish = ACTION_NOTHING; action_at_finish = ACTION_NOTHING;
} }
else { else {
write_socket("1: action must be one of 'proxy', 'exit' or 'nothing'"); write_socket("1: action must be 'exit' or 'nothing'");
return -1; return -1;
} }
} }

View File

@@ -201,6 +201,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
int was_closed = 0; int was_closed = 0;
void * status; void * status;
int join_errno;
if (entry->thread != 0) { if (entry->thread != 0) {
char s_client_address[64]; char s_client_address[64];
@@ -212,8 +213,14 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
s_client_address, s_client_address,
64 ); 64 );
if (joinfunc(entry->thread, &status) != 0) { join_errno = joinfunc(entry->thread, &status);
FATAL_UNLESS( errno == EBUSY, "Problem with joining thread" ); /* join_errno can legitimately be ESRCH if the thread is
* already dead, but the cluent still needs tidying up. */
if (join_errno != 0 && !entry->client->stopped ) {
FATAL_UNLESS( join_errno == EBUSY,
"Problem with joining thread %p: %s",
entry->thread,
strerror(join_errno) );
} }
else { else {
debug("nbd thread %p exited (%s) with status %ld", debug("nbd thread %p exited (%s) with status %ld",
@@ -384,7 +391,7 @@ void accept_nbd_client(
return; return;
} }
debug("nbd thread %d started (%s)", (int) params->nbd_client[slot].thread, s_client_address); debug("nbd thread %p started (%s)", params->nbd_client[slot].thread, s_client_address);
} }
@@ -436,7 +443,7 @@ void server_close_clients( struct server *params )
} }
} }
for( j = 0; j < MAX_NBD_CLIENTS; j++ ) { for( j = 0; j < MAX_NBD_CLIENTS; j++ ) {
join_client_thread( &params->nbd_client[i] ); join_client_thread( &params->nbd_client[j] );
} }
} }
@@ -566,7 +573,6 @@ void serve_cleanup(struct server* params,
if (params->server_fd){ close(params->server_fd); } if (params->server_fd){ close(params->server_fd); }
if (params->control_fd){ close(params->control_fd); } if (params->control_fd){ close(params->control_fd); }
if (params->control_socket_name){ ; } if (params->control_socket_name){ ; }
if (params->proxy_fd){ close(params->proxy_fd); }
if (params->close_signal) { if (params->close_signal) {
self_pipe_destroy( params->close_signal ); self_pipe_destroy( params->close_signal );
@@ -583,10 +589,11 @@ void serve_cleanup(struct server* params,
for (i=0; i < MAX_NBD_CLIENTS; i++) { for (i=0; i < MAX_NBD_CLIENTS; i++) {
void* status; void* status;
pthread_t thread_id = params->nbd_client[i].thread;
if (params->nbd_client[i].thread != 0) { if (thread_id != 0) {
debug("joining thread %d", i); debug("joining thread %p", thread_id);
pthread_join(params->nbd_client[i].thread, &status); pthread_join(thread_id, &status);
} }
} }
} }

View File

@@ -15,7 +15,6 @@
static const int block_allocation_resolution = 4096;//128<<10; static const int block_allocation_resolution = 4096;//128<<10;
enum mirror_finish_action { enum mirror_finish_action {
ACTION_PROXY,
ACTION_EXIT, ACTION_EXIT,
ACTION_NOTHING ACTION_NOTHING
}; };
@@ -63,9 +62,6 @@ struct server {
/** Claims around any I/O to this file */ /** Claims around any I/O to this file */
pthread_mutex_t l_io; pthread_mutex_t l_io;
/** set to non-zero to cause r/w requests to go via this fd */
int proxy_fd;
/** to interrupt accept loop and clients, write() to close_signal[1] */ /** to interrupt accept loop and clients, write() to close_signal[1] */
struct self_pipe * close_signal; struct self_pipe * close_signal;

View File

@@ -53,8 +53,9 @@ extern pthread_key_t cleanup_handler_key;
pthread_setspecific(cleanup_handler_key, context); \ pthread_setspecific(cleanup_handler_key, context); \
break; \ break; \
case 1: /* fatal error, terminate thread */ \ case 1: /* fatal error, terminate thread */ \
debug( "Fatal error in thread %p", pthread_self() ); \
context->handler(context->data, 1); \ context->handler(context->data, 1); \
pthread_exit((void*) 1); \ /*pthread_exit((void*) 1);*/ \
abort(); \ abort(); \
case 2: /* non-fatal error, return to context of error handler setup */ \ case 2: /* non-fatal error, return to context of error handler setup */ \
context->handler(context->data, 0); \ context->handler(context->data, 0); \

View File

@@ -13,6 +13,19 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#ifdef DEBUG
# define LOG_LEVEL 0
#else
# define LOG_LEVEL 2
#endif
/* Need these because libcheck is braindead and doesn't
* run teardown after a failing test
*/
#define myfail( msg ) do { teardown(); fail(msg); } while (0)
#define myfail_if( tst, msg ) do { if( tst ) { myfail( msg ); } } while (0)
#define myfail_unless( tst, msg ) myfail_if( !(tst), msg )
char * dummy_file; char * dummy_file;
@@ -45,13 +58,6 @@ void teardown( void )
dummy_file = NULL; dummy_file = NULL;
} }
/* Need these because libcheck is braindead and doesn't
* run teardown after a failing test
*/
#define myfail( msg ) do { teardown(); fail(msg); } while (0)
#define myfail_if( tst, msg ) do { if( tst ) { myfail( msg ); } } while (0)
#define myfail_unless( tst, msg ) myfail_if( !(tst), msg )
START_TEST( test_replaces_acl ) START_TEST( test_replaces_acl )
{ {
@@ -221,22 +227,22 @@ Suite* serve_suite(void)
Suite *s = suite_create("serve"); Suite *s = suite_create("serve");
TCase *tc_acl_update = tcase_create("acl_update"); TCase *tc_acl_update = tcase_create("acl_update");
tcase_add_checked_fixture( tc_acl_update, setup, teardown ); tcase_add_checked_fixture( tc_acl_update, setup, NULL );
tcase_add_test(tc_acl_update, test_replaces_acl); tcase_add_test(tc_acl_update, test_replaces_acl);
tcase_add_test(tc_acl_update, test_signals_acl_updated); tcase_add_test(tc_acl_update, test_signals_acl_updated);
tcase_add_test(tc_acl_update, test_acl_update_closes_bad_client);
tcase_add_test(tc_acl_update, test_acl_update_leaves_good_client); tcase_add_exit_test(tc_acl_update, test_acl_update_closes_bad_client, 0);
tcase_add_exit_test(tc_acl_update, test_acl_update_leaves_good_client, 0);
suite_add_tcase(s, tc_acl_update); suite_add_tcase(s, tc_acl_update);
return s; return s;
} }
int main(void) int main(void)
{ {
log_level = 2; log_level = LOG_LEVEL;
int number_failed; int number_failed;
Suite *s = serve_suite(); Suite *s = serve_suite();
SRunner *sr = srunner_create(s); SRunner *sr = srunner_create(s);