Added mirror write barrier / final pass stuff & clean exit afterwards.

Plenty of code documentation.
This commit is contained in:
Matthew Bloch
2012-05-29 04:03:28 +01:00
parent dcb1633b8b
commit ab0dfb5eca
7 changed files with 323 additions and 28 deletions

View File

@@ -9,26 +9,32 @@
static inline char char_with_bit_set(int num) { return 1<<(num%8); }
/** Return 1 if the bit at ''idx'' in array ''b'' is set */
static inline int bit_is_set(char* b, int idx) {
return (b[idx/8] & char_with_bit_set(idx)) != 0;
}
/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
static inline int bit_is_clear(char* b, int idx) {
return !bit_is_set(b, idx);
}
/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
static inline int bit_has_value(char* b, int idx, int value) {
if (value)
return bit_is_set(b, idx);
else
return bit_is_clear(b, idx);
}
/** Sets the bit ''idx'' in array ''b'' */
static inline void bit_set(char* b, int idx) {
b[idx/8] |= char_with_bit_set(idx);
//__sync_fetch_and_or(b+(idx/8), char_with_bit_set(idx));
}
/** Clears the bit ''idx'' in array ''b'' */
static inline void bit_clear(char* b, int idx) {
b[idx/8] &= ~char_with_bit_set(idx);
//__sync_fetch_and_nand(b+(idx/8), char_with_bit_set(idx));
}
/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_set_range(char* b, int from, int len) {
for (; from%8 != 0 && len > 0; len--)
bit_set(b, from++);
@@ -37,6 +43,7 @@ static inline void bit_set_range(char* b, int from, int len) {
for (; len > 0; len--)
bit_set(b, from++);
}
/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_clear_range(char* b, int from, int len) {
for (; from%8 != 0 && len > 0; len--)
bit_clear(b, from++);
@@ -46,6 +53,10 @@ static inline void bit_clear_range(char* b, int from, int len) {
bit_clear(b, from++);
}
/** Counts the number of contiguous bits in array ''b'', starting at ''from''
* up to a maximum number of bits ''len''. Returns the number of contiguous
* bits that are the same as the first one specified.
*/
static inline int bit_run_count(char* b, int from, int len) {
int count;
int first_value = bit_is_set(b, from);
@@ -72,12 +83,19 @@ static inline int bit_run_count(char* b, int from, int len) {
return count;
}
/** An application of a bitset - a bitset mapping represents a file of ''size''
* broken down into ''resolution''-sized chunks. The bit set is assumed to
* represent one bit per chunk.
*/
struct bitset_mapping {
uint64_t size;
int resolution;
char bits[];
};
/** Allocate a bitset_mapping for a file of the given size, and chunks of the
* given resolution.
*/
static inline struct bitset_mapping* bitset_alloc(
uint64_t size,
int resolution
@@ -97,6 +115,9 @@ static inline struct bitset_mapping* bitset_alloc(
last = (from+len-1)/set->resolution, \
bitlen = last-first+1
/** Set the bits in a bitset which correspond to the given bytes in the larger
* file.
*/
static inline void bitset_set_range(
struct bitset_mapping* set,
uint64_t from,
@@ -106,6 +127,9 @@ static inline void bitset_set_range(
bit_set_range(set->bits, first, bitlen);
}
/** Clear the bits in a bitset which correspond to the given bytes in the
* larger file.
*/
static inline void bitset_clear_range(
struct bitset_mapping* set,
uint64_t from,
@@ -115,6 +139,9 @@ static inline void bitset_clear_range(
bit_clear_range(set->bits, first, bitlen);
}
/** Counts the number of contiguous bytes that are represented as a run in
* the bit field.
*/
static inline int bitset_run_count(
struct bitset_mapping* set,
uint64_t from,
@@ -124,6 +151,8 @@ static inline int bitset_run_count(
return bit_run_count(set->bits, first, bitlen) * set->resolution;
}
/** Tests whether the bit field is clear for the given file offset.
*/
static inline int bitset_is_clear_at(
struct bitset_mapping* set,
uint64_t at
@@ -132,6 +161,8 @@ static inline int bitset_is_clear_at(
return bit_is_clear(set->bits, at/set->resolution);
}
/** Tests whether the bit field is set for the given file offset.
*/
static inline int bitset_is_set_at(
struct bitset_mapping* set,
uint64_t at

130
control.c
View File

@@ -1,3 +1,30 @@
/* FlexNBD server (C) Bytemark Hosting 2012
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** The control server responds on a UNIX socket and services our "remote"
* commands which are used for changing the access control list, initiating
* a mirror process, or asking for status. The protocol is pretty simple -
* after connecting the client sends a series of LF-terminated lines, followed
* by a blank line (i.e. double LF). The first line is taken to be the command
* name to invoke, and the lines before the double LF are its arguments.
*
* These commands can be invoked remotely from the command line, with the
* client code to be found in remote.c
*/
#include "params.h"
#include "util.h"
#include "ioutil.h"
@@ -10,33 +37,74 @@
#include <sys/un.h>
#include <unistd.h>
static const int longest_run = 8<<20;
/** The mirror code will split NBD writes, making them this long as a maximum */
static const int mirror_longest_write = 8<<20;
/** If, during a mirror pass, we have sent this number of bytes or fewer, we
* go to freeze the I/O and finish it off. This is just a guess.
*/
static const int mirror_last_pass_after_bytes_written = 100<<20;
/** The largest number of full passes we'll do - the last one will always
* cause the I/O to freeze, however many bytes are left to copy.
*/
static const int mirror_maximum_passes = 7;
/** Thread launched to drive mirror process */
void* mirror_runner(void* serve_params_uncast)
{
struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
const int max_passes = 7; /* biblical */
const int last_pass = mirror_maximum_passes-1;
int pass;
struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
struct bitset_mapping *map = serve->mirror->dirty_map;
for (pass=0; pass < max_passes; pass++) {
for (pass=0; pass < mirror_maximum_passes; pass++) {
uint64_t current = 0;
uint64_t written = 0;
debug("mirror start pass=%d", pass);
if (pass == last_pass) {
/* last pass, stop everything else */
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&serve->l_accept),
"Problem with accept lock"
);
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&serve->l_io),
"Problem with I/O lock"
);
}
while (current < serve->size) {
int run;
run = bitset_run_count(map, current, longest_run);
run = bitset_run_count(map, current, mirror_longest_write);
debug("mirror current=%ld, run=%d", current, run);
/* FIXME: we could avoid sending sparse areas of the
* disc here, and probably save a lot of bandwidth and
* time (if we know the destination starts off zeroed).
*/
if (bitset_is_set_at(map, current)) {
/* We've found a dirty area, send it */
debug("^^^ writing");
/* dirty area */
/* We need to stop the main thread from working
* because it might corrupt the dirty map. This
* is likely to slow things down but will be
* safe.
*/
if (pass < last_pass)
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&serve->l_io),
"Problem with I/O lock"
);
/** FIXME: do something useful with bytes/second */
/** FIXME: error handling code here won't unlock */
socket_nbd_write(
serve->mirror->client,
current,
@@ -45,21 +113,60 @@ void* mirror_runner(void* serve_params_uncast)
serve->mirror->mapped + current
);
/* now mark it clean */
bitset_clear_range(map, current, run);
if (pass < last_pass)
SERVER_ERROR_ON_FAILURE(
pthread_mutex_unlock(&serve->l_io),
"Problem with I/O unlock"
);
written += run;
}
current += run;
}
if (written == 0)
pass = max_passes-1;
/* if we've not written anything */
if (written < mirror_last_pass_after_bytes_written)
pass = last_pass;
}
switch (serve->mirror->action_at_finish)
{
case ACTION_PROXY:
debug("proxy!");
serve->proxy_fd = serve->mirror->client;
/* don't close our file descriptor, we still need it! */
break;
case ACTION_EXIT:
debug("exit!");
write(serve->close_signal[1], serve, 1); /* any byte will do */
/* fall through */
case ACTION_NOTHING:
debug("nothing!");
close(serve->mirror->client);
}
free(serve->mirror->dirty_map);
free(serve->mirror);
serve->mirror = NULL; /* and we're gone */
SERVER_ERROR_ON_FAILURE(
pthread_mutex_unlock(&serve->l_accept),
"Problem with accept unlock"
);
SERVER_ERROR_ON_FAILURE(
pthread_mutex_unlock(&serve->l_io),
"Problem with I/O unlock"
);
return NULL;
}
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
/** Command parser to start mirror process from socket input */
int control_mirror(struct control_params* client, int linesc, char** lines)
{
off64_t size, remote_size;
@@ -146,9 +253,12 @@ int control_mirror(struct control_params* client, int linesc, char** lines)
"Failed to create mirror thread"
);
write_socket("0: mirror started");
return 0;
}
/** Command parser to alter access control list from socket input */
int control_acl(struct control_params* client, int linesc, char** lines)
{
int acl_entries = 0, parsed;
@@ -173,11 +283,13 @@ int control_acl(struct control_params* client, int linesc, char** lines)
return 0;
}
/** FIXME: add some useful statistics */
int control_status(struct control_params* client, int linesc, char** lines)
{
return 0;
}
/** Master command parser for control socket connections, delegates quickly */
void* control_serve(void* client_uncast)
{
struct control_params* client = (struct control_params*) client_uncast;

View File

@@ -1,3 +1,24 @@
/* FlexNBD server (C) Bytemark Hosting 2012
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** main() function for parsing and dispatching commands. Each mode has
* a corresponding structure which is filled in and passed to a do_ function
* elsewhere in the program.
*/
#include "params.h"
#include "util.h"
@@ -22,6 +43,7 @@ void syntax()
" flexnbd write <IP address> <port> <offset> <file to write>\n"
" flexnbd acl <control socket> [allowed connection addresses ...]\n"
" flexnbd mirror <control socket> <dst IP address> <dst port>\n"
" [bytes per second] [proxy|nothing|exit]"
" flexnbd status <control socket>\n"
);
exit(1);
@@ -185,12 +207,12 @@ void mode(char* mode, int argc, char **argv)
int main(int argc, char** argv)
{
signal(SIGPIPE, SIG_IGN);
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
error_init();
if (argc < 2)
syntax();
mode(argv[1], argc-2, argv+2);
mode(argv[1], argc-2, argv+2); /* never returns */
return 0;
}

View File

@@ -222,6 +222,8 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines)
char line[max_line_length+1];
*lines = NULL;
memset(line, 0, max_line_length+1);
while (1) {
if (read_until_newline(fd, line, max_line_length) < 0)
return lines_count;

View File

@@ -4,13 +4,54 @@
#include "params.h"
/** Returns a bit field representing which blocks are allocated in file
* descriptor ''fd''. You must supply the size, and the resolution at which
* you want the bits to represent allocated blocks. If the OS represents
* allocated blocks at a finer resolution than you've asked for, any block
* or part block will count as "allocated" with the corresponding bit set.
*/
char* build_allocation_map(int fd, off64_t size, int resolution);
/** Repeat a write() operation that succeeds partially until ''size'' bytes
* are written, or an error is returned, when it returns -1 as usual.
*/
int writeloop(int filedes, const void *buffer, size_t size);
/** Repeat a read() operation that succeeds partially until ''size'' bytes
* are written, or an error is returned, when it returns -1 as usual.
*/
int readloop(int filedes, void *buffer, size_t size);
/** Repeat a sendfile() operation that succeeds partially until ''size'' bytes
* are written, or an error is returned, when it returns -1 as usual.
*/
int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count);
/** Copy ''len'' bytes from ''fd_in'' to ''fd_out'' by creating a temporary
* pipe and using the Linux splice call repeatedly until it has transferred
* all the data. Returns -1 on error.
*/
int splice_via_pipe_loop(int fd_in, int fd_out, size_t len);
/** Fill up to ''bufsize'' characters starting at ''buf'' with data from ''fd''
* until an LF character is received, which is written to the buffer at a zero
* byte. Returns -1 on error, or the number of bytes written to the buffer.
*/
int read_until_newline(int fd, char* buf, int bufsize);
/** Read a number of lines using read_until_newline, until an empty line is
* received (i.e. the sequence LF LF). The data is read from ''fd'' and
* lines must be a maximum of ''max_line_length''. The set of lines is
* returned as an array of zero-terminated strings; you must pass an address
* ''lines'' in which you want the address of this array returned.
*/
int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
/** Open the given ''filename'', determine its size, and mmap it in its
* entirety. The file descriptor is stored in ''out_fd'', the size in
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
* wrong, returns -1 setting errno, otherwise 0.
*/
int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map);
#endif

View File

@@ -32,27 +32,36 @@ struct control_params {
#define MAX_NBD_CLIENTS 16
struct mode_serve_params {
/* address/port to bind to */
/** address/port to bind to */
union mysockaddr bind_to;
/* number of entries in current access control list*/
/** number of entries in current access control list*/
int acl_entries;
/* pointer to access control list entries*/
/** pointer to access control list entries*/
struct ip_and_mask (*acl)[0];
/* file name to serve */
/** file name to serve */
char* filename;
/* TCP backlog for listen() */
/** TCP backlog for listen() */
int tcp_backlog;
/* file name of UNIX control socket (or NULL if none) */
/** file name of UNIX control socket (or NULL if none) */
char* control_socket_name;
/* size of file */
/** size of file */
off64_t size;
/* NB dining philosophers if we ever mave more than one thread
* that might need to pause the whole server. At the moment we only
* have the one.
*/
pthread_mutex_t l_accept; /* accept connections lock */
pthread_mutex_t l_io ; /* read/write request lock */
/** Claimed around any accept/thread starting loop */
pthread_mutex_t l_accept;
/** Claims around any I/O to this file */
pthread_mutex_t l_io;
/** set to non-zero to cause r/w requests to go via this fd */
int proxy_fd;
/** to interrupt accept loop and clients, write() to close_signal[1] */
int close_signal[2];
struct mirror_status* mirror;
int server;

92
serve.c
View File

@@ -23,6 +23,14 @@ static inline void dirty(struct mode_serve_params *serve, off64_t from, int len)
bitset_set_range(serve->mirror->dirty_map, from, len);
}
int server_detect_closed(struct mode_serve_params* serve)
{
int errno_old = errno;
int result = fcntl(serve->server, F_GETFD, 0) < 0;
errno = errno_old;
return result;
}
/**
* So waiting on client->socket is len bytes of data, and we must write it all
* to client->mapped. However while doing do we must consult the bitmap
@@ -126,6 +134,16 @@ int client_serve_request(struct client_params* client)
off64_t offset;
struct nbd_request request;
struct nbd_reply reply;
fd_set fds;
FD_ZERO(&fds);
FD_SET(client->socket, &fds);
FD_SET(client->serve->close_signal[0], &fds);
CLIENT_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, NULL, NULL, NULL),
"select() failed");
if (FD_ISSET(client->serve->close_signal[0], &fds))
return 1;
if (readloop(client->socket, &request, sizeof(request)) == -1) {
if (errno == 0) {
@@ -176,6 +194,14 @@ int client_serve_request(struct client_params* client)
"Problem with I/O lock"
);
if (server_detect_closed(client->serve)) {
CLIENT_ERROR_ON_FAILURE(
pthread_mutex_unlock(&client->serve->l_io),
"Problem with I/O unlock"
);
return 1;
}
switch (be32toh(request.type))
{
case REQUEST_READ:
@@ -270,17 +296,19 @@ void* client_serve(void* client_uncast)
client->socket
);
close(client->socket);
close(client->fileno);
munmap(client->mapped, client->serve->size);
free(client);
return NULL;
}
static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 };
/** Test whether AF_INET or AF_INET6 sockaddr is included in the given access
* control list, returning 1 if it is, and 0 if not.
*/
int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct sockaddr* test)
{
int i;
@@ -335,6 +363,7 @@ int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct soc
return 0;
}
/** Prepares a listening socket for the NBD server, binding etc. */
void serve_open_server_socket(struct mode_serve_params* params)
{
int optval=1;
@@ -362,6 +391,10 @@ void serve_open_server_socket(struct mode_serve_params* params)
);
}
/** We can only accommodate MAX_NBD_CLIENTS connections at once. This function
* goes through the current list, waits for any threads that have finished
* and returns the next slot free (or -1 if there are none).
*/
int cleanup_and_find_client_slot(struct mode_serve_params* params)
{
int slot=-1, i;
@@ -399,6 +432,10 @@ int cleanup_and_find_client_slot(struct mode_serve_params* params)
return slot;
}
/** Dispatch function for accepting an NBD connection and starting a thread
* to handle it. Rejects the connection if there is an ACL, and the far end's
* address doesn't match, or if there are too many clients already connected.
*/
void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address)
{
struct client_params* client_params;
@@ -441,6 +478,7 @@ void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct s
debug("nbd thread %d started (%s)", (int) params->nbd_client[slot].thread, s_client_address);
}
/** Accept either an NBD or control socket connection, dispatch appropriately */
void serve_accept_loop(struct mode_serve_params* params)
{
while (1) {
@@ -451,18 +489,19 @@ void serve_accept_loop(struct mode_serve_params* params)
FD_ZERO(&fds);
FD_SET(params->server, &fds);
FD_SET(params->close_signal[0], &fds);
if (params->control_socket_name)
FD_SET(params->control, &fds);
SERVER_ERROR_ON_FAILURE(
select(FD_SETSIZE, &fds, NULL, NULL, NULL),
"select() failed"
);
SERVER_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds,
NULL, NULL, NULL), "select() failed");
if (FD_ISSET(params->close_signal[0], &fds))
return;
activity_fd = FD_ISSET(params->server, &fds) ? params->server :
params->control;
client_fd = accept(activity_fd, &client_address, &socklen);
SERVER_ERROR_ON_FAILURE(client_fd, "accept() failed");
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&params->l_accept),
@@ -481,6 +520,9 @@ void serve_accept_loop(struct mode_serve_params* params)
}
}
/** Initialisation function that sets up the initial allocation map, i.e. so
* we know which blocks of the file are allocated.
*/
void serve_init_allocation_map(struct mode_serve_params* params)
{
int fd = open(params->filename, O_RDONLY);
@@ -495,14 +537,50 @@ void serve_init_allocation_map(struct mode_serve_params* params)
close(fd);
}
/** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct mode_serve_params* params)
{
int i;
close(params->server);
close(params->control);
if (params->acl)
free(params->acl);
//free(params->filename);
if (params->control_socket_name)
//free(params->control_socket_name);
pthread_mutex_destroy(&params->l_accept);
pthread_mutex_destroy(&params->l_io);
if (params->proxy_fd);
close(params->proxy_fd);
close(params->close_signal[0]);
close(params->close_signal[1]);
free(params->block_allocation_map);
if (params->mirror)
debug("mirror thread running! this should not happen!");
for (i=0; i < MAX_NBD_CLIENTS; i++) {
void* status;
if (params->nbd_client[i].thread != 0) {
debug("joining thread %d", i);
pthread_join(params->nbd_client[i].thread, &status);
}
}
}
/** Full lifecycle of the server */
void do_serve(struct mode_serve_params* params)
{
pthread_mutex_init(&params->l_accept, NULL);
pthread_mutex_init(&params->l_io, NULL);
SERVER_ERROR_ON_FAILURE(pipe(params->close_signal) , "pipe failed");
serve_open_server_socket(params);
serve_open_control_socket(params);
serve_init_allocation_map(params);
serve_accept_loop(params);
serve_cleanup(params);
}