From c265d7fe3f0505553f997e97018559a843d12fcf Mon Sep 17 00:00:00 2001 From: Michel Pollet Date: Wed, 12 Oct 2016 22:51:32 +0100 Subject: [PATCH 1/5] tools: holemap: new tool Tool to list how a file is 'sparse' and calculate how 'sparse' it /could/ be. Signed-off-by: Michel Pollet --- tools/Makefile | 15 +++ tools/holemap.c | 271 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 tools/Makefile create mode 100644 tools/holemap.c diff --git a/tools/Makefile b/tools/Makefile new file mode 100644 index 0000000..15abde6 --- /dev/null +++ b/tools/Makefile @@ -0,0 +1,15 @@ + +CPPFLAGS = -Wall -I../src/server -I../src/common -I../src/proxy \ + -Doff64_t=__off64_t \ + -Wunused-const-variable=0 -Wformat=0 + +CFLAGS = $(CPPFLAGS) -g -O3 -std=gnu99 + +all: holemap + +# holemap requires libmhash-dev for md5 calculation +holemap: holemap.c ../src/server/*.h + $(CC) $(CFLAGS) -o $@ $^ ${shell pkg-config mhash --cflags --libs} + +clean: + rm -f holemap diff --git a/tools/holemap.c b/tools/holemap.c new file mode 100644 index 0000000..3b9c035 --- /dev/null +++ b/tools/holemap.c @@ -0,0 +1,271 @@ + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include "bitset.h" + +#include + +static int verbose = 0; + +static const char * fstr(const char *fname, const char *error) { + char * ret; + asprintf(&ret, "%s:%s", fname, error); + return strdup(ret); // leaks on purpose; it's before we exit() anyway +} + +static void +fill_check_pages( + uint8_t *map, + bitfield_p zmap, + uint64_t current, + uint64_t count, + uint64_t blksize) +{ + uint64_t * base = (uint64_t*)(map + (current * blksize)); + const int wordcount = blksize / sizeof(uint64_t); + + while (count--) { + int is_zero = 1; + + if (is_zero) { + uint64_t *cur = base; + for (int z = 0; z < wordcount && is_zero; z++) + is_zero = 0 == *cur++; + } + if (!is_zero) + bit_set(zmap, current); + base += wordcount; + current++; + } +} + +static int +read_file_data_ranges( + int fd, + bitfield_p bmap, + struct stat st) +{ + off_t off = 0; + int whence = SEEK_DATA; + + while (off < st.st_size) { + if (verbose > 1) + printf("offset %llu\n", off); + + off_t res = lseek(fd, off, whence); + if (res == -1) { + // end of file is OK + if (errno == ENXIO) + return 0; + fprintf(stderr, "lseek fails at %llu/%llu\n", off, st.st_size); + return res; + } + if (res > off && verbose) + printf("%s from %llu to %llu\n", + whence == SEEK_DATA ? "hole" : "data", off, res); + if (whence != SEEK_DATA) { + if (res > off) { + uint64_t from = off / st.st_blksize, + cnt = ((res + st.st_blksize - 1) - off) / st.st_blksize; + bit_set_range(bmap, from, cnt); + if (verbose > 1) + printf("set bits from %u cnt %u\n", from, cnt); + } + } + whence = whence == SEEK_DATA ? SEEK_HOLE : SEEK_DATA; + off = res; + } + return 0; +} + +int main(int argc, const char *argv[]) +{ + int dohash = 0; + + for (int i = 1; i < argc; i++) { + if (argv[i][0] == '-') { + if (!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")) { + verbose++; + continue; + } else if (!strcmp(argv[i], "-h") || !strcmp(argv[i], "--hash")) { + dohash++; + continue; + } + + fprintf(stderr, "%s: invalid argument: %s\n", argv[0], argv[i]); + exit(1); + } + const char *fname = argv[i]; + struct stat st; + if (stat(fname, &st)) { + perror(fname); + exit(1); + } + int fd = open(fname, O_RDONLY); + if (fd == -1) { + perror(fname); + exit(1); + } + if (verbose) + printf("%s is %u (%u times %u)\n", fname, + st.st_size, st.st_blocks, st.st_blksize); + + /* number of used blocks for this file size, also number of bits in bitmaps */ + size_t maxblocks = (st.st_size + st.st_blksize -1) / st.st_blksize; + + /* this is the original bitmap made using the operating system */ + bitfield_p bmap = (bitfield_p)calloc( + 1 + (maxblocks / BITS_PER_WORD), BITFIELD_WORD_SIZE); + + if (read_file_data_ranges(fd, bmap, st)) { + perror(fstr(fname, "read_file_data_ranges")); + exit(1); + } + + if (verbose > 3) { + printf("original map: "); + for (int z = 0; z <= (maxblocks / BITS_PER_WORD); z++) + printf("%016llx ", bmap[z]); + printf("\n"); + } + + uint8_t * map = mmap(NULL, maxblocks * st.st_blksize, + PROT_READ, MAP_PRIVATE, + fd, 0); + if (map == MAP_FAILED) { + perror(fname); + exit(1); + } + /* This one bitmap will have a subset, or be a copy of the bmap one when + * we're done + */ + bitfield_p zmap = (bitfield_p)calloc( + 1 + (maxblocks / BITS_PER_WORD), BITFIELD_WORD_SIZE); + { + uint64_t current = 0, count; + int which; + + while (current < maxblocks && + (count = bit_run_count(bmap, current, maxblocks - current, &which)) > 0) { + if (verbose) + printf("map is %s from %u to %u\n", + which ? "set" : "zero", + current * st.st_blksize, + (current+count) * st.st_blksize); + /* if bits were sets in the extents, double check them */ + if (which) + fill_check_pages(map, zmap, current, count, st.st_blksize); + + current += count; + } + } + if (verbose > 2) { + printf("optimized map: "); + for (int z = 0; z <= (maxblocks / BITS_PER_WORD); z++) + printf("%016llx ", zmap[z]); + printf("\n"); + } + + { + bitfield_p b[2] = { bmap, zmap }; + uint64_t set[2] = {0}; + + for (int bs = 0; bs < 2; bs++) { + for (int z = 0; z <= (maxblocks / BITS_PER_WORD); z++) + set[bs] += __builtin_popcountll(b[bs][z]); + } + + printf("%s is %d%% sparse, saving %llu blocks (%lluMB)\n", fname, + 100 - ((set[0] * 100) / maxblocks), + 100 - (maxblocks - set[0]), + ((maxblocks - set[0]) * st.st_blksize) / (1024*1024)); + if (set[1] < set[0]) + printf("%s could be %d%% sparse, saving %llu blocks (%lluMB)\n", fname, + 100 - ((set[1] * 100) / maxblocks), + 100 - (maxblocks - set[1]), + ((maxblocks - set[1]) * st.st_blksize) / (1024*1024)); + } + + + if (dohash) { + const int hs = mhash_get_block_size(MHASH_MD5); + struct { + char h[hs + 1]; + } hash[3]; + MHASH td; + + if (verbose) + printf("%s: calculating hashes, be patient\n", argv[i]); + td = mhash_init(MHASH_MD5); + mhash(td, map, maxblocks * st.st_blksize); + mhash_deinit(td, hash[0].h); + + if (verbose) + printf("%s: hash 1/3 done\n", argv[i]); + + { + uint64_t current = 0, count; + int which; + + td = mhash_init(MHASH_MD5); + + while (current < maxblocks && + (count = bit_run_count(bmap, current, maxblocks - current, &which)) > 0) { + /* if bits were sets in the extents, double check them */ + if (which) + mhash(td, map + (current * st.st_blksize), + count * st.st_blksize); + current += count; + } + mhash_deinit(td, hash[1].h); + } + if (verbose) + printf("%s: hash 2/3 done\n", argv[i]); + + { + uint64_t current = 0, count; + int which; + + td = mhash_init(MHASH_MD5); + + while (current < maxblocks && + (count = bit_run_count(zmap, current, maxblocks - current, &which)) > 0) { + /* if bits were sets in the extents, double check them */ + if (which) + mhash(td, map + (current * st.st_blksize), + count * st.st_blksize); + current += count; + } + mhash_deinit(td, hash[2].h); + } + if (memcmp(hash[0].h, hash[1].h, hs) || memcmp(hash[0].h, hash[2].h, hs)) { + fprintf(stderr, "%s: hash mismatch: ", argv[i]); + for (int h = 0; h < 3; h++) { + for (int hb = 0; hb < hs; hb++) + fprintf(stderr, "%.2x", (uint8_t)hash[h].h[hb]); + fprintf(stderr, " "); + } + fprintf(stderr, "\n"); + } else if (verbose) { + fprintf(stdout, "%s: hash 3/3 match: ", argv[i]); + for (int hb = 0; hb < hs; hb++) + fprintf(stdout, "%.2x", (uint8_t)hash[0].h[hb]); + fprintf(stdout, "\n"); + } + } + munmap(map, maxblocks * st.st_blksize); + close(fd); + free(bmap); + free(zmap); + } + + +} From 9ab1af8dff3be6a8bcc7321f60ce9c3d8d70e699 Mon Sep 17 00:00:00 2001 From: Michel Pollet Date: Thu, 13 Oct 2016 17:41:29 +0100 Subject: [PATCH 2/5] tools: semtest: new tool Piece of code I used to validate that socketpair is a lot quicker than pthread_semaphores. Signed-off-by: Michel Pollet --- tools/Makefile | 7 ++- tools/semtest.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 tools/semtest.c diff --git a/tools/Makefile b/tools/Makefile index 15abde6..410b52c 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -5,11 +5,14 @@ CPPFLAGS = -Wall -I../src/server -I../src/common -I../src/proxy \ CFLAGS = $(CPPFLAGS) -g -O3 -std=gnu99 -all: holemap +all: holemap semtest # holemap requires libmhash-dev for md5 calculation holemap: holemap.c ../src/server/*.h $(CC) $(CFLAGS) -o $@ $^ ${shell pkg-config mhash --cflags --libs} +semtest: semtest.c + $(CC) $(CFLAGS) -o $@ $^ -lpthread + clean: - rm -f holemap + rm -f holemap semtest diff --git a/tools/semtest.c b/tools/semtest.c new file mode 100644 index 0000000..fd1c80e --- /dev/null +++ b/tools/semtest.c @@ -0,0 +1,122 @@ +/* + * Latency test for pthread 'conditions' vs using a good ol' + * pair of linked sockets. + * gcc -o semtest -O2 -g --std=c99 semtest.c -lpthread + * + */ +#include +#include +#include +#include +#include +#include + +#define TESTCOUNT 10000000 + +pthread_mutex_t mutex; +pthread_cond_t kidwake, dadwake; +int flag_passed = 0; + +static void *thread_semaphore(void * param) +{ + printf("kid started\n"); + pthread_mutex_lock(&mutex); + pthread_cond_signal( &dadwake ); + pthread_mutex_unlock(&mutex); + do { + pthread_mutex_lock(&mutex); + while (flag_passed == 0) + pthread_cond_wait( &kidwake, &mutex ); + flag_passed = 0; + pthread_cond_signal( &dadwake ); + pthread_mutex_unlock(&mutex); + } while (1); + return NULL; +} + +int sock[2]; + +static void * thread_socket(void * param) +{ + printf("kid started\n"); + do { + char b = 0; + int r = read(sock[1], &b, 1); + if (r != 1) + perror("read"); + b = 0; + write(sock[1], &b, 1); // tell the kid + } while (1); + + return NULL; +} + + +int main(int argc, char ** argv) +{ + int test = 0; + for (int i = 1; i < argc; i++) { + if (!strcmp(argv[i], "socket")) + test = 1; + else if (!strcmp(argv[i], "semaphore")) + test = 2; + else + goto usage; + } + if (!test) + goto usage; + + if (test == 1) { + if (socketpair(PF_LOCAL, SOCK_STREAM, 0, sock)) { + perror("socketpair"); + exit(1); + } + printf("Socket test start:\n"); + pthread_t t; + pthread_create(&t, NULL, thread_socket, NULL); + + for (int ti = 0; ti < TESTCOUNT; ti++) { + char b = 1; + write(sock[0], &b, 1); // tell the kid + int r = read(sock[0], &b, 1); + if (r != 1) + perror("read"); + + if (!(ti % 100000)) { + printf("."); fflush(stdout); + } + } + } else { + pthread_mutex_init(&mutex, NULL); + pthread_cond_init( &kidwake, NULL ); + pthread_cond_init( &dadwake, NULL ); + + printf("Semaphore test start:\n"); + pthread_t t; + pthread_create(&t, NULL, thread_semaphore, NULL); + + pthread_mutex_lock(&mutex); + printf("dad wait for kid\n"); + pthread_cond_wait( &dadwake, &mutex ); + pthread_mutex_unlock(&mutex); + printf("dad has kid ready\n"); + + for (int ti = 0; ti < TESTCOUNT; ti++) { + pthread_mutex_lock(&mutex); + flag_passed = 1; + pthread_cond_signal( &kidwake ); + while (flag_passed == 1) + pthread_cond_wait( &dadwake, &mutex ); + pthread_mutex_unlock(&mutex); + if (!(ti % 100000)) { + printf("."); fflush(stdout); + } + } + } + printf("\nDone\n"); + exit(0); +usage: + fprintf(stderr, "%s socket -- run semaphore latency test with sockets\n", argv[0]); + fprintf(stderr, "%s semaphore -- run semaphore latency test with sockets\n", argv[0]); + exit(1); +} From 90e8b13df59dbde62bc77943014d03555df8cc23 Mon Sep 17 00:00:00 2001 From: Michel Pollet Date: Thu, 13 Oct 2016 16:55:41 +0100 Subject: [PATCH 3/5] fifo: Split the bitset.h And fix all dependencies Signed-off-by: Michel Pollet --- src/common/ioutil.c | 2 +- src/server/bitset.h | 303 --------------------------------------- src/server/bitstream.h | 317 +++++++++++++++++++++++++++++++++++++++++ src/server/client.c | 2 +- src/server/mirror.c | 2 +- src/server/serve.c | 2 +- 6 files changed, 321 insertions(+), 307 deletions(-) create mode 100644 src/server/bitstream.h diff --git a/src/common/ioutil.c b/src/common/ioutil.c index 6a8309a..93e7e60 100644 --- a/src/common/ioutil.c +++ b/src/common/ioutil.c @@ -9,7 +9,7 @@ #include #include "util.h" -#include "bitset.h" +#include "bitstream.h" #include "ioutil.h" diff --git a/src/server/bitset.h b/src/server/bitset.h index ed0ffd2..6b3ac72 100644 --- a/src/server/bitset.h +++ b/src/server/bitset.h @@ -130,309 +130,6 @@ static inline uint64_t bit_run_count(bitfield_p b, uint64_t from, uint64_t len, return count; } -enum bitset_stream_events { - BITSET_STREAM_UNSET = 0, - BITSET_STREAM_SET = 1, - BITSET_STREAM_ON = 2, - BITSET_STREAM_OFF = 3 -}; -#define BITSET_STREAM_EVENTS_ENUM_SIZE 4 - -struct bitset_stream_entry { - enum bitset_stream_events event; - uint64_t from; - uint64_t len; -}; - -/** Limit the stream size to 1MB for now. - * - * If this is too small, it'll cause requests to stall as the migration lags - * behind the changes made by those requests. - */ -#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) ) - -struct bitset_stream { - struct bitset_stream_entry entries[BITSET_STREAM_SIZE]; - int in; - int out; - int size; - pthread_mutex_t mutex; - pthread_cond_t cond_not_full; - pthread_cond_t cond_not_empty; - uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE]; -}; - - -/** An application of a bitset - a bitset mapping represents a file of ''size'' - * broken down into ''resolution''-sized chunks. The bit set is assumed to - * represent one bit per chunk. We also bundle a lock so that the set can be - * written reliably by multiple threads. - */ -struct bitset { - pthread_mutex_t lock; - uint64_t size; - int resolution; - struct bitset_stream *stream; - int stream_enabled; - bitfield_word_t bits[]; -}; - -/** Allocate a bitset for a file of the given size, and chunks of the - * given resolution. - */ -static inline struct bitset *bitset_alloc( uint64_t size, int resolution ) -{ - // calculate a size to allocate that is a multiple of the size of the - // bitfield word - size_t bitfield_size = - BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t ); - struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) ); - - bitset->size = size; - bitset->resolution = resolution; - /* don't actually need to call pthread_mutex_destroy '*/ - pthread_mutex_init(&bitset->lock, NULL); - bitset->stream = xmalloc( sizeof( struct bitset_stream ) ); - pthread_mutex_init( &bitset->stream->mutex, NULL ); - - /* Technically don't need to call pthread_cond_destroy either */ - pthread_cond_init( &bitset->stream->cond_not_full, NULL ); - pthread_cond_init( &bitset->stream->cond_not_empty, NULL ); - - return bitset; -} - -static inline void bitset_free( struct bitset * set ) -{ - /* TODO: free our mutex... */ - - free( set->stream ); - set->stream = NULL; - - free( set ); -} - -#define INT_FIRST_AND_LAST \ - uint64_t first = from/set->resolution, \ - last = ((from+len)-1)/set->resolution, \ - bitlen = (last-first)+1 - -#define BITSET_LOCK \ - FATAL_IF_NEGATIVE(pthread_mutex_lock(&set->lock), "Error locking bitset") - -#define BITSET_UNLOCK \ - FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset") - - -static inline void bitset_stream_enqueue( - struct bitset * set, - enum bitset_stream_events event, - uint64_t from, - uint64_t len -) -{ - struct bitset_stream * stream = set->stream; - - pthread_mutex_lock( &stream->mutex ); - - while ( stream->size == BITSET_STREAM_SIZE ) { - pthread_cond_wait( &stream->cond_not_full, &stream->mutex ); - } - - stream->entries[stream->in].event = event; - stream->entries[stream->in].from = from; - stream->entries[stream->in].len = len; - stream->queued_bytes[event] += len; - - stream->size++; - stream->in++; - stream->in %= BITSET_STREAM_SIZE; - - pthread_mutex_unlock( & stream->mutex ); - pthread_cond_signal( &stream->cond_not_empty ); - - return; -} - -static inline void bitset_stream_dequeue( - struct bitset * set, - struct bitset_stream_entry * out -) -{ - struct bitset_stream * stream = set->stream; - struct bitset_stream_entry * dequeued; - - pthread_mutex_lock( &stream->mutex ); - - while ( stream->size == 0 ) { - pthread_cond_wait( &stream->cond_not_empty, &stream->mutex ); - } - - dequeued = &stream->entries[stream->out]; - - if ( out != NULL ) { - out->event = dequeued->event; - out->from = dequeued->from; - out->len = dequeued->len; - } - - stream->queued_bytes[dequeued->event] -= dequeued->len; - stream->size--; - stream->out++; - stream->out %= BITSET_STREAM_SIZE; - - pthread_mutex_unlock( &stream->mutex ); - pthread_cond_signal( &stream->cond_not_full ); - - return; -} - -static inline size_t bitset_stream_size( struct bitset * set ) -{ - size_t size; - - pthread_mutex_lock( &set->stream->mutex ); - size = set->stream->size; - pthread_mutex_unlock( &set->stream->mutex ); - - return size; -} - -static inline uint64_t bitset_stream_queued_bytes( - struct bitset * set, - enum bitset_stream_events event -) -{ - uint64_t total; - - pthread_mutex_lock( &set->stream->mutex ); - total = set->stream->queued_bytes[event]; - pthread_mutex_unlock( &set->stream->mutex ); - - return total; -} - -static inline void bitset_enable_stream( struct bitset * set ) -{ - BITSET_LOCK; - set->stream_enabled = 1; - bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size ); - BITSET_UNLOCK; -} - -static inline void bitset_disable_stream( struct bitset * set ) -{ - BITSET_LOCK; - bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size ); - set->stream_enabled = 0; - BITSET_UNLOCK; -} - -/** Set the bits in a bitset which correspond to the given bytes in the larger - * file. - */ -static inline void bitset_set_range( - struct bitset * set, - uint64_t from, - uint64_t len) -{ - INT_FIRST_AND_LAST; - BITSET_LOCK; - bit_set_range(set->bits, first, bitlen); - - if ( set->stream_enabled ) { - bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len ); - } - - BITSET_UNLOCK; -} - - -/** Set every bit in the bitset. */ -static inline void bitset_set( struct bitset * set ) -{ - bitset_set_range(set, 0, set->size); -} - -/** Clear the bits in a bitset which correspond to the given bytes in the - * larger file. - */ -static inline void bitset_clear_range( - struct bitset * set, - uint64_t from, - uint64_t len) -{ - INT_FIRST_AND_LAST; - BITSET_LOCK; - bit_clear_range(set->bits, first, bitlen); - - if ( set->stream_enabled ) { - bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len ); - } - - BITSET_UNLOCK; -} - - -/** Clear every bit in the bitset. */ -static inline void bitset_clear( struct bitset * set ) -{ - bitset_clear_range(set, 0, set->size); -} - -/** As per bitset_run_count but also tells you whether the run it found was set - * or unset, atomically. - */ -static inline uint64_t bitset_run_count_ex( - struct bitset * set, - uint64_t from, - uint64_t len, - int* run_is_set -) -{ - uint64_t run; - - /* Clip our requests to the end of the bitset, avoiding uint underflow. */ - if ( from > set->size ) { - return 0; - } - len = ( len + from ) > set->size ? ( set->size - from ) : len; - - INT_FIRST_AND_LAST; - - BITSET_LOCK; - run = bit_run_count(set->bits, first, bitlen, run_is_set) * set->resolution; - run -= (from % set->resolution); - BITSET_UNLOCK; - - return run; -} - -/** Counts the number of contiguous bytes that are represented as a run in - * the bit field. - */ -static inline uint64_t bitset_run_count( - struct bitset * set, - uint64_t from, - uint64_t len) -{ - return bitset_run_count_ex( set, from, len, NULL ); -} - -/** Tests whether the bit field is clear for the given file offset. - */ -static inline int bitset_is_clear_at( struct bitset * set, uint64_t at ) -{ - return bit_is_clear(set->bits, at/set->resolution); -} - -/** Tests whether the bit field is set for the given file offset. - */ -static inline int bitset_is_set_at( struct bitset * set, uint64_t at ) -{ - return bit_is_set(set->bits, at/set->resolution); -} - #endif diff --git a/src/server/bitstream.h b/src/server/bitstream.h new file mode 100644 index 0000000..ec45747 --- /dev/null +++ b/src/server/bitstream.h @@ -0,0 +1,317 @@ +/* + * bitstream.h + * + * Created on: 13 Oct 2016 + * Author: michel + */ + +#ifndef SRC_SERVER_BITSTREAM_H_ +#define SRC_SERVER_BITSTREAM_H_ + +#include "bitset.h" + +enum bitset_stream_events { + BITSET_STREAM_UNSET = 0, + BITSET_STREAM_SET = 1, + BITSET_STREAM_ON = 2, + BITSET_STREAM_OFF = 3 +}; +#define BITSET_STREAM_EVENTS_ENUM_SIZE 4 + +struct bitset_stream_entry { + enum bitset_stream_events event; + uint64_t from; + uint64_t len; +}; + +/** Limit the stream size to 1MB for now. + * + * If this is too small, it'll cause requests to stall as the migration lags + * behind the changes made by those requests. + */ +#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) ) + +struct bitset_stream { + struct bitset_stream_entry entries[BITSET_STREAM_SIZE]; + int in; + int out; + int size; + pthread_mutex_t mutex; + pthread_cond_t cond_not_full; + pthread_cond_t cond_not_empty; + uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE]; +}; + + +/** An application of a bitset - a bitset mapping represents a file of ''size'' + * broken down into ''resolution''-sized chunks. The bit set is assumed to + * represent one bit per chunk. We also bundle a lock so that the set can be + * written reliably by multiple threads. + */ +struct bitset { + pthread_mutex_t lock; + uint64_t size; + int resolution; + struct bitset_stream *stream; + int stream_enabled; + bitfield_word_t bits[]; +}; + +/** Allocate a bitset for a file of the given size, and chunks of the + * given resolution. + */ +static inline struct bitset *bitset_alloc( uint64_t size, int resolution ) +{ + // calculate a size to allocate that is a multiple of the size of the + // bitfield word + size_t bitfield_size = + BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t ); + struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) ); + + bitset->size = size; + bitset->resolution = resolution; + /* don't actually need to call pthread_mutex_destroy '*/ + pthread_mutex_init(&bitset->lock, NULL); + bitset->stream = xmalloc( sizeof( struct bitset_stream ) ); + pthread_mutex_init( &bitset->stream->mutex, NULL ); + + /* Technically don't need to call pthread_cond_destroy either */ + pthread_cond_init( &bitset->stream->cond_not_full, NULL ); + pthread_cond_init( &bitset->stream->cond_not_empty, NULL ); + + return bitset; +} + +static inline void bitset_free( struct bitset * set ) +{ + /* TODO: free our mutex... */ + + free( set->stream ); + set->stream = NULL; + + free( set ); +} + +#define INT_FIRST_AND_LAST \ + uint64_t first = from/set->resolution, \ + last = ((from+len)-1)/set->resolution, \ + bitlen = (last-first)+1 + +#define BITSET_LOCK \ + FATAL_IF_NEGATIVE(pthread_mutex_lock(&set->lock), "Error locking bitset") + +#define BITSET_UNLOCK \ + FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset") + + +static inline void bitset_stream_enqueue( + struct bitset * set, + enum bitset_stream_events event, + uint64_t from, + uint64_t len +) +{ + struct bitset_stream * stream = set->stream; + + pthread_mutex_lock( &stream->mutex ); + + while ( stream->size == BITSET_STREAM_SIZE ) { + pthread_cond_wait( &stream->cond_not_full, &stream->mutex ); + } + + stream->entries[stream->in].event = event; + stream->entries[stream->in].from = from; + stream->entries[stream->in].len = len; + stream->queued_bytes[event] += len; + + stream->size++; + stream->in++; + stream->in %= BITSET_STREAM_SIZE; + + pthread_mutex_unlock( & stream->mutex ); + pthread_cond_signal( &stream->cond_not_empty ); + + return; +} + +static inline void bitset_stream_dequeue( + struct bitset * set, + struct bitset_stream_entry * out +) +{ + struct bitset_stream * stream = set->stream; + struct bitset_stream_entry * dequeued; + + pthread_mutex_lock( &stream->mutex ); + + while ( stream->size == 0 ) { + pthread_cond_wait( &stream->cond_not_empty, &stream->mutex ); + } + + dequeued = &stream->entries[stream->out]; + + if ( out != NULL ) { + out->event = dequeued->event; + out->from = dequeued->from; + out->len = dequeued->len; + } + + stream->queued_bytes[dequeued->event] -= dequeued->len; + stream->size--; + stream->out++; + stream->out %= BITSET_STREAM_SIZE; + + pthread_mutex_unlock( &stream->mutex ); + pthread_cond_signal( &stream->cond_not_full ); + + return; +} + +static inline size_t bitset_stream_size( struct bitset * set ) +{ + size_t size; + + pthread_mutex_lock( &set->stream->mutex ); + size = set->stream->size; + pthread_mutex_unlock( &set->stream->mutex ); + + return size; +} + +static inline uint64_t bitset_stream_queued_bytes( + struct bitset * set, + enum bitset_stream_events event +) +{ + uint64_t total; + + pthread_mutex_lock( &set->stream->mutex ); + total = set->stream->queued_bytes[event]; + pthread_mutex_unlock( &set->stream->mutex ); + + return total; +} + +static inline void bitset_enable_stream( struct bitset * set ) +{ + BITSET_LOCK; + set->stream_enabled = 1; + bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size ); + BITSET_UNLOCK; +} + +static inline void bitset_disable_stream( struct bitset * set ) +{ + BITSET_LOCK; + bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size ); + set->stream_enabled = 0; + BITSET_UNLOCK; +} + +/** Set the bits in a bitset which correspond to the given bytes in the larger + * file. + */ +static inline void bitset_set_range( + struct bitset * set, + uint64_t from, + uint64_t len) +{ + INT_FIRST_AND_LAST; + BITSET_LOCK; + bit_set_range(set->bits, first, bitlen); + + if ( set->stream_enabled ) { + bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len ); + } + + BITSET_UNLOCK; +} + + +/** Set every bit in the bitset. */ +static inline void bitset_set( struct bitset * set ) +{ + bitset_set_range(set, 0, set->size); +} + +/** Clear the bits in a bitset which correspond to the given bytes in the + * larger file. + */ +static inline void bitset_clear_range( + struct bitset * set, + uint64_t from, + uint64_t len) +{ + INT_FIRST_AND_LAST; + BITSET_LOCK; + bit_clear_range(set->bits, first, bitlen); + + if ( set->stream_enabled ) { + bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len ); + } + + BITSET_UNLOCK; +} + + +/** Clear every bit in the bitset. */ +static inline void bitset_clear( struct bitset * set ) +{ + bitset_clear_range(set, 0, set->size); +} + +/** As per bitset_run_count but also tells you whether the run it found was set + * or unset, atomically. + */ +static inline uint64_t bitset_run_count_ex( + struct bitset * set, + uint64_t from, + uint64_t len, + int* run_is_set +) +{ + uint64_t run; + + /* Clip our requests to the end of the bitset, avoiding uint underflow. */ + if ( from > set->size ) { + return 0; + } + len = ( len + from ) > set->size ? ( set->size - from ) : len; + + INT_FIRST_AND_LAST; + + BITSET_LOCK; + run = bit_run_count(set->bits, first, bitlen, run_is_set) * set->resolution; + run -= (from % set->resolution); + BITSET_UNLOCK; + + return run; +} + +/** Counts the number of contiguous bytes that are represented as a run in + * the bit field. + */ +static inline uint64_t bitset_run_count( + struct bitset * set, + uint64_t from, + uint64_t len) +{ + return bitset_run_count_ex( set, from, len, NULL ); +} + +/** Tests whether the bit field is clear for the given file offset. + */ +static inline int bitset_is_clear_at( struct bitset * set, uint64_t at ) +{ + return bit_is_clear(set->bits, at/set->resolution); +} + +/** Tests whether the bit field is set for the given file offset. + */ +static inline int bitset_is_set_at( struct bitset * set, uint64_t at ) +{ + return bit_is_set(set->bits, at/set->resolution); +} + + +#endif /* SRC_SERVER_BITSTREAM_H_ */ diff --git a/src/server/client.c b/src/server/client.c index 8b57b47..2c6cd39 100644 --- a/src/server/client.c +++ b/src/server/client.c @@ -3,7 +3,7 @@ #include "ioutil.h" #include "sockutil.h" #include "util.h" -#include "bitset.h" +#include "bitstream.h" #include "nbdtypes.h" #include "self_pipe.h" diff --git a/src/server/mirror.c b/src/server/mirror.c index b3d22d5..708a67d 100644 --- a/src/server/mirror.c +++ b/src/server/mirror.c @@ -22,7 +22,7 @@ #include "sockutil.h" #include "parse.h" #include "readwrite.h" -#include "bitset.h" +#include "bitstream.h" #include "self_pipe.h" #include "status.h" diff --git a/src/server/serve.c b/src/server/serve.c index de6b871..d0856f2 100644 --- a/src/server/serve.c +++ b/src/server/serve.c @@ -4,7 +4,7 @@ #include "ioutil.h" #include "sockutil.h" #include "util.h" -#include "bitset.h" +#include "bitstream.h" #include "control.h" #include "self_pipe.h" From 781a91fe3dbdccd727cd8ad7f88c5ea037554ee3 Mon Sep 17 00:00:00 2001 From: Michel Pollet Date: Thu, 13 Oct 2016 17:42:04 +0100 Subject: [PATCH 4/5] fifo: Add fifo_declare.h My own implementation, used in countless projects Signed-off-by: Michel Pollet --- src/common/fifo_declare.h | 189 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 src/common/fifo_declare.h diff --git a/src/common/fifo_declare.h b/src/common/fifo_declare.h new file mode 100644 index 0000000..8a3b2fb --- /dev/null +++ b/src/common/fifo_declare.h @@ -0,0 +1,189 @@ +/* + fido_declare.h + Copyright (C) 2003-2012 Michel Pollet + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +/* + * FIFO helpers, aka circular buffers + * + * these macros define accessories for FIFOs of any name, type and + * any (power of two) size + */ + +#ifndef __FIFO_DECLARE__ +#define __FIFO_DECLARE__ + +#ifdef __cplusplus +extern "C" { +#endif + +/* + doing a : + DECLARE_FIFO(uint8_t, myfifo, 128); + + will declare : + enum : myfifo_overflow_f + type : myfifo_t + functions: + // write a byte into the fifo, return 1 if there was room, 0 if there wasn't + int myfifo_write(myfifo_t *c, uint8_t b); + // reads a byte from the fifo, return 0 if empty. Use myfifo_isempty() to check beforehand + uint8_t myfifo_read(myfifo_t *c); + int myfifo_isfull(myfifo_t *c); + int myfifo_isempty(myfifo_t *c); + // returns number of items to read now + uint16_t myfifo_get_read_size(myfifo_t *c); + // read item at offset o from read cursor, no cursor advance + uint8_t myfifo_read_at(myfifo_t *c, uint16_t o); + // write b at offset o compared to current write cursor, no cursor advance + void myfifo_write_at(myfifo_t *c, uint16_t o, uint8_t b); + + In your .c you need to 'implement' the fifo: + DEFINE_FIFO(uint8_t, myfifo) + + To use the fifo, you must declare at least one : + myfifo_t fifo = FIFO_NULL; + + while (!myfifo_isfull(&fifo)) + myfifo_write(&fifo, 0xaa); + .... + while (!myfifo_isempty(&fifo)) + b = myfifo_read(&fifo); + */ + +#include + +#if __AVR__ +#define FIFO_CURSOR_TYPE uint8_t +#define FIFO_BOOL_TYPE char +#define FIFO_INLINE +#define FIFO_SYNC +#endif + +#ifndef FIFO_CURSOR_TYPE +#define FIFO_CURSOR_TYPE uint16_t +#endif +#ifndef FIFO_BOOL_TYPE +#define FIFO_BOOL_TYPE int +#endif +#ifndef FIFO_INLINE +#define FIFO_INLINE inline +#endif + +/* We should not need volatile */ +#ifndef FIFO_VOLATILE +#define FIFO_VOLATILE +#endif +#ifndef FIFO_SYNC +#define FIFO_SYNC __sync_synchronize() +#endif + +#ifndef FIFO_ZERO_INIT +#define FIFO_ZERO_INIT {0} +#endif +#define FIFO_NULL { FIFO_ZERO_INIT, 0, 0, 0 } + +/* New compilers don't like unused static functions. However, + * we do like 'static inlines' for these small accessors, + * so we mark them as 'unused'. It stops it complaining */ +#ifdef __GNUC__ +#define FIFO_DECL static __attribute__ ((unused)) +#else +#define FIFO_DECL static +#endif + +#define DECLARE_FIFO(__type, __name, __size) \ +enum { __name##_overflow_f = (1 << 0) }; \ +enum { __name##_fifo_size = (__size) }; \ +typedef struct __name##_t { \ + __type buffer[__name##_fifo_size]; \ + FIFO_VOLATILE FIFO_CURSOR_TYPE read; \ + FIFO_VOLATILE FIFO_CURSOR_TYPE write; \ + FIFO_VOLATILE uint8_t flags; \ +} __name##_t + +#define DEFINE_FIFO(__type, __name) \ +FIFO_DECL FIFO_INLINE FIFO_BOOL_TYPE __name##_write(__name##_t * c, __type b)\ +{\ + FIFO_CURSOR_TYPE now = c->write;\ + FIFO_CURSOR_TYPE next = (now + 1) & (__name##_fifo_size-1);\ + if (c->read != next) { \ + c->buffer[now] = b;\ + FIFO_SYNC; \ + c->write = next;\ + return 1;\ + }\ + return 0;\ +}\ +FIFO_DECL FIFO_INLINE FIFO_BOOL_TYPE __name##_isfull(__name##_t *c)\ +{\ + FIFO_CURSOR_TYPE next = (c->write + 1) & (__name##_fifo_size-1);\ + return c->read == next;\ +}\ +FIFO_DECL FIFO_INLINE FIFO_BOOL_TYPE __name##_isempty(__name##_t * c)\ +{\ + return c->read == c->write;\ +}\ +FIFO_DECL FIFO_INLINE __type __name##_read(__name##_t * c)\ +{\ + __type res = FIFO_ZERO_INIT; \ + FIFO_CURSOR_TYPE read = c->read;\ + if (read == c->write)\ + return res;\ + res = c->buffer[read];\ + FIFO_SYNC; \ + c->read = (read + 1) & (__name##_fifo_size-1);\ + return res;\ +}\ +FIFO_DECL FIFO_INLINE FIFO_CURSOR_TYPE __name##_get_read_size(__name##_t *c)\ +{\ + return ((c->write + __name##_fifo_size) - c->read) & (__name##_fifo_size-1);\ +}\ +FIFO_DECL FIFO_INLINE FIFO_CURSOR_TYPE __name##_get_write_size(__name##_t *c)\ +{\ + return (__name##_fifo_size-1) - __name##_get_read_size(c);\ +}\ +FIFO_DECL FIFO_INLINE void __name##_read_offset(__name##_t *c, FIFO_CURSOR_TYPE o)\ +{\ + FIFO_SYNC; \ + c->read = (c->read + o) & (__name##_fifo_size-1);\ +}\ +FIFO_DECL FIFO_INLINE __type __name##_read_at(__name##_t *c, FIFO_CURSOR_TYPE o)\ +{\ + return c->buffer[(c->read + o) & (__name##_fifo_size-1)];\ +}\ +FIFO_DECL FIFO_INLINE void __name##_write_at(__name##_t *c, FIFO_CURSOR_TYPE o, __type b)\ +{\ + c->buffer[(c->write + o) & (__name##_fifo_size-1)] = b;\ +}\ +FIFO_DECL FIFO_INLINE void __name##_write_offset(__name##_t *c, FIFO_CURSOR_TYPE o)\ +{\ + FIFO_SYNC; \ + c->write = (c->write + o) & (__name##_fifo_size-1);\ +}\ +FIFO_DECL FIFO_INLINE void __name##_reset(__name##_t *c)\ +{\ + FIFO_SYNC; \ + c->read = c->write = c->flags = 0;\ +}\ +struct __name##_t + +#ifdef __cplusplus +}; +#endif + +#endif From c19901cf10d3fb3bb7618f4ec6331425873caf80 Mon Sep 17 00:00:00 2001 From: Michel Pollet Date: Sat, 1 Oct 2016 12:15:09 +0100 Subject: [PATCH 5/5] mbox: Simplified Removed the existing mbox that is used to pass transactions from one thread to the other, use a non-locking FIFO and a simple semaphore instead. Removed all notion of void* from the FIFO system, use the now generic FIFO. Also started to rework the mirror.c code to use typedefs for structs enums etc. DO NOT USE. check_mbox is borken and needs changing. Currently 'functional' otherwise like this, but requires more testing. Signed-off-by: Michel Pollet --- src/server/control.c | 20 ++++----- src/server/control.h | 4 +- src/server/mbox.c | 90 ++++++++++++++++++++--------------------- src/server/mbox.h | 46 ++++++++++----------- src/server/mirror.c | 81 +++++++++++++++++-------------------- src/server/mirror.h | 38 ++++++++--------- src/server/serve.h | 4 +- tests/unit/check_mbox.c | 13 +++--- 8 files changed, 139 insertions(+), 157 deletions(-) diff --git a/src/server/control.c b/src/server/control.c index 63f0720..7d47eab 100644 --- a/src/server/control.c +++ b/src/server/control.c @@ -83,7 +83,7 @@ void control_destroy( struct control * control ) struct control_client * control_client_create( struct flexnbd * flexnbd, int client_fd , - struct mbox * state_mbox ) + struct mbox_t * state_mbox ) { NULLCHECK( flexnbd ); @@ -256,7 +256,7 @@ void * control_runner( void * control_uncast ) #define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1) -void control_write_mirror_response( enum mirror_state mirror_state, int client_fd ) +void control_write_mirror_response( mirror_state_t mirror_state, int client_fd ) { switch (mirror_state) { case MS_INIT: @@ -290,22 +290,16 @@ void control_write_mirror_response( enum mirror_state mirror_state, int client_f /* Call this in the thread where you want to receive the mirror state */ -enum mirror_state control_client_mirror_wait( +mirror_state_t control_client_mirror_wait( struct control_client* client) { NULLCHECK( client ); NULLCHECK( client->mirror_state_mbox ); - struct mbox * mbox = client->mirror_state_mbox; - enum mirror_state mirror_state; - enum mirror_state * contents; + struct mbox_t * mbox = client->mirror_state_mbox; + mirror_state_t mirror_state; - contents = (enum mirror_state*)mbox_receive( mbox ); - NULLCHECK( contents ); - - mirror_state = *contents; - - free( contents ); + mirror_state = mbox_receive( mbox ).i; return mirror_state; } @@ -425,7 +419,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines) ); debug("Control thread mirror super waiting"); - enum mirror_state state = + mirror_state_t state = control_client_mirror_wait( client ); debug("Control thread writing response"); control_write_mirror_response( state, client->socket ); diff --git a/src/server/control.h b/src/server/control.h index 977c565..bc35634 100644 --- a/src/server/control.h +++ b/src/server/control.h @@ -31,7 +31,7 @@ struct control { * process (and we can only have a mirror thread if the control * thread has started it). */ - struct mbox * mirror_state_mbox; + struct mbox_t * mirror_state_mbox; }; struct control_client{ @@ -41,7 +41,7 @@ struct control_client{ /* Passed in on creation. We know it's all right to do this * because we know there's only ever one control_client. */ - struct mbox * mirror_state_mbox; + struct mbox_t * mirror_state_mbox; }; struct control * control_create( diff --git a/src/server/mbox.c b/src/server/mbox.c index 3e96785..1f8beec 100644 --- a/src/server/mbox.c +++ b/src/server/mbox.c @@ -1,77 +1,73 @@ + #include "mbox.h" #include "util.h" +#include #include -struct mbox * mbox_create( void ) +DEFINE_FIFO(mbox_item_t, mbox_fifo); + +#define ARRAY_SIZE(w) (sizeof(w) / sizeof((w)[0])) + +mbox_p mbox_create( void ) { - struct mbox * mbox = xmalloc( sizeof( struct mbox ) ); - FATAL_UNLESS( 0 == pthread_cond_init( &mbox->filled_cond, NULL ), - "Failed to initialise a condition variable" ); - FATAL_UNLESS( 0 == pthread_cond_init( &mbox->emptied_cond, NULL ), - "Failed to initialise a condition variable" ); - FATAL_UNLESS( 0 == pthread_mutex_init( &mbox->mutex, NULL ), - "Failed to initialise a mutex" ); + mbox_p mbox = xmalloc( sizeof( struct mbox_t ) ); + + int sv[2]; + FATAL_UNLESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0, + "Failed to socketpair"); + mbox->signalw = sv[0]; + mbox->signalr = sv[1]; + return mbox; } -void mbox_post( struct mbox * mbox, void * contents ) +void mbox_post( mbox_p mbox, mbox_item_t item ) { - pthread_mutex_lock( &mbox->mutex ); + mbox_fifo_write(&mbox->fifo, item); { - if (mbox->full){ - pthread_cond_wait( &mbox->emptied_cond, &mbox->mutex ); - } - mbox->contents = contents; - mbox->full = 1; - while( 0 != pthread_cond_signal( &mbox->filled_cond ) ); + uint8_t w; + FATAL_UNLESS((write(mbox->signalw, &w, 1)) == 1, + "Write to socketpair"); } - pthread_mutex_unlock( &mbox->mutex ); } -void * mbox_contents( struct mbox * mbox ) +mbox_item_t mbox_contents( mbox_p mbox ) { - return mbox->contents; + const mbox_item_t zero = {0}; + + return mbox_fifo_isempty(&mbox->fifo) ? + zero : + mbox_fifo_read_at(&mbox->fifo, 0); } -int mbox_is_full( struct mbox * mbox ) +int mbox_is_full( mbox_p mbox ) { - return mbox->full; + return mbox_fifo_isfull(&mbox->fifo); } -void * mbox_receive( struct mbox * mbox ) -{ - NULLCHECK( mbox ); - void * result; - - pthread_mutex_lock( &mbox->mutex ); - { - if ( !mbox->full ) { - pthread_cond_wait( &mbox->filled_cond, &mbox->mutex ); - } - mbox->full = 0; - result = mbox->contents; - mbox->contents = NULL; - - while( 0 != pthread_cond_signal( &mbox->emptied_cond)); - } - pthread_mutex_unlock( &mbox->mutex ); - - return result; -} - - -void mbox_destroy( struct mbox * mbox ) +mbox_item_t mbox_receive( mbox_p mbox ) { NULLCHECK( mbox ); - while( 0 != pthread_cond_destroy( &mbox->emptied_cond ) ); - while( 0 != pthread_cond_destroy( &mbox->filled_cond ) ); + while (mbox_fifo_isempty(&mbox->fifo)) { + uint8_t w; + FATAL_UNLESS((read(mbox->signalr, &w, 1)) == 1, + "Read from socketpair"); + } - while( 0 != pthread_mutex_destroy( &mbox->mutex ) ); + return mbox_fifo_read(&mbox->fifo); +} + +void mbox_destroy( mbox_p mbox ) +{ + NULLCHECK( mbox ); + + close(mbox->signalw); + close(mbox->signalr); free( mbox ); } diff --git a/src/server/mbox.h b/src/server/mbox.h index 3af54d8..1352799 100644 --- a/src/server/mbox.h +++ b/src/server/mbox.h @@ -11,45 +11,43 @@ #include +#include +#include "fifo_declare.h" + +typedef union { + uint64_t i; + void * p; +} mbox_item_t; + +DECLARE_FIFO(mbox_item_t, mbox_fifo, 8); + +typedef struct mbox_t { + mbox_fifo_t fifo; + // socketpair() ends + int signalw, signalr; +} mbox_t, *mbox_p; -struct mbox { - void * contents; - - /** Marker to tell us if there's content in the box. - * Keeping this separate allows us to use NULL for the contents. - */ - int full; - - /** This gets signaled by mbox_post, and waited on by - * mbox_receive */ - pthread_cond_t filled_cond; - /** This is signaled by mbox_receive, and waited on by mbox_post */ - pthread_cond_t emptied_cond; - pthread_mutex_t mutex; -}; - - -/* Create an mbox. */ -struct mbox * mbox_create(void); +/* Create an mbox_t. */ +mbox_p mbox_create(void); /* Put something in the mbox, blocking if it's already full. * That something can be NULL if you want. */ -void mbox_post( struct mbox *, void *); +void mbox_post( mbox_p , mbox_item_t item); /* See what's in the mbox. This isn't thread-safe. */ -void * mbox_contents( struct mbox *); +mbox_item_t mbox_contents( mbox_p ); /* See if anything has been put into the mbox. This isn't thread-safe. * */ -int mbox_is_full( struct mbox *); +int mbox_is_full( mbox_p ); /* Get the contents from the mbox, blocking if there's nothing there. */ -void * mbox_receive( struct mbox *); +mbox_item_t mbox_receive( mbox_p ); /* Free the mbox and destroy the associated pthread bits. */ -void mbox_destroy( struct mbox *); +void mbox_destroy( mbox_p ); #endif diff --git a/src/server/mirror.c b/src/server/mirror.c index 708a67d..b4a7ea3 100644 --- a/src/server/mirror.c +++ b/src/server/mirror.c @@ -66,7 +66,7 @@ struct xfer { struct mirror_ctrl { struct server *serve; - struct mirror *mirror; + mirror_p mirror; /* libev stuff */ struct ev_loop *ev_loop; @@ -90,16 +90,16 @@ struct mirror_ctrl { }; -struct mirror * mirror_alloc( +mirror_p mirror_alloc( union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, - enum mirror_finish_action action_at_finish, - struct mbox * commit_signal) + mirror_finish_action_t action_at_finish, + mbox_p commit_signal) { - struct mirror * mirror; + mirror_p mirror; - mirror = xmalloc(sizeof(struct mirror)); + mirror = xmalloc(sizeof(mirror_t)); mirror->connect_to = connect_to; mirror->connect_from = connect_from; mirror->max_bytes_per_second = max_Bps; @@ -116,7 +116,7 @@ struct mirror * mirror_alloc( return mirror; } -void mirror_set_state_f( struct mirror * mirror, enum mirror_state state ) +void mirror_set_state_f( mirror_p mirror, mirror_state_t state ) { NULLCHECK( mirror ); mirror->commit_state = state; @@ -127,7 +127,7 @@ void mirror_set_state_f( struct mirror * mirror, enum mirror_state state ) mirror_set_state_f( mirror, state );\ } while(0) -enum mirror_state mirror_get_state( struct mirror * mirror ) +mirror_state_t mirror_get_state( mirror_p mirror ) { NULLCHECK( mirror ); return mirror->commit_state; @@ -136,7 +136,7 @@ enum mirror_state mirror_get_state( struct mirror * mirror ) #define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state -void mirror_init( struct mirror * mirror, const char * filename ) +void mirror_init( mirror_p mirror, const char * filename ) { int map_fd; uint64_t size; @@ -163,7 +163,7 @@ void mirror_init( struct mirror * mirror, const char * filename ) /* Call this before a mirror attempt. */ -void mirror_reset( struct mirror * mirror ) +void mirror_reset( mirror_p mirror ) { NULLCHECK( mirror ); mirror_set_state( mirror, MS_INIT ); @@ -176,16 +176,16 @@ void mirror_reset( struct mirror * mirror ) } -struct mirror * mirror_create( +mirror_p mirror_create( const char * filename, union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, int action_at_finish, - struct mbox * commit_signal) + mbox_p commit_signal) { /* FIXME: shouldn't map_fd get closed? */ - struct mirror * mirror; + mirror_p mirror; mirror = mirror_alloc( connect_to, connect_from, @@ -201,7 +201,7 @@ struct mirror * mirror_create( } -void mirror_destroy( struct mirror *mirror ) +void mirror_destroy( mirror_p mirror ) { NULLCHECK( mirror ); self_pipe_destroy( mirror->abandon_signal ); @@ -254,7 +254,7 @@ void mirror_cleanup( struct server * serve, int fatal __attribute__((unused))) { NULLCHECK( serve ); - struct mirror * mirror = serve->mirror; + mirror_p mirror = serve->mirror; NULLCHECK( mirror ); info( "Cleaning up mirror thread"); @@ -270,7 +270,7 @@ void mirror_cleanup( struct server * serve, } -int mirror_connect( struct mirror * mirror, uint64_t local_size ) +int mirror_connect( mirror_p mirror, uint64_t local_size ) { struct sockaddr * connect_from = NULL; int connected = 0; @@ -325,7 +325,7 @@ int mirror_connect( struct mirror * mirror, uint64_t local_size ) } -int mirror_should_quit( struct mirror * mirror ) +int mirror_should_quit( mirror_p mirror ) { switch( mirror->action_at_finish ) { case ACTION_EXIT: @@ -359,7 +359,7 @@ int mirror_should_wait( struct mirror_ctrl *ctrl ) * next transfer, then puts it together. */ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl ) { - struct mirror* mirror = ctrl->mirror; + mirror_p mirror = ctrl->mirror; struct server* serve = ctrl->serve; struct bitset_stream_entry e = { .event = BITSET_STREAM_UNSET }; uint64_t current = mirror->offset, run = 0, size = serve->size; @@ -508,7 +508,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents ) struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data; NULLCHECK( ctrl ); - struct mirror *m = ctrl->mirror; + mirror_p m = ctrl->mirror; NULLCHECK( m ); struct xfer *xfer = &ctrl->xfer; @@ -733,7 +733,7 @@ void mirror_run( struct server *serve ) NULLCHECK( serve ); NULLCHECK( serve->mirror ); - struct mirror *m = serve->mirror; + mirror_p m = serve->mirror; m->migration_started = monotonic_time_ms(); info("Starting mirror" ); @@ -849,18 +849,17 @@ void mirror_run( struct server *serve ) } -void mbox_post_mirror_state( struct mbox * mbox, enum mirror_state st ) +void mbox_post_mirror_state( mbox_p mbox, mirror_state_t st ) { NULLCHECK( mbox ); - enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) ); - *contents = st; + mbox_item_t ste = { .i = st }; - mbox_post( mbox, contents ); + mbox_post( mbox, ste ); } -void mirror_signal_commit( struct mirror * mirror ) +void mirror_signal_commit( mirror_p mirror ) { NULLCHECK( mirror ); @@ -887,7 +886,7 @@ void* mirror_runner(void* serve_params_uncast) NULLCHECK( serve ); NULLCHECK( serve->mirror ); - struct mirror * mirror = serve->mirror; + mirror_p mirror = serve->mirror; error_set_handler( (cleanup_handler *) mirror_cleanup, serve ); @@ -932,15 +931,15 @@ abandon_mirror: } -struct mirror_super * mirror_super_create( +mirror_super_p mirror_super_create( const char * filename, union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, - enum mirror_finish_action action_at_finish, - struct mbox * state_mbox) + mirror_finish_action_t action_at_finish, + mbox_p state_mbox) { - struct mirror_super * super = xmalloc( sizeof( struct mirror_super) ); + mirror_super_p super = xmalloc( sizeof( struct mirror_super_t) ); super->mirror = mirror_create( filename, connect_to, @@ -955,8 +954,8 @@ struct mirror_super * mirror_super_create( /* Post the current state of the mirror into super->state_mbox.*/ void mirror_super_signal_committed( - struct mirror_super * super , - enum mirror_state commit_state ) + mirror_super_p super , + mirror_state_t commit_state ) { NULLCHECK( super ); NULLCHECK( super->state_mbox ); @@ -967,7 +966,7 @@ void mirror_super_signal_committed( } -void mirror_super_destroy( struct mirror_super * super ) +void mirror_super_destroy( mirror_super_p super ) { NULLCHECK( super ); @@ -993,8 +992,8 @@ void * mirror_super_runner( void * serve_uncast ) int should_retry = 0; int success = 0, abandoned = 0; - struct mirror * mirror = serve->mirror; - struct mirror_super * super = serve->mirror_super; + mirror_p mirror = serve->mirror; + mirror_super_p super = serve->mirror_super; do { FATAL_IF( 0 != pthread_create( @@ -1005,8 +1004,8 @@ void * mirror_super_runner( void * serve_uncast ) "Failed to create mirror thread"); debug("Supervisor waiting for commit signal"); - enum mirror_state * commit_state = - mbox_receive( mirror->commit_signal ); + mirror_state_t commit_state = + mbox_receive( mirror->commit_signal ).i; debug( "Supervisor got commit signal" ); if ( first_pass ) { @@ -1015,18 +1014,14 @@ void * mirror_super_runner( void * serve_uncast ) * retry behind the scenes. This may race with migration completing * but since we "shouldn't retry" in that case either, that's fine */ - should_retry = *commit_state == MS_GO; + should_retry = commit_state == MS_GO; /* Only send this signal the first time */ mirror_super_signal_committed( super, - *commit_state); + commit_state); debug("Mirror supervisor committed"); } - /* We only care about the value of the commit signal on - * the first pass, so this is ok - */ - free( commit_state ); debug("Supervisor waiting for mirror thread" ); pthread_join( mirror->thread, NULL ); diff --git a/src/server/mirror.h b/src/server/mirror.h index d390fdf..cd38cf1 100644 --- a/src/server/mirror.h +++ b/src/server/mirror.h @@ -7,7 +7,7 @@ #include "bitset.h" #include "self_pipe.h" -enum mirror_state; + #include "serve.h" #include "mbox.h" @@ -57,14 +57,14 @@ enum mirror_state; #define MS_REQUEST_LIMIT_SECS 60 #define MS_REQUEST_LIMIT_SECS_F 60.0 -enum mirror_finish_action { - ACTION_EXIT, +typedef enum { + ACTION_EXIT = 0, ACTION_UNLINK, ACTION_NOTHING -}; +} mirror_finish_action_t; -enum mirror_state { - MS_UNKNOWN, +typedef enum { + MS_UNKNOWN = 0, MS_INIT, MS_GO, MS_ABANDONED, @@ -73,9 +73,9 @@ enum mirror_state { MS_FAIL_REJECTED, MS_FAIL_NO_HELLO, MS_FAIL_SIZE_MISMATCH -}; +} mirror_state_t; -struct mirror { +typedef struct mirror_t { pthread_t thread; /* Signal to this then join the thread if you want to abandon mirroring */ @@ -90,19 +90,19 @@ struct mirror { * over the network) are considered */ uint64_t max_bytes_per_second; - enum mirror_finish_action action_at_finish; + mirror_finish_action_t action_at_finish; char *mapped; /* We need to send every byte at least once; we do so by */ uint64_t offset; - enum mirror_state commit_state; + mirror_state_t commit_state; /* commit_signal is sent immediately after attempting to connect * and checking the remote size, whether successful or not. */ - struct mbox * commit_signal; + struct mbox_t * commit_signal; /* The time (from monotonic_time_ms()) the migration was started. Can be * used to calculate bps, etc. */ @@ -110,14 +110,14 @@ struct mirror { /* Running count of all bytes we've transferred */ uint64_t all_dirty; -}; +} mirror_t, *mirror_p; -struct mirror_super { - struct mirror * mirror; +typedef struct mirror_super_t { + mirror_p mirror; pthread_t thread; - struct mbox * state_mbox; -}; + struct mbox_t * state_mbox; +} mirror_super_t, *mirror_super_p; @@ -127,13 +127,13 @@ struct mirror_super { struct server; struct flexnbd; -struct mirror_super * mirror_super_create( +mirror_super_p mirror_super_create( const char * filename, union mysockaddr * connect_to, union mysockaddr * connect_from, uint64_t max_Bps, - enum mirror_finish_action action_at_finish, - struct mbox * state_mbox + mirror_finish_action_t action_at_finish, + struct mbox_t * state_mbox ); void * mirror_super_runner( void * serve_uncast ); diff --git a/src/server/serve.h b/src/server/serve.h index 5d04e14..252f3b5 100644 --- a/src/server/serve.h +++ b/src/server/serve.h @@ -53,8 +53,8 @@ struct server { * shutting down on a SIGTERM. */ struct flexthread_mutex * l_start_mirror; - struct mirror* mirror; - struct mirror_super * mirror_super; + struct mirror_t * mirror; + struct mirror_super_t * mirror_super; /* This is used to stop the mirror from starting after we * receive a SIGTERM */ int mirror_can_start; diff --git a/tests/unit/check_mbox.c b/tests/unit/check_mbox.c index 5399531..65a7827 100644 --- a/tests/unit/check_mbox.c +++ b/tests/unit/check_mbox.c @@ -6,7 +6,7 @@ START_TEST( test_allocs_cvar ) { - struct mbox * mbox = mbox_create(); + struct mbox_t * mbox = mbox_create(); fail_if( NULL == mbox, "Nothing allocated" ); pthread_cond_t cond_zero; @@ -22,7 +22,7 @@ END_TEST START_TEST( test_post_stores_value ) { - struct mbox * mbox = mbox_create(); + struct mbox_t * mbox = mbox_create(); void * deadbeef = (void *)0xDEADBEEF; mbox_post( mbox, deadbeef ); @@ -33,19 +33,18 @@ START_TEST( test_post_stores_value ) END_TEST -void * mbox_receive_runner( void * mbox_uncast ) +mbox_item_t mbox_receive_runner( void * mbox_uncast ) { - struct mbox * mbox = (struct mbox *)mbox_uncast; + struct mbox_t * mbox = (struct mbox_t *)mbox_uncast; void * contents = NULL; - contents = mbox_receive( mbox ); - return contents; + return mbox_receive( mbox ); } START_TEST( test_receive_blocks_until_post ) { - struct mbox * mbox = mbox_create(); + struct mbox_t * mbox = mbox_create(); pthread_t receiver; pthread_create( &receiver, NULL, mbox_receive_runner, mbox );