From 7eaf5c3fd345ac2e62e538cbab784d234c671a97 Mon Sep 17 00:00:00 2001 From: Matthew Bloch Date: Mon, 21 May 2012 04:03:17 +0100 Subject: [PATCH] Initial, untested mirror implementation and resolved some type confusion around struct ip_and_mask pointers (no idea how it worked before). Added a header for readwrite.h used in mirror implementation. --- params.h | 29 ++++++--- parse.c | 9 ++- parse.h | 2 +- readwrite.h | 11 ++++ serve.c | 170 ++++++++++++++++++++++++++++++++++++++++------------ 5 files changed, 172 insertions(+), 49 deletions(-) create mode 100644 readwrite.h diff --git a/params.h b/params.h index 7b629c5..6c50b7f 100644 --- a/params.h +++ b/params.h @@ -8,14 +8,31 @@ #include +struct mirror_status { + pthread_t thread; + int client; + char *filename; + off64_t max_bytes_per_second; + + char *mapped; + struct bitset_mapping *dirty_map; +}; + +struct control_params { + int socket; + struct mode_serve_params* serve; +}; + struct mode_serve_params { union mysockaddr bind_to; int acl_entries; - struct ip_and_mask *acl[0]; + struct ip_and_mask (*acl)[0]; char* filename; int tcp_backlog; char* control_socket_name; - + off64_t size; + + struct mirror_status* mirror; int server; int control; @@ -38,12 +55,8 @@ struct client_params { off64_t size; char* mapped; - char* block_allocation_map; -}; - -struct control_params { - int socket; - struct mode_serve_params* serve; + char* block_allocation_map; + struct mode_serve_params* serve; /* FIXME: remove above duplication */ }; union mode_params { diff --git a/parse.c b/parse.c index b5e2e86..5dcad9c 100644 --- a/parse.c +++ b/parse.c @@ -43,8 +43,9 @@ int parse_ip_to_sockaddr(struct sockaddr* out, char* src) return 0; } -int parse_acl(struct ip_and_mask *out[], int max, char **entries) +int parse_acl(struct ip_and_mask (**out)[], int max, char **entries) { + struct ip_and_mask (*list)[0]; int i; if (max == 0) { @@ -56,10 +57,12 @@ int parse_acl(struct ip_and_mask *out[], int max, char **entries) debug("acl alloc: %p", *out); } + list = *out; + for (i = 0; i < max; i++) { # define MAX_MASK_BITS (outentry->ip.family == AF_INET ? 32 : 128) int j; - struct ip_and_mask* outentry = *out+i; + struct ip_and_mask* outentry = list[i]; if (parse_ip_to_sockaddr(&outentry->ip.generic, entries[i]) == 0) return i; @@ -78,7 +81,7 @@ int parse_acl(struct ip_and_mask *out[], int max, char **entries) } for (i=0; i < max; i++) { - struct ip_and_mask* entry = *out+i; + struct ip_and_mask* entry = list[i]; debug("acl entry %d @ %p has mask %d", i, entry, entry->mask); } diff --git a/parse.h b/parse.h index 6019ec2..7588fab 100644 --- a/parse.h +++ b/parse.h @@ -18,7 +18,7 @@ struct ip_and_mask { }; int parse_ip_to_sockaddr(struct sockaddr* out, char* src); -int parse_acl(struct ip_and_mask *out[], int max, char **entries); +int parse_acl(struct ip_and_mask (**out)[0], int max, char **entries); #endif diff --git a/readwrite.h b/readwrite.h new file mode 100644 index 0000000..6b0d3c4 --- /dev/null +++ b/readwrite.h @@ -0,0 +1,11 @@ +#ifndef __READWRITE_H + +#define __READWRITE_H + +int socket_connect(struct sockaddr* to); +off64_t socket_nbd_read_hello(int fd); +void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf); +void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf); + +#endif + diff --git a/serve.c b/serve.c index 937d4bf..b8af350 100644 --- a/serve.c +++ b/serve.c @@ -4,6 +4,7 @@ #include "util.h" #include "bitset.h" #include "parse.h" +#include "readwrite.h" #include #include @@ -17,6 +18,12 @@ static const int block_allocation_resolution = 4096;//128<<10; +static inline void dirty(struct mode_serve_params *serve, off64_t from, int len) +{ + if (serve->mirror) + bitset_set_range(serve->mirror->dirty_map, from, len); +} + /** * So waiting on client->socket is len bytes of data, and we must write it all * to client->mapped. However while doing do we must consult the bitmap @@ -47,16 +54,11 @@ void write_not_zeroes(struct client_params* client, off64_t from, int len) int run = bit_run_count(map, first_bit, last_bit-first_bit) * block_allocation_resolution; + if (run > len) + run = len; + debug("write_not_zeroes: %ld+%d, first_bit=%d, last_bit=%d, run=%d", from, len, first_bit, last_bit, run); - - run -= from % block_allocation_resolution; /*start*/ - - if (first_bit == last_bit) - run -= block_allocation_resolution - - ((from+len) % block_allocation_resolution); /*end*/ - - debug("run adjusted to %d", run); #define DO_READ(dst, len) CLIENT_ERROR_ON_FAILURE( \ readloop( \ @@ -67,10 +69,11 @@ void write_not_zeroes(struct client_params* client, off64_t from, int len) "read failed %ld+%d", from, (len) \ ) - if (bit_is_set(map, from/8)) { - /*debug("writing the lot");*/ + if (bit_is_set(map, from/block_allocation_resolution)) { + debug("writing the lot"); /* already allocated, just write it all */ DO_READ(client->mapped + from, run); + dirty(client->serve, from, run); len -= run; from += run; } @@ -85,8 +88,8 @@ void write_not_zeroes(struct client_params* client, off64_t from, int len) if (blockrun > run) blockrun = run; - /*debug("writing partial: bit=%d, blockrun=%d", - bit, blockrun);*/ + debug("writing partial: bit=%d, blockrun=%d (run=%d)", + bit, blockrun, run); DO_READ(zerobuffer, blockrun); @@ -99,7 +102,8 @@ void write_not_zeroes(struct client_params* client, off64_t from, int len) memcmp(zerobuffer, zerobuffer + 1, blockrun)) { memcpy(dst, zerobuffer, blockrun); bit_set(map, bit); - /*debug("non-zero, copied and set bit %d", bit);*/ + dirty(client->serve, from, blockrun); + debug("non-zero, copied and set bit %d", bit); /* at this point we could choose to * short-cut the rest of the write for * faster I/O but by continuing to do it @@ -108,7 +112,7 @@ void write_not_zeroes(struct client_params* client, off64_t from, int len) */ } else { - /*debug("all zero, skip write");*/ + debug("all zero, skip write"); } len -= blockrun; run -= blockrun; @@ -208,6 +212,7 @@ int client_serve_request(struct client_params* client) be64toh(request.from), be32toh(request.len) ); + dirty(client->serve, be64toh(request.from), be32toh(request.len)); } write(client->socket, &reply, sizeof(reply)); @@ -216,21 +221,6 @@ int client_serve_request(struct client_params* client) return 0; } -void client_open_file(struct client_params* client) -{ - client->fileno = open(client->filename, O_RDWR|O_DIRECT|O_SYNC); - CLIENT_ERROR_ON_FAILURE(client->fileno, "Couldn't open %s", - client->filename); - client->size = lseek64(client->fileno, 0, SEEK_END); - CLIENT_ERROR_ON_FAILURE(client->fileno, "Couldn't seek to end of %s", - client->filename); - client->mapped = mmap64(NULL, client->size, PROT_READ|PROT_WRITE, - MAP_SHARED, client->fileno, 0); - CLIENT_ERROR_ON_FAILURE((long) client->mapped, "Couldn't map file %s", - client->filename); - debug("opened %s size %ld on fd %d @ %p", client->filename, client->size, client->fileno, client->mapped); -} - void client_send_hello(struct client_params* client) { struct nbd_init init; @@ -249,7 +239,16 @@ void* client_serve(void* client_uncast) { struct client_params* client = (struct client_params*) client_uncast; - client_open_file(client); + //client_open_file(client); + CLIENT_ERROR_ON_FAILURE( + open_and_mmap( + client->filename, + &client->fileno, + &client->size, + (void**) &client->mapped + ), + "Couldn't open/mmap file %s", client->filename + ); client_send_hello(client); while (client_serve_request(client) == 0) @@ -261,6 +260,11 @@ void* client_serve(void* client_uncast) client->socket ); + + close(client->socket); + close(client->fileno); + munmap(client->mapped, client->size); + free(client); return NULL; } @@ -269,12 +273,12 @@ void control_acl(struct control_params* client) { int acl_entries = 0, parsed; char** s_acl_entry = NULL; - struct ip_and_mask** acl, **old_acl; + struct ip_and_mask (*acl)[], (*old_acl)[]; while (1) { char entry[64]; int result = read_until_newline(client->socket, entry, 64); - if (result == -1 || result == 64) + if (result == -1) goto done; if (result == 1) /* blank line terminates */ break; @@ -310,15 +314,104 @@ done: if (acl_entries > 0) { } return; } + +static const int longest_run = 8<<20; + +void* mirror_runner(void* serve_params_uncast) +{ + struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast; + + int pass; + struct bitset_mapping *map = serve->mirror->dirty_map; + + for (pass=0; pass < 7 /* biblical */; pass++) { + uint64_t current = 0; + + while (current < serve->size) { + int run = bitset_run_count(map, current, longest_run); + + if (bitset_is_set_at(map, current)) { + /* dirty area */ + socket_nbd_write( + serve->mirror->client, + current, + run, + 0, + serve->mirror->mapped + current + ); + + bitset_clear_range(map, current, run); + } + current += run; + } + } + + return NULL; +} + void control_mirror(struct control_params* client) { + off64_t size; + int fd, map_fd; + struct mirror_status *mirror; + union mysockaddr connect_to; + char s_ip_address[64], s_port[8]; + + CLIENT_ERROR_ON_FAILURE( + read_until_newline(client->socket, s_ip_address, 64), + "Failed to read destination IP" + ); + CLIENT_ERROR_ON_FAILURE( + read_until_newline(client->socket, s_port, 8), + "Failed to read destination port" + ); + + if (parse_ip_to_sockaddr(&connect_to.generic, s_ip_address) == 0) + CLIENT_ERROR("Couldn't parse connection address '%s'", + s_ip_address); + + connect_to.v4.sin_port = atoi(s_port); + if (connect_to.v4.sin_port < 0 || connect_to.v4.sin_port > 65535) + CLIENT_ERROR("Port number must be >= 0 and <= 65535"); + connect_to.v4.sin_port = htobe16(connect_to.v4.sin_port); + + fd = socket_connect(&connect_to.generic); /* FIXME uses wrong error handler */ + + mirror = xmalloc(sizeof(struct mirror_status)); + mirror->client = fd; + mirror->max_bytes_per_second = 0; + + CLIENT_ERROR_ON_FAILURE( + open_and_mmap( + client->serve->filename, + &map_fd, + &size, + (void**) &mirror->mapped + ), + "Failed to open and mmap %s", + client->serve->filename + ); + + mirror->dirty_map = bitset_alloc(size, block_allocation_resolution); + bitset_set_range(mirror->dirty_map, 0, size); + + client->serve->mirror = mirror; + + CLIENT_ERROR_ON_FAILURE( /* FIXME should free mirror on error */ + pthread_create( + &mirror->thread, + NULL, + mirror_runner, + client->serve + ), + "Failed to create mirror thread" + ); } + void control_status(struct control_params* client) { } - - void* control_serve(void* client_uncast) { const int max = 256; @@ -350,12 +443,12 @@ void* control_serve(void* client_uncast) static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 }; -int is_included_in_acl(int list_length, struct ip_and_mask** list, struct sockaddr* test) +int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct sockaddr* test) { int i; for (i=0; i < list_length; i++) { - struct ip_and_mask *entry = list+i; + struct ip_and_mask *entry = &(*list)[i]; int testbits; char *raw_address1, *raw_address2; @@ -469,6 +562,7 @@ void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct s client_params->filename = params->filename; client_params->block_allocation_map = params->block_allocation_map; + client_params->serve = params; SERVER_ERROR_ON_FAILURE( pthread_create( @@ -513,7 +607,8 @@ void serve_accept_loop(struct mode_serve_params* params) FD_ZERO(&fds); FD_SET(params->server, &fds); - FD_SET(params->control, &fds); + if (params->control_socket_name) + FD_SET(params->control, &fds); SERVER_ERROR_ON_FAILURE( select(FD_SETSIZE, &fds, NULL, NULL, NULL), @@ -538,6 +633,7 @@ void serve_init_allocation_map(struct mode_serve_params* params) off64_t size; SERVER_ERROR_ON_FAILURE(fd, "Couldn't open %s", params->filename); size = lseek64(fd, 0, SEEK_END); + params->size = size; SERVER_ERROR_ON_FAILURE(size, "Couldn't find size of %s", params->filename); params->block_allocation_map =