From ab0dfb5ecabf93c7f25b95ee950aa404daa3d129 Mon Sep 17 00:00:00 2001 From: Matthew Bloch Date: Tue, 29 May 2012 04:03:28 +0100 Subject: [PATCH] Added mirror write barrier / final pass stuff & clean exit afterwards. Plenty of code documentation. --- bitset.h | 31 +++++++++++++ control.c | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++---- flexnbd.c | 28 ++++++++++-- ioutil.c | 2 + ioutil.h | 41 +++++++++++++++++ params.h | 27 ++++++++---- serve.c | 92 +++++++++++++++++++++++++++++++++++--- 7 files changed, 323 insertions(+), 28 deletions(-) diff --git a/bitset.h b/bitset.h index 51036a0..8fb1673 100644 --- a/bitset.h +++ b/bitset.h @@ -9,26 +9,32 @@ static inline char char_with_bit_set(int num) { return 1<<(num%8); } +/** Return 1 if the bit at ''idx'' in array ''b'' is set */ static inline int bit_is_set(char* b, int idx) { return (b[idx/8] & char_with_bit_set(idx)) != 0; } +/** Return 1 if the bit at ''idx'' in array ''b'' is clear */ static inline int bit_is_clear(char* b, int idx) { return !bit_is_set(b, idx); } +/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */ static inline int bit_has_value(char* b, int idx, int value) { if (value) return bit_is_set(b, idx); else return bit_is_clear(b, idx); } +/** Sets the bit ''idx'' in array ''b'' */ static inline void bit_set(char* b, int idx) { b[idx/8] |= char_with_bit_set(idx); //__sync_fetch_and_or(b+(idx/8), char_with_bit_set(idx)); } +/** Clears the bit ''idx'' in array ''b'' */ static inline void bit_clear(char* b, int idx) { b[idx/8] &= ~char_with_bit_set(idx); //__sync_fetch_and_nand(b+(idx/8), char_with_bit_set(idx)); } +/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */ static inline void bit_set_range(char* b, int from, int len) { for (; from%8 != 0 && len > 0; len--) bit_set(b, from++); @@ -37,6 +43,7 @@ static inline void bit_set_range(char* b, int from, int len) { for (; len > 0; len--) bit_set(b, from++); } +/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */ static inline void bit_clear_range(char* b, int from, int len) { for (; from%8 != 0 && len > 0; len--) bit_clear(b, from++); @@ -46,6 +53,10 @@ static inline void bit_clear_range(char* b, int from, int len) { bit_clear(b, from++); } +/** Counts the number of contiguous bits in array ''b'', starting at ''from'' + * up to a maximum number of bits ''len''. Returns the number of contiguous + * bits that are the same as the first one specified. + */ static inline int bit_run_count(char* b, int from, int len) { int count; int first_value = bit_is_set(b, from); @@ -72,12 +83,19 @@ static inline int bit_run_count(char* b, int from, int len) { return count; } +/** An application of a bitset - a bitset mapping represents a file of ''size'' + * broken down into ''resolution''-sized chunks. The bit set is assumed to + * represent one bit per chunk. + */ struct bitset_mapping { uint64_t size; int resolution; char bits[]; }; +/** Allocate a bitset_mapping for a file of the given size, and chunks of the + * given resolution. + */ static inline struct bitset_mapping* bitset_alloc( uint64_t size, int resolution @@ -97,6 +115,9 @@ static inline struct bitset_mapping* bitset_alloc( last = (from+len-1)/set->resolution, \ bitlen = last-first+1 +/** Set the bits in a bitset which correspond to the given bytes in the larger + * file. + */ static inline void bitset_set_range( struct bitset_mapping* set, uint64_t from, @@ -106,6 +127,9 @@ static inline void bitset_set_range( bit_set_range(set->bits, first, bitlen); } +/** Clear the bits in a bitset which correspond to the given bytes in the + * larger file. + */ static inline void bitset_clear_range( struct bitset_mapping* set, uint64_t from, @@ -115,6 +139,9 @@ static inline void bitset_clear_range( bit_clear_range(set->bits, first, bitlen); } +/** Counts the number of contiguous bytes that are represented as a run in + * the bit field. + */ static inline int bitset_run_count( struct bitset_mapping* set, uint64_t from, @@ -124,6 +151,8 @@ static inline int bitset_run_count( return bit_run_count(set->bits, first, bitlen) * set->resolution; } +/** Tests whether the bit field is clear for the given file offset. + */ static inline int bitset_is_clear_at( struct bitset_mapping* set, uint64_t at @@ -132,6 +161,8 @@ static inline int bitset_is_clear_at( return bit_is_clear(set->bits, at/set->resolution); } +/** Tests whether the bit field is set for the given file offset. + */ static inline int bitset_is_set_at( struct bitset_mapping* set, uint64_t at diff --git a/control.c b/control.c index 80a30a2..79986fa 100644 --- a/control.c +++ b/control.c @@ -1,3 +1,30 @@ +/* FlexNBD server (C) Bytemark Hosting 2012 + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +/** The control server responds on a UNIX socket and services our "remote" + * commands which are used for changing the access control list, initiating + * a mirror process, or asking for status. The protocol is pretty simple - + * after connecting the client sends a series of LF-terminated lines, followed + * by a blank line (i.e. double LF). The first line is taken to be the command + * name to invoke, and the lines before the double LF are its arguments. + * + * These commands can be invoked remotely from the command line, with the + * client code to be found in remote.c + */ + #include "params.h" #include "util.h" #include "ioutil.h" @@ -10,33 +37,74 @@ #include #include -static const int longest_run = 8<<20; +/** The mirror code will split NBD writes, making them this long as a maximum */ +static const int mirror_longest_write = 8<<20; +/** If, during a mirror pass, we have sent this number of bytes or fewer, we + * go to freeze the I/O and finish it off. This is just a guess. + */ +static const int mirror_last_pass_after_bytes_written = 100<<20; + +/** The largest number of full passes we'll do - the last one will always + * cause the I/O to freeze, however many bytes are left to copy. + */ +static const int mirror_maximum_passes = 7; + +/** Thread launched to drive mirror process */ void* mirror_runner(void* serve_params_uncast) { - struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast; - - const int max_passes = 7; /* biblical */ + const int last_pass = mirror_maximum_passes-1; int pass; + struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast; struct bitset_mapping *map = serve->mirror->dirty_map; - for (pass=0; pass < max_passes; pass++) { + for (pass=0; pass < mirror_maximum_passes; pass++) { uint64_t current = 0; uint64_t written = 0; debug("mirror start pass=%d", pass); + if (pass == last_pass) { + /* last pass, stop everything else */ + SERVER_ERROR_ON_FAILURE( + pthread_mutex_lock(&serve->l_accept), + "Problem with accept lock" + ); + SERVER_ERROR_ON_FAILURE( + pthread_mutex_lock(&serve->l_io), + "Problem with I/O lock" + ); + } + while (current < serve->size) { int run; - run = bitset_run_count(map, current, longest_run); + run = bitset_run_count(map, current, mirror_longest_write); debug("mirror current=%ld, run=%d", current, run); + /* FIXME: we could avoid sending sparse areas of the + * disc here, and probably save a lot of bandwidth and + * time (if we know the destination starts off zeroed). + */ if (bitset_is_set_at(map, current)) { + /* We've found a dirty area, send it */ debug("^^^ writing"); - /* dirty area */ + /* We need to stop the main thread from working + * because it might corrupt the dirty map. This + * is likely to slow things down but will be + * safe. + */ + if (pass < last_pass) + SERVER_ERROR_ON_FAILURE( + pthread_mutex_lock(&serve->l_io), + "Problem with I/O lock" + ); + + /** FIXME: do something useful with bytes/second */ + + /** FIXME: error handling code here won't unlock */ socket_nbd_write( serve->mirror->client, current, @@ -45,21 +113,60 @@ void* mirror_runner(void* serve_params_uncast) serve->mirror->mapped + current ); + /* now mark it clean */ bitset_clear_range(map, current, run); + + if (pass < last_pass) + SERVER_ERROR_ON_FAILURE( + pthread_mutex_unlock(&serve->l_io), + "Problem with I/O unlock" + ); + written += run; } current += run; } - if (written == 0) - pass = max_passes-1; + /* if we've not written anything */ + if (written < mirror_last_pass_after_bytes_written) + pass = last_pass; } + switch (serve->mirror->action_at_finish) + { + case ACTION_PROXY: + debug("proxy!"); + serve->proxy_fd = serve->mirror->client; + /* don't close our file descriptor, we still need it! */ + break; + case ACTION_EXIT: + debug("exit!"); + write(serve->close_signal[1], serve, 1); /* any byte will do */ + /* fall through */ + case ACTION_NOTHING: + debug("nothing!"); + close(serve->mirror->client); + } + + free(serve->mirror->dirty_map); + free(serve->mirror); + serve->mirror = NULL; /* and we're gone */ + + SERVER_ERROR_ON_FAILURE( + pthread_mutex_unlock(&serve->l_accept), + "Problem with accept unlock" + ); + SERVER_ERROR_ON_FAILURE( + pthread_mutex_unlock(&serve->l_io), + "Problem with I/O unlock" + ); + return NULL; } #define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1) +/** Command parser to start mirror process from socket input */ int control_mirror(struct control_params* client, int linesc, char** lines) { off64_t size, remote_size; @@ -146,9 +253,12 @@ int control_mirror(struct control_params* client, int linesc, char** lines) "Failed to create mirror thread" ); + write_socket("0: mirror started"); + return 0; } +/** Command parser to alter access control list from socket input */ int control_acl(struct control_params* client, int linesc, char** lines) { int acl_entries = 0, parsed; @@ -173,11 +283,13 @@ int control_acl(struct control_params* client, int linesc, char** lines) return 0; } +/** FIXME: add some useful statistics */ int control_status(struct control_params* client, int linesc, char** lines) { return 0; } +/** Master command parser for control socket connections, delegates quickly */ void* control_serve(void* client_uncast) { struct control_params* client = (struct control_params*) client_uncast; diff --git a/flexnbd.c b/flexnbd.c index 520119e..6286114 100644 --- a/flexnbd.c +++ b/flexnbd.c @@ -1,3 +1,24 @@ +/* FlexNBD server (C) Bytemark Hosting 2012 + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +/** main() function for parsing and dispatching commands. Each mode has + * a corresponding structure which is filled in and passed to a do_ function + * elsewhere in the program. + */ + #include "params.h" #include "util.h" @@ -22,6 +43,7 @@ void syntax() " flexnbd write \n" " flexnbd acl [allowed connection addresses ...]\n" " flexnbd mirror \n" + " [bytes per second] [proxy|nothing|exit]" " flexnbd status \n" ); exit(1); @@ -185,12 +207,12 @@ void mode(char* mode, int argc, char **argv) int main(int argc, char** argv) { - signal(SIGPIPE, SIG_IGN); - error_init(); + signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */ + error_init(); if (argc < 2) syntax(); - mode(argv[1], argc-2, argv+2); + mode(argv[1], argc-2, argv+2); /* never returns */ return 0; } diff --git a/ioutil.c b/ioutil.c index a77f959..f61ff12 100644 --- a/ioutil.c +++ b/ioutil.c @@ -222,6 +222,8 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines) char line[max_line_length+1]; *lines = NULL; + memset(line, 0, max_line_length+1); + while (1) { if (read_until_newline(fd, line, max_line_length) < 0) return lines_count; diff --git a/ioutil.h b/ioutil.h index 5eb0e3f..1f3c566 100644 --- a/ioutil.h +++ b/ioutil.h @@ -4,13 +4,54 @@ #include "params.h" +/** Returns a bit field representing which blocks are allocated in file + * descriptor ''fd''. You must supply the size, and the resolution at which + * you want the bits to represent allocated blocks. If the OS represents + * allocated blocks at a finer resolution than you've asked for, any block + * or part block will count as "allocated" with the corresponding bit set. + */ char* build_allocation_map(int fd, off64_t size, int resolution); + +/** Repeat a write() operation that succeeds partially until ''size'' bytes + * are written, or an error is returned, when it returns -1 as usual. + */ int writeloop(int filedes, const void *buffer, size_t size); + +/** Repeat a read() operation that succeeds partially until ''size'' bytes + * are written, or an error is returned, when it returns -1 as usual. + */ int readloop(int filedes, void *buffer, size_t size); + +/** Repeat a sendfile() operation that succeeds partially until ''size'' bytes + * are written, or an error is returned, when it returns -1 as usual. + */ int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count); + +/** Copy ''len'' bytes from ''fd_in'' to ''fd_out'' by creating a temporary + * pipe and using the Linux splice call repeatedly until it has transferred + * all the data. Returns -1 on error. + */ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len); + +/** Fill up to ''bufsize'' characters starting at ''buf'' with data from ''fd'' + * until an LF character is received, which is written to the buffer at a zero + * byte. Returns -1 on error, or the number of bytes written to the buffer. + */ int read_until_newline(int fd, char* buf, int bufsize); + +/** Read a number of lines using read_until_newline, until an empty line is + * received (i.e. the sequence LF LF). The data is read from ''fd'' and + * lines must be a maximum of ''max_line_length''. The set of lines is + * returned as an array of zero-terminated strings; you must pass an address + * ''lines'' in which you want the address of this array returned. + */ int read_lines_until_blankline(int fd, int max_line_length, char ***lines); + +/** Open the given ''filename'', determine its size, and mmap it in its + * entirety. The file descriptor is stored in ''out_fd'', the size in + * ''out_size'' and the address of the mmap in ''out_map''. If anything goes + * wrong, returns -1 setting errno, otherwise 0. + */ int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map); #endif diff --git a/params.h b/params.h index bcf2f6b..57df4f4 100644 --- a/params.h +++ b/params.h @@ -32,27 +32,36 @@ struct control_params { #define MAX_NBD_CLIENTS 16 struct mode_serve_params { - /* address/port to bind to */ + /** address/port to bind to */ union mysockaddr bind_to; - /* number of entries in current access control list*/ + /** number of entries in current access control list*/ int acl_entries; - /* pointer to access control list entries*/ + /** pointer to access control list entries*/ struct ip_and_mask (*acl)[0]; - /* file name to serve */ + /** file name to serve */ char* filename; - /* TCP backlog for listen() */ + /** TCP backlog for listen() */ int tcp_backlog; - /* file name of UNIX control socket (or NULL if none) */ + /** file name of UNIX control socket (or NULL if none) */ char* control_socket_name; - /* size of file */ + /** size of file */ off64_t size; /* NB dining philosophers if we ever mave more than one thread * that might need to pause the whole server. At the moment we only * have the one. */ - pthread_mutex_t l_accept; /* accept connections lock */ - pthread_mutex_t l_io ; /* read/write request lock */ + + /** Claimed around any accept/thread starting loop */ + pthread_mutex_t l_accept; + /** Claims around any I/O to this file */ + pthread_mutex_t l_io; + + /** set to non-zero to cause r/w requests to go via this fd */ + int proxy_fd; + + /** to interrupt accept loop and clients, write() to close_signal[1] */ + int close_signal[2]; struct mirror_status* mirror; int server; diff --git a/serve.c b/serve.c index a0e910d..17408f4 100644 --- a/serve.c +++ b/serve.c @@ -23,6 +23,14 @@ static inline void dirty(struct mode_serve_params *serve, off64_t from, int len) bitset_set_range(serve->mirror->dirty_map, from, len); } +int server_detect_closed(struct mode_serve_params* serve) +{ + int errno_old = errno; + int result = fcntl(serve->server, F_GETFD, 0) < 0; + errno = errno_old; + return result; +} + /** * So waiting on client->socket is len bytes of data, and we must write it all * to client->mapped. However while doing do we must consult the bitmap @@ -126,7 +134,17 @@ int client_serve_request(struct client_params* client) off64_t offset; struct nbd_request request; struct nbd_reply reply; + fd_set fds; + FD_ZERO(&fds); + FD_SET(client->socket, &fds); + FD_SET(client->serve->close_signal[0], &fds); + CLIENT_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, NULL, NULL, NULL), + "select() failed"); + + if (FD_ISSET(client->serve->close_signal[0], &fds)) + return 1; + if (readloop(client->socket, &request, sizeof(request)) == -1) { if (errno == 0) { debug("EOF reading request"); @@ -176,6 +194,14 @@ int client_serve_request(struct client_params* client) "Problem with I/O lock" ); + if (server_detect_closed(client->serve)) { + CLIENT_ERROR_ON_FAILURE( + pthread_mutex_unlock(&client->serve->l_io), + "Problem with I/O unlock" + ); + return 1; + } + switch (be32toh(request.type)) { case REQUEST_READ: @@ -269,18 +295,20 @@ void* client_serve(void* client_uncast) "Couldn't close socket %d", client->socket ); - close(client->socket); close(client->fileno); munmap(client->mapped, client->serve->size); - free(client); + return NULL; } static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 }; +/** Test whether AF_INET or AF_INET6 sockaddr is included in the given access + * control list, returning 1 if it is, and 0 if not. + */ int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct sockaddr* test) { int i; @@ -335,6 +363,7 @@ int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct soc return 0; } +/** Prepares a listening socket for the NBD server, binding etc. */ void serve_open_server_socket(struct mode_serve_params* params) { int optval=1; @@ -362,6 +391,10 @@ void serve_open_server_socket(struct mode_serve_params* params) ); } +/** We can only accommodate MAX_NBD_CLIENTS connections at once. This function + * goes through the current list, waits for any threads that have finished + * and returns the next slot free (or -1 if there are none). + */ int cleanup_and_find_client_slot(struct mode_serve_params* params) { int slot=-1, i; @@ -399,6 +432,10 @@ int cleanup_and_find_client_slot(struct mode_serve_params* params) return slot; } +/** Dispatch function for accepting an NBD connection and starting a thread + * to handle it. Rejects the connection if there is an ACL, and the far end's + * address doesn't match, or if there are too many clients already connected. + */ void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address) { struct client_params* client_params; @@ -441,6 +478,7 @@ void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct s debug("nbd thread %d started (%s)", (int) params->nbd_client[slot].thread, s_client_address); } +/** Accept either an NBD or control socket connection, dispatch appropriately */ void serve_accept_loop(struct mode_serve_params* params) { while (1) { @@ -451,18 +489,19 @@ void serve_accept_loop(struct mode_serve_params* params) FD_ZERO(&fds); FD_SET(params->server, &fds); + FD_SET(params->close_signal[0], &fds); if (params->control_socket_name) FD_SET(params->control, &fds); - SERVER_ERROR_ON_FAILURE( - select(FD_SETSIZE, &fds, NULL, NULL, NULL), - "select() failed" - ); + SERVER_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, + NULL, NULL, NULL), "select() failed"); + + if (FD_ISSET(params->close_signal[0], &fds)) + return; activity_fd = FD_ISSET(params->server, &fds) ? params->server : params->control; client_fd = accept(activity_fd, &client_address, &socklen); - SERVER_ERROR_ON_FAILURE(client_fd, "accept() failed"); SERVER_ERROR_ON_FAILURE( pthread_mutex_lock(¶ms->l_accept), @@ -481,6 +520,9 @@ void serve_accept_loop(struct mode_serve_params* params) } } +/** Initialisation function that sets up the initial allocation map, i.e. so + * we know which blocks of the file are allocated. + */ void serve_init_allocation_map(struct mode_serve_params* params) { int fd = open(params->filename, O_RDONLY); @@ -495,14 +537,50 @@ void serve_init_allocation_map(struct mode_serve_params* params) close(fd); } +/** Closes sockets, frees memory and waits for all client threads to finish */ +void serve_cleanup(struct mode_serve_params* params) +{ + int i; + + close(params->server); + close(params->control); + if (params->acl) + free(params->acl); + //free(params->filename); + if (params->control_socket_name) + //free(params->control_socket_name); + pthread_mutex_destroy(¶ms->l_accept); + pthread_mutex_destroy(¶ms->l_io); + if (params->proxy_fd); + close(params->proxy_fd); + close(params->close_signal[0]); + close(params->close_signal[1]); + free(params->block_allocation_map); + + if (params->mirror) + debug("mirror thread running! this should not happen!"); + + for (i=0; i < MAX_NBD_CLIENTS; i++) { + void* status; + + if (params->nbd_client[i].thread != 0) { + debug("joining thread %d", i); + pthread_join(params->nbd_client[i].thread, &status); + } + } +} + +/** Full lifecycle of the server */ void do_serve(struct mode_serve_params* params) { pthread_mutex_init(¶ms->l_accept, NULL); pthread_mutex_init(¶ms->l_io, NULL); + SERVER_ERROR_ON_FAILURE(pipe(params->close_signal) , "pipe failed"); serve_open_server_socket(params); serve_open_control_socket(params); serve_init_allocation_map(params); serve_accept_loop(params); + serve_cleanup(params); }