diff --git a/src/bitset.h b/src/bitset.h index 599c367..81d68ca 100644 --- a/src/bitset.h +++ b/src/bitset.h @@ -120,6 +120,17 @@ static inline void bitset_set_range( bit_set_range(set->bits, first, bitlen); } + +/** Set every bit in the bitset. */ +static inline void bitset_set( + struct bitset_mapping* set +) +{ + bitset_set_range(set, 0, set->size); +} + + + /** Clear the bits in a bitset which correspond to the given bytes in the * larger file. */ @@ -132,6 +143,16 @@ static inline void bitset_clear_range( bit_clear_range(set->bits, first, bitlen); } + +/** Clear every bit in the bitset. */ +static inline void bitset_clear( + struct bitset_mapping *set +) +{ + bitset_clear_range(set, 0, set->size); +} + + /** Counts the number of contiguous bytes that are represented as a run in * the bit field. */ diff --git a/src/client.c b/src/client.c index 6a1c26d..205317b 100644 --- a/src/client.c +++ b/src/client.c @@ -170,12 +170,25 @@ int client_read_request( struct client * client , struct nbd_request *out_reques struct nbd_request_raw request_raw; fd_set fds; + struct timeval tv = {CLIENT_MAX_WAIT_SECS, 0}; + struct timeval * ptv; + int fd_count; + + /* We want a timeout if this is an inbound migration, but not + * otherwise + */ + ptv = server_is_in_control( client->serve ) ? NULL : &tv; FD_ZERO(&fds); FD_SET(client->socket, &fds); self_pipe_fd_set( client->stop_signal, &fds ); - FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds, NULL, NULL, NULL), - "select() failed"); + fd_count = select(FD_SETSIZE, &fds, NULL, NULL, ptv); + if ( fd_count == 0 ) { + /* This "can't ever happen" */ + if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); } + else { error("Timed out waiting for I/O"); } + } + else if ( fd_count < 0 ) { fatal( "Select failed" ); } if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){ debug("Client received stop signal."); @@ -187,8 +200,14 @@ int client_read_request( struct client * client , struct nbd_request *out_reques debug("EOF reading request"); return 0; /* neat point to close the socket */ } - else { - fatal("Error reading request"); + else { + /* FIXME: I've seen this happen, but I couldn't + * reproduce it so I'm leaving it here with a + * better debug output in the hope it'll + * spontaneously happen again. It should + * *probably* be an error() call, but I want to + * be sure. */ + fatal("Error reading request: %s", strerror( errno )); } } diff --git a/src/client.h b/src/client.h index 087f091..29871a2 100644 --- a/src/client.h +++ b/src/client.h @@ -1,6 +1,14 @@ #ifndef CLIENT_H #define CLIENT_H +/** CLIENT_MAX_WAIT_SECS + * This is the length of time an inbound migration will wait for a fresh + * write before assuming the source has Gone Away. Note: it is *not* + * the time from one write to the next, it is the gap between the end of + * one write and the start of the next. + */ +#define CLIENT_MAX_WAIT_SECS 5 + struct client { /* When we call pthread_join, if the thread is already dead diff --git a/src/control.c b/src/control.c index 88dcd32..b0ffae1 100644 --- a/src/control.c +++ b/src/control.c @@ -25,6 +25,7 @@ * client code to be found in remote.c */ +#include "control.h" #include "serve.h" #include "util.h" #include "ioutil.h" @@ -33,43 +34,116 @@ #include "bitset.h" #include "self_pipe.h" #include "acl.h" +#include "status.h" #include #include #include #include -struct mirror_status * mirror_status_create( - struct server * serve, - int fd, +struct mirror_status * mirror_status_alloc( + union mysockaddr * connect_to, + union mysockaddr * connect_from, int max_Bps, - int action_at_finish) + int action_at_finish, + struct self_pipe * commit_signal, + enum mirror_state * out_commit_state) { - /* FIXME: shouldn't map_fd get closed? */ - int map_fd; - off64_t size; struct mirror_status * mirror; - NULLCHECK( serve ); - mirror = xmalloc(sizeof(struct mirror_status)); - mirror->client = fd; + mirror->connect_to = connect_to; + mirror->connect_from = connect_from; mirror->max_bytes_per_second = max_Bps; mirror->action_at_finish = action_at_finish; - + mirror->commit_signal = commit_signal; + mirror->commit_state = out_commit_state; + + return mirror; +} + +void mirror_set_state_f( struct mirror_status * mirror, enum mirror_state state ) +{ + NULLCHECK( mirror ); + if ( mirror->commit_state ){ + *mirror->commit_state = state; + } +} + +#define mirror_set_state( mirror, state ) do{\ + debug( "Mirror state => " #state );\ + mirror_set_state_f( mirror, state );\ +} while(0) + +enum mirror_state mirror_get_state( struct mirror_status * mirror ) +{ + NULLCHECK( mirror ); + if ( mirror->commit_state ){ + return *mirror->commit_state; + } else { + return MS_UNKNOWN; + } +} + + +void mirror_status_init( struct mirror_status * mirror, char * filename ) +{ + int map_fd; + off64_t size; + + NULLCHECK( mirror ); + NULLCHECK( filename ); + FATAL_IF_NEGATIVE( open_and_mmap( - serve->filename, + filename, &map_fd, &size, (void**) &mirror->mapped ), "Failed to open and mmap %s", - serve->filename + filename ); mirror->dirty_map = bitset_alloc(size, 4096); - bitset_set_range(mirror->dirty_map, 0, size); + +} + + +/* Call this before a mirror attempt. */ +void mirror_status_reset( struct mirror_status * mirror ) +{ + NULLCHECK( mirror ); + NULLCHECK( mirror->dirty_map ); + mirror_set_state( mirror, MS_INIT ); + bitset_set(mirror->dirty_map); +} + + +struct mirror_status * mirror_status_create( + struct server * serve, + union mysockaddr * connect_to, + union mysockaddr * connect_from, + int max_Bps, + int action_at_finish, + struct self_pipe * commit_signal, + enum mirror_state * out_commit_state) +{ + /* FIXME: shouldn't map_fd get closed? */ + struct mirror_status * mirror; + + NULLCHECK( serve ); + + mirror = mirror_status_alloc( connect_to, + connect_from, + max_Bps, + action_at_finish, + commit_signal, + out_commit_state ); + + mirror_status_init( mirror, serve->filename ); + mirror_status_reset( mirror ); + return mirror; } @@ -78,7 +152,8 @@ struct mirror_status * mirror_status_create( void mirror_status_destroy( struct mirror_status *mirror ) { NULLCHECK( mirror ); - close(mirror->client); + free(mirror->connect_to); + free(mirror->connect_from); free(mirror->dirty_map); free(mirror); } @@ -155,7 +230,7 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written) } -void mirror_transfer_control( struct mirror_status * mirror ) +void mirror_give_control( struct mirror_status * mirror ) { /* TODO: set up an error handler to clean up properly on ERROR. */ @@ -196,7 +271,8 @@ void mirror_on_exit( struct server * serve ) * and already-connected clients don't get needlessly * disconnected. */ - mirror_transfer_control( serve->mirror ); + debug( "mirror_give_control"); + mirror_give_control( serve->mirror ); /* If we're still here, the transfer of control went ok, and the * remote is listening (or will be shortly). We can shut the @@ -205,6 +281,7 @@ void mirror_on_exit( struct server * serve ) * It doesn't matter if we get new client connections before * now, the IO lock will stop them from doing anything. */ + debug("serve_signal_close"); serve_signal_close( serve ); /* We have to wait until the server is closed before unlocking @@ -214,34 +291,96 @@ void mirror_on_exit( struct server * serve ) * guarantee the server thread will win the race and we risk the * clients seeing a "successful" write to a dead disc image. */ + debug("serve_wait_for_close"); serve_wait_for_close( serve ); + info("Mirror sent."); } -/** Thread launched to drive mirror process */ -void* mirror_runner(void* serve_params_uncast) +void mirror_cleanup( struct mirror_status * mirror, + int fatal __attribute__((unused))) { - int pass; - struct server *serve = (struct server*) serve_params_uncast; - uint64_t written; + NULLCHECK( mirror ); + info( "Cleaning up mirror thread"); + if( mirror->client && mirror->client > 0 ){ + close( mirror->client ); + } + mirror->client = -1; +} + + + +int mirror_status_connect( struct mirror_status * mirror, off64_t local_size ) +{ + struct sockaddr * connect_from = NULL; + if ( mirror->connect_from ) { + connect_from = &mirror->connect_from->generic; + } + + NULLCHECK( mirror->connect_to ); + + mirror->client = socket_connect(&mirror->connect_to->generic, connect_from); + if ( 0 < mirror->client ) { + fd_set fds; + struct timeval tv = { MS_HELLO_TIME_SECS, 0}; + FD_ZERO( &fds ); + FD_SET( mirror->client, &fds ); + + FATAL_UNLESS( 0 <= select( FD_SETSIZE, &fds, NULL, NULL, &tv ), + "Select failed." ); + + if( FD_ISSET( mirror->client, &fds ) ){ + off64_t remote_size; + if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) { + if( remote_size == local_size ){ + mirror_set_state( mirror, MS_GO ); + } + else { + warn("Remote size (%d) doesn't match local (%d)", + remote_size, local_size ); + mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH ); + } + } + else { + warn( "Mirror attempt rejected." ); + mirror_set_state( mirror, MS_FAIL_REJECTED ); + } + } + else { + warn( "No NBD Hello received." ); + mirror_set_state( mirror, MS_FAIL_NO_HELLO ); + } + } + else { + warn( "Mirror failed to connect."); + mirror_set_state( mirror, MS_FAIL_CONNECT ); + } + + return mirror_get_state(mirror) == MS_GO; +} + + + +void server_run_mirror( struct server *serve ) +{ NULLCHECK( serve ); NULLCHECK( serve->mirror ); - NULLCHECK( serve->mirror->dirty_map ); - debug("Starting mirror" ); - + int pass; + uint64_t written; + + info("Starting mirror" ); for (pass=0; pass < mirror_maximum_passes-1; pass++) { + debug("mirror start pass=%d", pass); - - if ( !mirror_pass( serve, 1, &written ) ){ - goto abandon_mirror; - } + if ( !mirror_pass( serve, 1, &written ) ){ return; } /* if we've not written anything */ if (written < mirror_last_pass_after_bytes_written) { break; } } + mirror_set_state( serve->mirror, MS_FINALISE ); server_lock_io( serve ); { if ( mirror_pass( serve, 0, &written ) && @@ -253,30 +392,256 @@ void* mirror_runner(void* serve_params_uncast) } } server_unlock_io( serve ); +} -abandon_mirror: - mirror_status_destroy( serve->mirror ); - serve->mirror = NULL; /* and we're gone */ + +void mirror_signal_commit( struct mirror_status * mirror ) +{ + NULLCHECK( mirror ); + + self_pipe_signal( mirror->commit_signal ); +} + +/** Thread launched to drive mirror process */ +void* mirror_runner(void* serve_params_uncast) +{ + /* The supervisor thread relies on there not being any ERROR + * calls until after the mirror_signal_commit() call in this + * function. + * However, *after* that, we should call ERROR_* instead of + * FATAL_* wherever possible. + */ + struct server *serve = (struct server*) serve_params_uncast; + + NULLCHECK( serve ); + NULLCHECK( serve->mirror ); + struct mirror_status * mirror = serve->mirror; + NULLCHECK( mirror->dirty_map ); + + error_set_handler( (cleanup_handler *) mirror_cleanup, mirror ); + + info( "Connecting to mirror" ); + time_t start_time = time(NULL); + int connected = mirror_status_connect( mirror, serve->size ); + mirror_signal_commit( mirror ); + if ( !connected ) { goto abandon_mirror; } + + /* After this point, if we see a failure we need to disconnect + * and retry everything from mirror_set_state(_, MS_INIT), but + * *without* signaling the commit or abandoning the mirror. + * */ + + if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){ + /* If we get here, then we managed to connect but the + * control thread feeding status back to the user will + * have gone away, leaving the user without meaningful + * feedback. In this instance, they have to assume a + * failure, so we can't afford to let the mirror happen. + * We have to set the state to avoid a race. + */ + mirror_set_state( mirror, MS_FAIL_CONNECT ); + warn( "Mirror connected, but too slowly" ); + goto abandon_mirror; + } + + server_run_mirror( serve ); + + mirror_set_state( mirror, MS_DONE ); +abandon_mirror: + return NULL; +} + + +struct mirror_super * mirror_super_create( + struct server * serve, + union mysockaddr * connect_to, + union mysockaddr * connect_from, + int max_Bps, + int action_at_finish, + enum mirror_state * out_commit_state) +{ + struct mirror_super * super = xmalloc( sizeof( struct mirror_super) ); + super->mirror = mirror_status_create( serve, + connect_to, + connect_from, + max_Bps, + action_at_finish, + self_pipe_create(), + out_commit_state ); + super->commit_signal = self_pipe_create(); + + return super; +} + +void mirror_super_signal_committed( struct mirror_super * super ) +{ + NULLCHECK( super ); + self_pipe_signal( super->commit_signal ); +} + + +void mirror_super_destroy( struct mirror_super * super ) +{ + NULLCHECK( super ); + + mirror_status_destroy( super->mirror ); + self_pipe_destroy( super->commit_signal ); +} + + +/* The mirror supervisor thread. Responsible for kicking off retries if + * the mirror thread fails. + * The mirror_status and mirror_super objects are never freed, and the + * mirror_super_runner thread is never joined. + */ +void * mirror_super_runner( void * serve_uncast ) +{ + struct server * serve = (struct server *) serve_uncast; + NULLCHECK( serve ); + NULLCHECK( serve->mirror ); + NULLCHECK( serve->mirror_super ); + + int should_retry = 0; + int success = 0; + fd_set fds; + int fd_count; + + struct mirror_status * mirror = serve->mirror; + struct mirror_super * super = serve->mirror_super; + + do { + if ( should_retry ) { + /* We don't want to hammer the destination too + * hard, so if this is a retry, insert a delay. */ + sleep( MS_RETRY_DELAY_SECS ); + + /* We also have to reset the bitmap to be sure + * we transfer everything */ + mirror_status_reset( mirror ); + } + + FATAL_IF( 0 != pthread_create( + &mirror->thread, + NULL, + mirror_runner, + serve), + "Failed to create mirror thread"); + + debug("Supervisor waiting for commit signal"); + FD_ZERO( &fds ); + self_pipe_fd_set( mirror->commit_signal, &fds ); + /* There's no timeout on this select. This means that + * the mirror thread *must* signal then abort itself if + * it passes the timeout, and it *must* always signal, + * no matter what. + */ + fd_count = select( FD_SETSIZE, &fds, NULL, NULL, NULL ); + if ( 1 == fd_count ) { + debug( "Supervisor got commit signal" ); + if ( 0 == should_retry ) { + should_retry = 1; + /* Only send this signal the first time */ + mirror_super_signal_committed(super); + debug("Mirror supervisor committed"); + } + } + else { fatal( "Select failed." ); } + + debug("Supervisor waiting for mirror thread" ); + pthread_join( mirror->thread, NULL ); + debug( "Clearing the commit signal. If this blocks," + " it's fatal but we can't check in advance." ); + self_pipe_signal_clear( mirror->commit_signal ); + debug( "Commit signal cleared." ); + + success = MS_DONE == mirror_get_state( mirror ); + + if( success ){ info( "Mirror supervisor success, exiting" ); } + else if (should_retry){ + warn( "Mirror failed, retrying" ); + } + else { warn( "Mirror failed before commit, giving up" ); } + } + while ( should_retry && !success ); + + serve->mirror = NULL; + serve->mirror_super = NULL; + + mirror_super_destroy( super ); + debug( "Mirror supervisor done." ); + return NULL; } #define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1) +/* We have to pass the mirror_state pointer and the commit_signal + * separately from the mirror itself because the mirror might have been + * freed by the time we get to check it */ +void mirror_watch_startup( struct control_params * client, + struct self_pipe * commit_signal, + enum mirror_state *mirror_state ) +{ + NULLCHECK( client ); + struct server * serve = client->serve; + NULLCHECK( serve ); + struct mirror_status * mirror = serve->mirror; + NULLCHECK( mirror ); + + fd_set fds; + /* This gives a 61 second timeout for the mirror thread to + * either fail or succeed to connect. + */ + struct timeval tv = {MS_CONNECT_TIME_SECS+1,0}; + FD_ZERO( &fds ); + self_pipe_fd_set( commit_signal, &fds ); + ERROR_IF_NEGATIVE( select( FD_SETSIZE, &fds, NULL, NULL, &tv ), "Select failed."); + + if ( self_pipe_fd_isset( commit_signal, &fds ) ){ + switch (*mirror_state) { + case MS_INIT: + case MS_UNKNOWN: + write_socket( "1: Mirror failed to initialise" ); + fatal( "Impossible mirror state: %d", *mirror_state ); + case MS_FAIL_CONNECT: + write_socket( "1: Mirror failed to connect"); + break; + case MS_FAIL_REJECTED: + write_socket( "1: Mirror was rejected" ); + break; + case MS_FAIL_NO_HELLO: + write_socket( "1: Remote server failed to respond"); + break; + case MS_FAIL_SIZE_MISMATCH: + write_socket( "1: Remote size does not match local size" ); + break; + case MS_GO: + case MS_FINALISE: + case MS_DONE: /* Yes, I know we know better, but it's simpler this way */ + write_socket( "0: Mirror started" ); + break; + default: + fatal( "Unhandled mirror state: %d", *mirror_state ); + } + } + else { + /* Timeout. Badness. This "should never happen". */ + write_socket( "1: Mirror timed out connecting to remote host" ); + } +} + /** Command parser to start mirror process from socket input */ int control_mirror(struct control_params* client, int linesc, char** lines) { NULLCHECK( client ); - off64_t remote_size; struct server * serve = client->serve; - int fd; - struct mirror_status *mirror; - union mysockaddr connect_to; - union mysockaddr connect_from; + union mysockaddr *connect_to = xmalloc( sizeof( union mysockaddr ) ); + union mysockaddr *connect_from = NULL; int use_connect_from = 0; - uint64_t max_Bps; + uint64_t max_Bps = 0; int action_at_finish; int raw_port; @@ -286,7 +651,7 @@ int control_mirror(struct control_params* client, int linesc, char** lines) return -1; } - if (parse_ip_to_sockaddr(&connect_to.generic, lines[0]) == 0) { + if (parse_ip_to_sockaddr(&connect_to->generic, lines[0]) == 0) { write_socket("1: bad IP address"); return -1; } @@ -296,20 +661,18 @@ int control_mirror(struct control_params* client, int linesc, char** lines) write_socket("1: bad IP port number"); return -1; } - connect_to.v4.sin_port = htobe16(raw_port); + connect_to->v4.sin_port = htobe16(raw_port); if (linesc > 2) { - if (parse_ip_to_sockaddr(&connect_from.generic, lines[2]) == 0) { + connect_from = xmalloc( sizeof( union mysockaddr ) ); + if (parse_ip_to_sockaddr(&connect_from->generic, lines[2]) == 0) { write_socket("1: bad bind address"); return -1; } use_connect_from = 1; } - max_Bps = 0; - if (linesc > 3) { - max_Bps = atoi(lines[2]); - } + if (linesc > 3) { max_Bps = atoi(lines[2]); } action_at_finish = ACTION_EXIT; if (linesc > 4) { @@ -330,35 +693,27 @@ int control_mirror(struct control_params* client, int linesc, char** lines) return -1; } - /** I don't like use_connect_from but socket_connect doesn't take *mysockaddr :( */ - struct sockaddr *afrom = use_connect_from ? &connect_from.generic : NULL; - fd = socket_connect(&connect_to.generic, afrom); - - remote_size = socket_nbd_read_hello(fd); - if( remote_size != (off64_t)serve->size ){ - warn("Remote size (%d) doesn't match local (%d)", remote_size, serve->size ); - write_socket( "1: remote size (%d) doesn't match local (%d)"); - close(fd); - return -1; - } - - mirror = mirror_status_create( serve, - fd, + enum mirror_state mirror_state; + serve->mirror_super = mirror_super_create( serve, + connect_to, + connect_from, max_Bps , - action_at_finish ); - serve->mirror = mirror; + action_at_finish, + &mirror_state ); + serve->mirror = serve->mirror_super->mirror; FATAL_IF( /* FIXME should free mirror on error */ 0 != pthread_create( - &mirror->thread, + &serve->mirror_super->thread, NULL, - mirror_runner, + mirror_super_runner, serve ), "Failed to create mirror thread" ); - write_socket("0: mirror started"); + mirror_watch_startup( client, serve->mirror_super->commit_signal, &mirror_state ); + debug( "Control thread going away." ); return 0; } @@ -388,11 +743,19 @@ int control_acl(struct control_params* client, int linesc, char** lines) /** FIXME: add some useful statistics */ int control_status( - struct control_params* client __attribute__ ((unused)), + struct control_params* client, int linesc __attribute__ ((unused)), char** lines __attribute__((unused)) ) { + NULLCHECK( client ); + NULLCHECK( client->serve ); + struct status * status = status_create( client->serve ); + + write( client->socket, "0: ", 3 ); + status_write( status, client->socket ); + status_destroy( status ); + return 0; } @@ -423,16 +786,19 @@ void* control_serve(void* client_uncast) /* ignore failure */ } else if (strcmp(lines[0], "acl") == 0) { + info("acl command received" ); if (control_acl(client, linesc-1, lines+1) < 0) { finished = 1; } } else if (strcmp(lines[0], "mirror") == 0) { + info("mirror command received" ); if (control_mirror(client, linesc-1, lines+1) < 0) { finished = 1; } } else if (strcmp(lines[0], "status") == 0) { + info("status command received" ); if (control_status(client, linesc-1, lines+1) < 0) { finished = 1; } @@ -449,6 +815,7 @@ void* control_serve(void* client_uncast) } control_cleanup(client, 0); + debug("control command handled" ); return NULL; } diff --git a/src/control.h b/src/control.h index 2f70bb8..b5c2491 100644 --- a/src/control.h +++ b/src/control.h @@ -1,5 +1,28 @@ -#ifndef __CONTROL_H -#define __CONTROL_H +#ifndef CONTROL_H +#define CONTROL_H + +/* MS_CONNECT_TIME_SECS + * The length of time after which the sender will assume a connect() to + * the destination has failed. + */ +#define MS_CONNECT_TIME_SECS 60 + +/* MS_HELLO_TIME_SECS + * The length of time the sender will wait for the NBD hello message + * after connect() before aborting the connection attempt. + */ +#define MS_HELLO_TIME_SECS 5 + + +/* MS_RETRY_DELAY_SECS + * The delay after a failed migration attempt before launching another + * thread to try again. + */ +#define MS_RETRY_DELAY_SECS 1 + + +#include "parse.h" +#include "serve.h" void accept_control_connection(struct server* params, int client_fd, union mysockaddr* client_address); void serve_open_control_socket(struct server* params); diff --git a/src/listen.c b/src/listen.c index 4c6da57..cc96d82 100644 --- a/src/listen.c +++ b/src/listen.c @@ -18,8 +18,9 @@ struct listen * listen_create( { struct listen * listen; - listen = (struct listen *)xmalloc( sizeof( listen ) ); - listen->init_serve = server_create( s_ip_address, + listen = (struct listen *)xmalloc( sizeof( struct listen ) ); + listen->init_serve = server_create( + s_ip_address, s_port, s_file, s_ctrl_sock, @@ -29,7 +30,7 @@ struct listen * listen_create( 1, 0); listen->main_serve = server_create( s_rebind_ip_address ? s_rebind_ip_address : s_ip_address, - s_rebind_port ? s_rebind_port : s_port, + s_rebind_port ? s_rebind_port : s_port, s_file, s_ctrl_sock, default_deny, diff --git a/src/readwrite.c b/src/readwrite.c index aab1dad..86be4e4 100644 --- a/src/readwrite.c +++ b/src/readwrite.c @@ -10,33 +10,50 @@ int socket_connect(struct sockaddr* to, struct sockaddr* from) { int fd = socket(to->sa_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0); - FATAL_IF_NEGATIVE(fd, "Couldn't create client socket"); - - if (NULL != from) { - FATAL_IF_NEGATIVE( - bind(fd, from, sizeof(struct sockaddr_in6)), - "bind() failed" - ); + if( fd < 0 ){ + warn( "Couldn't create client socket"); + return -1; + } + + if (NULL != from) { + if ( 0 > bind(fd, from, sizeof(struct sockaddr_in6)) ){ + warn( "bind() failed"); + close( fd ); + return -1; + } + } + + if ( 0 > connect(fd, to, sizeof(struct sockaddr_in6)) ) { + warn( "connect failed" ); + close( fd ); + return -1; } - FATAL_IF_NEGATIVE( - connect(fd, to, sizeof(struct sockaddr_in6)),"connect failed" - ); return fd; } -off64_t socket_nbd_read_hello(int fd) +int socket_nbd_read_hello(int fd, off64_t * out_size) { struct nbd_init init; - FATAL_IF_NEGATIVE(readloop(fd, &init, sizeof(init)), - "Couldn't read init"); + if ( 0 > readloop(fd, &init, sizeof(init)) ) { + warn( "Couldn't read init" ); + goto fail; + } if (strncmp(init.passwd, INIT_PASSWD, 8) != 0) { - fatal("wrong passwd"); + warn("wrong passwd"); + goto fail; } if (be64toh(init.magic) != INIT_MAGIC) { - fatal("wrong magic (%x)", be64toh(init.magic)); + warn("wrong magic (%x)", be64toh(init.magic)); + goto fail; } - return be64toh(init.size); + if ( NULL != out_size ) { + *out_size = be64toh(init.size); + } + + return 1; +fail: + return 0; } void fill_request(struct nbd_request *request, int type, int from, int len) @@ -92,15 +109,15 @@ void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf) struct nbd_reply reply; fill_request(&request, REQUEST_WRITE, from, len); - FATAL_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)), + ERROR_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)), "Couldn't write request"); if (in_buf) { - FATAL_IF_NEGATIVE(writeloop(fd, in_buf, len), + ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len), "Write failed"); } else { - FATAL_IF_NEGATIVE( + ERROR_IF_NEGATIVE( splice_via_pipe_loop(in_fd, fd, len), "Splice failed" ); @@ -137,17 +154,25 @@ int socket_nbd_disconnect( int fd ) } #define CHECK_RANGE(error_type) { \ - off64_t size = socket_nbd_read_hello(params->client); \ - if (params->from < 0 || (params->from + params->len) > size) {\ - fatal(error_type \ - " request %d+%d is out of range given size %d", \ - params->from, params->len, size\ - ); }\ + off64_t size;\ + int success = socket_nbd_read_hello(params->client, &size); \ + if ( success ) {\ + if (params->from < 0 || (params->from + params->len) > size) {\ + fatal(error_type \ + " request %d+%d is out of range given size %d", \ + params->from, params->len, size\ + );\ + }\ + }\ + else {\ + fatal( error_type " connection failed." );\ + }\ } void do_read(struct mode_readwrite_params* params) { params->client = socket_connect(¶ms->connect_to.generic, ¶ms->connect_from.generic); + FATAL_IF_NEGATIVE( params->client, "Couldn't connect." ); CHECK_RANGE("read"); socket_nbd_read(params->client, params->from, params->len, params->data_fd, NULL); @@ -157,6 +182,7 @@ void do_read(struct mode_readwrite_params* params) void do_write(struct mode_readwrite_params* params) { params->client = socket_connect(¶ms->connect_to.generic, ¶ms->connect_from.generic); + FATAL_IF_NEGATIVE( params->client, "Couldn't connect." ); CHECK_RANGE("write"); socket_nbd_write(params->client, params->from, params->len, params->data_fd, NULL); diff --git a/src/readwrite.h b/src/readwrite.h index fb1736a..cb07e68 100644 --- a/src/readwrite.h +++ b/src/readwrite.h @@ -6,7 +6,7 @@ #include int socket_connect(struct sockaddr* to, struct sockaddr* from); -off64_t socket_nbd_read_hello(int fd); +int socket_nbd_read_hello(int fd, off64_t * size); void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf); void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf); void socket_nbd_entrust(int fd); diff --git a/src/remote.c b/src/remote.c index 7ffbe48..dc5801b 100644 --- a/src/remote.c +++ b/src/remote.c @@ -6,11 +6,27 @@ static const int max_response=1024; +void print_response( const char * response ) +{ + char * response_text; + FILE * out; + int exit_status; + + NULLCHECK( response ); + + exit_status = atoi(response); + response_text = strchr( response, ':' ) + 2; + + NULLCHECK( response_text ); + + out = exit_status > 0 ? stderr : stdout; + fprintf(out, "%s\n", response_text ); +} + void do_remote_command(char* command, char* socket_name, int argc, char** argv) { char newline=10; int i; - int exit_status; int remote = socket(AF_UNIX, SOCK_STREAM, 0); struct sockaddr_un address; char response[max_response]; @@ -42,11 +58,8 @@ void do_remote_command(char* command, char* socket_name, int argc, char** argv) "Couldn't read response from %s", socket_name ); - exit_status = atoi(response); - if (exit_status > 0) { - fprintf(stderr, "%s\n", strchr(response, ':')+2); - } - + print_response( response ); + exit(atoi(response)); close(remote); diff --git a/src/self_pipe.c b/src/self_pipe.c index 1cdb8ed..3fcee37 100644 --- a/src/self_pipe.c +++ b/src/self_pipe.c @@ -31,11 +31,11 @@ void self_pipe_server_error( int err, char *msg ) { - char errbuf[1024]; + char errbuf[1024] = {0}; strerror_r( err, errbuf, 1024 ); - fatal( "%s\t%s", msg, errbuf ); + fatal( "%s\t%d (%s)", msg, err, errbuf ); } /** @@ -87,6 +87,10 @@ struct self_pipe * self_pipe_create(void) */ int self_pipe_signal( struct self_pipe * sig ) { + NULLCHECK( sig ); + FATAL_IF( 1 == sig->write_fd, "Shouldn't be writing to stdout" ); + FATAL_IF( 2 == sig->write_fd, "Shouldn't be writing to stderr" ); + int written = write( sig->write_fd, "X", 1 ); if ( written != 1 ) { self_pipe_server_error( errno, ERR_MSG_WRITE ); diff --git a/src/serve.c b/src/serve.c index 522abd0..0798ab5 100644 --- a/src/serve.c +++ b/src/serve.c @@ -240,8 +240,8 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre strerror(join_errno) ); } else { - debug("nbd thread %p exited (%s) with status %ld", - (int) entry->thread, + debug("nbd thread %016x exited (%s) with status %ld", + entry->thread, s_client_address, (uint64_t)status); client_destroy( entry->client ); @@ -339,7 +339,6 @@ int server_acl_accepts( struct server *params, union mysockaddr * client_address int server_should_accept_client( struct server * params, - int client_fd, union mysockaddr * client_address, char *s_client_address, size_t s_client_address_len ) @@ -351,17 +350,15 @@ int server_should_accept_client( if (inet_ntop(client_address->generic.sa_family, sockaddr_address_data(&client_address->generic), s_client_address, s_client_address_len ) == NULL) { - debug( "Rejecting client %s: Bad client_address", s_client_address ); - write(client_fd, "Bad client_address", 18); + warn( "Rejecting client %s: Bad client_address", s_client_address ); return 0; } if ( !server_acl_accepts( params, client_address ) ) { - debug( "Rejecting client %s: Access control error", s_client_address ); + warn( "Rejecting client %s: Access control error", s_client_address ); debug( "We %s have an acl, and default_deny is %s", (params->acl ? "do" : "do not"), (params->acl->default_deny ? "true" : "false") ); - write(client_fd, "Access control error", 20); return 0; } @@ -428,7 +425,7 @@ void accept_nbd_client( char s_client_address[64] = {0}; - if ( !server_should_accept_client( params, client_fd, client_address, s_client_address, 64 ) ) { + if ( !server_should_accept_client( params, client_address, s_client_address, 64 ) ) { close( client_fd ); return; } @@ -436,7 +433,6 @@ void accept_nbd_client( slot = cleanup_and_find_client_slot(params); if (slot < 0) { warn("too many clients to accept connection"); - write(client_fd, "Too many clients", 16); close(client_fd); return; } @@ -452,7 +448,6 @@ void accept_nbd_client( if (spawn_client_thread( client_params, params->vacuum_signal, thread ) != 0) { debug( "Thread creation problem." ); - write(client_fd, "Thread creation problem", 23); client_destroy( client_params ); close(client_fd); return; @@ -474,7 +469,7 @@ void server_audit_clients( struct server * serve) * won't have been audited against the later acl. This isn't a * problem though, because in order to update the acl * server_replace_acl must have been called, so the - * server_accept loop will see a second acl_updated signal as + * server_accept ioop will see a second acl_updated signal as * soon as it hits select, and a second audit will be run. */ for( i = 0; i < serve->max_nbd_clients; i++ ) { @@ -544,7 +539,7 @@ void server_replace_acl( struct server *serve, struct acl * new_acl ) int server_accept( struct server * params ) { NULLCHECK( params ); - info("accept loop starting"); + debug("accept loop starting"); int client_fd; union mysockaddr client_address; fd_set fds; @@ -595,6 +590,7 @@ int server_accept( struct server * params ) void serve_accept_loop(struct server* params) { + NULLCHECK( params ); while( server_accept( params ) ); } @@ -681,8 +677,17 @@ void serve_cleanup(struct server* params, pthread_join(thread_id, &status); } } + debug( "Cleanup done"); } + +int server_is_in_control( struct server *serve ) +{ + NULLCHECK( serve ); + return serve->has_control; +} + + /** Full lifecycle of the server */ int do_serve(struct server* params) { @@ -697,6 +702,7 @@ int do_serve(struct server* params) serve_accept_loop(params); has_control = params->has_control; serve_cleanup(params, 0); + debug("Server %s control.", has_control ? "has" : "does not have" ); return has_control; } diff --git a/src/serve.h b/src/serve.h index 35227ed..76f5f6b 100644 --- a/src/serve.h +++ b/src/serve.h @@ -15,10 +15,24 @@ enum mirror_finish_action { ACTION_NOTHING }; +enum mirror_state { + MS_UNKNOWN, + MS_INIT, + MS_GO, + MS_FINALISE, + MS_DONE, + MS_FAIL_CONNECT, + MS_FAIL_REJECTED, + MS_FAIL_NO_HELLO, + MS_FAIL_SIZE_MISMATCH +}; + struct mirror_status { pthread_t thread; /* set to 1, then join thread to make mirror terminate early */ int signal_abandon; + union mysockaddr * connect_to; + union mysockaddr * connect_from; int client; char *filename; off64_t max_bytes_per_second; @@ -26,8 +40,26 @@ struct mirror_status { char *mapped; struct bitset_mapping *dirty_map; + + /* Pass a commit state pointer, then it will be updated + * immediately before commit_signal is sent. + */ + enum mirror_state * commit_state; + + /* commit_signal is sent immediately after attempting to connect + * and checking the remote size, whether successful or not. + */ + struct self_pipe * commit_signal; }; + +struct mirror_super { + struct mirror_status * mirror; + struct self_pipe * commit_signal; + pthread_t thread; +}; + + struct control_params { int socket; struct server* serve; @@ -75,6 +107,7 @@ struct server { struct self_pipe * vacuum_signal; struct mirror_status* mirror; + struct mirror_super * mirror_super; int server_fd; int control_fd; @@ -110,6 +143,7 @@ void serve_signal_close( struct server *serve ); void serve_wait_for_close( struct server * serve ); void server_replace_acl( struct server *serve, struct acl * acl); void server_control_arrived( struct server *serve ); +int server_is_in_control( struct server *serve ); int do_serve( struct server * ); diff --git a/src/status.c b/src/status.c new file mode 100644 index 0000000..4365d99 --- /dev/null +++ b/src/status.c @@ -0,0 +1,34 @@ +#include "status.h" +#include "serve.h" +#include "util.h" + +struct status * status_create( struct server * serve ) +{ + NULLCHECK( serve ); + struct status * status; + + status = xmalloc( sizeof( struct status ) ); + status->has_control = serve->has_control; + status->is_mirroring = NULL != serve->mirror; + return status; + +} + +#define BOOL_S(var) (var ? "true" : "false" ) +#define PRINT_FIELD( var ) \ + do{dprintf( fd, #var "=%s ", BOOL_S( status->var ) );}while(0) + +int status_write( struct status * status, int fd ) +{ + PRINT_FIELD( is_mirroring ); + PRINT_FIELD( has_control ); + dprintf(fd, "\n"); + return 1; +} + + +void status_destroy( struct status * status ) +{ + NULLCHECK( status ); + free( status ); +} diff --git a/src/status.h b/src/status.h new file mode 100644 index 0000000..03d925a --- /dev/null +++ b/src/status.h @@ -0,0 +1,55 @@ +#ifndef STATUS_H +#define STATUS_H + + +/* Status reports + * + * The status will be reported by writing to a file descriptor. The + * status report will be on a single line. The status format will be: + * + * A=B C=D + * + * That is, a space-separated list of label,value pairs, each pair + * separated by an '=' character. Neither ' ' nor '=' will appear in + * either labels or values. + * + * Boolean values will appear as the strings "true" and "false". + * + * The following status fields are defined: + * + * has_control: + * This will be false when the server is listening for an incoming + * migration. It will switch to true when the end-of-migration + * handshake is successfully completed. + * If the server is started in "serve" mode, this will never be + * false. + * + * is_migrating: + * This will be false when the server is started in either "listen" + * or "serve" mode. It will become true when a server in "serve" + * mode starts a migration, and will become false again when the + * migration terminates, successfully or not. + * If the server is currently in "listen" mode, this will never b + * true. + */ + + +#include "serve.h" + +struct status { + int has_control; + int is_mirroring; +}; + +/** Create a status object for the given server. */ +struct status * status_create( struct server * ); + +/** Output the given status object to the given file descriptot */ +int status_write( struct status *, int fd ); + +/** Free the status object */ +void status_destroy( struct status * ); + + + +#endif diff --git a/src/util.h b/src/util.h index 0a42ccf..5426d0c 100644 --- a/src/util.h +++ b/src/util.h @@ -79,27 +79,31 @@ void error_handler(int fatal); /* mylog a line at the given level (0 being most verbose) */ void mylog(int line_level, const char* format, ...); +#define levstr(i) (i==0?'D':(i==1?'I':(i==2?'W':(i==3?'E':'F')))) + +#define myloglev(level, msg, ...) mylog( level, "%c:%d %p %s:%d: "msg, levstr(level), getpid(),pthread_self(), __FILE__, __LINE__, ##__VA_ARGS__ ) + #ifdef DEBUG -# define debug(msg, ...) mylog(0, "%s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__) +# define debug(msg, ...) myloglev(0, msg, ##__VA_ARGS__) #else # define debug(msg, ...) /* no-op */ #endif /* informational message, not expected to be compiled out */ -#define info(msg, ...) mylog(1, "%s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__) +#define info(msg, ...) myloglev(1, msg, ##__VA_ARGS__) /* messages that might indicate a problem */ -#define warn(msg, ...) mylog(2, "%s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__) +#define warn(msg, ...) myloglev(2, msg, ##__VA_ARGS__) /* mylog a message and invoke the error handler to recover */ #define error(msg, ...) do { \ - mylog(3, "*** %s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__); \ + myloglev(3, msg, ##__VA_ARGS__); \ error_handler(0); \ } while(0) /* mylog a message and invoke the error handler to kill the current thread */ #define fatal(msg, ...) do { \ - mylog(4, "*** %s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__); \ + myloglev(4, msg, ##__VA_ARGS__); \ error_handler(1); \ } while(0) diff --git a/tests/check_bitset.c b/tests/check_bitset.c index cf3418b..efb7258 100644 --- a/tests/check_bitset.c +++ b/tests/check_bitset.c @@ -140,17 +140,53 @@ START_TEST(test_bitset) } END_TEST + +START_TEST( test_bitset_set ) +{ + struct bitset_mapping* map; + uint64_t *num; + + map = bitset_alloc(64, 1); + num = (uint64_t*) map->bits; + + ck_assert_int_eq( 0x0000000000000000, *num ); + bitset_set( map ); + ck_assert_int_eq( 0xffffffffffffffff, *num ); +} +END_TEST + + +START_TEST( test_bitset_clear ) +{ + struct bitset_mapping* map; + uint64_t *num; + + map = bitset_alloc(64, 1); + num = (uint64_t*) map->bits; + + ck_assert_int_eq( 0x0000000000000000, *num ); + bitset_set( map ); + bitset_clear( map ); + ck_assert_int_eq( 0x0000000000000000, *num ); +} +END_TEST + + Suite* bitset_suite(void) { Suite *s = suite_create("bitset"); - TCase *tc_core = tcase_create("bitset"); - tcase_add_test(tc_core, test_bit_set); - tcase_add_test(tc_core, test_bit_clear); - tcase_add_test(tc_core, test_bit_tests); - tcase_add_test(tc_core, test_bit_ranges); - tcase_add_test(tc_core, test_bit_runs); - tcase_add_test(tc_core, test_bitset); - suite_add_tcase(s, tc_core); + TCase *tc_bit = tcase_create("bit"); + TCase *tc_bitset = tcase_create("bitset"); + tcase_add_test(tc_bit, test_bit_set); + tcase_add_test(tc_bit, test_bit_clear); + tcase_add_test(tc_bit, test_bit_tests); + tcase_add_test(tc_bit, test_bit_ranges); + tcase_add_test(tc_bit, test_bit_runs); + tcase_add_test(tc_bitset, test_bitset); + tcase_add_test(tc_bitset, test_bitset_set); + tcase_add_test(tc_bitset, test_bitset_clear); + suite_add_tcase(s, tc_bit); + suite_add_tcase(s, tc_bitset); return s; } diff --git a/tests/check_status.c b/tests/check_status.c new file mode 100644 index 0000000..04dea93 --- /dev/null +++ b/tests/check_status.c @@ -0,0 +1,139 @@ +#include "status.h" +#include "serve.h" +#include "ioutil.h" +#include "util.h" + +#include + +START_TEST( test_status_create ) +{ + struct server server; + struct status *status = NULL; + + status = status_create( &server ); + + fail_if( NULL == status, "Status wasn't allocated" ); + status_destroy( status ); +} +END_TEST + +START_TEST( test_gets_has_control ) +{ + struct server server; + struct status * status; + + server.has_control = 1; + status = status_create( &server ); + + fail_unless( status->has_control == 1, "has_control wasn't copied" ); + status_destroy( status ); +} +END_TEST + + +START_TEST( test_gets_is_mirroring ) +{ + struct server server; + struct status * status; + + server.mirror = NULL; + status = status_create( &server ); + fail_if( status->is_mirroring, "is_mirroring was set" ); + status_destroy( status ); + + server.mirror = (struct mirror_status *)xmalloc( sizeof( struct mirror_status ) ); + status = status_create( &server ); + fail_unless( status->is_mirroring, "is_mirroring wasn't set" ); + status_destroy( status ); +} +END_TEST + + +START_TEST( test_renders_has_control ) +{ + struct status status; + int fds[2]; + pipe(fds); + char buf[1024] = {0}; + + status.has_control = 1; + status_write( &status, fds[1] ); + + fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0, + "Couldn't read the result" ); + + char *found = strstr( buf, "has_control=true" ); + fail_if( NULL == found, "has_control=true not found" ); + + status.has_control = 0; + status_write( &status, fds[1] ); + + fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0, + "Couldn't read the result" ); + found = strstr( buf, "has_control=false" ); + fail_if( NULL == found, "has_control=false not found" ); + +} +END_TEST + + +START_TEST( test_renders_is_mirroring ) +{ + struct status status; + int fds[2]; + pipe(fds); + char buf[1024] = {0}; + + status.is_mirroring = 1; + status_write( &status, fds[1] ); + + fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0, + "Couldn't read the result" ); + + char *found = strstr( buf, "is_mirroring=true" ); + fail_if( NULL == found, "is_mirroring=true not found" ); + + status.is_mirroring = 0; + status_write( &status, fds[1] ); + + fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0, + "Couldn't read the result" ); + found = strstr( buf, "is_mirroring=false" ); + fail_if( NULL == found, "is_mirroring=false not found" ); + +} +END_TEST + + +Suite *status_suite(void) +{ + Suite *s = suite_create("status"); + TCase *tc_create = tcase_create("create"); + TCase *tc_render = tcase_create("render"); + + tcase_add_test(tc_create, test_status_create); + tcase_add_test(tc_create, test_gets_has_control); + tcase_add_test(tc_create, test_gets_is_mirroring); + + tcase_add_test(tc_render, test_renders_has_control); + tcase_add_test(tc_render, test_renders_is_mirroring); + + suite_add_tcase(s, tc_create); + suite_add_tcase(s, tc_render); + + return s; +} + +int main(void) +{ + int number_failed; + + Suite *s = status_suite(); + SRunner *sr = srunner_create(s); + srunner_run_all(sr, CK_NORMAL); + number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? 0 : 1; +} + + diff --git a/tests/fakes/dest/close_after_hello.rb b/tests/fakes/dest/close_after_hello.rb new file mode 100755 index 0000000..3d11c75 --- /dev/null +++ b/tests/fakes/dest/close_after_hello.rb @@ -0,0 +1,47 @@ +#!/usr/bin/env ruby + +# Wait for a sender connection, send a correct hello, then disconnect. +# Simulate a server which crashes after sending the hello. We then +# reopen the server socket to check that the sender retries: since the +# command-line has gone away, and can't feed an error back to the +# user, we have to keep trying. + +addr, port = *ARGV + +require 'socket' +require 'timeout' + +sock = TCPServer.new( addr, port ) +client_sock = nil + +begin + Timeout.timeout(2) do + client_sock = sock.accept + end +rescue Timeout::Error + $stderr.puts "Timed out waiting for a connection" + exit 1 +end + + +client_sock.write( "NBDMAGIC" ) +client_sock.write( "\x00\x00\x42\x02\x81\x86\x12\x53" ) +client_sock.write( "\x00\x00\x00\x00\x00\x00\x10\x00" ) +client_sock.write( "\x00" * 128 ) + +client_sock.close + +new_sock = nil +begin + Timeout.timeout(60) do + new_sock = sock.accept + end +rescue Timeout::Error + $stderr.puts "Timed out waiting for a reconnection" + exit 1 +end + +new_sock.close +sock.close + +exit 0 diff --git a/tests/fakes/dest/hang_after_connect.rb b/tests/fakes/dest/hang_after_connect.rb new file mode 100755 index 0000000..a769c97 --- /dev/null +++ b/tests/fakes/dest/hang_after_connect.rb @@ -0,0 +1,34 @@ +#!/usr/bin/env ruby + +# Will open a server, accept a single connection, then sleep for 5 +# seconds. After that time, the client should have disconnected, +# which we can can't effectively check. +# +# This allows the test runner to check that the command-line sees the +# right error message after the timeout time. + +require 'socket' +require 'timeout' +require 'flexnbd/constants' + +addr, port = *ARGV + +serve_sock = TCPServer.new( addr, port ) +client_sock = nil + +begin + # A failure here means a more serious issue with flexnbd + Timeout.timeout( 2 ) do + client_sock = serve_sock.accept + end +rescue Timeout::Error + $stderr.puts "Client didn't make connection" + exit 1 +end + +# Sleep for one second past the timeout (a bit of slop in case ruby +# doesn't launch things quickly) +sleep(FlexNBD::MS_HELLO_TIME_SECS + 1) + +client_sock.close if client_sock +serve_sock.close diff --git a/tests/fakes/dest/hello_wrong_magic.rb b/tests/fakes/dest/hello_wrong_magic.rb new file mode 100755 index 0000000..c2f86bd --- /dev/null +++ b/tests/fakes/dest/hello_wrong_magic.rb @@ -0,0 +1,46 @@ +#!/usr/bin/env ruby + +# Simulate a destination which sends the wrong magic. +# We expect the sender to disconnect and reconnect. + +addr, port = *ARGV + +require 'socket' +require 'timeout' +require 'flexnbd/constants' + +sock = TCPServer.new( addr, port ) +client_sock = nil + +begin + Timeout.timeout(2) do + client_sock = sock.accept + end +rescue Timeout::Error + $stderr.puts "Timed out waiting for a connection" + exit 1 +end + + +t = Thread.new do + begin + Timeout.timeout(FlexNBD::MS_RETRY_DELAY_SECS + 1) do + client_sock2 = sock.accept + end + rescue Timeout::Error + $stderr.puts "Timed out waiting for a reconnection" + exit 1 + end +end + +client_sock.write( "NBDMAGIC" ) +# We're off in the last byte, should be \x53 +client_sock.write( "\x00\x00\x42\x02\x81\x86\x12\x52" ) +# 4096 is the right size; this is defined in nbd_scenarios +client_sock.write( "\x00\x00\x00\x00\x00\x00\x10\x00" ) +client_sock.write( "\x00" * 128 ) + + +t.join + +exit 0 diff --git a/tests/fakes/dest/hello_wrong_size.rb b/tests/fakes/dest/hello_wrong_size.rb new file mode 100755 index 0000000..b33174b --- /dev/null +++ b/tests/fakes/dest/hello_wrong_size.rb @@ -0,0 +1,44 @@ +#!/usr/bin/env ruby + +# Simulate a server which has a disc of the wrong size attached: send +# a valid NBD hello with a random size, then check that we have see an +# EOF on read. + +addr, port = *ARGV + +require 'socket' +require 'timeout' +require 'flexnbd/constants' + +sock = TCPServer.new( addr, port ) +client_sock = nil + +begin + Timeout.timeout(2) do + client_sock = sock.accept + end +rescue Timeout::Error + $stderr.puts "Timed out waiting for a connection" + exit 1 +end + +t = Thread.new do + begin + Timeout.timeout(FlexNBD::MS_RETRY_DELAY_SECS + 1) do + client_sock2 = sock.accept + end + rescue Timeout::Error + $stderr.puts "Timed out waiting for a reconnection" + exit 1 + end +end + +client_sock.write( "NBDMAGIC" ) +client_sock.write( "\x00\x00\x42\x02\x81\x86\x12\x53" ) +8.times do client_sock.write rand(256).chr end +client_sock.write( "\x00" * 128 ) + + +t.join + +exit 0 diff --git a/tests/fakes/dest/reject_acl.rb b/tests/fakes/dest/reject_acl.rb new file mode 100755 index 0000000..a781266 --- /dev/null +++ b/tests/fakes/dest/reject_acl.rb @@ -0,0 +1,22 @@ +#!/usr/bin/env ruby + +# Accept a connection, then immediately close it. This simulates an ACL rejection. + +addr, port = *ARGV +require 'socket' +require 'timeout' + +serve_sock = TCPServer.open( addr, port ) + +begin + Timeout.timeout( 2 ) do + serve_sock.accept.close + end +rescue Timeout::Error + $stderr.puts "Timed out waiting for a connection" + exit 1 +end + +serve_sock.close + +exit(0) diff --git a/tests/fakes/source/close_after_connect.rb b/tests/fakes/source/close_after_connect.rb new file mode 100755 index 0000000..1145992 --- /dev/null +++ b/tests/fakes/source/close_after_connect.rb @@ -0,0 +1,28 @@ +#!/usr/bin/env ruby + +# Connects to the destination server, then immediately disconnects, +# simulating a source crash. +# +# It then connects again, to check that the destination is still +# listening. + +addr, port = *ARGV +require 'socket' +require 'timeout' + +begin + Timeout.timeout( 2 ) do + sock = TCPSocket.open( addr, port.to_i ) + sock.close + end +rescue Timeout::Error + $stderr.puts "Failed to connect" + exit 1 +end + +Timeout.timeout( 2 ) do + sock = TCPSocket.open( addr, port.to_i ) + sock.close +end + +exit 0 diff --git a/tests/fakes/source/close_after_hello.rb b/tests/fakes/source/close_after_hello.rb new file mode 100755 index 0000000..dc0acd1 --- /dev/null +++ b/tests/fakes/source/close_after_hello.rb @@ -0,0 +1,49 @@ +#!/usr/bin/env ruby + +# Connect, read the hello, then immediately disconnect. This +# simulates a sender which dislikes something in the hello message - a +# wrong size, for instance. + +# After the disconnect, we reconnect to be sure that the destination +# is still alive. + + +require 'socket' +require "timeout" +require 'flexnbd/constants' + +addr, port = *ARGV + +client_sock = nil +begin + Timeout.timeout(2) do + client_sock = TCPSocket.open( addr, port ) + end +rescue Timeout::Error + $stderr.puts "Timed out connecting." + exit 1 +end + +begin + Timeout.timeout( FlexNBD::MS_HELLO_TIME_SECS ) do + fail "No hello." unless (hello = client_sock.read( 152 )) && + hello.length==152 + end +rescue Timeout::Error + $stderr.puts "Timed out waiting for hello." + exit 1 +end + +client_sock.close + +sleep(0.2) +begin + Timeout.timeout(2) do + client_sock = TCPSocket.open( addr, port ) + end +rescue Timeout::Error + $stderr.puts "Timed out reconnecting." + exit 1 +end + +exit(0) diff --git a/tests/fakes/source/hang_after_hello.rb b/tests/fakes/source/hang_after_hello.rb new file mode 100755 index 0000000..9070d9c --- /dev/null +++ b/tests/fakes/source/hang_after_hello.rb @@ -0,0 +1,69 @@ +#!/usr/bin/env ruby + +# Simulate the hello message going astray, or the source hanging after +# receiving it. +# +# We then connect again, to confirm that the destination is still +# listening for an incoming migration. + +addr, port = *ARGV +require 'socket' +require 'timeout' + +require "flexnbd/constants" + + +client_sock=nil +begin + Timeout.timeout(2) do + client_sock = TCPSocket.open( addr, port ) + end +rescue Timeout::Error + $stderr.puts "Timed out connecting" + exit 1 +end + +begin + Timeout.timeout(FlexNBD::MS_HELLO_TIME_SECS) do + client_sock.read(152) + end +rescue Timeout::Error + $stderr.puts "Timed out reading hello" + exit 1 +end + +# Now we do two things: + +# - In the parent process, we sleep for CLIENT_MAX_WAIT_SECS+5, which +# will make the destination give up and close the connection. +# - In the child process, we sleep for CLIENT_MAX_WAIT_SECS+1, which +# should be able to reconnect despite the parent process not having +# closed its end yet. + +kidpid = fork do + client_sock.close + new_sock = nil + sleep( FlexNBD::CLIENT_MAX_WAIT_SECS + 1 ) + begin + Timeout.timeout( 2 ) do + new_sock = TCPSocket.open( addr, port ) + end + Timeout.timeout( FlexNBD::MS_HELLO_TIME_SECS ) do + fail "No hello." unless (hello = new_sock.read( 152 )) && + hello.length==152 + end + new_sock.close + rescue Timeout::Error + $stderr.puts "Timed out reconnecting" + exit 1 + end + exit 0 +end + +# Sleep for longer than the child, to give the flexnbd process a bit +# of slop +sleep( FlexNBD::CLIENT_MAX_WAIT_SECS + 3 ) +client_sock.close + +_,status = Process.waitpid2( kidpid ) +exit status.exitstatus diff --git a/tests/flexnbd.rb b/tests/flexnbd.rb index ce13607..fc3ef97 100644 --- a/tests/flexnbd.rb +++ b/tests/flexnbd.rb @@ -1,5 +1,7 @@ require 'socket' require 'thread' +require 'open3' +require 'timeout' require 'rexml/document' require 'rexml/streamlistener' @@ -138,7 +140,7 @@ class ValgrindExecutor def launch_watch_thread(pid, io_r) Thread.start do io_source = REXML::IOSource.new( io_r ) - listener = ErrorListener.new( self ) + listener = DebugErrorListener.new( self ) REXML::Document.parse_stream( io_source, listener ) end end @@ -190,6 +192,17 @@ class FlexNBD end + def listen_cmd( file, acl ) + "#{@bin} listen "\ + "--addr #{ip} "\ + "--port #{port} "\ + "--file #{file} "\ + "--sock #{ctrl} "\ + "#{@debug} "\ + "#{acl.join(' ')}" + end + + def read_cmd( offset, length ) "#{bin} read "\ "--addr #{ip} "\ @@ -218,9 +231,16 @@ class FlexNBD "#{@debug} " end - def serve(file, *acl) + + def status_cmd + "#{@bin} status "\ + "--sock #{ctrl} "\ + "#{@debug}" + end + + + def run_serve_cmd(cmd) File.unlink(ctrl) if File.exists?(ctrl) - cmd =serve_cmd( file, acl ) debug( cmd ) @pid = @executor.run( cmd ) @@ -233,33 +253,49 @@ class FlexNBD end at_exit { kill } end + private :run_serve_cmd + + + def serve( file, *acl) + run_serve_cmd( serve_cmd( file, acl ) ) + end + + def listen(file, *acl) + run_serve_cmd( listen_cmd( file, acl ) ) + end + def start_wait_thread( pid ) - Thread.start do - Process.waitpid2( pid ) + @wait_thread = Thread.start do + _, status = Process.waitpid2( pid ) + if @kill - fail "flexnbd quit with a bad status #{$?.exitstatus}" unless - $?.exitstatus == @kill + fail "flexnbd quit with a bad status: #{status.to_i}" unless + @kill.include? status.to_i else - $stderr.puts "flexnbd quit" - fail "flexnbd quit early" + $stderr.puts "flexnbd #{self.pid} quit" + fail "flexnbd #{self.pid} quit early with status #{status.to_i}" end end end - def can_die(status=0) + def can_die(*status) + status << 0 if status.empty? @kill = status end def kill - can_die() - begin - Process.kill("INT", @pid) - rescue Errno::ESRCH => e - # already dead. Presumably this means it went away after a - # can_die() call. + can_die(2) + if @pid + begin + Process.kill("INT", @pid) + rescue Errno::ESRCH => e + # already dead. Presumably this means it went away after a + # can_die() call. + end end + @wait_thread.join if @wait_thread end def read(offset, length) @@ -284,21 +320,55 @@ class FlexNBD nil end - def mirror(dest_ip, dest_port, bandwidth=nil, action=nil) + + def mirror_unchecked( dest_ip, dest_port, bandwidth=nil, action=nil, timeout=nil ) cmd = mirror_cmd( dest_ip, dest_port) debug( cmd ) - system cmd - raise IOError.new( "Migrate command failed") unless $?.success? - nil + + maybe_timeout( cmd, timeout ) + end + + def maybe_timeout(cmd, timeout=nil ) + stdout, stderr = "","" + run = Proc.new do + Open3.popen3( cmd ) do |io_in, io_out, io_err| + io_in.close + stdout.replace io_out.read + stderr.replace io_err.read + end + end + + if timeout + Timeout.timeout(timeout, &run) + else + run.call + end + + [stdout, stderr] + end + + + def mirror(dest_ip, dest_port, bandwidth=nil, action=nil) + stdout, stderr = mirror_unchecked( dest_ip, dest_port, bandwidth, action ) + raise IOError.new( "Migrate command failed\n" + stderr) unless $?.success? + + stdout end def acl(*acl) control_command("acl", *acl) end - def status + + def status( timeout = nil ) + cmd = status_cmd() + debug( cmd ) + + maybe_timeout( cmd, timeout ) + end + protected def control_command(*args) raise "Server not running" unless @pid diff --git a/tests/flexnbd/constants.rb b/tests/flexnbd/constants.rb new file mode 100644 index 0000000..1989732 --- /dev/null +++ b/tests/flexnbd/constants.rb @@ -0,0 +1,36 @@ +# encoding: utf-8 + +module FlexNBD + + # eeevil is his one and only name... + def self.read_constants + parents = [] + current = File.expand_path(".") + while current != "/" + parents << current + current = File.expand_path( File.join( current, ".." ) ) + end + + source_root = parents.find do |dirname| + File.directory?( File.join( dirname, "src" ) ) + end + + fail "No source root!" unless source_root + + headers = Dir[File.join( source_root, "src", "*.h" ) ] + + headers.each do |header_filename| + txt_lines = File.readlines( header_filename ) + txt_lines.each do |line| + if line =~ /^#\s*define\s+([A-Z0-9_]+)\s+(\d+)\s*$/ + const_set($1, $2.to_i) + end + end + end + + end + + read_constants() +end # module FlexNBD + + diff --git a/tests/nbd_scenarios b/tests/nbd_scenarios index 8e44bf5..972f617 100644 --- a/tests/nbd_scenarios +++ b/tests/nbd_scenarios @@ -4,88 +4,25 @@ require 'test/unit' require 'flexnbd' require 'test_file_writer' -class NBDScenarios < Test::Unit::TestCase - def setup +class Environment + attr_reader( :blocksize, :filename1, :filename2, :ip, + :port1, :port2, :nbd1, :nbd2, :file1, :file2 ) + + def initialize @blocksize = 1024 - @filename1 = ".flexnbd.test.#{$$}.#{Time.now.to_i}.1" - @filename2 = ".flexnbd.test.#{$$}.#{Time.now.to_i}.2" + @filename1 = "/tmp/.flexnbd.test.#{$$}.#{Time.now.to_i}.1" + @filename2 = "/tmp/.flexnbd.test.#{$$}.#{Time.now.to_i}.2" @ip = "127.0.0.1" @available_ports = [*40000..41000] - listening_ports @port1 = @available_ports.shift @port2 = @available_ports.shift @nbd1 = FlexNBD.new("../build/flexnbd", @ip, @port1) @nbd2 = FlexNBD.new("../build/flexnbd", @ip, @port2) - end - def teardown - [@filename1, @filename2].each do |f| - File.unlink(f) if File.exists?(f) - end - end - - def test_read1 - writefile1("f"*64) - serve1 - - [0, 12, 63].each do |num| - - assert_equal( - @nbd1.read(num*@blocksize, @blocksize), - @file1.read(num*@blocksize, @blocksize) - ) - end - - [124, 1200, 10028, 25488].each do |num| - assert_equal(@nbd1.read(num, 4), @file1.read(num, 4)) - end - end - - # Check that we're not - # - def test_writeread1 - writefile1("0"*64) - serve1 - - [0, 12, 63].each do |num| - data = "X"*@blocksize - @nbd1.write(num*@blocksize, data) - assert_equal(data, @file1.read(num*@blocksize, data.size)) - assert_equal(data, @nbd1.read(num*@blocksize, data.size)) - end - end - - # Check that we're not overstepping or understepping where our writes end - # up. - # - def test_writeread2 - writefile1("0"*1024) - serve1 - - d0 = "\0"*@blocksize - d1 = "X"*@blocksize - (0..63).each do |num| - @nbd1.write(num*@blocksize*2, d1) - end - (0..63).each do |num| - assert_equal(d0, @nbd1.read(((2*num)+1)*@blocksize, d0.size)) - end + @fake_pid = nil end - def test_mirror - writefile1( "f"*4 ) - serve1 - - writefile2( "0"*4 ) - serve2 - - @nbd1.can_die - mirror12 - assert_equal(@file1.read_original( 0, @blocksize ), - @file2.read( 0, @blocksize ) ) - end - - protected def serve1(*acl) @nbd1.serve(@filename1, *acl) end @@ -94,10 +31,60 @@ class NBDScenarios < Test::Unit::TestCase @nbd2.serve(@filename2, *acl) end + + def listen1( *acl ) + @nbd1.listen( @filename1, *acl ) + end + + def listen2( *acl ) + @nbd2.listen( @filename2, *acl ) + end + + + def parse_status( status ) + hsh = {} + + status.split(" ").each do |part| + next if part.strip.empty? + a,b = part.split("=") + b.strip! + b = true if b == "true" + b = false if b == "false" + + hsh[a.strip] = b + end + + hsh + end + + + def status( nbd ) + stdout, stderr = nbd.status + [parse_status(stdout), stderr] + end + + def status1 + status( @nbd1 ) + end + + def status2 + puts "Getting status" + result = status( @nbd2 ) + puts "Got status" + return result + end + + + def mirror12 @nbd1.mirror( @nbd2.ip, @nbd2.port ) end + def mirror12_unchecked + @nbd1.mirror_unchecked( @nbd2.ip, @nbd2.port, nil, nil, 10 ) + end + + def writefile1(data) @file1 = TestFileWriter.new(@filename1, @blocksize).write(data) end @@ -114,5 +101,232 @@ class NBDScenarios < Test::Unit::TestCase map { |x| x.split(/\s+/) }[2..-1]. map { |l| l[3].split(":")[-1].to_i } end + + + def cleanup + if @fake_pid + begin + Process.waitpid2( @fake_pid ) + rescue Errno::ESRCH + end + end + + + @nbd1.kill + @nbd2.kill + + [@filename1, @filename2].each do |f| + File.unlink(f) if File.exists?(f) + end + end + + + def run_fake( name, addr, port ) + fakedir = File.join( File.dirname( __FILE__ ), "fakes" ) + fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn| + File.executable?( fn ) + } + + raise "no fake executable" unless fake + raise "no addr" unless addr + raise "no port" unless port + @fake_pid = fork do + exec fake + " " + addr.to_s + " " + port.to_s + end + sleep(0.5) + end + + + def fake_reports_success + _,status = Process.waitpid2( @fake_pid ) + @fake_pid = nil + status.success? + end + + +end # class Environment + + +class NBDScenarios < Test::Unit::TestCase + def setup + @env = Environment.new + end + + def teardown + @env.cleanup + end + + + def test_read1 + @env.writefile1("f"*64) + @env.serve1 + + [0, 12, 63].each do |num| + + assert_equal( + @env.nbd1.read(num*@env.blocksize, @env.blocksize), + @env.file1.read(num*@env.blocksize, @env.blocksize) + ) + end + + [124, 1200, 10028, 25488].each do |num| + assert_equal(@env.nbd1.read(num, 4), @env.file1.read(num, 4)) + end + end + + # Check that we're not + # + def test_writeread1 + @env.writefile1("0"*64) + @env.serve1 + + [0, 12, 63].each do |num| + data = "X"*@env.blocksize + @env.nbd1.write(num*@env.blocksize, data) + assert_equal(data, @env.file1.read(num*@env.blocksize, data.size)) + assert_equal(data, @env.nbd1.read(num*@env.blocksize, data.size)) + end + end + + # Check that we're not overstepping or understepping where our writes end + # up. + # + def test_writeread2 + @env.writefile1("0"*1024) + @env.serve1 + + d0 = "\0"*@env.blocksize + d1 = "X"*@env.blocksize + (0..63).each do |num| + @env.nbd1.write(num*@env.blocksize*2, d1) + end + (0..63).each do |num| + assert_equal(d0, @env.nbd1.read(((2*num)+1)*@env.blocksize, d0.size)) + end + end + + + def test_mirror + @env.writefile1( "f"*4 ) + @env.serve1 + + @env.writefile2( "0"*4 ) + @env.listen2 + + @env.nbd1.can_die + stdout, stderr = @env.mirror12 + + assert_equal(@env.file1.read_original( 0, @env.blocksize ), + @env.file2.read( 0, @env.blocksize ) ) + assert @env.status2['has_control'], "destination didn't take control" + end + end + +class NBDConnectSourceFailureScenarios < Test::Unit::TestCase + def setup + @env = Environment.new + @env.writefile1( "f" * 4 ) + @env.serve1 + + end + + def teardown + @env.cleanup + end + + + def test_failure_to_connect_reported_in_mirror_cmd_response + stdout, stderr = @env.mirror12_unchecked + assert_match( /failed to connect/, stderr ) + end + + + def test_destination_hangs_after_connect_reports_error_at_source + @env.run_fake( "dest/hang_after_connect", @env.ip, @env.port2 ) + + stdout, stderr = @env.mirror12_unchecked + assert_match( /Remote server failed to respond/, stderr ) + assert @env.fake_reports_success + end + + + def test_destination_rejects_connection_reports_error_at_source + @env.run_fake( "dest/reject_acl", @env.ip, @env.port2 ) + + stdout, stderr = @env.mirror12_unchecked + assert_match /Mirror was rejected/, stderr + assert @env.fake_reports_success + end + + def test_wrong_size_causes_disconnect + @env.run_fake( "dest/hello_wrong_size", @env.ip, @env.port2 ) + stdout, stderr = @env.mirror12_unchecked + assert_match /Remote size does not match local size/, stderr + assert @env.fake_reports_success + end + + + def test_wrong_magic_causes_disconnect + @env.run_fake( "dest/hello_wrong_magic", @env.ip, @env.port2 ) + stdout, stderr = @env.mirror12_unchecked + assert_match /Mirror was rejected/, stderr + assert @env.fake_reports_success, "dest/hello_wrong_magic fake failed" + end + + + def test_disconnect_after_hello_causes_retry + @env.run_fake( "dest/close_after_hello", @env.ip, @env.port2 ) + stdout, stderr = @env.mirror12_unchecked + assert_match( /Mirror started/, stdout ) + + assert @env.fake_reports_success + end +end + + +class NBDConnectDestFailureScenarios < Test::Unit::TestCase + + def setup + @env = Environment.new + @env.writefile1( "0" * 4 ) + @env.listen1 + end + + def teardown + @env.cleanup + end + + + def test_hello_blocked_by_disconnect_causes_error_not_fatal + run_fake( "source/close_after_connect" ) + assert_no_control + end + + + def test_hello_goes_astray_causes_timeout_error + run_fake( "source/hang_after_hello" ) + assert_no_control + end + + + def test_disconnect_after_hello_causes_error_not_fatal + run_fake( "source/close_after_hello" ) + assert_no_control + end + + + private + def run_fake( name ) + @env.run_fake( name, @env.ip, @env.port1 ) + assert @env.fake_reports_success + end + + def assert_no_control + status, stderr = @env.status1 + assert !status['has_control'], "Thought it had control" + end + + +end # class NBDConnectDestFailureScenarios