This commit is contained in:
Alex Young
2012-10-04 14:51:54 +01:00
7 changed files with 208 additions and 172 deletions

View File

@@ -61,13 +61,13 @@ void client_destroy( struct client *client )
* So waiting on client->socket is len bytes of data, and we must write it all
* to client->mapped. However while doing do we must consult the bitmap
* client->block_allocation_map, which is a bitmap where one bit represents
* block_allocation_resolution bytes. Where a bit isn't set, there are no
* block_allocation_resolution bytes. Where a bit isn't set, there are no
* disc blocks allocated for that portion of the file, and we'd like to keep
* it that way.
* it that way.
*
* If the bitmap shows that every block in our prospective write is already
* allocated, we can proceed as normal and make one call to writeloop.
*
* allocated, we can proceed as normal and make one call to writeloop.
*
*/
void write_not_zeroes(struct client* client, uint64_t from, int len)
{
@@ -83,30 +83,30 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
* how many blocks our write covers, then cut off the start
* and end to get the exact number of bytes.
*/
int run = bitset_run_count(map, from, len);
debug("write_not_zeroes: from=%ld, len=%d, run=%d", from, len, run);
if (run > len) {
run = len;
debug("(run adjusted to %d)", run);
}
if (0) /* useful but expensive */
{
uint64_t i;
fprintf(stderr, "full map resolution=%d: ", map->resolution);
for (i=0; i<client->serve->size; i+=map->resolution) {
int here = (from >= i && from < i+map->resolution);
if (here) { fprintf(stderr, ">"); }
fprintf(stderr, bitset_is_set_at(map, i) ? "1" : "0");
if (here) { fprintf(stderr, "<"); }
}
fprintf(stderr, "\n");
}
#define DO_READ(dst, len) ERROR_IF_NEGATIVE( \
readloop( \
client->socket, \
@@ -115,7 +115,7 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
), \
"read failed %ld+%d", from, (len) \
)
if (bitset_is_set_at(map, from)) {
debug("writing the lot: from=%ld, run=%d", from, run);
/* already allocated, just write it all */
@@ -128,19 +128,19 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
char zerobuffer[block_allocation_resolution];
/* not allocated, read in block_allocation_resoution */
while (run > 0) {
int blockrun = block_allocation_resolution -
int blockrun = block_allocation_resolution -
(from % block_allocation_resolution);
if (blockrun > run)
blockrun = run;
DO_READ(zerobuffer, blockrun);
/* This reads the buffer twice in the worst case
/* This reads the buffer twice in the worst case
* but we're leaning on memcmp failing early
* and memcpy being fast, rather than try to
* hand-optimized something specific.
*/
if (zerobuffer[0] != 0 ||
if (zerobuffer[0] != 0 ||
memcmp(zerobuffer, zerobuffer + 1, blockrun - 1)) {
memcpy(client->mapped+from, zerobuffer, blockrun);
bitset_set_range(map, from, blockrun);
@@ -148,7 +148,7 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
/* at this point we could choose to
* short-cut the rest of the write for
* faster I/O but by continuing to do it
* the slow way we preserve as much
* the slow way we preserve as much
* sparseness as possible.
*/
}
@@ -185,18 +185,18 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
* otherwise
*/
ptv = server_is_in_control( client->serve ) ? NULL : &tv;
FD_ZERO(&fds);
FD_SET(client->socket, &fds);
self_pipe_fd_set( client->stop_signal, &fds );
fd_count = select(FD_SETSIZE, &fds, NULL, NULL, ptv);
if ( fd_count == 0 ) {
/* This "can't ever happen" */
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
else { error("Timed out waiting for I/O"); }
}
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){
debug("Client received stop signal.");
return 0;
@@ -220,9 +220,9 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
* again. It should *probably* be an
* error() call, but I want to be sure.
* */
fatal("Error reading request: %d, %s",
errno,
strerror( errno ));
fatal("Error reading request: %d, %s",
errno,
strerror( errno ));
}
}
@@ -299,7 +299,7 @@ void client_write_init( struct client * client, uint64_t size )
void client_flush( struct client * client, size_t len )
{
int devnull = open("/dev/null", O_WRONLY);
FATAL_IF_NEGATIVE( devnull,
FATAL_IF_NEGATIVE( devnull,
"Couldn't open /dev/null: %s", strerror(errno));
int pipes[2];
pipe( pipes );
@@ -308,9 +308,9 @@ void client_flush( struct client * client, size_t len )
size_t spliced = 0;
while ( spliced < len ) {
ssize_t received = splice(
ssize_t received = splice(
client->socket, NULL,
pipes[1], NULL,
pipes[1], NULL,
len-spliced, flags );
FATAL_IF_NEGATIVE( received,
"splice error: %s",
@@ -318,9 +318,9 @@ void client_flush( struct client * client, size_t len )
ssize_t junked = 0;
while( junked < received ) {
ssize_t junk;
junk = splice(
junk = splice(
pipes[0], NULL,
devnull, NULL,
devnull, NULL,
received, flags );
FATAL_IF_NEGATIVE( junk,
"splice error: %s",
@@ -341,15 +341,15 @@ void client_flush( struct client * client, size_t len )
* request_err is set to 0 if the client sent a bad request, in which
* case we drop the connection.
*/
int client_request_needs_reply( struct client * client,
int client_request_needs_reply( struct client * client,
struct nbd_request request )
{
debug("request type %d", request.type);
if (request.magic != REQUEST_MAGIC) {
fatal("Bad magic %08x", request.magic);
}
switch (request.type)
{
case REQUEST_READ:
@@ -358,7 +358,7 @@ int client_request_needs_reply( struct client * client,
/* check it's not out of range */
if ( request.from+request.len > client->serve->size) {
warn("write request %d+%d out of range",
request.from,
request.from,
request.len
);
client_write_reply( client, &request, 1 );
@@ -367,12 +367,12 @@ int client_request_needs_reply( struct client * client,
return 0;
}
break;
case REQUEST_DISCONNECT:
debug("request disconnect");
client->disconnect = 1;
return 0;
default:
fatal("Unknown request %08x", request.type);
}
@@ -394,9 +394,9 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
*/
ERROR_IF_NEGATIVE(
sendfileloop(
client->socket,
client->fileno,
&offset,
client->socket,
client->fileno,
&offset,
request.len),
"sendfile failed from=%ld, len=%d",
offset,
@@ -420,7 +420,7 @@ void client_reply_to_write( struct client* client, struct nbd_request request )
request.len),
"reading write data failed from=%ld, len=%d",
request.from,
request.len
request.len
);
server_dirty(client->serve, request.from, request.len);
}
@@ -432,8 +432,8 @@ void client_reply_to_write( struct client* client, struct nbd_request request )
uint64_t len_rounded = request.len + (request.from - from_rounded);
FATAL_IF_NEGATIVE(
msync( client->mapped + from_rounded,
len_rounded,
msync( client->mapped + from_rounded,
len_rounded,
MS_SYNC),
"msync failed %ld %ld", request.from, request.len
);
@@ -466,7 +466,7 @@ int client_serve_request(struct client* client)
if ( disconnected ) { return stop; }
if ( !client_request_needs_reply( client, request ) ) {
return client->disconnect;
}
}
server_lock_io( client->serve );
{
@@ -486,12 +486,12 @@ void client_send_hello(struct client* client)
client_write_init( client, client->serve->size );
}
void client_cleanup(struct client* client,
void client_cleanup(struct client* client,
int fatal __attribute__ ((unused)) )
{
info("client cleanup for client %p", client);
if (client->socket) {
if (client->socket) {
FATAL_IF_NEGATIVE( close(client->socket),
"Error closing client socket %d",
client->socket );
@@ -501,7 +501,7 @@ void client_cleanup(struct client* client,
if (client->mapped) {
munmap(client->mapped, client->serve->size);
}
if (client->fileno) {
if (client->fileno) {
FATAL_IF_NEGATIVE( close(client->fileno),
"Error closing file %d",
client->fileno );
@@ -517,15 +517,15 @@ void client_cleanup(struct client* client,
void* client_serve(void* client_uncast)
{
struct client* client = (struct client*) client_uncast;
error_set_handler((cleanup_handler*) client_cleanup, client);
info("client: mmaping file");
FATAL_IF_NEGATIVE(
open_and_mmap(
client->serve->filename,
&client->fileno,
NULL,
NULL,
(void**) &client->mapped
),
"Couldn't open/mmap file %s: %s", client->serve->filename, strerror( errno )
@@ -533,13 +533,13 @@ void* client_serve(void* client_uncast)
debug( "Opened client file fd %d", client->fileno);
debug("client: sending hello");
client_send_hello(client);
debug("client: serving requests");
while (client_serve_request(client) == 0)
;
debug("client: stopped serving requests");
client->stopped = 1;
if ( client->disconnect ){
debug("client: control arrived" );
server_control_arrived( client->serve );
@@ -548,6 +548,7 @@ void* client_serve(void* client_uncast)
debug("Cleaning client %p up normally in thread %p", client, pthread_self());
client_cleanup(client, 0);
debug("Client thread done" );
return NULL;
}

View File

@@ -34,10 +34,10 @@ struct bitset_mapping* build_allocation_map(int fd, uint64_t size, int resolutio
/* Resize fiemap to allow us to read in the extents */
fiemap = (struct fiemap*)xmalloc(
sizeof(struct fiemap) + (
sizeof(struct fiemap_extent) *
sizeof(struct fiemap_extent) *
fiemap_count->fm_mapped_extents
)
);
);
/* realloc makes valgrind complain a lot */
memcpy(fiemap, fiemap_count, sizeof(struct fiemap));
@@ -46,15 +46,15 @@ struct bitset_mapping* build_allocation_map(int fd, uint64_t size, int resolutio
fiemap->fm_extent_count = fiemap->fm_mapped_extents;
fiemap->fm_mapped_extents = 0;
if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0) {
if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0) {
debug( "Couldn't get fiemap, returning no allocation_map" );
goto no_map;
}
for (i=0;i<fiemap->fm_mapped_extents;i++) {
bitset_set_range(
allocation_map,
fiemap->fm_extents[i].fe_logical,
allocation_map,
fiemap->fm_extents[i].fe_logical,
fiemap->fm_extents[i].fe_length
);
}
@@ -94,7 +94,7 @@ no_map:
int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **out_map)
{
off64_t size;
/* O_DIRECT seems to be intermittently supported. Leaving it as
* a compile-time option for now. */
#ifdef DIRECT_IO
@@ -107,7 +107,7 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
warn("open(%s) failed: does it exist?", filename);
return *out_fd;
}
size = lseek64(*out_fd, 0, SEEK_END);
if (size < 0) {
warn("lseek64() failed");
@@ -116,9 +116,9 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
if (out_size) {
*out_size = size;
}
if (out_map) {
*out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED,
*out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED,
*out_fd, 0);
if (((long) *out_map) == -1) {
warn("mmap64() failed");
@@ -126,7 +126,7 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
}
}
debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map);
return 0;
}
@@ -175,16 +175,16 @@ ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_
{
const unsigned int flags = SPLICE_F_MORE|SPLICE_F_MOVE|flags2;
size_t spliced=0;
//debug("spliceloop(%d, %ld, %d, %ld, %ld)", fd_in, off_in ? *off_in : 0, fd_out, off_out ? *off_out : 0, len);
while (spliced < len) {
ssize_t result = splice(fd_in, off_in, fd_out, off_out, len, flags);
if (result < 0) {
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
if (errno == EAGAIN && (flags & SPLICE_F_NONBLOCK) ) {
return spliced;
}
}
else {
return -1;
}
@@ -193,7 +193,7 @@ ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
}
}
return spliced;
}
@@ -202,25 +202,25 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
int pipefd[2]; /* read end, write end */
size_t spliced=0;
if (pipe(pipefd) == -1) {
return -1;
}
while (spliced < len) {
ssize_t run = len-spliced;
ssize_t s2, s1 = spliceloop(fd_in, NULL, pipefd[1], NULL, run, SPLICE_F_NONBLOCK);
/*if (run > 65535)
run = 65535;*/
if (s1 < 0) { break; }
s2 = spliceloop(pipefd[0], NULL, fd_out, NULL, s1, 0);
if (s2 < 0) { break; }
spliced += s2;
}
close(pipefd[0]);
close(pipefd[1]);
return spliced < len ? -1 : 0;
}
@@ -234,16 +234,16 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
int read_until_newline(int fd, char* buf, int bufsize)
{
int cur;
for (cur=0; cur < bufsize; cur++) {
int result = read(fd, buf+cur, 1);
if (result <= 0) { return -1; }
if (buf[cur] == 10) {
if (buf[cur] == 10) {
buf[cur] = '\0';
break;
break;
}
}
return cur+1;
}
@@ -252,9 +252,9 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines)
int lines_count = 0;
char line[max_line_length+1];
*lines = NULL;
memset(line, 0, max_line_length+1);
while (1) {
int readden = read_until_newline(fd, line, max_line_length);
/* readden will be:
@@ -280,3 +280,4 @@ int fd_is_closed( int fd_in )
errno = errno_old;
return result;
}

View File

@@ -4,7 +4,7 @@
#include "serve.h"
struct bitset_mapping; /* don't need whole of bitset.h here */
/** Returns a bit field representing which blocks are allocated in file
/** Returns a bit field representing which blocks are allocated in file
* descriptor ''fd''. You must supply the size, and the resolution at which
* you want the bits to represent allocated blocks. If the OS represents
* allocated blocks at a finer resolution than you've asked for, any block
@@ -43,15 +43,15 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len);
int read_until_newline(int fd, char* buf, int bufsize);
/** Read a number of lines using read_until_newline, until an empty line is
* received (i.e. the sequence LF LF). The data is read from ''fd'' and
* received (i.e. the sequence LF LF). The data is read from ''fd'' and
* lines must be a maximum of ''max_line_length''. The set of lines is
* returned as an array of zero-terminated strings; you must pass an address
* ''lines'' in which you want the address of this array returned.
*/
int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
/** Open the given ''filename'', determine its size, and mmap it in its
* entirety. The file descriptor is stored in ''out_fd'', the size in
/** Open the given ''filename'', determine its size, and mmap it in its
* entirety. The file descriptor is stored in ''out_fd'', the size in
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
* wrong, returns -1 setting errno, otherwise 0.
*/

View File

@@ -79,15 +79,15 @@ void mirror_init( struct mirror * mirror, const char * filename )
FATAL_IF_NEGATIVE(
open_and_mmap(
filename,
filename,
&map_fd,
&size,
&size,
(void**) &mirror->mapped
),
"Failed to open and mmap %s",
filename
);
mirror->dirty_map = bitset_alloc(size, 4096);
}
@@ -119,7 +119,7 @@ struct mirror * mirror_create(
max_Bps,
action_at_finish,
commit_signal);
mirror_init( mirror, filename );
mirror_reset( mirror );
@@ -146,7 +146,7 @@ static const int mirror_longest_write = 8<<20;
*/
static const unsigned int mirror_last_pass_after_bytes_written = 100<<20;
/** The largest number of full passes we'll do - the last one will always
/** The largest number of full passes we'll do - the last one will always
* cause the I/O to freeze, however many bytes are left to copy.
*/
static const int mirror_maximum_passes = 7;
@@ -166,15 +166,15 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written)
debug("mirror current=%ld, run=%d", current, run);
/* FIXME: we could avoid sending sparse areas of the
/* FIXME: we could avoid sending sparse areas of the
* disc here, and probably save a lot of bandwidth and
* time (if we know the destination starts off zeroed).
*/
*/
if (bitset_is_set_at(map, current)) {
/* We've found a dirty area, send it */
debug("^^^ writing");
/* We need to stop the main thread from working
/* We need to stop the main thread from working
* because it might corrupt the dirty map. This
* is likely to slow things down but will be
* safe.
@@ -185,7 +185,7 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written)
/** FIXME: do something useful with bytes/second */
/** FIXME: error handling code here won't unlock */
socket_nbd_write( serve->mirror->client,
socket_nbd_write( serve->mirror->client,
current,
run,
0,
@@ -292,7 +292,7 @@ int mirror_connect( struct mirror * mirror, off64_t local_size )
mirror_set_state( mirror, MS_GO );
}
else {
warn("Remote size (%d) doesn't match local (%d)",
warn("Remote size (%d) doesn't match local (%d)",
remote_size, local_size );
mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH );
}
@@ -341,11 +341,11 @@ void mirror_run( struct server *serve )
for (pass=0; pass < mirror_maximum_passes-1; pass++) {
debug("mirror start pass=%d", pass);
if ( !mirror_pass( serve, 1, &written ) ){
if ( !mirror_pass( serve, 1, &written ) ){
debug("Failed mirror pass state is %d", mirror_get_state( serve->mirror ) );
debug("pass failed, giving up");
return; }
/* if we've not written anything */
if (written < mirror_last_pass_after_bytes_written) { break; }
}
@@ -358,7 +358,7 @@ void mirror_run( struct server *serve )
mirror_on_exit( serve );
info("Server closed, quitting "
"after successful migration");
}
}
}
server_unlock_io( serve );
}
@@ -379,11 +379,11 @@ void mirror_signal_commit( struct mirror * mirror )
{
NULLCHECK( mirror );
mbox_post_mirror_state( mirror->commit_signal,
mbox_post_mirror_state( mirror->commit_signal,
mirror_get_state( mirror ) );
}
/** Thread launched to drive mirror process
/** Thread launched to drive mirror process
* This is needed for two reasons: firstly, it decouples the mirroring
* from the control thread (although that's less valid with mboxes
* passing state back and forth) and to provide an error context so that
@@ -408,7 +408,7 @@ void* mirror_runner(void* serve_params_uncast)
error_set_handler( (cleanup_handler *) mirror_cleanup, serve );
info( "Connecting to mirror" );
time_t start_time = time(NULL);
int connected = mirror_connect( mirror, serve->size );
mirror_signal_commit( mirror );
@@ -418,8 +418,8 @@ void* mirror_runner(void* serve_params_uncast)
* and retry everything from mirror_set_state(_, MS_INIT), but
* *without* signaling the commit or abandoning the mirror.
* */
if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){
if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){
/* If we get here, then we managed to connect but the
* control thread feeding status back to the user will
* have gone away, leaving the user without meaningful
@@ -449,11 +449,11 @@ struct mirror_super * mirror_super_create(
struct mbox * state_mbox)
{
struct mirror_super * super = xmalloc( sizeof( struct mirror_super) );
super->mirror = mirror_create(
filename,
connect_to,
connect_from,
max_Bps,
super->mirror = mirror_create(
filename,
connect_to,
connect_from,
max_Bps,
action_at_finish,
mbox_create() ) ;
super->state_mbox = state_mbox;
@@ -462,15 +462,15 @@ struct mirror_super * mirror_super_create(
/* Post the current state of the mirror into super->state_mbox.*/
void mirror_super_signal_committed(
void mirror_super_signal_committed(
struct mirror_super * super ,
enum mirror_state commit_state )
{
NULLCHECK( super );
NULLCHECK( super->state_mbox );
mbox_post_mirror_state(
super->state_mbox,
mbox_post_mirror_state(
super->state_mbox,
commit_state );
}
@@ -506,14 +506,14 @@ void * mirror_super_runner( void * serve_uncast )
do {
FATAL_IF( 0 != pthread_create(
&mirror->thread,
NULL,
mirror_runner,
&mirror->thread,
NULL,
mirror_runner,
serve),
"Failed to create mirror thread");
debug("Supervisor waiting for commit signal");
enum mirror_state * commit_state =
enum mirror_state * commit_state =
mbox_receive( mirror->commit_signal );
debug( "Supervisor got commit signal" );
@@ -526,7 +526,7 @@ void * mirror_super_runner( void * serve_uncast )
should_retry = *commit_state == MS_GO;
/* Only send this signal the first time */
mirror_super_signal_committed(
super,
super,
*commit_state);
debug("Mirror supervisor committed");
}
@@ -540,7 +540,7 @@ void * mirror_super_runner( void * serve_uncast )
success = MS_DONE == mirror_get_state( mirror );
if( success ){
if( success ){
info( "Mirror supervisor success, exiting" ); }
else if ( mirror->signal_abandon ) {
info( "Mirror abandoned" );
@@ -553,7 +553,7 @@ void * mirror_super_runner( void * serve_uncast )
first_pass = 0;
if ( should_retry ) {
if ( should_retry ) {
/* We don't want to hammer the destination too
* hard, so if this is a retry, insert a delay. */
sleep( MS_RETRY_DELAY_SECS );
@@ -563,7 +563,7 @@ void * mirror_super_runner( void * serve_uncast )
mirror_reset( mirror );
}
}
}
while ( should_retry && !success );
serve->mirror = NULL;
@@ -576,4 +576,3 @@ void * mirror_super_runner( void * serve_uncast )
return NULL;
}

View File

@@ -26,7 +26,7 @@ static inline void* sockaddr_address_data(struct sockaddr* sockaddr)
struct sockaddr_in* in = (struct sockaddr_in*) sockaddr;
struct sockaddr_in6* in6 = (struct sockaddr_in6*) sockaddr;
if (sockaddr->sa_family == AF_INET) {
return &in->sin_addr;
}
@@ -64,7 +64,7 @@ struct server * server_create (
FATAL_IF_ZERO(
parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address),
"Couldn't parse server address '%s' (use 0 if "
"you want to bind to all IPs)",
"you want to bind to all IPs)",
s_ip_address
);
@@ -107,7 +107,7 @@ void server_destroy( struct server * serve )
flexthread_mutex_destroy( serve->l_acl );
flexthread_mutex_destroy( serve->l_io );
if ( serve->acl ) {
if ( serve->acl ) {
acl_destroy( serve->acl );
serve->acl = NULL;
}
@@ -246,20 +246,20 @@ void serve_bind( struct server * serve )
do {
bind_result = bind(
serve->server_fd,
serve->server_fd,
&serve->bind_to.generic,
sizeof(serve->bind_to));
if ( 0 == bind_result ) {
info( "Bound to %s port %d",
s_address,
info( "Bound to %s port %d",
s_address,
ntohs(serve->bind_to.v4.sin_port));
break;
}
else {
warn( "Couldn't bind to %s port %d: %s",
s_address,
warn( "Couldn't bind to %s port %d: %s",
s_address,
ntohs(serve->bind_to.v4.sin_port),
strerror( errno ) );
@@ -268,10 +268,10 @@ void serve_bind( struct server * serve )
* EADDRINUSE, EADDRNOTAVAIL, EBADF,
* EINVAL or ENOTSOCK.
*
* Any of these other than EACCES,
* Any of these other than EACCES,
* EADDRINUSE or EADDRNOTAVAIL signify
* that there's a logic error somewhere.
*
*
* EADDRINUSE is fatal: if there's
* something already where we want to be
* listening, we have no guarantees that
@@ -284,7 +284,7 @@ void serve_bind( struct server * serve )
continue;
case EADDRINUSE:
fatal( "%s port %d in use, giving up.",
s_address,
s_address,
ntohs(serve->bind_to.v4.sin_port));
default:
fatal( "Giving up" );
@@ -301,11 +301,11 @@ void serve_open_server_socket(struct server* params)
NULLCHECK( params );
int optval=1;
params->server_fd= socket(params->bind_to.generic.sa_family == AF_INET ?
params->server_fd= socket(params->bind_to.generic.sa_family == AF_INET ?
PF_INET : PF_INET6, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE(params->server_fd,
FATAL_IF_NEGATIVE(params->server_fd,
"Couldn't create server socket");
/* We need SO_REUSEADDR so that when we switch from listening to
@@ -334,7 +334,7 @@ void serve_open_server_socket(struct server* params)
* squatting on our ip/port combo, or the ip isn't yet
* configured. Ideally we want to retry this. */
serve_bind(params);
FATAL_IF_NEGATIVE(
listen(params->server_fd, params->tcp_backlog),
"Couldn't listen on server socket"
@@ -358,9 +358,9 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
memset(s_client_address, 0, 64);
strcpy(s_client_address, "???");
inet_ntop( entry->address.generic.sa_family,
sockaddr_address_data(&entry->address.generic),
s_client_address,
inet_ntop( entry->address.generic.sa_family,
sockaddr_address_data(&entry->address.generic),
s_client_address,
64 );
debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread );
@@ -369,15 +369,15 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
* already dead, but the client still needs tidying up. */
if (join_errno != 0 && !entry->client->stopped ) {
debug( "join_errno was %s, stopped was %d", strerror( join_errno ), entry->client->stopped );
FATAL_UNLESS( join_errno == EBUSY,
"Problem with joining thread %p: %s",
FATAL_UNLESS( join_errno == EBUSY,
"Problem with joining thread %p: %s",
entry->thread,
strerror(join_errno) );
}
else if ( join_errno == 0 ) {
debug("nbd thread %016x exited (%s) with status %ld",
entry->thread,
s_client_address,
debug("nbd thread %016x exited (%s) with status %ld",
entry->thread,
s_client_address,
(uint64_t)status);
client_destroy( entry->client );
entry->client = NULL;
@@ -473,8 +473,8 @@ int server_acl_accepts( struct server *params, union mysockaddr * client_address
}
int server_should_accept_client(
struct server * params,
int server_should_accept_client(
struct server * params,
union mysockaddr * client_address,
char *s_client_address,
size_t s_client_address_len )
@@ -492,7 +492,7 @@ int server_should_accept_client(
if ( !server_acl_accepts( params, client_address ) ) {
warn( "Rejecting client %s: Access control error", s_client_address );
debug( "We %s have an acl, and default_deny is %s",
debug( "We %s have an acl, and default_deny is %s",
(params->acl ? "do" : "do not"),
(params->acl->default_deny ? "true" : "false") );
return 0;
@@ -503,8 +503,8 @@ int server_should_accept_client(
int spawn_client_thread(
struct client * client_params,
int spawn_client_thread(
struct client * client_params,
pthread_t *out_thread)
{
int result = pthread_create(out_thread, NULL, client_serve, client_params);
@@ -518,8 +518,8 @@ int spawn_client_thread(
* address doesn't match, or if there are too many clients already connected.
*/
void accept_nbd_client(
struct server* params,
int client_fd,
struct server* params,
int client_fd,
union mysockaddr* client_address)
{
NULLCHECK(params);
@@ -537,7 +537,7 @@ void accept_nbd_client(
return;
}
slot = cleanup_and_find_client_slot(params);
slot = cleanup_and_find_client_slot(params);
if (slot < 0) {
warn("too many clients to accept connection");
FATAL_IF_NEGATIVE( close( client_fd ),
@@ -545,14 +545,14 @@ void accept_nbd_client(
debug("Closed client socket fd %d", client_fd);
return;
}
info( "Client %s accepted on fd %d.", s_client_address, client_fd );
client_params = client_create( params, client_fd );
params->nbd_client[slot].client = client_params;
memcpy(&params->nbd_client[slot].address, client_address,
memcpy(&params->nbd_client[slot].address, client_address,
sizeof(union mysockaddr));
pthread_t * thread = &params->nbd_client[slot].thread;
if ( 0 != spawn_client_thread( client_params, thread ) ) {
@@ -563,7 +563,7 @@ void accept_nbd_client(
debug("Closed client socket fd %d", client_fd);
return;
}
debug("nbd thread %p started (%s)", params->nbd_client[slot].thread, s_client_address);
}
@@ -602,7 +602,7 @@ int server_is_closed(struct server* serve)
void server_close_clients( struct server *params )
{
NULLCHECK(params);
info("closing all clients");
int i; /* , j; */
@@ -613,7 +613,7 @@ void server_close_clients( struct server *params )
if ( entry->thread != 0 ) {
debug( "Stop signaling client %p", entry->client );
client_signal_stop( entry->client );
client_signal_stop( entry->client );
}
}
/* We don't join the clients here. When we enter the final
@@ -717,7 +717,7 @@ int server_accept( struct server * params )
self_pipe_fd_set( params->close_signal, &fds );
self_pipe_fd_set( params->acl_updated_signal, &fds );
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds,
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds,
NULL, NULL, NULL), "select() failed");
if ( self_pipe_fd_isset( params->close_signal, &fds ) ){
@@ -745,13 +745,13 @@ int server_accept( struct server * params )
client_fd = accept( params->server_fd, &client_address.generic, &socklen );
debug("Accepted nbd client socket fd %d", client_fd);
accept_nbd_client(params, client_fd, &client_address);
}
}
return 1;
}
void serve_accept_loop(struct server* params)
void serve_accept_loop(struct server* params)
{
NULLCHECK( params );
while( server_accept( params ) );
@@ -770,9 +770,9 @@ void serve_init_allocation_map(struct server* params)
FATAL_IF_NEGATIVE(fd, "Couldn't open %s", params->filename);
size = lseek64(fd, 0, SEEK_END);
params->size = size;
FATAL_IF_NEGATIVE(size, "Couldn't find size of %s",
FATAL_IF_NEGATIVE(size, "Couldn't find size of %s",
params->filename);
params->allocation_map =
params->allocation_map =
build_allocation_map(fd, size, block_allocation_resolution);
close(fd);
}
@@ -810,15 +810,15 @@ void server_control_arrived( struct server *serve )
/** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct server* params,
void serve_cleanup(struct server* params,
int fatal __attribute__ ((unused)) )
{
NULLCHECK( params );
info("cleaning up");
int i;
if (params->server_fd){ close(params->server_fd); }
if (params->allocation_map) {
@@ -837,11 +837,11 @@ void serve_cleanup(struct server* params,
}
if ( need_mirror_lock ) { server_unlock_start_mirror( params ); }
for (i=0; i < params->max_nbd_clients; i++) {
void* status;
pthread_t thread_id = params->nbd_client[i].thread;
if (thread_id != 0) {
debug("joining thread %p", thread_id);
pthread_join(thread_id, &status);
@@ -877,7 +877,7 @@ void server_abandon_mirror( struct server * serve )
{
NULLCHECK( serve );
if ( serve->mirror_super ) {
/* FIXME: AWOOGA! RACE!
/* FIXME: AWOOGA! RACE!
* We can set signal_abandon after mirror_super has
* checked it, but before the reset. This would lead to
* a hang. However, mirror_reset doesn't change the
@@ -901,10 +901,10 @@ int do_serve(struct server* params)
NULLCHECK( params );
int has_control;
error_set_handler((cleanup_handler*) serve_cleanup, params);
serve_open_server_socket(params);
serve_init_allocation_map(params);
serve_open_server_socket(params);
serve_accept_loop(params);
has_control = params->has_control;
serve_cleanup(params, 0);

View File

@@ -314,9 +314,16 @@ module FlexNBD
while !File.socket?(ctrl)
pid, status = Process.wait2(@pid, Process::WNOHANG)
raise "server did not start (#{cmd})" if pid
raise "server did not start (#{cmd}) - UNIX socket didn't appear" if pid
sleep 0.1
end
while !socket_open?
pid, status = Process.wait2(@pid, Process::WNOHANG)
raise "server did not start (#{cmd}) - TCP socket didn't appear" if pid
sleep 0.1
end
at_exit { kill }
end
private :run_serve_cmd
@@ -512,7 +519,15 @@ module FlexNBD
hsh
end
def socket_open?
sock = (TCPSocket.new(@ip, @port) rescue nil)
!!sock
ensure
sock.close rescue nil if sock
end
end
end

View File

@@ -8,16 +8,35 @@ module FlexNBD
class FakeSource
def initialize( addr, port, err_msg, source_addr=nil, source_port=0 )
timing_out( 2, err_msg ) do
@sock = if source_addr
TCPSocket.new( addr, port, source_addr, source_port )
else
TCPSocket.new( addr, port )
end
timing_out( 10, err_msg ) {
@sock = wait_for_server_socket( addr, port, source_addr, source_port )
}
end
def wait_for_server_socket(addr, port, saddr = nil, sport = 0)
sock = nil
loop do
sock = try_get_server_socket( addr, port, saddr, sport )
break if sock
sleep 0.1
end
sock
end
def try_get_server_socket(addr, port, saddr = nil, sport = 0)
if saddr
TCPSocket.new( addr, port, saddr, sport ) rescue nil
else
TCPSocket.new( addr, port ) rescue nil
end
end
def close
@sock.close
end
@@ -137,3 +156,4 @@ module FlexNBD
end # class FakeSource
end # module FlexNBD