connect failure scenarios

This commit is contained in:
Alex Young
2012-06-22 10:05:41 +01:00
parent 80f298f6cd
commit 2078d17053
28 changed files with 1674 additions and 225 deletions

View File

@@ -120,6 +120,17 @@ static inline void bitset_set_range(
bit_set_range(set->bits, first, bitlen);
}
/** Set every bit in the bitset. */
static inline void bitset_set(
struct bitset_mapping* set
)
{
bitset_set_range(set, 0, set->size);
}
/** Clear the bits in a bitset which correspond to the given bytes in the
* larger file.
*/
@@ -132,6 +143,16 @@ static inline void bitset_clear_range(
bit_clear_range(set->bits, first, bitlen);
}
/** Clear every bit in the bitset. */
static inline void bitset_clear(
struct bitset_mapping *set
)
{
bitset_clear_range(set, 0, set->size);
}
/** Counts the number of contiguous bytes that are represented as a run in
* the bit field.
*/

View File

@@ -170,12 +170,25 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
struct nbd_request_raw request_raw;
fd_set fds;
struct timeval tv = {CLIENT_MAX_WAIT_SECS, 0};
struct timeval * ptv;
int fd_count;
/* We want a timeout if this is an inbound migration, but not
* otherwise
*/
ptv = server_is_in_control( client->serve ) ? NULL : &tv;
FD_ZERO(&fds);
FD_SET(client->socket, &fds);
self_pipe_fd_set( client->stop_signal, &fds );
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds, NULL, NULL, NULL),
"select() failed");
fd_count = select(FD_SETSIZE, &fds, NULL, NULL, ptv);
if ( fd_count == 0 ) {
/* This "can't ever happen" */
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
else { error("Timed out waiting for I/O"); }
}
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){
debug("Client received stop signal.");
@@ -187,8 +200,14 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
debug("EOF reading request");
return 0; /* neat point to close the socket */
}
else {
fatal("Error reading request");
else {
/* FIXME: I've seen this happen, but I couldn't
* reproduce it so I'm leaving it here with a
* better debug output in the hope it'll
* spontaneously happen again. It should
* *probably* be an error() call, but I want to
* be sure. */
fatal("Error reading request: %s", strerror( errno ));
}
}

View File

