Whitespace
This commit is contained in:
103
src/client.c
103
src/client.c
@@ -61,13 +61,13 @@ void client_destroy( struct client *client )
|
||||
* So waiting on client->socket is len bytes of data, and we must write it all
|
||||
* to client->mapped. However while doing do we must consult the bitmap
|
||||
* client->block_allocation_map, which is a bitmap where one bit represents
|
||||
* block_allocation_resolution bytes. Where a bit isn't set, there are no
|
||||
* block_allocation_resolution bytes. Where a bit isn't set, there are no
|
||||
* disc blocks allocated for that portion of the file, and we'd like to keep
|
||||
* it that way.
|
||||
* it that way.
|
||||
*
|
||||
* If the bitmap shows that every block in our prospective write is already
|
||||
* allocated, we can proceed as normal and make one call to writeloop.
|
||||
*
|
||||
* allocated, we can proceed as normal and make one call to writeloop.
|
||||
*
|
||||
*/
|
||||
void write_not_zeroes(struct client* client, uint64_t from, int len)
|
||||
{
|
||||
@@ -83,30 +83,30 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
|
||||
* how many blocks our write covers, then cut off the start
|
||||
* and end to get the exact number of bytes.
|
||||
*/
|
||||
|
||||
|
||||
int run = bitset_run_count(map, from, len);
|
||||
|
||||
|
||||
debug("write_not_zeroes: from=%ld, len=%d, run=%d", from, len, run);
|
||||
|
||||
|
||||
if (run > len) {
|
||||
run = len;
|
||||
debug("(run adjusted to %d)", run);
|
||||
}
|
||||
|
||||
|
||||
if (0) /* useful but expensive */
|
||||
{
|
||||
uint64_t i;
|
||||
fprintf(stderr, "full map resolution=%d: ", map->resolution);
|
||||
for (i=0; i<client->serve->size; i+=map->resolution) {
|
||||
int here = (from >= i && from < i+map->resolution);
|
||||
|
||||
|
||||
if (here) { fprintf(stderr, ">"); }
|
||||
fprintf(stderr, bitset_is_set_at(map, i) ? "1" : "0");
|
||||
if (here) { fprintf(stderr, "<"); }
|
||||
}
|
||||
fprintf(stderr, "\n");
|
||||
}
|
||||
|
||||
|
||||
#define DO_READ(dst, len) ERROR_IF_NEGATIVE( \
|
||||
readloop( \
|
||||
client->socket, \
|
||||
@@ -115,7 +115,7 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
|
||||
), \
|
||||
"read failed %ld+%d", from, (len) \
|
||||
)
|
||||
|
||||
|
||||
if (bitset_is_set_at(map, from)) {
|
||||
debug("writing the lot: from=%ld, run=%d", from, run);
|
||||
/* already allocated, just write it all */
|
||||
@@ -128,19 +128,19 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
|
||||
char zerobuffer[block_allocation_resolution];
|
||||
/* not allocated, read in block_allocation_resoution */
|
||||
while (run > 0) {
|
||||
int blockrun = block_allocation_resolution -
|
||||
int blockrun = block_allocation_resolution -
|
||||
(from % block_allocation_resolution);
|
||||
if (blockrun > run)
|
||||
blockrun = run;
|
||||
|
||||
|
||||
DO_READ(zerobuffer, blockrun);
|
||||
|
||||
/* This reads the buffer twice in the worst case
|
||||
|
||||
/* This reads the buffer twice in the worst case
|
||||
* but we're leaning on memcmp failing early
|
||||
* and memcpy being fast, rather than try to
|
||||
* hand-optimized something specific.
|
||||
*/
|
||||
if (zerobuffer[0] != 0 ||
|
||||
if (zerobuffer[0] != 0 ||
|
||||
memcmp(zerobuffer, zerobuffer + 1, blockrun - 1)) {
|
||||
memcpy(client->mapped+from, zerobuffer, blockrun);
|
||||
bitset_set_range(map, from, blockrun);
|
||||
@@ -148,7 +148,7 @@ void write_not_zeroes(struct client* client, uint64_t from, int len)
|
||||
/* at this point we could choose to
|
||||
* short-cut the rest of the write for
|
||||
* faster I/O but by continuing to do it
|
||||
* the slow way we preserve as much
|
||||
* the slow way we preserve as much
|
||||
* sparseness as possible.
|
||||
*/
|
||||
}
|
||||
@@ -185,18 +185,18 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
|
||||
* otherwise
|
||||
*/
|
||||
ptv = server_is_in_control( client->serve ) ? NULL : &tv;
|
||||
|
||||
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(client->socket, &fds);
|
||||
self_pipe_fd_set( client->stop_signal, &fds );
|
||||
fd_count = select(FD_SETSIZE, &fds, NULL, NULL, ptv);
|
||||
if ( fd_count == 0 ) {
|
||||
/* This "can't ever happen" */
|
||||
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
|
||||
if ( NULL == ptv ) { fatal( "No FDs selected, and no timeout!" ); }
|
||||
else { error("Timed out waiting for I/O"); }
|
||||
}
|
||||
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
|
||||
|
||||
|
||||
if ( self_pipe_fd_isset( client->stop_signal, &fds ) ){
|
||||
debug("Client received stop signal.");
|
||||
return 0;
|
||||
@@ -220,9 +220,9 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
|
||||
* again. It should *probably* be an
|
||||
* error() call, but I want to be sure.
|
||||
* */
|
||||
fatal("Error reading request: %d, %s",
|
||||
errno,
|
||||
strerror( errno ));
|
||||
fatal("Error reading request: %d, %s",
|
||||
errno,
|
||||
strerror( errno ));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,7 +299,7 @@ void client_write_init( struct client * client, uint64_t size )
|
||||
void client_flush( struct client * client, size_t len )
|
||||
{
|
||||
int devnull = open("/dev/null", O_WRONLY);
|
||||
FATAL_IF_NEGATIVE( devnull,
|
||||
FATAL_IF_NEGATIVE( devnull,
|
||||
"Couldn't open /dev/null: %s", strerror(errno));
|
||||
int pipes[2];
|
||||
pipe( pipes );
|
||||
@@ -308,9 +308,9 @@ void client_flush( struct client * client, size_t len )
|
||||
size_t spliced = 0;
|
||||
|
||||
while ( spliced < len ) {
|
||||
ssize_t received = splice(
|
||||
ssize_t received = splice(
|
||||
client->socket, NULL,
|
||||
pipes[1], NULL,
|
||||
pipes[1], NULL,
|
||||
len-spliced, flags );
|
||||
FATAL_IF_NEGATIVE( received,
|
||||
"splice error: %s",
|
||||
@@ -318,9 +318,9 @@ void client_flush( struct client * client, size_t len )
|
||||
ssize_t junked = 0;
|
||||
while( junked < received ) {
|
||||
ssize_t junk;
|
||||
junk = splice(
|
||||
junk = splice(
|
||||
pipes[0], NULL,
|
||||
devnull, NULL,
|
||||
devnull, NULL,
|
||||
received, flags );
|
||||
FATAL_IF_NEGATIVE( junk,
|
||||
"splice error: %s",
|
||||
@@ -341,15 +341,15 @@ void client_flush( struct client * client, size_t len )
|
||||
* request_err is set to 0 if the client sent a bad request, in which
|
||||
* case we drop the connection.
|
||||
*/
|
||||
int client_request_needs_reply( struct client * client,
|
||||
int client_request_needs_reply( struct client * client,
|
||||
struct nbd_request request )
|
||||
{
|
||||
debug("request type %d", request.type);
|
||||
|
||||
|
||||
if (request.magic != REQUEST_MAGIC) {
|
||||
fatal("Bad magic %08x", request.magic);
|
||||
}
|
||||
|
||||
|
||||
switch (request.type)
|
||||
{
|
||||
case REQUEST_READ:
|
||||
@@ -358,7 +358,7 @@ int client_request_needs_reply( struct client * client,
|
||||
/* check it's not out of range */
|
||||
if ( request.from+request.len > client->serve->size) {
|
||||
warn("write request %d+%d out of range",
|
||||
request.from,
|
||||
request.from,
|
||||
request.len
|
||||
);
|
||||
client_write_reply( client, &request, 1 );
|
||||
@@ -367,12 +367,12 @@ int client_request_needs_reply( struct client * client,
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
case REQUEST_DISCONNECT:
|
||||
debug("request disconnect");
|
||||
client->disconnect = 1;
|
||||
return 0;
|
||||
|
||||
|
||||
default:
|
||||
fatal("Unknown request %08x", request.type);
|
||||
}
|
||||
@@ -394,9 +394,9 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
|
||||
*/
|
||||
ERROR_IF_NEGATIVE(
|
||||
sendfileloop(
|
||||
client->socket,
|
||||
client->fileno,
|
||||
&offset,
|
||||
client->socket,
|
||||
client->fileno,
|
||||
&offset,
|
||||
request.len),
|
||||
"sendfile failed from=%ld, len=%d",
|
||||
offset,
|
||||
@@ -420,7 +420,7 @@ void client_reply_to_write( struct client* client, struct nbd_request request )
|
||||
request.len),
|
||||
"reading write data failed from=%ld, len=%d",
|
||||
request.from,
|
||||
request.len
|
||||
request.len
|
||||
);
|
||||
server_dirty(client->serve, request.from, request.len);
|
||||
}
|
||||
@@ -432,8 +432,8 @@ void client_reply_to_write( struct client* client, struct nbd_request request )
|
||||
uint64_t len_rounded = request.len + (request.from - from_rounded);
|
||||
|
||||
FATAL_IF_NEGATIVE(
|
||||
msync( client->mapped + from_rounded,
|
||||
len_rounded,
|
||||
msync( client->mapped + from_rounded,
|
||||
len_rounded,
|
||||
MS_SYNC),
|
||||
"msync failed %ld %ld", request.from, request.len
|
||||
);
|
||||
@@ -466,7 +466,7 @@ int client_serve_request(struct client* client)
|
||||
if ( disconnected ) { return stop; }
|
||||
if ( !client_request_needs_reply( client, request ) ) {
|
||||
return client->disconnect;
|
||||
}
|
||||
}
|
||||
|
||||
server_lock_io( client->serve );
|
||||
{
|
||||
@@ -486,12 +486,12 @@ void client_send_hello(struct client* client)
|
||||
client_write_init( client, client->serve->size );
|
||||
}
|
||||
|
||||
void client_cleanup(struct client* client,
|
||||
void client_cleanup(struct client* client,
|
||||
int fatal __attribute__ ((unused)) )
|
||||
{
|
||||
info("client cleanup for client %p", client);
|
||||
|
||||
if (client->socket) {
|
||||
|
||||
if (client->socket) {
|
||||
FATAL_IF_NEGATIVE( close(client->socket),
|
||||
"Error closing client socket %d",
|
||||
client->socket );
|
||||
@@ -501,7 +501,7 @@ void client_cleanup(struct client* client,
|
||||
if (client->mapped) {
|
||||
munmap(client->mapped, client->serve->size);
|
||||
}
|
||||
if (client->fileno) {
|
||||
if (client->fileno) {
|
||||
FATAL_IF_NEGATIVE( close(client->fileno),
|
||||
"Error closing file %d",
|
||||
client->fileno );
|
||||
@@ -517,15 +517,15 @@ void client_cleanup(struct client* client,
|
||||
void* client_serve(void* client_uncast)
|
||||
{
|
||||
struct client* client = (struct client*) client_uncast;
|
||||
|
||||
|
||||
error_set_handler((cleanup_handler*) client_cleanup, client);
|
||||
|
||||
|
||||
info("client: mmaping file");
|
||||
FATAL_IF_NEGATIVE(
|
||||
open_and_mmap(
|
||||
client->serve->filename,
|
||||
&client->fileno,
|
||||
NULL,
|
||||
NULL,
|
||||
(void**) &client->mapped
|
||||
),
|
||||
"Couldn't open/mmap file %s: %s", client->serve->filename, strerror( errno )
|
||||
@@ -533,13 +533,13 @@ void* client_serve(void* client_uncast)
|
||||
debug( "Opened client file fd %d", client->fileno);
|
||||
debug("client: sending hello");
|
||||
client_send_hello(client);
|
||||
|
||||
|
||||
debug("client: serving requests");
|
||||
while (client_serve_request(client) == 0)
|
||||
;
|
||||
debug("client: stopped serving requests");
|
||||
client->stopped = 1;
|
||||
|
||||
|
||||
if ( client->disconnect ){
|
||||
debug("client: control arrived" );
|
||||
server_control_arrived( client->serve );
|
||||
@@ -548,6 +548,7 @@ void* client_serve(void* client_uncast)
|
||||
debug("Cleaning client %p up normally in thread %p", client, pthread_self());
|
||||
client_cleanup(client, 0);
|
||||
debug("Client thread done" );
|
||||
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
49
src/ioutil.c
49
src/ioutil.c
@@ -34,10 +34,10 @@ struct bitset_mapping* build_allocation_map(int fd, uint64_t size, int resolutio
|
||||
/* Resize fiemap to allow us to read in the extents */
|
||||
fiemap = (struct fiemap*)xmalloc(
|
||||
sizeof(struct fiemap) + (
|
||||
sizeof(struct fiemap_extent) *
|
||||
sizeof(struct fiemap_extent) *
|
||||
fiemap_count->fm_mapped_extents
|
||||
)
|
||||
);
|
||||
);
|
||||
|
||||
/* realloc makes valgrind complain a lot */
|
||||
memcpy(fiemap, fiemap_count, sizeof(struct fiemap));
|
||||
@@ -46,15 +46,15 @@ struct bitset_mapping* build_allocation_map(int fd, uint64_t size, int resolutio
|
||||
fiemap->fm_extent_count = fiemap->fm_mapped_extents;
|
||||
fiemap->fm_mapped_extents = 0;
|
||||
|
||||
if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0) {
|
||||
if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0) {
|
||||
debug( "Couldn't get fiemap, returning no allocation_map" );
|
||||
goto no_map;
|
||||
}
|
||||
|
||||
for (i=0;i<fiemap->fm_mapped_extents;i++) {
|
||||
bitset_set_range(
|
||||
allocation_map,
|
||||
fiemap->fm_extents[i].fe_logical,
|
||||
allocation_map,
|
||||
fiemap->fm_extents[i].fe_logical,
|
||||
fiemap->fm_extents[i].fe_length
|
||||
);
|
||||
}
|
||||
@@ -94,7 +94,7 @@ no_map:
|
||||
int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **out_map)
|
||||
{
|
||||
off64_t size;
|
||||
|
||||
|
||||
/* O_DIRECT seems to be intermittently supported. Leaving it as
|
||||
* a compile-time option for now. */
|
||||
#ifdef DIRECT_IO
|
||||
@@ -107,7 +107,7 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
|
||||
warn("open(%s) failed: does it exist?", filename);
|
||||
return *out_fd;
|
||||
}
|
||||
|
||||
|
||||
size = lseek64(*out_fd, 0, SEEK_END);
|
||||
if (size < 0) {
|
||||
warn("lseek64() failed");
|
||||
@@ -116,9 +116,9 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
|
||||
if (out_size) {
|
||||
*out_size = size;
|
||||
}
|
||||
|
||||
|
||||
if (out_map) {
|
||||
*out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED,
|
||||
*out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED,
|
||||
*out_fd, 0);
|
||||
if (((long) *out_map) == -1) {
|
||||
warn("mmap64() failed");
|
||||
@@ -126,7 +126,7 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
|
||||
}
|
||||
}
|
||||
debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map);
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -175,16 +175,16 @@ ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_
|
||||
{
|
||||
const unsigned int flags = SPLICE_F_MORE|SPLICE_F_MOVE|flags2;
|
||||
size_t spliced=0;
|
||||
|
||||
|
||||
//debug("spliceloop(%d, %ld, %d, %ld, %ld)", fd_in, off_in ? *off_in : 0, fd_out, off_out ? *off_out : 0, len);
|
||||
|
||||
|
||||
while (spliced < len) {
|
||||
ssize_t result = splice(fd_in, off_in, fd_out, off_out, len, flags);
|
||||
if (result < 0) {
|
||||
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
|
||||
if (errno == EAGAIN && (flags & SPLICE_F_NONBLOCK) ) {
|
||||
return spliced;
|
||||
}
|
||||
}
|
||||
else {
|
||||
return -1;
|
||||
}
|
||||
@@ -193,7 +193,7 @@ ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_
|
||||
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return spliced;
|
||||
}
|
||||
|
||||
@@ -202,25 +202,25 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
|
||||
|
||||
int pipefd[2]; /* read end, write end */
|
||||
size_t spliced=0;
|
||||
|
||||
|
||||
if (pipe(pipefd) == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
while (spliced < len) {
|
||||
ssize_t run = len-spliced;
|
||||
ssize_t s2, s1 = spliceloop(fd_in, NULL, pipefd[1], NULL, run, SPLICE_F_NONBLOCK);
|
||||
/*if (run > 65535)
|
||||
run = 65535;*/
|
||||
if (s1 < 0) { break; }
|
||||
|
||||
|
||||
s2 = spliceloop(pipefd[0], NULL, fd_out, NULL, s1, 0);
|
||||
if (s2 < 0) { break; }
|
||||
spliced += s2;
|
||||
}
|
||||
close(pipefd[0]);
|
||||
close(pipefd[1]);
|
||||
|
||||
|
||||
return spliced < len ? -1 : 0;
|
||||
}
|
||||
|
||||
@@ -234,16 +234,16 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
|
||||
int read_until_newline(int fd, char* buf, int bufsize)
|
||||
{
|
||||
int cur;
|
||||
|
||||
|
||||
for (cur=0; cur < bufsize; cur++) {
|
||||
int result = read(fd, buf+cur, 1);
|
||||
if (result <= 0) { return -1; }
|
||||
if (buf[cur] == 10) {
|
||||
if (buf[cur] == 10) {
|
||||
buf[cur] = '\0';
|
||||
break;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return cur+1;
|
||||
}
|
||||
|
||||
@@ -252,9 +252,9 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines)
|
||||
int lines_count = 0;
|
||||
char line[max_line_length+1];
|
||||
*lines = NULL;
|
||||
|
||||
|
||||
memset(line, 0, max_line_length+1);
|
||||
|
||||
|
||||
while (1) {
|
||||
int readden = read_until_newline(fd, line, max_line_length);
|
||||
/* readden will be:
|
||||
@@ -280,3 +280,4 @@ int fd_is_closed( int fd_in )
|
||||
errno = errno_old;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@@ -4,7 +4,7 @@
|
||||
#include "serve.h"
|
||||
struct bitset_mapping; /* don't need whole of bitset.h here */
|
||||
|
||||
/** Returns a bit field representing which blocks are allocated in file
|
||||
/** Returns a bit field representing which blocks are allocated in file
|
||||
* descriptor ''fd''. You must supply the size, and the resolution at which
|
||||
* you want the bits to represent allocated blocks. If the OS represents
|
||||
* allocated blocks at a finer resolution than you've asked for, any block
|
||||
@@ -43,15 +43,15 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len);
|
||||
int read_until_newline(int fd, char* buf, int bufsize);
|
||||
|
||||
/** Read a number of lines using read_until_newline, until an empty line is
|
||||
* received (i.e. the sequence LF LF). The data is read from ''fd'' and
|
||||
* received (i.e. the sequence LF LF). The data is read from ''fd'' and
|
||||
* lines must be a maximum of ''max_line_length''. The set of lines is
|
||||
* returned as an array of zero-terminated strings; you must pass an address
|
||||
* ''lines'' in which you want the address of this array returned.
|
||||
*/
|
||||
int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
|
||||
|
||||
/** Open the given ''filename'', determine its size, and mmap it in its
|
||||
* entirety. The file descriptor is stored in ''out_fd'', the size in
|
||||
/** Open the given ''filename'', determine its size, and mmap it in its
|
||||
* entirety. The file descriptor is stored in ''out_fd'', the size in
|
||||
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
|
||||
* wrong, returns -1 setting errno, otherwise 0.
|
||||
*/
|
||||
|
69
src/mirror.c
69
src/mirror.c
@@ -79,15 +79,15 @@ void mirror_init( struct mirror * mirror, const char * filename )
|
||||
|
||||
FATAL_IF_NEGATIVE(
|
||||
open_and_mmap(
|
||||
filename,
|
||||
filename,
|
||||
&map_fd,
|
||||
&size,
|
||||
&size,
|
||||
(void**) &mirror->mapped
|
||||
),
|
||||
"Failed to open and mmap %s",
|
||||
filename
|
||||
);
|
||||
|
||||
|
||||
mirror->dirty_map = bitset_alloc(size, 4096);
|
||||
|
||||
}
|
||||
@@ -119,7 +119,7 @@ struct mirror * mirror_create(
|
||||
max_Bps,
|
||||
action_at_finish,
|
||||
commit_signal);
|
||||
|
||||
|
||||
mirror_init( mirror, filename );
|
||||
mirror_reset( mirror );
|
||||
|
||||
@@ -146,7 +146,7 @@ static const int mirror_longest_write = 8<<20;
|
||||
*/
|
||||
static const unsigned int mirror_last_pass_after_bytes_written = 100<<20;
|
||||
|
||||
/** The largest number of full passes we'll do - the last one will always
|
||||
/** The largest number of full passes we'll do - the last one will always
|
||||
* cause the I/O to freeze, however many bytes are left to copy.
|
||||
*/
|
||||
static const int mirror_maximum_passes = 7;
|
||||
@@ -166,15 +166,15 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written)
|
||||
|
||||
debug("mirror current=%ld, run=%d", current, run);
|
||||
|
||||
/* FIXME: we could avoid sending sparse areas of the
|
||||
/* FIXME: we could avoid sending sparse areas of the
|
||||
* disc here, and probably save a lot of bandwidth and
|
||||
* time (if we know the destination starts off zeroed).
|
||||
*/
|
||||
*/
|
||||
if (bitset_is_set_at(map, current)) {
|
||||
/* We've found a dirty area, send it */
|
||||
debug("^^^ writing");
|
||||
|
||||
/* We need to stop the main thread from working
|
||||
/* We need to stop the main thread from working
|
||||
* because it might corrupt the dirty map. This
|
||||
* is likely to slow things down but will be
|
||||
* safe.
|
||||
@@ -185,7 +185,7 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written)
|
||||
/** FIXME: do something useful with bytes/second */
|
||||
|
||||
/** FIXME: error handling code here won't unlock */
|
||||
socket_nbd_write( serve->mirror->client,
|
||||
socket_nbd_write( serve->mirror->client,
|
||||
current,
|
||||
run,
|
||||
0,
|
||||
@@ -292,7 +292,7 @@ int mirror_connect( struct mirror * mirror, off64_t local_size )
|
||||
mirror_set_state( mirror, MS_GO );
|
||||
}
|
||||
else {
|
||||
warn("Remote size (%d) doesn't match local (%d)",
|
||||
warn("Remote size (%d) doesn't match local (%d)",
|
||||
remote_size, local_size );
|
||||
mirror_set_state( mirror, MS_FAIL_SIZE_MISMATCH );
|
||||
}
|
||||
@@ -341,11 +341,11 @@ void mirror_run( struct server *serve )
|
||||
for (pass=0; pass < mirror_maximum_passes-1; pass++) {
|
||||
|
||||
debug("mirror start pass=%d", pass);
|
||||
if ( !mirror_pass( serve, 1, &written ) ){
|
||||
if ( !mirror_pass( serve, 1, &written ) ){
|
||||
debug("Failed mirror pass state is %d", mirror_get_state( serve->mirror ) );
|
||||
debug("pass failed, giving up");
|
||||
return; }
|
||||
|
||||
|
||||
/* if we've not written anything */
|
||||
if (written < mirror_last_pass_after_bytes_written) { break; }
|
||||
}
|
||||
@@ -358,7 +358,7 @@ void mirror_run( struct server *serve )
|
||||
mirror_on_exit( serve );
|
||||
info("Server closed, quitting "
|
||||
"after successful migration");
|
||||
}
|
||||
}
|
||||
}
|
||||
server_unlock_io( serve );
|
||||
}
|
||||
@@ -379,11 +379,11 @@ void mirror_signal_commit( struct mirror * mirror )
|
||||
{
|
||||
NULLCHECK( mirror );
|
||||
|
||||
mbox_post_mirror_state( mirror->commit_signal,
|
||||
mbox_post_mirror_state( mirror->commit_signal,
|
||||
mirror_get_state( mirror ) );
|
||||
}
|
||||
|
||||
/** Thread launched to drive mirror process
|
||||
/** Thread launched to drive mirror process
|
||||
* This is needed for two reasons: firstly, it decouples the mirroring
|
||||
* from the control thread (although that's less valid with mboxes
|
||||
* passing state back and forth) and to provide an error context so that
|
||||
@@ -408,7 +408,7 @@ void* mirror_runner(void* serve_params_uncast)
|
||||
error_set_handler( (cleanup_handler *) mirror_cleanup, serve );
|
||||
|
||||
info( "Connecting to mirror" );
|
||||
|
||||
|
||||
time_t start_time = time(NULL);
|
||||
int connected = mirror_connect( mirror, serve->size );
|
||||
mirror_signal_commit( mirror );
|
||||
@@ -418,8 +418,8 @@ void* mirror_runner(void* serve_params_uncast)
|
||||
* and retry everything from mirror_set_state(_, MS_INIT), but
|
||||
* *without* signaling the commit or abandoning the mirror.
|
||||
* */
|
||||
|
||||
if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){
|
||||
|
||||
if ( (time(NULL) - start_time) > MS_CONNECT_TIME_SECS ){
|
||||
/* If we get here, then we managed to connect but the
|
||||
* control thread feeding status back to the user will
|
||||
* have gone away, leaving the user without meaningful
|
||||
@@ -449,11 +449,11 @@ struct mirror_super * mirror_super_create(
|
||||
struct mbox * state_mbox)
|
||||
{
|
||||
struct mirror_super * super = xmalloc( sizeof( struct mirror_super) );
|
||||
super->mirror = mirror_create(
|
||||
filename,
|
||||
connect_to,
|
||||
connect_from,
|
||||
max_Bps,
|
||||
super->mirror = mirror_create(
|
||||
filename,
|
||||
connect_to,
|
||||
connect_from,
|
||||
max_Bps,
|
||||
action_at_finish,
|
||||
mbox_create() ) ;
|
||||
super->state_mbox = state_mbox;
|
||||
@@ -462,15 +462,15 @@ struct mirror_super * mirror_super_create(
|
||||
|
||||
|
||||
/* Post the current state of the mirror into super->state_mbox.*/
|
||||
void mirror_super_signal_committed(
|
||||
void mirror_super_signal_committed(
|
||||
struct mirror_super * super ,
|
||||
enum mirror_state commit_state )
|
||||
{
|
||||
NULLCHECK( super );
|
||||
NULLCHECK( super->state_mbox );
|
||||
|
||||
mbox_post_mirror_state(
|
||||
super->state_mbox,
|
||||
mbox_post_mirror_state(
|
||||
super->state_mbox,
|
||||
commit_state );
|
||||
}
|
||||
|
||||
@@ -506,14 +506,14 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
|
||||
do {
|
||||
FATAL_IF( 0 != pthread_create(
|
||||
&mirror->thread,
|
||||
NULL,
|
||||
mirror_runner,
|
||||
&mirror->thread,
|
||||
NULL,
|
||||
mirror_runner,
|
||||
serve),
|
||||
"Failed to create mirror thread");
|
||||
|
||||
debug("Supervisor waiting for commit signal");
|
||||
enum mirror_state * commit_state =
|
||||
enum mirror_state * commit_state =
|
||||
mbox_receive( mirror->commit_signal );
|
||||
|
||||
debug( "Supervisor got commit signal" );
|
||||
@@ -526,7 +526,7 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
should_retry = *commit_state == MS_GO;
|
||||
/* Only send this signal the first time */
|
||||
mirror_super_signal_committed(
|
||||
super,
|
||||
super,
|
||||
*commit_state);
|
||||
debug("Mirror supervisor committed");
|
||||
}
|
||||
@@ -540,7 +540,7 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
|
||||
success = MS_DONE == mirror_get_state( mirror );
|
||||
|
||||
if( success ){
|
||||
if( success ){
|
||||
info( "Mirror supervisor success, exiting" ); }
|
||||
else if ( mirror->signal_abandon ) {
|
||||
info( "Mirror abandoned" );
|
||||
@@ -553,7 +553,7 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
|
||||
first_pass = 0;
|
||||
|
||||
if ( should_retry ) {
|
||||
if ( should_retry ) {
|
||||
/* We don't want to hammer the destination too
|
||||
* hard, so if this is a retry, insert a delay. */
|
||||
sleep( MS_RETRY_DELAY_SECS );
|
||||
@@ -563,7 +563,7 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
mirror_reset( mirror );
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
while ( should_retry && !success );
|
||||
|
||||
serve->mirror = NULL;
|
||||
@@ -575,4 +575,3 @@ void * mirror_super_runner( void * serve_uncast )
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
102
src/serve.c
102
src/serve.c
@@ -26,7 +26,7 @@ static inline void* sockaddr_address_data(struct sockaddr* sockaddr)
|
||||
|
||||
struct sockaddr_in* in = (struct sockaddr_in*) sockaddr;
|
||||
struct sockaddr_in6* in6 = (struct sockaddr_in6*) sockaddr;
|
||||
|
||||
|
||||
if (sockaddr->sa_family == AF_INET) {
|
||||
return &in->sin_addr;
|
||||
}
|
||||
@@ -64,7 +64,7 @@ struct server * server_create (
|
||||
FATAL_IF_ZERO(
|
||||
parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address),
|
||||
"Couldn't parse server address '%s' (use 0 if "
|
||||
"you want to bind to all IPs)",
|
||||
"you want to bind to all IPs)",
|
||||
s_ip_address
|
||||
);
|
||||
|
||||
@@ -103,7 +103,7 @@ void server_destroy( struct server * serve )
|
||||
flexthread_mutex_destroy( serve->l_acl );
|
||||
flexthread_mutex_destroy( serve->l_io );
|
||||
|
||||
if ( serve->acl ) {
|
||||
if ( serve->acl ) {
|
||||
acl_destroy( serve->acl );
|
||||
serve->acl = NULL;
|
||||
}
|
||||
@@ -220,20 +220,20 @@ void serve_bind( struct server * serve )
|
||||
|
||||
do {
|
||||
bind_result = bind(
|
||||
serve->server_fd,
|
||||
serve->server_fd,
|
||||
&serve->bind_to.generic,
|
||||
sizeof(serve->bind_to));
|
||||
|
||||
if ( 0 == bind_result ) {
|
||||
info( "Bound to %s port %d",
|
||||
s_address,
|
||||
info( "Bound to %s port %d",
|
||||
s_address,
|
||||
ntohs(serve->bind_to.v4.sin_port));
|
||||
break;
|
||||
}
|
||||
else {
|
||||
|
||||
warn( "Couldn't bind to %s port %d: %s",
|
||||
s_address,
|
||||
warn( "Couldn't bind to %s port %d: %s",
|
||||
s_address,
|
||||
ntohs(serve->bind_to.v4.sin_port),
|
||||
strerror( errno ) );
|
||||
|
||||
@@ -242,10 +242,10 @@ void serve_bind( struct server * serve )
|
||||
* EADDRINUSE, EADDRNOTAVAIL, EBADF,
|
||||
* EINVAL or ENOTSOCK.
|
||||
*
|
||||
* Any of these other than EACCES,
|
||||
* Any of these other than EACCES,
|
||||
* EADDRINUSE or EADDRNOTAVAIL signify
|
||||
* that there's a logic error somewhere.
|
||||
*
|
||||
*
|
||||
* EADDRINUSE is fatal: if there's
|
||||
* something already where we want to be
|
||||
* listening, we have no guarantees that
|
||||
@@ -258,7 +258,7 @@ void serve_bind( struct server * serve )
|
||||
continue;
|
||||
case EADDRINUSE:
|
||||
fatal( "%s port %d in use, giving up.",
|
||||
s_address,
|
||||
s_address,
|
||||
ntohs(serve->bind_to.v4.sin_port));
|
||||
default:
|
||||
fatal( "Giving up" );
|
||||
@@ -275,11 +275,11 @@ void serve_open_server_socket(struct server* params)
|
||||
NULLCHECK( params );
|
||||
|
||||
int optval=1;
|
||||
|
||||
params->server_fd= socket(params->bind_to.generic.sa_family == AF_INET ?
|
||||
|
||||
params->server_fd= socket(params->bind_to.generic.sa_family == AF_INET ?
|
||||
PF_INET : PF_INET6, SOCK_STREAM, 0);
|
||||
|
||||
FATAL_IF_NEGATIVE(params->server_fd,
|
||||
|
||||
FATAL_IF_NEGATIVE(params->server_fd,
|
||||
"Couldn't create server socket");
|
||||
|
||||
/* We need SO_REUSEADDR so that when we switch from listening to
|
||||
@@ -308,7 +308,7 @@ void serve_open_server_socket(struct server* params)
|
||||
* squatting on our ip/port combo, or the ip isn't yet
|
||||
* configured. Ideally we want to retry this. */
|
||||
serve_bind(params);
|
||||
|
||||
|
||||
FATAL_IF_NEGATIVE(
|
||||
listen(params->server_fd, params->tcp_backlog),
|
||||
"Couldn't listen on server socket"
|
||||
@@ -332,9 +332,9 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
|
||||
|
||||
memset(s_client_address, 0, 64);
|
||||
strcpy(s_client_address, "???");
|
||||
inet_ntop( entry->address.generic.sa_family,
|
||||
sockaddr_address_data(&entry->address.generic),
|
||||
s_client_address,
|
||||
inet_ntop( entry->address.generic.sa_family,
|
||||
sockaddr_address_data(&entry->address.generic),
|
||||
s_client_address,
|
||||
64 );
|
||||
|
||||
debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread );
|
||||
@@ -343,15 +343,15 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
|
||||
* already dead, but the client still needs tidying up. */
|
||||
if (join_errno != 0 && !entry->client->stopped ) {
|
||||
debug( "join_errno was %s, stopped was %d", strerror( join_errno ), entry->client->stopped );
|
||||
FATAL_UNLESS( join_errno == EBUSY,
|
||||
"Problem with joining thread %p: %s",
|
||||
FATAL_UNLESS( join_errno == EBUSY,
|
||||
"Problem with joining thread %p: %s",
|
||||
entry->thread,
|
||||
strerror(join_errno) );
|
||||
}
|
||||
else if ( join_errno == 0 ) {
|
||||
debug("nbd thread %016x exited (%s) with status %ld",
|
||||
entry->thread,
|
||||
s_client_address,
|
||||
debug("nbd thread %016x exited (%s) with status %ld",
|
||||
entry->thread,
|
||||
s_client_address,
|
||||
(uint64_t)status);
|
||||
client_destroy( entry->client );
|
||||
entry->client = NULL;
|
||||
@@ -447,8 +447,8 @@ int server_acl_accepts( struct server *params, union mysockaddr * client_address
|
||||
}
|
||||
|
||||
|
||||
int server_should_accept_client(
|
||||
struct server * params,
|
||||
int server_should_accept_client(
|
||||
struct server * params,
|
||||
union mysockaddr * client_address,
|
||||
char *s_client_address,
|
||||
size_t s_client_address_len )
|
||||
@@ -466,7 +466,7 @@ int server_should_accept_client(
|
||||
|
||||
if ( !server_acl_accepts( params, client_address ) ) {
|
||||
warn( "Rejecting client %s: Access control error", s_client_address );
|
||||
debug( "We %s have an acl, and default_deny is %s",
|
||||
debug( "We %s have an acl, and default_deny is %s",
|
||||
(params->acl ? "do" : "do not"),
|
||||
(params->acl->default_deny ? "true" : "false") );
|
||||
return 0;
|
||||
@@ -477,8 +477,8 @@ int server_should_accept_client(
|
||||
|
||||
|
||||
|
||||
int spawn_client_thread(
|
||||
struct client * client_params,
|
||||
int spawn_client_thread(
|
||||
struct client * client_params,
|
||||
pthread_t *out_thread)
|
||||
{
|
||||
int result = pthread_create(out_thread, NULL, client_serve, client_params);
|
||||
@@ -492,8 +492,8 @@ int spawn_client_thread(
|
||||
* address doesn't match, or if there are too many clients already connected.
|
||||
*/
|
||||
void accept_nbd_client(
|
||||
struct server* params,
|
||||
int client_fd,
|
||||
struct server* params,
|
||||
int client_fd,
|
||||
union mysockaddr* client_address)
|
||||
{
|
||||
NULLCHECK(params);
|
||||
@@ -511,7 +511,7 @@ void accept_nbd_client(
|
||||
return;
|
||||
}
|
||||
|
||||
slot = cleanup_and_find_client_slot(params);
|
||||
slot = cleanup_and_find_client_slot(params);
|
||||
if (slot < 0) {
|
||||
warn("too many clients to accept connection");
|
||||
FATAL_IF_NEGATIVE( close( client_fd ),
|
||||
@@ -519,14 +519,14 @@ void accept_nbd_client(
|
||||
debug("Closed client socket fd %d", client_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
info( "Client %s accepted on fd %d.", s_client_address, client_fd );
|
||||
client_params = client_create( params, client_fd );
|
||||
|
||||
params->nbd_client[slot].client = client_params;
|
||||
memcpy(¶ms->nbd_client[slot].address, client_address,
|
||||
memcpy(¶ms->nbd_client[slot].address, client_address,
|
||||
sizeof(union mysockaddr));
|
||||
|
||||
|
||||
pthread_t * thread = ¶ms->nbd_client[slot].thread;
|
||||
|
||||
if ( 0 != spawn_client_thread( client_params, thread ) ) {
|
||||
@@ -537,7 +537,7 @@ void accept_nbd_client(
|
||||
debug("Closed client socket fd %d", client_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
debug("nbd thread %p started (%s)", params->nbd_client[slot].thread, s_client_address);
|
||||
}
|
||||
|
||||
@@ -576,7 +576,7 @@ int server_is_closed(struct server* serve)
|
||||
void server_close_clients( struct server *params )
|
||||
{
|
||||
NULLCHECK(params);
|
||||
|
||||
|
||||
info("closing all clients");
|
||||
|
||||
int i; /* , j; */
|
||||
@@ -587,7 +587,7 @@ void server_close_clients( struct server *params )
|
||||
|
||||
if ( entry->thread != 0 ) {
|
||||
debug( "Stop signaling client %p", entry->client );
|
||||
client_signal_stop( entry->client );
|
||||
client_signal_stop( entry->client );
|
||||
}
|
||||
}
|
||||
/* We don't join the clients here. When we enter the final
|
||||
@@ -648,7 +648,7 @@ int server_accept( struct server * params )
|
||||
self_pipe_fd_set( params->close_signal, &fds );
|
||||
self_pipe_fd_set( params->acl_updated_signal, &fds );
|
||||
|
||||
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds,
|
||||
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds,
|
||||
NULL, NULL, NULL), "select() failed");
|
||||
|
||||
if ( self_pipe_fd_isset( params->close_signal, &fds ) ){
|
||||
@@ -672,13 +672,13 @@ int server_accept( struct server * params )
|
||||
client_fd = accept( params->server_fd, &client_address.generic, &socklen );
|
||||
debug("Accepted nbd client socket fd %d", client_fd);
|
||||
accept_nbd_client(params, client_fd, &client_address);
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
void serve_accept_loop(struct server* params)
|
||||
void serve_accept_loop(struct server* params)
|
||||
{
|
||||
NULLCHECK( params );
|
||||
while( server_accept( params ) );
|
||||
@@ -697,9 +697,9 @@ void serve_init_allocation_map(struct server* params)
|
||||
FATAL_IF_NEGATIVE(fd, "Couldn't open %s", params->filename);
|
||||
size = lseek64(fd, 0, SEEK_END);
|
||||
params->size = size;
|
||||
FATAL_IF_NEGATIVE(size, "Couldn't find size of %s",
|
||||
FATAL_IF_NEGATIVE(size, "Couldn't find size of %s",
|
||||
params->filename);
|
||||
params->allocation_map =
|
||||
params->allocation_map =
|
||||
build_allocation_map(fd, size, block_allocation_resolution);
|
||||
close(fd);
|
||||
}
|
||||
@@ -737,29 +737,29 @@ void server_control_arrived( struct server *serve )
|
||||
|
||||
|
||||
/** Closes sockets, frees memory and waits for all client threads to finish */
|
||||
void serve_cleanup(struct server* params,
|
||||
void serve_cleanup(struct server* params,
|
||||
int fatal __attribute__ ((unused)) )
|
||||
{
|
||||
NULLCHECK( params );
|
||||
|
||||
|
||||
info("cleaning up");
|
||||
|
||||
int i;
|
||||
|
||||
|
||||
if (params->server_fd){ close(params->server_fd); }
|
||||
|
||||
if (params->allocation_map) {
|
||||
free(params->allocation_map);
|
||||
}
|
||||
|
||||
|
||||
if ( server_is_mirroring( params ) ) {
|
||||
server_abandon_mirror( params );
|
||||
}
|
||||
|
||||
|
||||
for (i=0; i < params->max_nbd_clients; i++) {
|
||||
void* status;
|
||||
pthread_t thread_id = params->nbd_client[i].thread;
|
||||
|
||||
|
||||
if (thread_id != 0) {
|
||||
debug("joining thread %p", thread_id);
|
||||
pthread_join(thread_id, &status);
|
||||
@@ -790,7 +790,7 @@ void server_abandon_mirror( struct server * serve )
|
||||
{
|
||||
NULLCHECK( serve );
|
||||
if ( serve->mirror_super ) {
|
||||
/* FIXME: AWOOGA! RACE!
|
||||
/* FIXME: AWOOGA! RACE!
|
||||
* We can set signal_abandon after mirror_super has
|
||||
* checked it, but before the reset. This would lead to
|
||||
* a hang. However, mirror_reset doesn't change the
|
||||
@@ -814,7 +814,7 @@ int do_serve(struct server* params)
|
||||
NULLCHECK( params );
|
||||
|
||||
int has_control;
|
||||
|
||||
|
||||
error_set_handler((cleanup_handler*) serve_cleanup, params);
|
||||
serve_open_server_socket(params);
|
||||
serve_init_allocation_map(params);
|
||||
|
Reference in New Issue
Block a user