From ccbfce1075479ab4556c86a71a6e50202a2188b6 Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 20 Sep 2012 13:37:48 +0100 Subject: [PATCH 1/2] Whitespace --- src/client.c | 103 ++++++++++++++++++++++++++------------------------- src/ioutil.c | 49 ++++++++++++------------ src/ioutil.h | 8 ++-- src/mirror.c | 69 +++++++++++++++++----------------- src/serve.c | 102 +++++++++++++++++++++++++------------------------- 5 files changed, 166 insertions(+), 165 deletions(-) diff --git a/src/client.c b/src/client.c index 273fc91..dcb76ad 100644 --- a/src/client.c +++ b/src/client.c @@ -61,13 +61,13 @@ void client_destroy( struct client *client ) * So waiting on client->socket is len bytes of data, and we must write it all * to client->mapped. However while doing do we must consult the bitmap * client->block_allocation_map, which is a bitmap where one bit represents - * block_allocation_resolution bytes. Where a bit isn't set, there are no + * block_allocation_resolution bytes. Where a bit isn't set, there are no * disc blocks allocated for that portion of the file, and we'd like to keep - * it that way. + * it that way. * * If the bitmap shows that every block in our prospective write is already - * allocated, we can proceed as normal and make one call to writeloop. - * + * allocated, we can proceed as normal and make one call to writeloop. + * */ void write_not_zeroes(struct client* client, uint64_t from, int len) { @@ -83,30 +83,30 @@ void write_not_zeroes(struct client* client, uint64_t from, int len) * how many blocks our write covers, then cut off the start * and end to get the exact number of bytes. */ - + int run = bitset_run_count(map, from, len); - + debug("write_not_zeroes: from=%ld, len=%d, run=%d", from, len, run); - + if (run > len) { run = len; debug("(run adjusted to %d)", run); } - + if (0) /* useful but expensive */ { uint64_t i; fprintf(stderr, "full map resolution=%d: ", map->resolution); for (i=0; iserve->size; i+=map->resolution) { int here = (from >= i && from < i+map->resolution); - + if (here) { fprintf(stderr, ">"); } fprintf(stderr, bitset_is_set_at(map, i) ? "1" : "0"); if (here) { fprintf(stderr, "<"); } } fprintf(stderr, "\n"); } - + #define DO_READ(dst, len) ERROR_IF_NEGATIVE( \ readloop( \ client->socket, \ @@ -115,7 +115,7 @@ void write_not_zeroes(struct client* client, uint64_t from, int len) ), \ "read failed %ld+%d", from, (len) \ ) - + if (bitset_is_set_at(map, from)) { debug("writing the lot: from=%ld, run=%d", from, run); /* already allocated, just write it all */ @@ -128,19 +128,19 @@ void write_not_zeroes(struct client* client, uint64_t from, int len) char zerobuffer[block_allocation_resolution]; /* not allocated, read in block_allocation_resoution */ while (run > 0) { - int blockrun = block_allocation_resolution - + int blockrun = block_allocation_resolution - (from % block_allocation_resolution); if (blockrun > run) blockrun = run; - + DO_READ(zerobuffer, blockrun); - - /* This reads the buffer twice in the worst case + + /* This reads the buffer twice in the worst case * but we're leaning on memcmp failing early * and memcpy being fast, rather than try to * hand-optimized something specific. */ - if (zerobuffer[0] != 0 || + if (zerobuffer[0] != 0 || memcmp(zerobuffer, zerobuffer + 1, blockrun - 1)) { memcpy(client->mapped+from, zerobuffer, blockrun); bitset_set_range(map, from, blockrun); @@ -148,7 +148,7 @@ void write_not_zeroes(struct client* client, uint64_t from, int len) /* at this point we could choose to * short-cut the rest of the write for * faster I/O but by continuing to do it - * the slow way we preserve as much + * the slow way we preserve as much * sparseness as possible. */ } @@ -185,18 +185,18 @@ int client_read_request( struct client * client , struct nbd_request *out_reques * otherwise */ ptv = server_is_in_control( client->serve ) ? NULL : &tv; - + FD_ZERO(&fds); FD_SET(client->socket, &fds); self_pipe_fd_set( client->stop_signal, &fds ); fd_count = select(FD_SETSIZE, &fds, NULL, NULL, ptv); if ( fd_count == 0 ) { /* This "can't ever happen" */ - if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); } + if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); } else { error("Timed out waiting for I/O"); } } else if ( fd_count < 0 ) { fatal( "Select failed" ); } - + if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){ debug("Client received stop signal."); return 0; @@ -220,9 +220,9 @@ int client_read_request( struct client * client , struct nbd_request *out_reques * again. It should *probably* be an * error() call, but I want to be sure. * */ - fatal("Error reading request: %d, %s", - errno, - strerror( errno )); + fatal("Error reading request: %d, %s", + errno, + strerror( errno )); } } @@ -299,7 +299,7 @@ void client_write_init( struct client * client, uint64_t size ) void client_flush( struct client * client, size_t len ) { int devnull = open("/dev/null", O_WRONLY); - FATAL_IF_NEGATIVE( devnull, + FATAL_IF_NEGATIVE( devnull, "Couldn't open /dev/null: %s", strerror(errno)); int pipes[2]; pipe( pipes ); @@ -308,9 +308,9 @@ void client_flush( struct client * client, size_t len ) size_t spliced = 0; while ( spliced < len ) { - ssize_t received = splice( + ssize_t received = splice( client->socket, NULL, - pipes[1], NULL, + pipes[1], NULL, len-spliced, flags ); FATAL_IF_NEGATIVE( received, "splice error: %s", @@ -318,9 +318,9 @@ void client_flush( struct client * client, size_t len ) ssize_t junked = 0; while( junked < received ) { ssize_t junk; - junk = splice( + junk = splice( pipes[0], NULL, - devnull, NULL, + devnull, NULL, received, flags ); FATAL_IF_NEGATIVE( junk, "splice error: %s", @@ -341,15 +341,15 @@ void client_flush( struct client * client, size_t len ) * request_err is set to 0 if the client sent a bad request, in which * case we drop the connection. */ -int client_request_needs_reply( struct client * client, +int client_request_needs_reply( struct client * client, struct nbd_request request ) { debug("request type %d", request.type); - + if (request.magic != REQUEST_MAGIC) { fatal("Bad magic %08x", request.magic); } - + switch (request.type) { case REQUEST_READ: @@ -358,7 +358,7 @@ int client_request_needs_reply( struct client * client, /* check it's not out of range */ if ( request.from+request.len > client->serve->size) { warn("write request %d+%d out of range", - request.from, + request.from, request.len ); client_write_reply( client, &request, 1 ); @@ -367,12 +367,12 @@ int client_request_needs_reply( struct client * client, return 0; } break; - + case REQUEST_DISCONNECT: debug("request disconnect"); client->disconnect = 1; return 0; - + default: fatal("Unknown request %08x", request.type); } @@ -394,9 +394,9 @@ void client_reply_to_read( struct client* client, struct nbd_request request ) */ ERROR_IF_NEGATIVE( sendfileloop( - client->socket, - client->fileno, - &offset, + client->socket, + client->fileno, + &offset, request.len), "sendfile failed from=%ld, len=%d", offset, @@ -420,7 +420,7 @@ void client_reply_to_write( struct client* client, struct nbd_request request ) request.len), "reading write data failed from=%ld, len=%d", request.from, - request.len + request.len ); server_dirty(client->serve, request.from, request.len); } @@ -432,8 +432,8 @@ void client_reply_to_write( struct client* client, struct nbd_request request ) uint64_t len_rounded = request.len + (request.from - from_rounded); FATAL_IF_NEGATIVE( - msync( client->mapped + from_rounded, - len_rounded, + msync( client->mapped + from_rounded, + len_rounded, MS_SYNC), "msync failed %ld %ld", request.from, request.len ); @@ -466,7 +466,7 @@ int client_serve_request(struct client* client) if ( disconnected ) { return stop; } if ( !client_request_needs_reply( client, request ) ) { return client->disconnect; - } + } server_lock_io( client->serve ); { @@ -486,12 +486,12 @@ void client_send_hello(struct client* client) client_write_init( client, client->serve->size ); } -void client_cleanup(struct client* client, +void client_cleanup(struct client* client, int fatal __attribute__ ((unused)) ) { info("client cleanup for client %p", client); - - if (client->socket) { + + if (client->socket) { FATAL_IF_NEGATIVE( close(client->socket), "Error closing client socket %d", client->socket ); @@ -501,7 +501,7 @@ void client_cleanup(struct client* client, if (client->mapped) { munmap(client->mapped, client->serve->size); } - if (client->fileno) { + if (client->fileno) { FATAL_IF_NEGATIVE( close(client->fileno), "Error closing file %d", client->fileno ); @@ -517,15 +517,15 @@ void client_cleanup(struct client* client, void* client_serve(void* client_uncast) { struct client* client = (struct client*) client_uncast; - + error_set_handler((cleanup_handler*) client_cleanup, client); - + info("client: mmaping file"); FATAL_IF_NEGATIVE( open_and_mmap( client->serve->filename, &client->fileno, - NULL, + NULL, (void**) &client->mapped ), "Couldn't open/mmap file %s: %s", client->serve->filename, strerror( errno ) @@ -533,13 +533,13 @@ void* client_serve(void* client_uncast) debug( "Opened client file fd %d", client->fileno); debug("client: sending hello"); client_send_hello(client); - + debug("client: serving requests"); while (client_serve_request(client) == 0) ; debug("client: stopped serving requests"); client->stopped = 1; - + if ( client->disconnect ){ debug("client: control arrived" ); server_control_arrived( client->serve ); @@ -548,6 +548,7 @@ void* client_serve(void* client_uncast) debug("Cleaning client %p up normally in thread %p", client, pthread_self()); client_cleanup(client, 0); debug("Client thread done" ); - + return NULL; } + diff --git a/src/ioutil.c b/src/ioutil.c index 7982e4d..5abafce 100644 --- a/src/ioutil.c +++ b/src/ioutil.c @@ -34,10 +34,10 @@ struct bitset_mapping* build_allocation_map(int fd, uint64_t size, int resolutio /* Resize fiemap to allow us to read in the extents */ fiemap = (struct fiemap*)xmalloc( sizeof(struct fiemap) + ( - sizeof(struct fiemap_extent) * + sizeof(struct fiemap_extent) * fiemap_count->fm_mapped_extents ) - ); + ); /* realloc makes valgrind complain a lot */ memcpy(fiemap, fiemap_count, sizeof(struct fiemap)); @@ -46,15 +46,15 @@ struct bitset_mapping* build_allocation_map(int fd, uint64_t size, int resolutio fiemap->fm_extent_count = fiemap->fm_mapped_extents; fiemap->fm_mapped_extents = 0; - if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0) { + if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0) { debug( "Couldn't get fiemap, returning no allocation_map" ); goto no_map; } for (i=0;ifm_mapped_extents;i++) { bitset_set_range( - allocation_map, - fiemap->fm_extents[i].fe_logical, + allocation_map, + fiemap->fm_extents[i].fe_logical, fiemap->fm_extents[i].fe_length ); } @@ -94,7 +94,7 @@ no_map: int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **out_map) { off64_t size; - + /* O_DIRECT seems to be intermittently supported. Leaving it as * a compile-time option for now. */ #ifdef DIRECT_IO @@ -107,7 +107,7 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o warn("open(%s) failed: does it exist?", filename); return *out_fd; } - + size = lseek64(*out_fd, 0, SEEK_END); if (size < 0) { warn("lseek64() failed"); @@ -116,9 +116,9 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o if (out_size) { *out_size = size; } - + if (out_map) { - *out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, + *out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, *out_fd, 0); if (((long) *out_map) == -1) { warn("mmap64() failed"); @@ -126,7 +126,7 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o } } debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map); - + return 0; } @@ -175,16 +175,16 @@ ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_ { const unsigned int flags = SPLICE_F_MORE|SPLICE_F_MOVE|flags2; size_t spliced=0; - + //debug("spliceloop(%d, %ld, %d, %ld, %ld)", fd_in, off_in ? *off_in : 0, fd_out, off_out ? *off_out : 0, len); - + while (spliced < len) { ssize_t result = splice(fd_in, off_in, fd_out, off_out, len, flags); if (result < 0) { //debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len); if (errno == EAGAIN && (flags & SPLICE_F_NONBLOCK) ) { return spliced; - } + } else { return -1; } @@ -193,7 +193,7 @@ ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_ //debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len); } } - + return spliced; } @@ -202,25 +202,25 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len) int pipefd[2]; /* read end, write end */ size_t spliced=0; - + if (pipe(pipefd) == -1) { return -1; } - + while (spliced < len) { ssize_t run = len-spliced; ssize_t s2, s1 = spliceloop(fd_in, NULL, pipefd[1], NULL, run, SPLICE_F_NONBLOCK); /*if (run > 65535) run = 65535;*/ if (s1 < 0) { break; } - + s2 = spliceloop(pipefd[0], NULL, fd_out, NULL, s1, 0); if (s2 < 0) { break; } spliced += s2; } close(pipefd[0]); close(pipefd[1]); - + return spliced < len ? -1 : 0; } @@ -234,16 +234,16 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len) int read_until_newline(int fd, char* buf, int bufsize) { int cur; - + for (cur=0; cur < bufsize; cur++) { int result = read(fd, buf+cur, 1); if (result <= 0) { return -1; } - if (buf[cur] == 10) { + if (buf[cur] == 10) { buf[cur] = '\0'; - break; + break; } } - + return cur+1; } @@ -252,9 +252,9 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines) int lines_count = 0; char line[max_line_length+1]; *lines = NULL; - + memset(line, 0, max_line_length+1); - + while (1) { int readden = read_until_newline(fd, line, max_line_length); /* readden will be: @@ -280,3 +280,4 @@ int fd_is_closed( int fd_in ) errno = errno_old; return result; } + diff --git a/src/ioutil.h b/src/ioutil.h index 0687cea..85bcb55 100644 --- a/src/ioutil.h +++ b/src/ioutil.h @@ -4,7 +4,7 @@ #include "serve.h" struct bitset_mapping; /* don't need whole of bitset.h here */ -/** Returns a bit field representing which blocks are allocated in file +/** Returns a bit field representing which blocks are allocated in file * descriptor ''fd''. You must supply the size, and the resolution at which * you want the bits to represent allocated blocks. If the OS represents * allocated blocks at a finer resolution than you've asked for, any block @@ -43,15 +43,15 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len); int read_until_newline(int fd, char* buf, int bufsize); /** Read a number of lines using read_until_newline, until an empty line is - * received (i.e. the sequence LF LF). The data is read from ''fd'' and + * received (i.e. the sequence LF LF). The data is read from ''fd'' and * lines must be a maximum of ''max_line_length''. The set of lines is * returned as an array of zero-terminated strings; you must pass an address * ''lines'' in which you want the address of this array returned. */ int read_lines_until_blankline(int fd, int max_line_length, char ***lines); -/** Open the given ''filename'', determine its size, and mmap it in its - * entirety. The file descriptor is stored in ''out_fd'', the size in +/** Open the given ''filename'', determine its size, and mmap it in its + * entirety. The file descriptor is stored in ''out_fd'', the size in * ''out_size'' and the address of the mmap in ''out_map''. If anything goes * wrong, returns -1 setting errno, otherwise 0. */ diff --git a/src/mirror.c b/src/mirror.c index d5fb9a9..4265d83 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -79,15 +79,15 @@ void mirror_init( struct mirror * mirror, const char * filename ) FATAL_IF_NEGATIVE( open_and_mmap( - filename, + filename, &map_fd, - &size, + &size, (void**) &mirror->mapped ), "Failed to open and mmap %s", filename ); - + mirror->dirty_map = bitset_alloc(size, 4096); } @@ -119,7 +119,7 @@ struct mirror * mirror_create( max_Bps, action_at_finish, commit_signal); - + mirror_init( mirror, filename ); mirror_reset( mirror ); @@ -146,7 +146,7 @@ static const int mirror_longest_write = 8<<20; */ static const unsigned int mirror_last_pass_after_bytes_written = 100<<20; -/** The largest number of full passes we'll do - the last one will always +/** The largest number of full passes we'll do - the last one will always * cause the I/O to freeze, however many bytes are left to copy. */ static const int mirror_maximum_passes = 7; @@ -166,15 +166,15 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written) debug("mirror current=%ld, run=%d", current, run); - /* FIXME: we could avoid sending sparse areas of the + /* FIXME: we could avoid sending sparse areas of the * disc here, and probably save a lot of bandwidth and * time (if we know the destination starts off zeroed). - */ + */ if (bitset_is_set_at(map, current)) { /* We've found a dirty area, send it */ debug("^^^ writing"); - /* We need to stop the main thread from working + /* We need to stop the main thread from working * because it might corrupt the dirty map. This * is likely to slow things down but will be * safe. @@ -185,7 +185,7 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written) /** FIXME: do something useful with bytes/second */ /** FIXME: error handling code here won't unlock */ - socket_nbd_write( serve->mirror->client, + socket_nbd_write( serve->mirror->client, current, run, 0, @@ -292,7 +292,7 @@ int mirror_connect( struct mirror * mirror, off64_t local_size ) mirror_set_state( mirror, MS_GO ); } else { - warn("Remote size (%d) doesn't match local (%d)", + warn("Remote size (%d) doesn't match local (%d)", remote_size, local_size ); mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH ); } @@ -341,11 +341,11 @@ void mirror_run( struct server *serve ) for (pass=0; pass < mirror_maximum_passes-1; pass++) { debug("mirror start pass=%d", pass); - if ( !mirror_pass( serve, 1, &written ) ){ + if ( !mirror_pass( serve, 1, &written ) ){ debug("Failed mirror pass state is %d", mirror_get_state( serve->mirror ) ); debug("pass failed, giving up"); return; } - + /* if we've not written anything */ if (written < mirror_last_pass_after_bytes_written) { break; } } @@ -358,7 +358,7 @@ void mirror_run( struct server *serve ) mirror_on_exit( serve ); info("Server closed, quitting " "after successful migration"); - } + } } server_unlock_io( serve ); } @@ -379,11 +379,11 @@ void mirror_signal_commit( struct mirror * mirror ) { NULLCHECK( mirror ); - mbox_post_mirror_state( mirror->commit_signal, + mbox_post_mirror_state( mirror->commit_signal, mirror_get_state( mirror ) ); } -/** Thread launched to drive mirror process +/** Thread launched to drive mirror process * This is needed for two reasons: firstly, it decouples the mirroring * from the control thread (although that's less valid with mboxes * passing state back and forth) and to provide an error context so that @@ -408,7 +408,7 @@ void* mirror_runner(void* serve_params_uncast) error_set_handler( (cleanup_handler *) mirror_cleanup, serve ); info( "Connecting to mirror" ); - + time_t start_time = time(NULL); int connected = mirror_connect( mirror, serve->size ); mirror_signal_commit( mirror ); @@ -418,8 +418,8 @@ void* mirror_runner(void* serve_params_uncast) * and retry everything from mirror_set_state(_, MS_INIT), but * *without* signaling the commit or abandoning the mirror. * */ - - if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){ + + if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){ /* If we get here, then we managed to connect but the * control thread feeding status back to the user will * have gone away, leaving the user without meaningful @@ -449,11 +449,11 @@ struct mirror_super * mirror_super_create( struct mbox * state_mbox) { struct mirror_super * super = xmalloc( sizeof( struct mirror_super) ); - super->mirror = mirror_create( - filename, - connect_to, - connect_from, - max_Bps, + super->mirror = mirror_create( + filename, + connect_to, + connect_from, + max_Bps, action_at_finish, mbox_create() ) ; super->state_mbox = state_mbox; @@ -462,15 +462,15 @@ struct mirror_super * mirror_super_create( /* Post the current state of the mirror into super->state_mbox.*/ -void mirror_super_signal_committed( +void mirror_super_signal_committed( struct mirror_super * super , enum mirror_state commit_state ) { NULLCHECK( super ); NULLCHECK( super->state_mbox ); - mbox_post_mirror_state( - super->state_mbox, + mbox_post_mirror_state( + super->state_mbox, commit_state ); } @@ -506,14 +506,14 @@ void * mirror_super_runner( void * serve_uncast ) do { FATAL_IF( 0 != pthread_create( - &mirror->thread, - NULL, - mirror_runner, + &mirror->thread, + NULL, + mirror_runner, serve), "Failed to create mirror thread"); debug("Supervisor waiting for commit signal"); - enum mirror_state * commit_state = + enum mirror_state * commit_state = mbox_receive( mirror->commit_signal ); debug( "Supervisor got commit signal" ); @@ -526,7 +526,7 @@ void * mirror_super_runner( void * serve_uncast ) should_retry = *commit_state == MS_GO; /* Only send this signal the first time */ mirror_super_signal_committed( - super, + super, *commit_state); debug("Mirror supervisor committed"); } @@ -540,7 +540,7 @@ void * mirror_super_runner( void * serve_uncast ) success = MS_DONE == mirror_get_state( mirror ); - if( success ){ + if( success ){ info( "Mirror supervisor success, exiting" ); } else if ( mirror->signal_abandon ) { info( "Mirror abandoned" ); @@ -553,7 +553,7 @@ void * mirror_super_runner( void * serve_uncast ) first_pass = 0; - if ( should_retry ) { + if ( should_retry ) { /* We don't want to hammer the destination too * hard, so if this is a retry, insert a delay. */ sleep( MS_RETRY_DELAY_SECS ); @@ -563,7 +563,7 @@ void * mirror_super_runner( void * serve_uncast ) mirror_reset( mirror ); } - } + } while ( should_retry && !success ); serve->mirror = NULL; @@ -575,4 +575,3 @@ void * mirror_super_runner( void * serve_uncast ) return NULL; } - diff --git a/src/serve.c b/src/serve.c index 777a3f6..91f2569 100644 --- a/src/serve.c +++ b/src/serve.c @@ -26,7 +26,7 @@ static inline void* sockaddr_address_data(struct sockaddr* sockaddr) struct sockaddr_in* in = (struct sockaddr_in*) sockaddr; struct sockaddr_in6* in6 = (struct sockaddr_in6*) sockaddr; - + if (sockaddr->sa_family == AF_INET) { return &in->sin_addr; } @@ -64,7 +64,7 @@ struct server * server_create ( FATAL_IF_ZERO( parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address), "Couldn't parse server address '%s' (use 0 if " - "you want to bind to all IPs)", + "you want to bind to all IPs)", s_ip_address ); @@ -103,7 +103,7 @@ void server_destroy( struct server * serve ) flexthread_mutex_destroy( serve->l_acl ); flexthread_mutex_destroy( serve->l_io ); - if ( serve->acl ) { + if ( serve->acl ) { acl_destroy( serve->acl ); serve->acl = NULL; } @@ -220,20 +220,20 @@ void serve_bind( struct server * serve ) do { bind_result = bind( - serve->server_fd, + serve->server_fd, &serve->bind_to.generic, sizeof(serve->bind_to)); if ( 0 == bind_result ) { - info( "Bound to %s port %d", - s_address, + info( "Bound to %s port %d", + s_address, ntohs(serve->bind_to.v4.sin_port)); break; } else { - warn( "Couldn't bind to %s port %d: %s", - s_address, + warn( "Couldn't bind to %s port %d: %s", + s_address, ntohs(serve->bind_to.v4.sin_port), strerror( errno ) ); @@ -242,10 +242,10 @@ void serve_bind( struct server * serve ) * EADDRINUSE, EADDRNOTAVAIL, EBADF, * EINVAL or ENOTSOCK. * - * Any of these other than EACCES, + * Any of these other than EACCES, * EADDRINUSE or EADDRNOTAVAIL signify * that there's a logic error somewhere. - * + * * EADDRINUSE is fatal: if there's * something already where we want to be * listening, we have no guarantees that @@ -258,7 +258,7 @@ void serve_bind( struct server * serve ) continue; case EADDRINUSE: fatal( "%s port %d in use, giving up.", - s_address, + s_address, ntohs(serve->bind_to.v4.sin_port)); default: fatal( "Giving up" ); @@ -275,11 +275,11 @@ void serve_open_server_socket(struct server* params) NULLCHECK( params ); int optval=1; - - params->server_fd= socket(params->bind_to.generic.sa_family == AF_INET ? + + params->server_fd= socket(params->bind_to.generic.sa_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0); - - FATAL_IF_NEGATIVE(params->server_fd, + + FATAL_IF_NEGATIVE(params->server_fd, "Couldn't create server socket"); /* We need SO_REUSEADDR so that when we switch from listening to @@ -308,7 +308,7 @@ void serve_open_server_socket(struct server* params) * squatting on our ip/port combo, or the ip isn't yet * configured. Ideally we want to retry this. */ serve_bind(params); - + FATAL_IF_NEGATIVE( listen(params->server_fd, params->tcp_backlog), "Couldn't listen on server socket" @@ -332,9 +332,9 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre memset(s_client_address, 0, 64); strcpy(s_client_address, "???"); - inet_ntop( entry->address.generic.sa_family, - sockaddr_address_data(&entry->address.generic), - s_client_address, + inet_ntop( entry->address.generic.sa_family, + sockaddr_address_data(&entry->address.generic), + s_client_address, 64 ); debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread ); @@ -343,15 +343,15 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre * already dead, but the client still needs tidying up. */ if (join_errno != 0 && !entry->client->stopped ) { debug( "join_errno was %s, stopped was %d", strerror( join_errno ), entry->client->stopped ); - FATAL_UNLESS( join_errno == EBUSY, - "Problem with joining thread %p: %s", + FATAL_UNLESS( join_errno == EBUSY, + "Problem with joining thread %p: %s", entry->thread, strerror(join_errno) ); } else if ( join_errno == 0 ) { - debug("nbd thread %016x exited (%s) with status %ld", - entry->thread, - s_client_address, + debug("nbd thread %016x exited (%s) with status %ld", + entry->thread, + s_client_address, (uint64_t)status); client_destroy( entry->client ); entry->client = NULL; @@ -447,8 +447,8 @@ int server_acl_accepts( struct server *params, union mysockaddr * client_address } -int server_should_accept_client( - struct server * params, +int server_should_accept_client( + struct server * params, union mysockaddr * client_address, char *s_client_address, size_t s_client_address_len ) @@ -466,7 +466,7 @@ int server_should_accept_client( if ( !server_acl_accepts( params, client_address ) ) { warn( "Rejecting client %s: Access control error", s_client_address ); - debug( "We %s have an acl, and default_deny is %s", + debug( "We %s have an acl, and default_deny is %s", (params->acl ? "do" : "do not"), (params->acl->default_deny ? "true" : "false") ); return 0; @@ -477,8 +477,8 @@ int server_should_accept_client( -int spawn_client_thread( - struct client * client_params, +int spawn_client_thread( + struct client * client_params, pthread_t *out_thread) { int result = pthread_create(out_thread, NULL, client_serve, client_params); @@ -492,8 +492,8 @@ int spawn_client_thread( * address doesn't match, or if there are too many clients already connected. */ void accept_nbd_client( - struct server* params, - int client_fd, + struct server* params, + int client_fd, union mysockaddr* client_address) { NULLCHECK(params); @@ -511,7 +511,7 @@ void accept_nbd_client( return; } - slot = cleanup_and_find_client_slot(params); + slot = cleanup_and_find_client_slot(params); if (slot < 0) { warn("too many clients to accept connection"); FATAL_IF_NEGATIVE( close( client_fd ), @@ -519,14 +519,14 @@ void accept_nbd_client( debug("Closed client socket fd %d", client_fd); return; } - + info( "Client %s accepted on fd %d.", s_client_address, client_fd ); client_params = client_create( params, client_fd ); params->nbd_client[slot].client = client_params; - memcpy(¶ms->nbd_client[slot].address, client_address, + memcpy(¶ms->nbd_client[slot].address, client_address, sizeof(union mysockaddr)); - + pthread_t * thread = ¶ms->nbd_client[slot].thread; if ( 0 != spawn_client_thread( client_params, thread ) ) { @@ -537,7 +537,7 @@ void accept_nbd_client( debug("Closed client socket fd %d", client_fd); return; } - + debug("nbd thread %p started (%s)", params->nbd_client[slot].thread, s_client_address); } @@ -576,7 +576,7 @@ int server_is_closed(struct server* serve) void server_close_clients( struct server *params ) { NULLCHECK(params); - + info("closing all clients"); int i; /* , j; */ @@ -587,7 +587,7 @@ void server_close_clients( struct server *params ) if ( entry->thread != 0 ) { debug( "Stop signaling client %p", entry->client ); - client_signal_stop( entry->client ); + client_signal_stop( entry->client ); } } /* We don't join the clients here. When we enter the final @@ -648,7 +648,7 @@ int server_accept( struct server * params ) self_pipe_fd_set( params->close_signal, &fds ); self_pipe_fd_set( params->acl_updated_signal, &fds ); - FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds, + FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds, NULL, NULL, NULL), "select() failed"); if ( self_pipe_fd_isset( params->close_signal, &fds ) ){ @@ -672,13 +672,13 @@ int server_accept( struct server * params ) client_fd = accept( params->server_fd, &client_address.generic, &socklen ); debug("Accepted nbd client socket fd %d", client_fd); accept_nbd_client(params, client_fd, &client_address); - } + } return 1; } -void serve_accept_loop(struct server* params) +void serve_accept_loop(struct server* params) { NULLCHECK( params ); while( server_accept( params ) ); @@ -697,9 +697,9 @@ void serve_init_allocation_map(struct server* params) FATAL_IF_NEGATIVE(fd, "Couldn't open %s", params->filename); size = lseek64(fd, 0, SEEK_END); params->size = size; - FATAL_IF_NEGATIVE(size, "Couldn't find size of %s", + FATAL_IF_NEGATIVE(size, "Couldn't find size of %s", params->filename); - params->allocation_map = + params->allocation_map = build_allocation_map(fd, size, block_allocation_resolution); close(fd); } @@ -737,29 +737,29 @@ void server_control_arrived( struct server *serve ) /** Closes sockets, frees memory and waits for all client threads to finish */ -void serve_cleanup(struct server* params, +void serve_cleanup(struct server* params, int fatal __attribute__ ((unused)) ) { NULLCHECK( params ); - + info("cleaning up"); int i; - + if (params->server_fd){ close(params->server_fd); } if (params->allocation_map) { free(params->allocation_map); } - + if ( server_is_mirroring( params ) ) { server_abandon_mirror( params ); } - + for (i=0; i < params->max_nbd_clients; i++) { void* status; pthread_t thread_id = params->nbd_client[i].thread; - + if (thread_id != 0) { debug("joining thread %p", thread_id); pthread_join(thread_id, &status); @@ -790,7 +790,7 @@ void server_abandon_mirror( struct server * serve ) { NULLCHECK( serve ); if ( serve->mirror_super ) { - /* FIXME: AWOOGA! RACE! + /* FIXME: AWOOGA! RACE! * We can set signal_abandon after mirror_super has * checked it, but before the reset. This would lead to * a hang. However, mirror_reset doesn't change the @@ -814,7 +814,7 @@ int do_serve(struct server* params) NULLCHECK( params ); int has_control; - + error_set_handler((cleanup_handler*) serve_cleanup, params); serve_open_server_socket(params); serve_init_allocation_map(params); From 32cae67a75434ef5487c70b9f8f42e233f8ba31d Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 25 Sep 2012 11:47:44 +0100 Subject: [PATCH 2/2] flexnbd: Move building the allocation map to before server socket bind() Building the allocation map takes time, which scales with the size of the disc being presented. By building that map in the space between bind() and accept(), we leave the process in a useless state after the only good signal we have for "we are ready" and the state where it is actually ready. This was breaking migrations of large files. --- src/serve.c | 2 +- tests/acceptance/flexnbd.rb | 17 ++++++++++++- tests/acceptance/flexnbd/fake_source.rb | 32 ++++++++++++++++++++----- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/serve.c b/src/serve.c index 91f2569..e720be0 100644 --- a/src/serve.c +++ b/src/serve.c @@ -816,8 +816,8 @@ int do_serve(struct server* params) int has_control; error_set_handler((cleanup_handler*) serve_cleanup, params); - serve_open_server_socket(params); serve_init_allocation_map(params); + serve_open_server_socket(params); serve_accept_loop(params); has_control = params->has_control; serve_cleanup(params, 0); diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index 1262982..887e7d8 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -314,9 +314,16 @@ module FlexNBD while !File.socket?(ctrl) pid, status = Process.wait2(@pid, Process::WNOHANG) - raise "server did not start (#{cmd})" if pid + raise "server did not start (#{cmd}) - UNIX socket didn't appear" if pid sleep 0.1 end + + while !socket_open? + pid, status = Process.wait2(@pid, Process::WNOHANG) + raise "server did not start (#{cmd}) - TCP socket didn't appear" if pid + sleep 0.1 + end + at_exit { kill } end private :run_serve_cmd @@ -511,7 +518,15 @@ module FlexNBD hsh end + def socket_open? + sock = (TCPSocket.new(@ip, @port) rescue nil) + !!sock + ensure + sock.close rescue nil if sock + end + end end + diff --git a/tests/acceptance/flexnbd/fake_source.rb b/tests/acceptance/flexnbd/fake_source.rb index 1f14719..5d4c91a 100644 --- a/tests/acceptance/flexnbd/fake_source.rb +++ b/tests/acceptance/flexnbd/fake_source.rb @@ -8,16 +8,35 @@ module FlexNBD class FakeSource def initialize( addr, port, err_msg, source_addr=nil, source_port=0 ) - timing_out( 2, err_msg ) do - @sock = if source_addr - TCPSocket.new( addr, port, source_addr, source_port ) - else - TCPSocket.new( addr, port ) - end + timing_out( 10, err_msg ) { + @sock = wait_for_server_socket( addr, port, source_addr, source_port ) + } + end + + + def wait_for_server_socket(addr, port, saddr = nil, sport = 0) + sock = nil + + loop do + sock = try_get_server_socket( addr, port, saddr, sport ) + break if sock + sleep 0.1 + end + + sock + end + + def try_get_server_socket(addr, port, saddr = nil, sport = 0) + if saddr + TCPSocket.new( addr, port, saddr, sport ) rescue nil + else + TCPSocket.new( addr, port ) rescue nil end end + + def close @sock.close end @@ -137,3 +156,4 @@ module FlexNBD end # class FakeSource end # module FlexNBD +