@@ -1,6 +1,14 @@
#ifndef CLIENT_H
#define CLIENT_H
/** CLIENT_MAX_WAIT_SECS
* This is the length of time an inbound migration will wait for a fresh
* write before assuming the source has Gone Away. Note: it is *not*
* the time from one write to the next, it is the gap between the end of
* one write and the start of the next.
*/
#define CLIENT_MAX_WAIT_SECS 5
struct client {
/* When we call pthread_join, if the thread is already dead

View File

@@ -25,6 +25,7 @@
* client code to be found in remote.c
*/
#include "control.h"
#include "serve.h"
#include "util.h"
#include "ioutil.h"
@@ -33,43 +34,116 @@
#include "bitset.h"
#include "self_pipe.h"
#include "acl.h"
#include "status.h"
#include <stdlib.h>
#include <string.h>
#include <sys/un.h>
#include <unistd.h>
struct mirror_status * mirror_status_create(
struct server * serve,
int fd,
struct mirror_status * mirror_status_alloc(
union mysockaddr * connect_to,
union mysockaddr * connect_from,
int max_Bps,
int action_at_finish)
int action_at_finish,
struct self_pipe * commit_signal,
enum mirror_state * out_commit_state)
{
/* FIXME: shouldn't map_fd get closed? */
int map_fd;
off64_t size;
struct mirror_status * mirror;
NULLCHECK( serve );
mirror = xmalloc(sizeof(struct mirror_status));
mirror->client = fd;
mirror->connect_to = connect_to;
mirror->connect_from = connect_from;
mirror->max_bytes_per_second = max_Bps;
mirror->action_at_finish = action_at_finish;
mirror->commit_signal = commit_signal;
mirror->commit_state = out_commit_state;
return mirror;
}
void mirror_set_state_f( struct mirror_status * mirror, enum mirror_state state )
{
NULLCHECK( mirror );
if ( mirror->commit_state ){
*mirror->commit_state = state;
}
}
#define mirror_set_state( mirror, state ) do{\
debug( "Mirror state => " #state );\
mirror_set_state_f( mirror, state );\
} while(0)
enum mirror_state mirror_get_state( struct mirror_status * mirror )
{
NULLCHECK( mirror );
if ( mirror->commit_state ){
return *mirror->commit_state;
} else {
return MS_UNKNOWN;
}
}
void mirror_status_init( struct mirror_status * mirror, char * filename )
{
int map_fd;
off64_t size;
NULLCHECK( mirror );
NULLCHECK( filename );
FATAL_IF_NEGATIVE(
open_and_mmap(
serve->filename,
filename,
&map_fd,
&size,
(void**) &mirror->mapped
),
"Failed to open and mmap %s",
serve->filename
filename
);
mirror->dirty_map = bitset_alloc(size, 4096);
bitset_set_range(mirror->dirty_map, 0, size);
}
/* Call this before a mirror attempt. */
void mirror_status_reset( struct mirror_status * mirror )
{
NULLCHECK( mirror );
NULLCHECK( mirror->dirty_map );
mirror_set_state( mirror, MS_INIT );
bitset_set(mirror->dirty_map);
}
struct mirror_status * mirror_status_create(
struct server * serve,
union mysockaddr * connect_to,
union mysockaddr * connect_from,
int max_Bps,
int action_at_finish,
struct self_pipe * commit_signal,
enum mirror_state * out_commit_state)
{
/* FIXME: shouldn't map_fd get closed? */
struct mirror_status * mirror;
NULLCHECK( serve );
mirror = mirror_status_alloc( connect_to,
connect_from,
max_Bps,
action_at_finish,
commit_signal,
out_commit_state );
mirror_status_init( mirror, serve->filename );
mirror_status_reset( mirror );
return mirror;
}
@@ -78,7 +152,8 @@ struct mirror_status * mirror_status_create(
void mirror_status_destroy( struct mirror_status *mirror )
{
NULLCHECK( mirror );
close(mirror->client);
free(mirror->connect_to);
free(mirror->connect_from);
free(mirror->dirty_map);
free(mirror);
}
@@ -155,7 +230,7 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written)
}
void mirror_transfer_control( struct mirror_status * mirror )
void mirror_give_control( struct mirror_status * mirror )
{
/* TODO: set up an error handler to clean up properly on ERROR.
*/
@@ -196,7 +271,8 @@ void mirror_on_exit( struct server * serve )
* and already-connected clients don't get needlessly
* disconnected.
*/
mirror_transfer_control( serve->mirror );
debug( "mirror_give_control");
mirror_give_control( serve->mirror );
/* If we're still here, the transfer of control went ok, and the
* remote is listening (or will be shortly). We can shut the
@@ -205,6 +281,7 @@ void mirror_on_exit( struct server * serve )
* It doesn't matter if we get new client connections before
* now, the IO lock will stop them from doing anything.
*/
debug("serve_signal_close");
serve_signal_close( serve );
/* We have to wait until the server is closed before unlocking
@@ -214,34 +291,96 @@ void mirror_on_exit( struct server * serve )
* guarantee the server thread will win the race and we risk the
* clients seeing a "successful" write to a dead disc image.
*/
debug("serve_wait_for_close");
serve_wait_for_close( serve );
info("Mirror sent.");
}
/** Thread launched to drive mirror process */
void* mirror_runner(void* serve_params_uncast)
void mirror_cleanup( struct mirror_status * mirror,
int fatal __attribute__((unused)))
{
int pass;
struct server *serve = (struct server*) serve_params_uncast;
uint64_t written;
NULLCHECK( mirror );
info( "Cleaning up mirror thread");
if( mirror->client && mirror->client > 0 ){
close( mirror->client );
}
mirror->client = -1;
}
int mirror_status_connect( struct mirror_status * mirror, off64_t local_size )
{
struct sockaddr * connect_from = NULL;
if ( mirror->connect_from ) {
connect_from = &mirror->connect_from->generic;
}
NULLCHECK( mirror->connect_to );
mirror->client = socket_connect(&mirror->connect_to->generic, connect_from);
if ( 0 < mirror->client ) {
fd_set fds;
struct timeval tv = { MS_HELLO_TIME_SECS, 0};
FD_ZERO( &fds );
FD_SET( mirror->client, &fds );
FATAL_UNLESS( 0 <= select( FD_SETSIZE, &fds, NULL, NULL, &tv ),
"Select failed." );
if( FD_ISSET( mirror->client, &fds ) ){
off64_t remote_size;
if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) {
if( remote_size == local_size ){
mirror_set_state( mirror, MS_GO );
}
else {
warn("Remote size (%d) doesn't match local (%d)",
remote_size, local_size );
mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH );
}
}
else {
warn( "Mirror attempt rejected." );
mirror_set_state( mirror, MS_FAIL_REJECTED );
}
}
else {
warn( "No NBD Hello received." );
mirror_set_state( mirror, MS_FAIL_NO_HELLO );
}
}
else {
warn( "Mirror failed to connect.");
mirror_set_state( mirror, MS_FAIL_CONNECT );
}
return mirror_get_state(mirror) == MS_GO;
}
void server_run_mirror( struct server *serve )
{
NULLCHECK( serve );
NULLCHECK( serve->mirror );
NULLCHECK( serve->mirror->dirty_map );
debug("Starting mirror" );
int pass;
uint64_t written;
info("Starting mirror" );
for (pass=0; pass < mirror_maximum_passes-1; pass++) {
debug("mirror start pass=%d", pass);
if ( !mirror_pass( serve, 1, &written ) ){
goto abandon_mirror;
}
if ( !mirror_pass( serve, 1, &written ) ){ return; }
/* if we've not written anything */
if (written < mirror_last_pass_after_bytes_written) { break; }
}
mirror_set_state( serve->mirror, MS_FINALISE );
server_lock_io( serve );
{
if ( mirror_pass( serve, 0, &written ) &&
@@ -253,30 +392,256 @@ void* mirror_runner(void* serve_params_uncast)
}
}
server_unlock_io( serve );
}
abandon_mirror:
mirror_status_destroy( serve->mirror );
serve->mirror = NULL; /* and we're gone */
void mirror_signal_commit( struct mirror_status * mirror )
{
NULLCHECK( mirror );
self_pipe_signal( mirror->commit_signal );
}
/** Thread launched to drive mirror process */
void* mirror_runner(void* serve_params_uncast)
{
/* The supervisor thread relies on there not being any ERROR
* calls until after the mirror_signal_commit() call in this
* function.
* However, *after* that, we should call ERROR_* instead of
* FATAL_* wherever possible.
*/
struct server *serve = (struct server*) serve_params_uncast;
NULLCHECK( serve );
NULLCHECK( serve->mirror );
struct mirror_status * mirror = serve->mirror;
NULLCHECK( mirror->dirty_map );
error_set_handler( (cleanup_handler *) mirror_cleanup, mirror );
info( "Connecting to mirror" );
time_t start_time = time(NULL);
int connected = mirror_status_connect( mirror, serve->size );
mirror_signal_commit( mirror );
if ( !connected ) { goto abandon_mirror; }
/* After this point, if we see a failure we need to disconnect
* and retry everything from mirror_set_state(_, MS_INIT), but
* *without* signaling the commit or abandoning the mirror.
* */
if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){
/* If we get here, then we managed to connect but the
* control thread feeding status back to the user will
* have gone away, leaving the user without meaningful
* feedback. In this instance, they have to assume a
* failure, so we can't afford to let the mirror happen.
* We have to set the state to avoid a race.
*/
mirror_set_state( mirror, MS_FAIL_CONNECT );
warn( "Mirror connected, but too slowly" );
goto abandon_mirror;
}
server_run_mirror( serve );
mirror_set_state( mirror, MS_DONE );
abandon_mirror:
return NULL;
}
struct mirror_super * mirror_super_create(
struct server * serve,
union mysockaddr * connect_to,
union mysockaddr * connect_from,
int max_Bps,
int action_at_finish,
enum mirror_state * out_commit_state)
{
struct mirror_super * super = xmalloc( sizeof( struct mirror_super) );
super->mirror = mirror_status_create( serve,
connect_to,
connect_from,
max_Bps,
action_at_finish,
self_pipe_create(),
out_commit_state );
super->commit_signal = self_pipe_create();
return super;
}
void mirror_super_signal_committed( struct mirror_super * super )
{
NULLCHECK( super );
self_pipe_signal( super->commit_signal );
}
void mirror_super_destroy( struct mirror_super * super )
{
NULLCHECK( super );
mirror_status_destroy( super->mirror );
self_pipe_destroy( super->commit_signal );
}
/* The mirror supervisor thread. Responsible for kicking off retries if
* the mirror thread fails.
* The mirror_status and mirror_super objects are never freed, and the
* mirror_super_runner thread is never joined.
*/
void * mirror_super_runner( void * serve_uncast )
{
struct server * serve = (struct server *) serve_uncast;
NULLCHECK( serve );
NULLCHECK( serve->mirror );
NULLCHECK( serve->mirror_super );
int should_retry = 0;
int success = 0;
fd_set fds;
int fd_count;
struct mirror_status * mirror = serve->mirror;
struct mirror_super * super = serve->mirror_super;
do {
if ( should_retry ) {
/* We don't want to hammer the destination too
* hard, so if this is a retry, insert a delay. */
sleep( MS_RETRY_DELAY_SECS );
/* We also have to reset the bitmap to be sure
* we transfer everything */
mirror_status_reset( mirror );
}
FATAL_IF( 0 != pthread_create(
&mirror->thread,
NULL,
mirror_runner,
serve),
"Failed to create mirror thread");
debug("Supervisor waiting for commit signal");
FD_ZERO( &fds );
self_pipe_fd_set( mirror->commit_signal, &fds );
/* There's no timeout on this select. This means that
* the mirror thread *must* signal then abort itself if
* it passes the timeout, and it *must* always signal,
* no matter what.
*/
fd_count = select( FD_SETSIZE, &fds, NULL, NULL, NULL );
if ( 1 == fd_count ) {
debug( "Supervisor got commit signal" );
if ( 0 == should_retry ) {
should_retry = 1;
/* Only send this signal the first time */
mirror_super_signal_committed(super);
debug("Mirror supervisor committed");
}
}
else { fatal( "Select failed." ); }
debug("Supervisor waiting for mirror thread" );
pthread_join( mirror->thread, NULL );
debug( "Clearing the commit signal. If this blocks,"
" it's fatal but we can't check in advance." );
self_pipe_signal_clear( mirror->commit_signal );
debug( "Commit signal cleared." );
success = MS_DONE == mirror_get_state( mirror );
if( success ){ info( "Mirror supervisor success, exiting" ); }
else if (should_retry){
warn( "Mirror failed, retrying" );
}
else { warn( "Mirror failed before commit, giving up" ); }
}
while ( should_retry && !success );
serve->mirror = NULL;
serve->mirror_super = NULL;
mirror_super_destroy( super );
debug( "Mirror supervisor done." );
return NULL;
}
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
/* We have to pass the mirror_state pointer and the commit_signal
* separately from the mirror itself because the mirror might have been
* freed by the time we get to check it */
void mirror_watch_startup( struct control_params * client,
struct self_pipe * commit_signal,
enum mirror_state *mirror_state )
{
NULLCHECK( client );
struct server * serve = client->serve;
NULLCHECK( serve );
struct mirror_status * mirror = serve->mirror;
NULLCHECK( mirror );
fd_set fds;
/* This gives a 61 second timeout for the mirror thread to
* either fail or succeed to connect.
*/
struct timeval tv = {MS_CONNECT_TIME_SECS+1,0};
FD_ZERO( &fds );
self_pipe_fd_set( commit_signal, &fds );
ERROR_IF_NEGATIVE( select( FD_SETSIZE, &fds, NULL, NULL, &tv ), "Select failed.");
if ( self_pipe_fd_isset( commit_signal, &fds ) ){
switch (*mirror_state) {
case MS_INIT:
case MS_UNKNOWN:
write_socket( "1: Mirror failed to initialise" );
fatal( "Impossible mirror state: %d", *mirror_state );
case MS_FAIL_CONNECT:
write_socket( "1: Mirror failed to connect");
break;
case MS_FAIL_REJECTED:
write_socket( "1: Mirror was rejected" );
break;
case MS_FAIL_NO_HELLO:
write_socket( "1: Remote server failed to respond");
break;
case MS_FAIL_SIZE_MISMATCH:
write_socket( "1: Remote size does not match local size" );
break;
case MS_GO:
case MS_FINALISE:
case MS_DONE: /* Yes, I know we know better, but it's simpler this way */
write_socket( "0: Mirror started" );
break;
default:
fatal( "Unhandled mirror state: %d", *mirror_state );
}
}
else {
/* Timeout. Badness. This "should never happen". */
write_socket( "1: Mirror timed out connecting to remote host" );
}
}
/** Command parser to start mirror process from socket input */
int control_mirror(struct control_params* client, int linesc, char** lines)
{
NULLCHECK( client );
off64_t remote_size;
struct server * serve = client->serve;
int fd;
struct mirror_status *mirror;
union mysockaddr connect_to;
union mysockaddr connect_from;
union mysockaddr *connect_to = xmalloc( sizeof( union mysockaddr ) );
union mysockaddr *connect_from = NULL;
int use_connect_from = 0;
uint64_t max_Bps;
uint64_t max_Bps = 0;
int action_at_finish;
int raw_port;
@@ -286,7 +651,7 @@ int control_mirror(struct control_params* client, int linesc, char** lines)
return -1;
}
if (parse_ip_to_sockaddr(&connect_to.generic, lines[0]) == 0) {
if (parse_ip_to_sockaddr(&connect_to->generic, lines[0]) == 0) {
write_socket("1: bad IP address");
return -1;
}
@@ -296,20 +661,18 @@ int control_mirror(struct control_params* client, int linesc, char** lines)
write_socket("1: bad IP port number");
return -1;
}
connect_to.v4.sin_port = htobe16(raw_port);
connect_to->v4.sin_port = htobe16(raw_port);
if (linesc > 2) {
if (parse_ip_to_sockaddr(&connect_from.generic, lines[2]) == 0) {
connect_from = xmalloc( sizeof( union mysockaddr ) );
if (parse_ip_to_sockaddr(&connect_from->generic, lines[2]) == 0) {
write_socket("1: bad bind address");
return -1;
}
use_connect_from = 1;
}
max_Bps = 0;
if (linesc > 3) {
max_Bps = atoi(lines[2]);
}
if (linesc > 3) { max_Bps = atoi(lines[2]); }
action_at_finish = ACTION_EXIT;
if (linesc > 4) {
@@ -330,35 +693,27 @@ int control_mirror(struct control_params* client, int linesc, char** lines)
return -1;
}
/** I don't like use_connect_from but socket_connect doesn't take *mysockaddr :( */
struct sockaddr *afrom = use_connect_from ? &connect_from.generic : NULL;
fd = socket_connect(&connect_to.generic, afrom);
remote_size = socket_nbd_read_hello(fd);
if( remote_size != (off64_t)serve->size ){
warn("Remote size (%d) doesn't match local (%d)", remote_size, serve->size );
write_socket( "1: remote size (%d) doesn't match local (%d)");
close(fd);
return -1;
}
mirror = mirror_status_create( serve,
fd,
enum mirror_state mirror_state;
serve->mirror_super = mirror_super_create( serve,
connect_to,
connect_from,
max_Bps ,
action_at_finish );
serve->mirror = mirror;
action_at_finish,
&mirror_state );
serve->mirror = serve->mirror_super->mirror;
FATAL_IF( /* FIXME should free mirror on error */
0 != pthread_create(
&mirror->thread,
&serve->mirror_super->thread,
NULL,
mirror_runner,
mirror_super_runner,
serve
),
"Failed to create mirror thread"
);
write_socket("0: mirror started");
mirror_watch_startup( client, serve->mirror_super->commit_signal, &mirror_state );
debug( "Control thread going away." );
return 0;
}
@@ -388,11 +743,19 @@ int control_acl(struct control_params* client, int linesc, char** lines)
/** FIXME: add some useful statistics */
int control_status(
struct control_params* client __attribute__ ((unused)),
struct control_params* client,
int linesc __attribute__ ((unused)),
char** lines __attribute__((unused))
)
{
NULLCHECK( client );
NULLCHECK( client->serve );
struct status * status = status_create( client->serve );
write( client->socket, "0: ", 3 );
status_write( status, client->socket );
status_destroy( status );
return 0;
}
@@ -423,16 +786,19 @@ void* control_serve(void* client_uncast)
/* ignore failure */
}
else if (strcmp(lines[0], "acl") == 0) {
info("acl command received" );
if (control_acl(client, linesc-1, lines+1) < 0) {
finished = 1;
}
}
else if (strcmp(lines[0], "mirror") == 0) {
info("mirror command received" );
if (control_mirror(client, linesc-1, lines+1) < 0) {
finished = 1;
}
}
else if (strcmp(lines[0], "status") == 0) {
info("status command received" );
if (control_status(client, linesc-1, lines+1) < 0) {
finished = 1;
}
@@ -449,6 +815,7 @@ void* control_serve(void* client_uncast)
}
control_cleanup(client, 0);
debug("control command handled" );
return NULL;
}

