diff --git a/bitset.h b/bitset.h
index 51036a0..8fb1673 100644
--- a/bitset.h
+++ b/bitset.h
@@ -9,26 +9,32 @@
static inline char char_with_bit_set(int num) { return 1<<(num%8); }
+/** Return 1 if the bit at ''idx'' in array ''b'' is set */
static inline int bit_is_set(char* b, int idx) {
return (b[idx/8] & char_with_bit_set(idx)) != 0;
}
+/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
static inline int bit_is_clear(char* b, int idx) {
return !bit_is_set(b, idx);
}
+/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
static inline int bit_has_value(char* b, int idx, int value) {
if (value)
return bit_is_set(b, idx);
else
return bit_is_clear(b, idx);
}
+/** Sets the bit ''idx'' in array ''b'' */
static inline void bit_set(char* b, int idx) {
b[idx/8] |= char_with_bit_set(idx);
//__sync_fetch_and_or(b+(idx/8), char_with_bit_set(idx));
}
+/** Clears the bit ''idx'' in array ''b'' */
static inline void bit_clear(char* b, int idx) {
b[idx/8] &= ~char_with_bit_set(idx);
//__sync_fetch_and_nand(b+(idx/8), char_with_bit_set(idx));
}
+/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_set_range(char* b, int from, int len) {
for (; from%8 != 0 && len > 0; len--)
bit_set(b, from++);
@@ -37,6 +43,7 @@ static inline void bit_set_range(char* b, int from, int len) {
for (; len > 0; len--)
bit_set(b, from++);
}
+/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_clear_range(char* b, int from, int len) {
for (; from%8 != 0 && len > 0; len--)
bit_clear(b, from++);
@@ -46,6 +53,10 @@ static inline void bit_clear_range(char* b, int from, int len) {
bit_clear(b, from++);
}
+/** Counts the number of contiguous bits in array ''b'', starting at ''from''
+ * up to a maximum number of bits ''len''. Returns the number of contiguous
+ * bits that are the same as the first one specified.
+ */
static inline int bit_run_count(char* b, int from, int len) {
int count;
int first_value = bit_is_set(b, from);
@@ -72,12 +83,19 @@ static inline int bit_run_count(char* b, int from, int len) {
return count;
}
+/** An application of a bitset - a bitset mapping represents a file of ''size''
+ * broken down into ''resolution''-sized chunks. The bit set is assumed to
+ * represent one bit per chunk.
+ */
struct bitset_mapping {
uint64_t size;
int resolution;
char bits[];
};
+/** Allocate a bitset_mapping for a file of the given size, and chunks of the
+ * given resolution.
+ */
static inline struct bitset_mapping* bitset_alloc(
uint64_t size,
int resolution
@@ -97,6 +115,9 @@ static inline struct bitset_mapping* bitset_alloc(
last = (from+len-1)/set->resolution, \
bitlen = last-first+1
+/** Set the bits in a bitset which correspond to the given bytes in the larger
+ * file.
+ */
static inline void bitset_set_range(
struct bitset_mapping* set,
uint64_t from,
@@ -106,6 +127,9 @@ static inline void bitset_set_range(
bit_set_range(set->bits, first, bitlen);
}
+/** Clear the bits in a bitset which correspond to the given bytes in the
+ * larger file.
+ */
static inline void bitset_clear_range(
struct bitset_mapping* set,
uint64_t from,
@@ -115,6 +139,9 @@ static inline void bitset_clear_range(
bit_clear_range(set->bits, first, bitlen);
}
+/** Counts the number of contiguous bytes that are represented as a run in
+ * the bit field.
+ */
static inline int bitset_run_count(
struct bitset_mapping* set,
uint64_t from,
@@ -124,6 +151,8 @@ static inline int bitset_run_count(
return bit_run_count(set->bits, first, bitlen) * set->resolution;
}
+/** Tests whether the bit field is clear for the given file offset.
+ */
static inline int bitset_is_clear_at(
struct bitset_mapping* set,
uint64_t at
@@ -132,6 +161,8 @@ static inline int bitset_is_clear_at(
return bit_is_clear(set->bits, at/set->resolution);
}
+/** Tests whether the bit field is set for the given file offset.
+ */
static inline int bitset_is_set_at(
struct bitset_mapping* set,
uint64_t at
diff --git a/control.c b/control.c
index 80a30a2..79986fa 100644
--- a/control.c
+++ b/control.c
@@ -1,3 +1,30 @@
+/* FlexNBD server (C) Bytemark Hosting 2012
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+*/
+
+/** The control server responds on a UNIX socket and services our "remote"
+ * commands which are used for changing the access control list, initiating
+ * a mirror process, or asking for status. The protocol is pretty simple -
+ * after connecting the client sends a series of LF-terminated lines, followed
+ * by a blank line (i.e. double LF). The first line is taken to be the command
+ * name to invoke, and the lines before the double LF are its arguments.
+ *
+ * These commands can be invoked remotely from the command line, with the
+ * client code to be found in remote.c
+ */
+
#include "params.h"
#include "util.h"
#include "ioutil.h"
@@ -10,33 +37,74 @@
#include
#include
-static const int longest_run = 8<<20;
+/** The mirror code will split NBD writes, making them this long as a maximum */
+static const int mirror_longest_write = 8<<20;
+/** If, during a mirror pass, we have sent this number of bytes or fewer, we
+ * go to freeze the I/O and finish it off. This is just a guess.
+ */
+static const int mirror_last_pass_after_bytes_written = 100<<20;
+
+/** The largest number of full passes we'll do - the last one will always
+ * cause the I/O to freeze, however many bytes are left to copy.
+ */
+static const int mirror_maximum_passes = 7;
+
+/** Thread launched to drive mirror process */
void* mirror_runner(void* serve_params_uncast)
{
- struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
-
- const int max_passes = 7; /* biblical */
+ const int last_pass = mirror_maximum_passes-1;
int pass;
+ struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
struct bitset_mapping *map = serve->mirror->dirty_map;
- for (pass=0; pass < max_passes; pass++) {
+ for (pass=0; pass < mirror_maximum_passes; pass++) {
uint64_t current = 0;
uint64_t written = 0;
debug("mirror start pass=%d", pass);
+ if (pass == last_pass) {
+ /* last pass, stop everything else */
+ SERVER_ERROR_ON_FAILURE(
+ pthread_mutex_lock(&serve->l_accept),
+ "Problem with accept lock"
+ );
+ SERVER_ERROR_ON_FAILURE(
+ pthread_mutex_lock(&serve->l_io),
+ "Problem with I/O lock"
+ );
+ }
+
while (current < serve->size) {
int run;
- run = bitset_run_count(map, current, longest_run);
+ run = bitset_run_count(map, current, mirror_longest_write);
debug("mirror current=%ld, run=%d", current, run);
+ /* FIXME: we could avoid sending sparse areas of the
+ * disc here, and probably save a lot of bandwidth and
+ * time (if we know the destination starts off zeroed).
+ */
if (bitset_is_set_at(map, current)) {
+ /* We've found a dirty area, send it */
debug("^^^ writing");
- /* dirty area */
+ /* We need to stop the main thread from working
+ * because it might corrupt the dirty map. This
+ * is likely to slow things down but will be
+ * safe.
+ */
+ if (pass < last_pass)
+ SERVER_ERROR_ON_FAILURE(
+ pthread_mutex_lock(&serve->l_io),
+ "Problem with I/O lock"
+ );
+
+ /** FIXME: do something useful with bytes/second */
+
+ /** FIXME: error handling code here won't unlock */
socket_nbd_write(
serve->mirror->client,
current,
@@ -45,21 +113,60 @@ void* mirror_runner(void* serve_params_uncast)
serve->mirror->mapped + current
);
+ /* now mark it clean */
bitset_clear_range(map, current, run);
+
+ if (pass < last_pass)
+ SERVER_ERROR_ON_FAILURE(
+ pthread_mutex_unlock(&serve->l_io),
+ "Problem with I/O unlock"
+ );
+
written += run;
}
current += run;
}
- if (written == 0)
- pass = max_passes-1;
+ /* if we've not written anything */
+ if (written < mirror_last_pass_after_bytes_written)
+ pass = last_pass;
}
+ switch (serve->mirror->action_at_finish)
+ {
+ case ACTION_PROXY:
+ debug("proxy!");
+ serve->proxy_fd = serve->mirror->client;
+ /* don't close our file descriptor, we still need it! */
+ break;
+ case ACTION_EXIT:
+ debug("exit!");
+ write(serve->close_signal[1], serve, 1); /* any byte will do */
+ /* fall through */
+ case ACTION_NOTHING:
+ debug("nothing!");
+ close(serve->mirror->client);
+ }
+
+ free(serve->mirror->dirty_map);
+ free(serve->mirror);
+ serve->mirror = NULL; /* and we're gone */
+
+ SERVER_ERROR_ON_FAILURE(
+ pthread_mutex_unlock(&serve->l_accept),
+ "Problem with accept unlock"
+ );
+ SERVER_ERROR_ON_FAILURE(
+ pthread_mutex_unlock(&serve->l_io),
+ "Problem with I/O unlock"
+ );
+
return NULL;
}
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
+/** Command parser to start mirror process from socket input */
int control_mirror(struct control_params* client, int linesc, char** lines)
{
off64_t size, remote_size;
@@ -146,9 +253,12 @@ int control_mirror(struct control_params* client, int linesc, char** lines)
"Failed to create mirror thread"
);
+ write_socket("0: mirror started");
+
return 0;
}
+/** Command parser to alter access control list from socket input */
int control_acl(struct control_params* client, int linesc, char** lines)
{
int acl_entries = 0, parsed;
@@ -173,11 +283,13 @@ int control_acl(struct control_params* client, int linesc, char** lines)
return 0;
}
+/** FIXME: add some useful statistics */
int control_status(struct control_params* client, int linesc, char** lines)
{
return 0;
}
+/** Master command parser for control socket connections, delegates quickly */
void* control_serve(void* client_uncast)
{
struct control_params* client = (struct control_params*) client_uncast;
diff --git a/flexnbd.c b/flexnbd.c
index 520119e..6286114 100644
--- a/flexnbd.c
+++ b/flexnbd.c
@@ -1,3 +1,24 @@
+/* FlexNBD server (C) Bytemark Hosting 2012
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+*/
+
+/** main() function for parsing and dispatching commands. Each mode has
+ * a corresponding structure which is filled in and passed to a do_ function
+ * elsewhere in the program.
+ */
+
#include "params.h"
#include "util.h"
@@ -22,6 +43,7 @@ void syntax()
" flexnbd write \n"
" flexnbd acl [allowed connection addresses ...]\n"
" flexnbd mirror \n"
+ " [bytes per second] [proxy|nothing|exit]"
" flexnbd status \n"
);
exit(1);
@@ -185,12 +207,12 @@ void mode(char* mode, int argc, char **argv)
int main(int argc, char** argv)
{
- signal(SIGPIPE, SIG_IGN);
- error_init();
+ signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
+ error_init();
if (argc < 2)
syntax();
- mode(argv[1], argc-2, argv+2);
+ mode(argv[1], argc-2, argv+2); /* never returns */
return 0;
}
diff --git a/ioutil.c b/ioutil.c
index a77f959..f61ff12 100644
--- a/ioutil.c
+++ b/ioutil.c
@@ -222,6 +222,8 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines)
char line[max_line_length+1];
*lines = NULL;
+ memset(line, 0, max_line_length+1);
+
while (1) {
if (read_until_newline(fd, line, max_line_length) < 0)
return lines_count;
diff --git a/ioutil.h b/ioutil.h
index 5eb0e3f..1f3c566 100644
--- a/ioutil.h
+++ b/ioutil.h
@@ -4,13 +4,54 @@
#include "params.h"
+/** Returns a bit field representing which blocks are allocated in file
+ * descriptor ''fd''. You must supply the size, and the resolution at which
+ * you want the bits to represent allocated blocks. If the OS represents
+ * allocated blocks at a finer resolution than you've asked for, any block
+ * or part block will count as "allocated" with the corresponding bit set.
+ */
char* build_allocation_map(int fd, off64_t size, int resolution);
+
+/** Repeat a write() operation that succeeds partially until ''size'' bytes
+ * are written, or an error is returned, when it returns -1 as usual.
+ */
int writeloop(int filedes, const void *buffer, size_t size);
+
+/** Repeat a read() operation that succeeds partially until ''size'' bytes
+ * are written, or an error is returned, when it returns -1 as usual.
+ */
int readloop(int filedes, void *buffer, size_t size);
+
+/** Repeat a sendfile() operation that succeeds partially until ''size'' bytes
+ * are written, or an error is returned, when it returns -1 as usual.
+ */
int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count);
+
+/** Copy ''len'' bytes from ''fd_in'' to ''fd_out'' by creating a temporary
+ * pipe and using the Linux splice call repeatedly until it has transferred
+ * all the data. Returns -1 on error.
+ */
int splice_via_pipe_loop(int fd_in, int fd_out, size_t len);
+
+/** Fill up to ''bufsize'' characters starting at ''buf'' with data from ''fd''
+ * until an LF character is received, which is written to the buffer at a zero
+ * byte. Returns -1 on error, or the number of bytes written to the buffer.
+ */
int read_until_newline(int fd, char* buf, int bufsize);
+
+/** Read a number of lines using read_until_newline, until an empty line is
+ * received (i.e. the sequence LF LF). The data is read from ''fd'' and
+ * lines must be a maximum of ''max_line_length''. The set of lines is
+ * returned as an array of zero-terminated strings; you must pass an address
+ * ''lines'' in which you want the address of this array returned.
+ */
int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
+
+/** Open the given ''filename'', determine its size, and mmap it in its
+ * entirety. The file descriptor is stored in ''out_fd'', the size in
+ * ''out_size'' and the address of the mmap in ''out_map''. If anything goes
+ * wrong, returns -1 setting errno, otherwise 0.
+ */
int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map);
#endif
diff --git a/params.h b/params.h
index bcf2f6b..57df4f4 100644
--- a/params.h
+++ b/params.h
@@ -32,27 +32,36 @@ struct control_params {
#define MAX_NBD_CLIENTS 16
struct mode_serve_params {
- /* address/port to bind to */
+ /** address/port to bind to */
union mysockaddr bind_to;
- /* number of entries in current access control list*/
+ /** number of entries in current access control list*/
int acl_entries;
- /* pointer to access control list entries*/
+ /** pointer to access control list entries*/
struct ip_and_mask (*acl)[0];
- /* file name to serve */
+ /** file name to serve */
char* filename;
- /* TCP backlog for listen() */
+ /** TCP backlog for listen() */
int tcp_backlog;
- /* file name of UNIX control socket (or NULL if none) */
+ /** file name of UNIX control socket (or NULL if none) */
char* control_socket_name;
- /* size of file */
+ /** size of file */
off64_t size;
/* NB dining philosophers if we ever mave more than one thread
* that might need to pause the whole server. At the moment we only
* have the one.
*/
- pthread_mutex_t l_accept; /* accept connections lock */
- pthread_mutex_t l_io ; /* read/write request lock */
+
+ /** Claimed around any accept/thread starting loop */
+ pthread_mutex_t l_accept;
+ /** Claims around any I/O to this file */
+ pthread_mutex_t l_io;
+
+ /** set to non-zero to cause r/w requests to go via this fd */
+ int proxy_fd;
+
+ /** to interrupt accept loop and clients, write() to close_signal[1] */
+ int close_signal[2];
struct mirror_status* mirror;
int server;
diff --git a/serve.c b/serve.c
index a0e910d..17408f4 100644
--- a/serve.c
+++ b/serve.c
@@ -23,6 +23,14 @@ static inline void dirty(struct mode_serve_params *serve, off64_t from, int len)
bitset_set_range(serve->mirror->dirty_map, from, len);
}
+int server_detect_closed(struct mode_serve_params* serve)
+{
+ int errno_old = errno;
+ int result = fcntl(serve->server, F_GETFD, 0) < 0;
+ errno = errno_old;
+ return result;
+}
+
/**
* So waiting on client->socket is len bytes of data, and we must write it all
* to client->mapped. However while doing do we must consult the bitmap
@@ -126,7 +134,17 @@ int client_serve_request(struct client_params* client)
off64_t offset;
struct nbd_request request;
struct nbd_reply reply;
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(client->socket, &fds);
+ FD_SET(client->serve->close_signal[0], &fds);
+ CLIENT_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, NULL, NULL, NULL),
+ "select() failed");
+
+ if (FD_ISSET(client->serve->close_signal[0], &fds))
+ return 1;
+
if (readloop(client->socket, &request, sizeof(request)) == -1) {
if (errno == 0) {
debug("EOF reading request");
@@ -176,6 +194,14 @@ int client_serve_request(struct client_params* client)
"Problem with I/O lock"
);
+ if (server_detect_closed(client->serve)) {
+ CLIENT_ERROR_ON_FAILURE(
+ pthread_mutex_unlock(&client->serve->l_io),
+ "Problem with I/O unlock"
+ );
+ return 1;
+ }
+
switch (be32toh(request.type))
{
case REQUEST_READ:
@@ -269,18 +295,20 @@ void* client_serve(void* client_uncast)
"Couldn't close socket %d",
client->socket
);
-
close(client->socket);
close(client->fileno);
munmap(client->mapped, client->serve->size);
-
free(client);
+
return NULL;
}
static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 };
+/** Test whether AF_INET or AF_INET6 sockaddr is included in the given access
+ * control list, returning 1 if it is, and 0 if not.
+ */
int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct sockaddr* test)
{
int i;
@@ -335,6 +363,7 @@ int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct soc
return 0;
}
+/** Prepares a listening socket for the NBD server, binding etc. */
void serve_open_server_socket(struct mode_serve_params* params)
{
int optval=1;
@@ -362,6 +391,10 @@ void serve_open_server_socket(struct mode_serve_params* params)
);
}
+/** We can only accommodate MAX_NBD_CLIENTS connections at once. This function
+ * goes through the current list, waits for any threads that have finished
+ * and returns the next slot free (or -1 if there are none).
+ */
int cleanup_and_find_client_slot(struct mode_serve_params* params)
{
int slot=-1, i;
@@ -399,6 +432,10 @@ int cleanup_and_find_client_slot(struct mode_serve_params* params)
return slot;
}
+/** Dispatch function for accepting an NBD connection and starting a thread
+ * to handle it. Rejects the connection if there is an ACL, and the far end's
+ * address doesn't match, or if there are too many clients already connected.
+ */
void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address)
{
struct client_params* client_params;
@@ -441,6 +478,7 @@ void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct s
debug("nbd thread %d started (%s)", (int) params->nbd_client[slot].thread, s_client_address);
}
+/** Accept either an NBD or control socket connection, dispatch appropriately */
void serve_accept_loop(struct mode_serve_params* params)
{
while (1) {
@@ -451,18 +489,19 @@ void serve_accept_loop(struct mode_serve_params* params)
FD_ZERO(&fds);
FD_SET(params->server, &fds);
+ FD_SET(params->close_signal[0], &fds);
if (params->control_socket_name)
FD_SET(params->control, &fds);
- SERVER_ERROR_ON_FAILURE(
- select(FD_SETSIZE, &fds, NULL, NULL, NULL),
- "select() failed"
- );
+ SERVER_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds,
+ NULL, NULL, NULL), "select() failed");
+
+ if (FD_ISSET(params->close_signal[0], &fds))
+ return;
activity_fd = FD_ISSET(params->server, &fds) ? params->server :
params->control;
client_fd = accept(activity_fd, &client_address, &socklen);
- SERVER_ERROR_ON_FAILURE(client_fd, "accept() failed");
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(¶ms->l_accept),
@@ -481,6 +520,9 @@ void serve_accept_loop(struct mode_serve_params* params)
}
}
+/** Initialisation function that sets up the initial allocation map, i.e. so
+ * we know which blocks of the file are allocated.
+ */
void serve_init_allocation_map(struct mode_serve_params* params)
{
int fd = open(params->filename, O_RDONLY);
@@ -495,14 +537,50 @@ void serve_init_allocation_map(struct mode_serve_params* params)
close(fd);
}
+/** Closes sockets, frees memory and waits for all client threads to finish */
+void serve_cleanup(struct mode_serve_params* params)
+{
+ int i;
+
+ close(params->server);
+ close(params->control);
+ if (params->acl)
+ free(params->acl);
+ //free(params->filename);
+ if (params->control_socket_name)
+ //free(params->control_socket_name);
+ pthread_mutex_destroy(¶ms->l_accept);
+ pthread_mutex_destroy(¶ms->l_io);
+ if (params->proxy_fd);
+ close(params->proxy_fd);
+ close(params->close_signal[0]);
+ close(params->close_signal[1]);
+ free(params->block_allocation_map);
+
+ if (params->mirror)
+ debug("mirror thread running! this should not happen!");
+
+ for (i=0; i < MAX_NBD_CLIENTS; i++) {
+ void* status;
+
+ if (params->nbd_client[i].thread != 0) {
+ debug("joining thread %d", i);
+ pthread_join(params->nbd_client[i].thread, &status);
+ }
+ }
+}
+
+/** Full lifecycle of the server */
void do_serve(struct mode_serve_params* params)
{
pthread_mutex_init(¶ms->l_accept, NULL);
pthread_mutex_init(¶ms->l_io, NULL);
+ SERVER_ERROR_ON_FAILURE(pipe(params->close_signal) , "pipe failed");
serve_open_server_socket(params);
serve_open_control_socket(params);
serve_init_allocation_map(params);
serve_accept_loop(params);
+ serve_cleanup(params);
}