View File

@@ -1,5 +1,28 @@
#ifndef __CONTROL_H
#define __CONTROL_H
#ifndef CONTROL_H
#define CONTROL_H
/* MS_CONNECT_TIME_SECS
* The length of time after which the sender will assume a connect() to
* the destination has failed.
*/
#define MS_CONNECT_TIME_SECS 60
/* MS_HELLO_TIME_SECS
* The length of time the sender will wait for the NBD hello message
* after connect() before aborting the connection attempt.
*/
#define MS_HELLO_TIME_SECS 5
/* MS_RETRY_DELAY_SECS
* The delay after a failed migration attempt before launching another
* thread to try again.
*/
#define MS_RETRY_DELAY_SECS 1
#include "parse.h"
#include "serve.h"
void accept_control_connection(struct server* params, int client_fd, union mysockaddr* client_address);
void serve_open_control_socket(struct server* params);

View File

@@ -18,8 +18,9 @@ struct listen * listen_create(
{
struct listen * listen;
listen = (struct listen *)xmalloc( sizeof( listen ) );
listen->init_serve = server_create( s_ip_address,
listen = (struct listen *)xmalloc( sizeof( struct listen ) );
listen->init_serve = server_create(
s_ip_address,
s_port,
s_file,
s_ctrl_sock,
@@ -29,7 +30,7 @@ struct listen * listen_create(
1, 0);
listen->main_serve = server_create(
s_rebind_ip_address ? s_rebind_ip_address : s_ip_address,
s_rebind_port ? s_rebind_port : s_port,
s_rebind_port ? s_rebind_port : s_port,
s_file,
s_ctrl_sock,
default_deny,

View File

@@ -10,33 +10,50 @@
int socket_connect(struct sockaddr* to, struct sockaddr* from)
{
int fd = socket(to->sa_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE(fd, "Couldn't create client socket");
if (NULL != from) {
FATAL_IF_NEGATIVE(
bind(fd, from, sizeof(struct sockaddr_in6)),
"bind() failed"
);
if( fd < 0 ){
warn( "Couldn't create client socket");
return -1;
}
if (NULL != from) {
if ( 0 > bind(fd, from, sizeof(struct sockaddr_in6)) ){
warn( "bind() failed");
close( fd );
return -1;
}
}
if ( 0 > connect(fd, to, sizeof(struct sockaddr_in6)) ) {
warn( "connect failed" );
close( fd );
return -1;
}
FATAL_IF_NEGATIVE(
connect(fd, to, sizeof(struct sockaddr_in6)),"connect failed"
);
return fd;
}
off64_t socket_nbd_read_hello(int fd)
int socket_nbd_read_hello(int fd, off64_t * out_size)
{
struct nbd_init init;
FATAL_IF_NEGATIVE(readloop(fd, &init, sizeof(init)),
"Couldn't read init");
if ( 0 > readloop(fd, &init, sizeof(init)) ) {
warn( "Couldn't read init" );
goto fail;
}
if (strncmp(init.passwd, INIT_PASSWD, 8) != 0) {
fatal("wrong passwd");
warn("wrong passwd");
goto fail;
}
if (be64toh(init.magic) != INIT_MAGIC) {
fatal("wrong magic (%x)", be64toh(init.magic));
warn("wrong magic (%x)", be64toh(init.magic));
goto fail;
}
return be64toh(init.size);
if ( NULL != out_size ) {
*out_size = be64toh(init.size);
}
return 1;
fail:
return 0;
}
void fill_request(struct nbd_request *request, int type, int from, int len)
@@ -92,15 +109,15 @@ void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf)
struct nbd_reply reply;
fill_request(&request, REQUEST_WRITE, from, len);
FATAL_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)),
ERROR_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)),
"Couldn't write request");
if (in_buf) {
FATAL_IF_NEGATIVE(writeloop(fd, in_buf, len),
ERROR_IF_NEGATIVE(writeloop(fd, in_buf, len),
"Write failed");
}
else {
FATAL_IF_NEGATIVE(
ERROR_IF_NEGATIVE(
splice_via_pipe_loop(in_fd, fd, len),
"Splice failed"
);
@@ -137,17 +154,25 @@ int socket_nbd_disconnect( int fd )
}
#define CHECK_RANGE(error_type) { \
off64_t size = socket_nbd_read_hello(params->client); \
if (params->from < 0 || (params->from + params->len) > size) {\
fatal(error_type \
" request %d+%d is out of range given size %d", \
params->from, params->len, size\
); }\
off64_t size;\
int success = socket_nbd_read_hello(params->client, &size); \
if ( success ) {\
if (params->from < 0 || (params->from + params->len) > size) {\
fatal(error_type \
" request %d+%d is out of range given size %d", \
params->from, params->len, size\
);\
}\
}\
else {\
fatal( error_type " connection failed." );\
}\
}
void do_read(struct mode_readwrite_params* params)
{
params->client = socket_connect(&params->connect_to.generic, &params->connect_from.generic);
FATAL_IF_NEGATIVE( params->client, "Couldn't connect." );
CHECK_RANGE("read");
socket_nbd_read(params->client, params->from, params->len,
params->data_fd, NULL);
@@ -157,6 +182,7 @@ void do_read(struct mode_readwrite_params* params)
void do_write(struct mode_readwrite_params* params)
{
params->client = socket_connect(&params->connect_to.generic, &params->connect_from.generic);
FATAL_IF_NEGATIVE( params->client, "Couldn't connect." );
CHECK_RANGE("write");
socket_nbd_write(params->client, params->from, params->len,
params->data_fd, NULL);

View File

@@ -6,7 +6,7 @@
#include <sys/socket.h>
int socket_connect(struct sockaddr* to, struct sockaddr* from);
off64_t socket_nbd_read_hello(int fd);
int socket_nbd_read_hello(int fd, off64_t * size);
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf);
void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf);
void socket_nbd_entrust(int fd);

View File

@@ -6,11 +6,27 @@
static const int max_response=1024;
void print_response( const char * response )
{
char * response_text;
FILE * out;
int exit_status;
NULLCHECK( response );
exit_status = atoi(response);
response_text = strchr( response, ':' ) + 2;
NULLCHECK( response_text );
out = exit_status > 0 ? stderr : stdout;
fprintf(out, "%s\n", response_text );
}
void do_remote_command(char* command, char* socket_name, int argc, char** argv)
{
char newline=10;
int i;
int exit_status;
int remote = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un address;
char response[max_response];
@@ -42,11 +58,8 @@ void do_remote_command(char* command, char* socket_name, int argc, char** argv)
"Couldn't read response from %s", socket_name
);
exit_status = atoi(response);
if (exit_status > 0) {
fprintf(stderr, "%s\n", strchr(response, ':')+2);
}
print_response( response );
exit(atoi(response));
close(remote);

View File

@@ -31,11 +31,11 @@
void self_pipe_server_error( int err, char *msg )
{
char errbuf[1024];
char errbuf[1024] = {0};
strerror_r( err, errbuf, 1024 );
fatal( "%s\t%s", msg, errbuf );
fatal( "%s\t%d (%s)", msg, err, errbuf );
}
/**
@@ -87,6 +87,10 @@ struct self_pipe * self_pipe_create(void)
*/
int self_pipe_signal( struct self_pipe * sig )
{
NULLCHECK( sig );
FATAL_IF( 1 == sig->write_fd, "Shouldn't be writing to stdout" );
FATAL_IF( 2 == sig->write_fd, "Shouldn't be writing to stderr" );
int written = write( sig->write_fd, "X", 1 );
if ( written != 1 ) {
self_pipe_server_error( errno, ERR_MSG_WRITE );

View File

@@ -240,8 +240,8 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
strerror(join_errno) );
}
else {
debug("nbd thread %p exited (%s) with status %ld",
(int) entry->thread,
debug("nbd thread %016x exited (%s) with status %ld",
entry->thread,
s_client_address,
(uint64_t)status);
client_destroy( entry->client );
@@ -339,7 +339,6 @@ int server_acl_accepts( struct server *params, union mysockaddr * client_address
int server_should_accept_client(
struct server * params,
int client_fd,
union mysockaddr * client_address,
char *s_client_address,
size_t s_client_address_len )
@@ -351,17 +350,15 @@ int server_should_accept_client(
if (inet_ntop(client_address->generic.sa_family,
sockaddr_address_data(&client_address->generic),
s_client_address, s_client_address_len ) == NULL) {
debug( "Rejecting client %s: Bad client_address", s_client_address );
write(client_fd, "Bad client_address", 18);
warn( "Rejecting client %s: Bad client_address", s_client_address );
return 0;
}
if ( !server_acl_accepts( params, client_address ) ) {
debug( "Rejecting client %s: Access control error", s_client_address );
warn( "Rejecting client %s: Access control error", s_client_address );
debug( "We %s have an acl, and default_deny is %s",
(params->acl ? "do" : "do not"),
(params->acl->default_deny ? "true" : "false") );
write(client_fd, "Access control error", 20);
return 0;
}
@@ -428,7 +425,7 @@ void accept_nbd_client(
char s_client_address[64] = {0};
if ( !server_should_accept_client( params, client_fd, client_address, s_client_address, 64 ) ) {
if ( !server_should_accept_client( params, client_address, s_client_address, 64 ) ) {
close( client_fd );
return;
}
@@ -436,7 +433,6 @@ void accept_nbd_client(
slot = cleanup_and_find_client_slot(params);
if (slot < 0) {
warn("too many clients to accept connection");
write(client_fd, "Too many clients", 16);
close(client_fd);
return;
}
@@ -452,7 +448,6 @@ void accept_nbd_client(
if (spawn_client_thread( client_params, params->vacuum_signal, thread ) != 0) {
debug( "Thread creation problem." );
write(client_fd, "Thread creation problem", 23);
client_destroy( client_params );
close(client_fd);
return;
@@ -474,7 +469,7 @@ void server_audit_clients( struct server * serve)
* won't have been audited against the later acl. This isn't a
* problem though, because in order to update the acl
* server_replace_acl must have been called, so the
* server_accept loop will see a second acl_updated signal as
* server_accept ioop will see a second acl_updated signal as
* soon as it hits select, and a second audit will be run.
*/
for( i = 0; i < serve->max_nbd_clients; i++ ) {
@@ -544,7 +539,7 @@ void server_replace_acl( struct server *serve, struct acl * new_acl )
int server_accept( struct server * params )
{
NULLCHECK( params );
info("accept loop starting");
debug("accept loop starting");
int client_fd;
union mysockaddr client_address;
fd_set fds;
@@ -595,6 +590,7 @@ int server_accept( struct server * params )
void serve_accept_loop(struct server* params)
{
NULLCHECK( params );
while( server_accept( params ) );
}
@@ -681,8 +677,17 @@ void serve_cleanup(struct server* params,
pthread_join(thread_id, &status);
}
}
debug( "Cleanup done");
}
int server_is_in_control( struct server *serve )
{
NULLCHECK( serve );
return serve->has_control;
}
/** Full lifecycle of the server */
int do_serve(struct server* params)
{
@@ -697,6 +702,7 @@ int do_serve(struct server* params)
serve_accept_loop(params);
has_control = params->has_control;
serve_cleanup(params, 0);
debug("Server %s control.", has_control ? "has" : "does not have" );
return has_control;
}

View File

@@ -15,10 +15,24 @@ enum mirror_finish_action {
ACTION_NOTHING
};
enum mirror_state {
MS_UNKNOWN,
MS_INIT,
MS_GO,
MS_FINALISE,
MS_DONE,
MS_FAIL_CONNECT,
MS_FAIL_REJECTED,
MS_FAIL_NO_HELLO,
MS_FAIL_SIZE_MISMATCH
};
struct mirror_status {
pthread_t thread;
/* set to 1, then join thread to make mirror terminate early */
int signal_abandon;
union mysockaddr * connect_to;
union mysockaddr * connect_from;
int client;
char *filename;
off64_t max_bytes_per_second;
@@ -26,8 +40,26 @@ struct mirror_status {
char *mapped;
struct bitset_mapping *dirty_map;
/* Pass a commit state pointer, then it will be updated
* immediately before commit_signal is sent.
*/
enum mirror_state * commit_state;
/* commit_signal is sent immediately after attempting to connect
* and checking the remote size, whether successful or not.
*/
struct self_pipe * commit_signal;
};
struct mirror_super {
struct mirror_status * mirror;
struct self_pipe * commit_signal;
pthread_t thread;
};
struct control_params {
int socket;
struct server* serve;
@@ -75,6 +107,7 @@ struct server {
struct self_pipe * vacuum_signal;
struct mirror_status* mirror;
struct mirror_super * mirror_super;
int server_fd;
int control_fd;
@@ -110,6 +143,7 @@ void serve_signal_close( struct server *serve );
void serve_wait_for_close( struct server * serve );
void server_replace_acl( struct server *serve, struct acl * acl);
void server_control_arrived( struct server *serve );
int server_is_in_control( struct server *serve );
int do_serve( struct server * );

34
src/status.c Normal file
View File

@@ -0,0 +1,34 @@
#include "status.h"
#include "serve.h"
#include "util.h"
struct status * status_create( struct server * serve )
{
NULLCHECK( serve );
struct status * status;
status = xmalloc( sizeof( struct status ) );
status->has_control = serve->has_control;
status->is_mirroring = NULL != serve->mirror;
return status;
}
#define BOOL_S(var) (var ? "true" : "false" )
#define PRINT_FIELD( var ) \
do{dprintf( fd, #var "=%s ", BOOL_S( status->var ) );}while(0)
int status_write( struct status * status, int fd )
{
PRINT_FIELD( is_mirroring );
PRINT_FIELD( has_control );
dprintf(fd, "\n");
return 1;
}
void status_destroy( struct status * status )
{
NULLCHECK( status );
free( status );
}

55
src/status.h Normal file
View File

@@ -0,0 +1,55 @@
#ifndef STATUS_H
#define STATUS_H
/* Status reports
*
* The status will be reported by writing to a file descriptor. The
* status report will be on a single line. The status format will be:
*
* A=B C=D
*
* That is, a space-separated list of label,value pairs, each pair
* separated by an '=' character. Neither ' ' nor '=' will appear in
* either labels or values.
*
* Boolean values will appear as the strings "true" and "false".
*
* The following status fields are defined:
*
* has_control:
* This will be false when the server is listening for an incoming
* migration. It will switch to true when the end-of-migration
* handshake is successfully completed.
* If the server is started in "serve" mode, this will never be
* false.
*
* is_migrating:
* This will be false when the server is started in either "listen"
* or "serve" mode. It will become true when a server in "serve"
* mode starts a migration, and will become false again when the
* migration terminates, successfully or not.
* If the server is currently in "listen" mode, this will never b
* true.
*/
#include "serve.h"
struct status {
int has_control;
int is_mirroring;
};
/** Create a status object for the given server. */
struct status * status_create( struct server * );
/** Output the given status object to the given file descriptot */
int status_write( struct status *, int fd );
/** Free the status object */
void status_destroy( struct status * );
#endif

View File

@@ -79,27 +79,31 @@ void error_handler(int fatal);
/* mylog a line at the given level (0 being most verbose) */
void mylog(int line_level, const char* format, ...);
#define levstr(i) (i==0?'D':(i==1?'I':(i==2?'W':(i==3?'E':'F'))))
#define myloglev(level, msg, ...) mylog( level, "%c:%d %p %s:%d: "msg, levstr(level), getpid(),pthread_self(), __FILE__, __LINE__, ##__VA_ARGS__ )
#ifdef DEBUG
# define debug(msg, ...) mylog(0, "%s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__)
# define debug(msg, ...) myloglev(0, msg, ##__VA_ARGS__)
#else
# define debug(msg, ...) /* no-op */
#endif
/* informational message, not expected to be compiled out */
#define info(msg, ...) mylog(1, "%s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__)
#define info(msg, ...) myloglev(1, msg, ##__VA_ARGS__)
/* messages that might indicate a problem */
#define warn(msg, ...) mylog(2, "%s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__)
#define warn(msg, ...) myloglev(2, msg, ##__VA_ARGS__)
/* mylog a message and invoke the error handler to recover */
#define error(msg, ...) do { \
mylog(3, "*** %s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__); \
myloglev(3, msg, ##__VA_ARGS__); \
error_handler(0); \
} while(0)
/* mylog a message and invoke the error handler to kill the current thread */
#define fatal(msg, ...) do { \
mylog(4, "*** %s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__); \
myloglev(4, msg, ##__VA_ARGS__); \
error_handler(1); \
} while(0)

View File

@@ -140,17 +140,53 @@ START_TEST(test_bitset)
}
END_TEST
START_TEST( test_bitset_set )
{
struct bitset_mapping* map;
uint64_t *num;
map = bitset_alloc(64, 1);
num = (uint64_t*) map->bits;
ck_assert_int_eq( 0x0000000000000000, *num );
bitset_set( map );
ck_assert_int_eq( 0xffffffffffffffff, *num );
}
END_TEST
START_TEST( test_bitset_clear )
{
struct bitset_mapping* map;
uint64_t *num;
map = bitset_alloc(64, 1);
num = (uint64_t*) map->bits;
ck_assert_int_eq( 0x0000000000000000, *num );
bitset_set( map );
bitset_clear( map );
ck_assert_int_eq( 0x0000000000000000, *num );
}
END_TEST
Suite* bitset_suite(void)
{
Suite *s = suite_create("bitset");
TCase *tc_core = tcase_create("bitset");
tcase_add_test(tc_core, test_bit_set);
tcase_add_test(tc_core, test_bit_clear);
tcase_add_test(tc_core, test_bit_tests);
tcase_add_test(tc_core, test_bit_ranges);
tcase_add_test(tc_core, test_bit_runs);
tcase_add_test(tc_core, test_bitset);
suite_add_tcase(s, tc_core);
TCase *tc_bit = tcase_create("bit");
TCase *tc_bitset = tcase_create("bitset");
tcase_add_test(tc_bit, test_bit_set);
tcase_add_test(tc_bit, test_bit_clear);
tcase_add_test(tc_bit, test_bit_tests);
tcase_add_test(tc_bit, test_bit_ranges);
tcase_add_test(tc_bit, test_bit_runs);
tcase_add_test(tc_bitset, test_bitset);
tcase_add_test(tc_bitset, test_bitset_set);
tcase_add_test(tc_bitset, test_bitset_clear);
suite_add_tcase(s, tc_bit);
suite_add_tcase(s, tc_bitset);
return s;
}

139
tests/check_status.c Normal file
View File

@@ -0,0 +1,139 @@
#include "status.h"
#include "serve.h"
#include "ioutil.h"
#include "util.h"
#include <check.h>
START_TEST( test_status_create )
{
struct server server;
struct status *status = NULL;
status = status_create( &server );
fail_if( NULL == status, "Status wasn't allocated" );
status_destroy( status );
}
END_TEST
START_TEST( test_gets_has_control )
{
struct server server;
struct status * status;
server.has_control = 1;
status = status_create( &server );
fail_unless( status->has_control == 1, "has_control wasn't copied" );
status_destroy( status );
}
END_TEST
START_TEST( test_gets_is_mirroring )
{
struct server server;
struct status * status;
server.mirror = NULL;
status = status_create( &server );
fail_if( status->is_mirroring, "is_mirroring was set" );
status_destroy( status );
server.mirror = (struct mirror_status *)xmalloc( sizeof( struct mirror_status ) );
status = status_create( &server );
fail_unless( status->is_mirroring, "is_mirroring wasn't set" );
status_destroy( status );
}
END_TEST
START_TEST( test_renders_has_control )
{
struct status status;
int fds[2];
pipe(fds);
char buf[1024] = {0};
status.has_control = 1;
status_write( &status, fds[1] );
fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0,
"Couldn't read the result" );
char *found = strstr( buf, "has_control=true" );
fail_if( NULL == found, "has_control=true not found" );
status.has_control = 0;
status_write( &status, fds[1] );
fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0,
"Couldn't read the result" );
found = strstr( buf, "has_control=false" );
fail_if( NULL == found, "has_control=false not found" );
}
END_TEST
START_TEST( test_renders_is_mirroring )
{
struct status status;
int fds[2];
pipe(fds);
char buf[1024] = {0};
status.is_mirroring = 1;
status_write( &status, fds[1] );
fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0,
"Couldn't read the result" );
char *found = strstr( buf, "is_mirroring=true" );
fail_if( NULL == found, "is_mirroring=true not found" );
status.is_mirroring = 0;
status_write( &status, fds[1] );
fail_unless( read_until_newline( fds[0], buf, 1024 ) > 0,
"Couldn't read the result" );
found = strstr( buf, "is_mirroring=false" );
fail_if( NULL == found, "is_mirroring=false not found" );
}
END_TEST
Suite *status_suite(void)
{
Suite *s = suite_create("status");
TCase *tc_create = tcase_create("create");
TCase *tc_render = tcase_create("render");
tcase_add_test(tc_create, test_status_create);
tcase_add_test(tc_create, test_gets_has_control);
tcase_add_test(tc_create, test_gets_is_mirroring);
tcase_add_test(tc_render, test_renders_has_control);
tcase_add_test(tc_render, test_renders_is_mirroring);
suite_add_tcase(s, tc_create);
suite_add_tcase(s, tc_render);
return s;
}
int main(void)
{
int number_failed;
Suite *s = status_suite();
SRunner *sr = srunner_create(s);
srunner_run_all(sr, CK_NORMAL);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? 0 : 1;
}

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env ruby
# Wait for a sender connection, send a correct hello, then disconnect.
# Simulate a server which crashes after sending the hello. We then
# reopen the server socket to check that the sender retries: since the
# command-line has gone away, and can't feed an error back to the
# user, we have to keep trying.
addr, port = *ARGV
require 'socket'
require 'timeout'
sock = TCPServer.new( addr, port )
client_sock = nil
begin
Timeout.timeout(2) do
client_sock = sock.accept
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for a connection"
exit 1
end
client_sock.write( "NBDMAGIC" )
client_sock.write( "\x00\x00\x42\x02\x81\x86\x12\x53" )
client_sock.write( "\x00\x00\x00\x00\x00\x00\x10\x00" )
client_sock.write( "\x00" * 128 )
client_sock.close
new_sock = nil
begin
Timeout.timeout(60) do
new_sock = sock.accept
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for a reconnection"
exit 1
end
new_sock.close
sock.close
exit 0

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env ruby
# Will open a server, accept a single connection, then sleep for 5
# seconds. After that time, the client should have disconnected,
# which we can can't effectively check.
#
# This allows the test runner to check that the command-line sees the
# right error message after the timeout time.
require 'socket'
require 'timeout'
require 'flexnbd/constants'
addr, port = *ARGV
serve_sock = TCPServer.new( addr, port )
client_sock = nil
begin
# A failure here means a more serious issue with flexnbd
Timeout.timeout( 2 ) do
client_sock = serve_sock.accept
end
rescue Timeout::Error
$stderr.puts "Client didn't make connection"
exit 1
end
# Sleep for one second past the timeout (a bit of slop in case ruby
# doesn't launch things quickly)
sleep(FlexNBD::MS_HELLO_TIME_SECS + 1)
client_sock.close if client_sock
serve_sock.close

View File

@@ -0,0 +1,46 @@
#!/usr/bin/env ruby
# Simulate a destination which sends the wrong magic.
# We expect the sender to disconnect and reconnect.
addr, port = *ARGV
require 'socket'
require 'timeout'
require 'flexnbd/constants'
sock = TCPServer.new( addr, port )
client_sock = nil
begin
Timeout.timeout(2) do
client_sock = sock.accept
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for a connection"
exit 1
end
t = Thread.new do
begin
Timeout.timeout(FlexNBD::MS_RETRY_DELAY_SECS + 1) do
client_sock2 = sock.accept
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for a reconnection"
exit 1
end
end
client_sock.write( "NBDMAGIC" )
# We're off in the last byte, should be \x53
client_sock.write( "\x00\x00\x42\x02\x81\x86\x12\x52" )
# 4096 is the right size; this is defined in nbd_scenarios
client_sock.write( "\x00\x00\x00\x00\x00\x00\x10\x00" )
client_sock.write( "\x00" * 128 )
t.join
exit 0

View File

@@ -0,0 +1,44 @@
#!/usr/bin/env ruby
# Simulate a server which has a disc of the wrong size attached: send
# a valid NBD hello with a random size, then check that we have see an
# EOF on read.
addr, port = *ARGV
require 'socket'
require 'timeout'
require 'flexnbd/constants'
sock = TCPServer.new( addr, port )
client_sock = nil
begin
Timeout.timeout(2) do
client_sock = sock.accept
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for a connection"
exit 1
end
t = Thread.new do
begin
Timeout.timeout(FlexNBD::MS_RETRY_DELAY_SECS + 1) do
client_sock2 = sock.accept
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for a reconnection"
exit 1
end
end
client_sock.write( "NBDMAGIC" )
client_sock.write( "\x00\x00\x42\x02\x81\x86\x12\x53" )
8.times do client_sock.write rand(256).chr end
client_sock.write( "\x00" * 128 )
t.join
exit 0

22
tests/fakes/dest/reject_acl.rb Executable file
View File

@@ -0,0 +1,22 @@
#!/usr/bin/env ruby
# Accept a connection, then immediately close it. This simulates an ACL rejection.
addr, port = *ARGV
require 'socket'
require 'timeout'
serve_sock = TCPServer.open( addr, port )
begin
Timeout.timeout( 2 ) do
serve_sock.accept.close
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for a connection"
exit 1
end
serve_sock.close
exit(0)

View File

@@ -0,0 +1,28 @@
#!/usr/bin/env ruby
# Connects to the destination server, then immediately disconnects,
# simulating a source crash.
#
# It then connects again, to check that the destination is still
# listening.
addr, port = *ARGV
require 'socket'
require 'timeout'
begin
Timeout.timeout( 2 ) do
sock = TCPSocket.open( addr, port.to_i )
sock.close
end
rescue Timeout::Error
$stderr.puts "Failed to connect"
exit 1
end
Timeout.timeout( 2 ) do
sock = TCPSocket.open( addr, port.to_i )
sock.close
end
exit 0

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env ruby
# Connect, read the hello, then immediately disconnect. This
# simulates a sender which dislikes something in the hello message - a
# wrong size, for instance.
# After the disconnect, we reconnect to be sure that the destination
# is still alive.
require 'socket'
require "timeout"
require 'flexnbd/constants'
addr, port = *ARGV
client_sock = nil
begin
Timeout.timeout(2) do
client_sock = TCPSocket.open( addr, port )
end
rescue Timeout::Error
$stderr.puts "Timed out connecting."
exit 1
end
begin
Timeout.timeout( FlexNBD::MS_HELLO_TIME_SECS ) do
fail "No hello." unless (hello = client_sock.read( 152 )) &&
hello.length==152
end
rescue Timeout::Error
$stderr.puts "Timed out waiting for hello."
exit 1
end
client_sock.close
sleep(0.2)
begin
Timeout.timeout(2) do
client_sock = TCPSocket.open( addr, port )
end
rescue Timeout::Error
$stderr.puts "Timed out reconnecting."
exit 1
end
exit(0)

View File

@@ -0,0 +1,69 @@
#!/usr/bin/env ruby
# Simulate the hello message going astray, or the source hanging after
# receiving it.
#
# We then connect again, to confirm that the destination is still
# listening for an incoming migration.
addr, port = *ARGV
require 'socket'
require 'timeout'
require "flexnbd/constants"
client_sock=nil
begin
Timeout.timeout(2) do
client_sock = TCPSocket.open( addr, port )
end
rescue Timeout::Error
$stderr.puts "Timed out connecting"
exit 1
end
begin
Timeout.timeout(FlexNBD::MS_HELLO_TIME_SECS) do
client_sock.read(152)
end
rescue Timeout::Error
$stderr.puts "Timed out reading hello"
exit 1
end
# Now we do two things:
# - In the parent process, we sleep for CLIENT_MAX_WAIT_SECS+5, which
# will make the destination give up and close the connection.
# - In the child process, we sleep for CLIENT_MAX_WAIT_SECS+1, which
# should be able to reconnect despite the parent process not having
# closed its end yet.
kidpid = fork do
client_sock.close
new_sock = nil
sleep( FlexNBD::CLIENT_MAX_WAIT_SECS + 1 )
begin
Timeout.timeout( 2 ) do
new_sock = TCPSocket.open( addr, port )
end
Timeout.timeout( FlexNBD::MS_HELLO_TIME_SECS ) do
fail "No hello." unless (hello = new_sock.read( 152 )) &&
hello.length==152
end
new_sock.close
rescue Timeout::Error
$stderr.puts "Timed out reconnecting"
exit 1
end
exit 0
end
# Sleep for longer than the child, to give the flexnbd process a bit
# of slop
sleep( FlexNBD::CLIENT_MAX_WAIT_SECS + 3 )
client_sock.close
_,status = Process.waitpid2( kidpid )
exit status.exitstatus

View File

@@ -1,5 +1,7 @@
require 'socket'
require 'thread'
require 'open3'
require 'timeout'
require 'rexml/document'
require 'rexml/streamlistener'
@@ -138,7 +140,7 @@ class ValgrindExecutor
def launch_watch_thread(pid, io_r)
Thread.start do
io_source = REXML::IOSource.new( io_r )
listener = ErrorListener.new( self )
listener = DebugErrorListener.new( self )
REXML::Document.parse_stream( io_source, listener )
end
end
@@ -190,6 +192,17 @@ class FlexNBD
end
def listen_cmd( file, acl )
"#{@bin} listen "\
"--addr #{ip} "\
"--port #{port} "\
"--file #{file} "\
"--sock #{ctrl} "\
"#{@debug} "\
"#{acl.join(' ')}"
end
def read_cmd( offset, length )
"#{bin} read "\
"--addr #{ip} "\
@@ -218,9 +231,16 @@ class FlexNBD
"#{@debug} "
end
def serve(file, *acl)
def status_cmd
"#{@bin} status "\
"--sock #{ctrl} "\
"#{@debug}"
end
def run_serve_cmd(cmd)
File.unlink(ctrl) if File.exists?(ctrl)
cmd =serve_cmd( file, acl )
debug( cmd )
@pid = @executor.run( cmd )
@@ -233,33 +253,49 @@ class FlexNBD
end
at_exit { kill }
end
private :run_serve_cmd
def serve( file, *acl)
run_serve_cmd( serve_cmd( file, acl ) )
end
def listen(file, *acl)
run_serve_cmd( listen_cmd( file, acl ) )
end
def start_wait_thread( pid )
Thread.start do
Process.waitpid2( pid )
@wait_thread = Thread.start do
_, status = Process.waitpid2( pid )
if @kill
fail "flexnbd quit with a bad status #{$?.exitstatus}" unless
$?.exitstatus == @kill
fail "flexnbd quit with a bad status: #{status.to_i}" unless
@kill.include? status.to_i
else
$stderr.puts "flexnbd quit"
fail "flexnbd quit early"
$stderr.puts "flexnbd #{self.pid} quit"
fail "flexnbd #{self.pid} quit early with status #{status.to_i}"
end
end
end
def can_die(status=0)
def can_die(*status)
status << 0 if status.empty?
@kill = status
end
def kill
can_die()
begin
Process.kill("INT", @pid)
rescue Errno::ESRCH => e
# already dead. Presumably this means it went away after a
# can_die() call.
can_die(2)
if @pid
begin
Process.kill("INT", @pid)
rescue Errno::ESRCH => e
# already dead. Presumably this means it went away after a
# can_die() call.
end
end
@wait_thread.join if @wait_thread
end
def read(offset, length)
@@ -284,21 +320,55 @@ class FlexNBD
nil
end
def mirror(dest_ip, dest_port, bandwidth=nil, action=nil)
def mirror_unchecked( dest_ip, dest_port, bandwidth=nil, action=nil, timeout=nil )
cmd = mirror_cmd( dest_ip, dest_port)
debug( cmd )
system cmd
raise IOError.new( "Migrate command failed") unless $?.success?
nil
maybe_timeout( cmd, timeout )
end
def maybe_timeout(cmd, timeout=nil )
stdout, stderr = "",""
run = Proc.new do
Open3.popen3( cmd ) do |io_in, io_out, io_err|
io_in.close
stdout.replace io_out.read
stderr.replace io_err.read
end
end
if timeout
Timeout.timeout(timeout, &run)
else
run.call
end
[stdout, stderr]
end
def mirror(dest_ip, dest_port, bandwidth=nil, action=nil)
stdout, stderr = mirror_unchecked( dest_ip, dest_port, bandwidth, action )
raise IOError.new( "Migrate command failed\n" + stderr) unless $?.success?
stdout
end
def acl(*acl)
control_command("acl", *acl)
end
def status
def status( timeout = nil )
cmd = status_cmd()
debug( cmd )
maybe_timeout( cmd, timeout )
end
protected
def control_command(*args)
raise "Server not running" unless @pid

View File

@@ -0,0 +1,36 @@
# encoding: utf-8
module FlexNBD
# eeevil is his one and only name...
def self.read_constants
parents = []
current = File.expand_path(".")
while current != "/"
parents << current
current = File.expand_path( File.join( current, ".." ) )
end
source_root = parents.find do |dirname|
File.directory?( File.join( dirname, "src" ) )
end
fail "No source root!" unless source_root
headers = Dir[File.join( source_root, "src", "*.h" ) ]
headers.each do |header_filename|
txt_lines = File.readlines( header_filename )
txt_lines.each do |line|
if line =~ /^#\s*define\s+([A-Z0-9_]+)\s+(\d+)\s*$/
const_set($1, $2.to_i)
end
end
end
end
read_constants()
end # module FlexNBD

View File

@@ -4,88 +4,25 @@ require 'test/unit'
require 'flexnbd'
require 'test_file_writer'
class NBDScenarios < Test::Unit::TestCase
def setup
class Environment
attr_reader( :blocksize, :filename1, :filename2, :ip,
:port1, :port2, :nbd1, :nbd2, :file1, :file2 )
def initialize
@blocksize = 1024
@filename1 = ".flexnbd.test.#{$$}.#{Time.now.to_i}.1"
@filename2 = ".flexnbd.test.#{$$}.#{Time.now.to_i}.2"
@filename1 = "/tmp/.flexnbd.test.#{$$}.#{Time.now.to_i}.1"
@filename2 = "/tmp/.flexnbd.test.#{$$}.#{Time.now.to_i}.2"
@ip = "127.0.0.1"
@available_ports = [*40000..41000] - listening_ports
@port1 = @available_ports.shift
@port2 = @available_ports.shift
@nbd1 = FlexNBD.new("../build/flexnbd", @ip, @port1)
@nbd2 = FlexNBD.new("../build/flexnbd", @ip, @port2)
end
def teardown
[@filename1, @filename2].each do |f|
File.unlink(f) if File.exists?(f)
end
end
def test_read1
writefile1("f"*64)
serve1
[0, 12, 63].each do |num|
assert_equal(
@nbd1.read(num*@blocksize, @blocksize),
@file1.read(num*@blocksize, @blocksize)
)
end
[124, 1200, 10028, 25488].each do |num|
assert_equal(@nbd1.read(num, 4), @file1.read(num, 4))
end
end
# Check that we're not
#
def test_writeread1
writefile1("0"*64)
serve1
[0, 12, 63].each do |num|
data = "X"*@blocksize
@nbd1.write(num*@blocksize, data)
assert_equal(data, @file1.read(num*@blocksize, data.size))
assert_equal(data, @nbd1.read(num*@blocksize, data.size))
end
end
# Check that we're not overstepping or understepping where our writes end
# up.
#
def test_writeread2
writefile1("0"*1024)
serve1
d0 = "\0"*@blocksize
d1 = "X"*@blocksize
(0..63).each do |num|
@nbd1.write(num*@blocksize*2, d1)
end
(0..63).each do |num|
assert_equal(d0, @nbd1.read(((2*num)+1)*@blocksize, d0.size))
end
@fake_pid = nil
end
def test_mirror
writefile1( "f"*4 )
serve1
writefile2( "0"*4 )
serve2
@nbd1.can_die
mirror12
assert_equal(@file1.read_original( 0, @blocksize ),
@file2.read( 0, @blocksize ) )
end
protected
def serve1(*acl)
@nbd1.serve(@filename1, *acl)
end
@@ -94,10 +31,60 @@ class NBDScenarios < Test::Unit::TestCase
@nbd2.serve(@filename2, *acl)
end
def listen1( *acl )
@nbd1.listen( @filename1, *acl )
end
def listen2( *acl )
@nbd2.listen( @filename2, *acl )
end
def parse_status( status )
hsh = {}
status.split(" ").each do |part|
next if part.strip.empty?
a,b = part.split("=")
b.strip!
b = true if b == "true"
b = false if b == "false"
hsh[a.strip] = b
end
hsh
end
def status( nbd )
stdout, stderr = nbd.status
[parse_status(stdout), stderr]
end
def status1
status( @nbd1 )
end
def status2
puts "Getting status"
result = status( @nbd2 )
puts "Got status"
return result
end
def mirror12
@nbd1.mirror( @nbd2.ip, @nbd2.port )
end
def mirror12_unchecked
@nbd1.mirror_unchecked( @nbd2.ip, @nbd2.port, nil, nil, 10 )
end
def writefile1(data)
@file1 = TestFileWriter.new(@filename1, @blocksize).write(data)
end
@@ -114,5 +101,232 @@ class NBDScenarios < Test::Unit::TestCase
map { |x| x.split(/\s+/) }[2..-1].
map { |l| l[3].split(":")[-1].to_i }
end
def cleanup
if @fake_pid
begin
Process.waitpid2( @fake_pid )
rescue Errno::ESRCH
end
end
@nbd1.kill
@nbd2.kill
[@filename1, @filename2].each do |f|
File.unlink(f) if File.exists?(f)
end
end
def run_fake( name, addr, port )
fakedir = File.join( File.dirname( __FILE__ ), "fakes" )
fake = Dir[File.join( fakedir, name ) + "*"].sort.find { |fn|
File.executable?( fn )
}
raise "no fake executable" unless fake
raise "no addr" unless addr
raise "no port" unless port
@fake_pid = fork do
exec fake + " " + addr.to_s + " " + port.to_s
end
sleep(0.5)
end
def fake_reports_success
_,status = Process.waitpid2( @fake_pid )
@fake_pid = nil
status.success?
end
end # class Environment
class NBDScenarios < Test::Unit::TestCase
def setup
@env = Environment.new
end
def teardown
@env.cleanup
end
def test_read1
@env.writefile1("f"*64)
@env.serve1
[0, 12, 63].each do |num|
assert_equal(
@env.nbd1.read(num*@env.blocksize, @env.blocksize),
@env.file1.read(num*@env.blocksize, @env.blocksize)
)
end
[124, 1200, 10028, 25488].each do |num|
assert_equal(@env.nbd1.read(num, 4), @env.file1.read(num, 4))
end
end
# Check that we're not
#
def test_writeread1
@env.writefile1("0"*64)
@env.serve1
[0, 12, 63].each do |num|
data = "X"*@env.blocksize
@env.nbd1.write(num*@env.blocksize, data)
assert_equal(data, @env.file1.read(num*@env.blocksize, data.size))
assert_equal(data, @env.nbd1.read(num*@env.blocksize, data.size))
end
end
# Check that we're not overstepping or understepping where our writes end
# up.
#
def test_writeread2
@env.writefile1("0"*1024)
@env.serve1
d0 = "\0"*@env.blocksize
d1 = "X"*@env.blocksize
(0..63).each do |num|
@env.nbd1.write(num*@env.blocksize*2, d1)
end
(0..63).each do |num|
assert_equal(d0, @env.nbd1.read(((2*num)+1)*@env.blocksize, d0.size))
end
end
def test_mirror
@env.writefile1( "f"*4 )
@env.serve1
@env.writefile2( "0"*4 )
@env.listen2
@env.nbd1.can_die
stdout, stderr = @env.mirror12
assert_equal(@env.file1.read_original( 0, @env.blocksize ),
@env.file2.read( 0, @env.blocksize ) )
assert @env.status2['has_control'], "destination didn't take control"
end
end
class NBDConnectSourceFailureScenarios < Test::Unit::TestCase
def setup
@env = Environment.new
@env.writefile1( "f" * 4 )
@env.serve1
end
def teardown
@env.cleanup
end
def test_failure_to_connect_reported_in_mirror_cmd_response
stdout, stderr = @env.mirror12_unchecked
assert_match( /failed to connect/, stderr )
end
def test_destination_hangs_after_connect_reports_error_at_source
@env.run_fake( "dest/hang_after_connect", @env.ip, @env.port2 )
stdout, stderr = @env.mirror12_unchecked
assert_match( /Remote server failed to respond/, stderr )
assert @env.fake_reports_success
end
def test_destination_rejects_connection_reports_error_at_source
@env.run_fake( "dest/reject_acl", @env.ip, @env.port2 )
stdout, stderr = @env.mirror12_unchecked
assert_match /Mirror was rejected/, stderr
assert @env.fake_reports_success
end
def test_wrong_size_causes_disconnect
@env.run_fake( "dest/hello_wrong_size", @env.ip, @env.port2 )
stdout, stderr = @env.mirror12_unchecked
assert_match /Remote size does not match local size/, stderr
assert @env.fake_reports_success
end
def test_wrong_magic_causes_disconnect
@env.run_fake( "dest/hello_wrong_magic", @env.ip, @env.port2 )
stdout, stderr = @env.mirror12_unchecked
assert_match /Mirror was rejected/, stderr
assert @env.fake_reports_success, "dest/hello_wrong_magic fake failed"
end
def test_disconnect_after_hello_causes_retry
@env.run_fake( "dest/close_after_hello", @env.ip, @env.port2 )
stdout, stderr = @env.mirror12_unchecked
assert_match( /Mirror started/, stdout )
assert @env.fake_reports_success
end
end
class NBDConnectDestFailureScenarios < Test::Unit::TestCase
def setup
@env = Environment.new
@env.writefile1( "0" * 4 )
@env.listen1
end
def teardown
@env.cleanup
end
def test_hello_blocked_by_disconnect_causes_error_not_fatal
run_fake( "source/close_after_connect" )
assert_no_control
end
def test_hello_goes_astray_causes_timeout_error
run_fake( "source/hang_after_hello" )
assert_no_control
end
def test_disconnect_after_hello_causes_error_not_fatal
run_fake( "source/close_after_hello" )
assert_no_control
end
private
def run_fake( name )
@env.run_fake( name, @env.ip, @env.port1 )
assert @env.fake_reports_success
end
def assert_no_control
status, stderr = @env.status1
assert !status['has_control'], "Thought it had control"
end
end # class NBDConnectDestFailureScenarios