From 90e8b13df59dbde62bc77943014d03555df8cc23 Mon Sep 17 00:00:00 2001 From: Michel Pollet Date: Thu, 13 Oct 2016 16:55:41 +0100 Subject: [PATCH] fifo: Split the bitset.h And fix all dependencies Signed-off-by: Michel Pollet --- src/common/ioutil.c | 2 +- src/server/bitset.h | 303 --------------------------------------- src/server/bitstream.h | 317 +++++++++++++++++++++++++++++++++++++++++ src/server/client.c | 2 +- src/server/mirror.c | 2 +- src/server/serve.c | 2 +- 6 files changed, 321 insertions(+), 307 deletions(-) create mode 100644 src/server/bitstream.h diff --git a/src/common/ioutil.c b/src/common/ioutil.c index 6a8309a..93e7e60 100644 --- a/src/common/ioutil.c +++ b/src/common/ioutil.c @@ -9,7 +9,7 @@ #include #include "util.h" -#include "bitset.h" +#include "bitstream.h" #include "ioutil.h" diff --git a/src/server/bitset.h b/src/server/bitset.h index ed0ffd2..6b3ac72 100644 --- a/src/server/bitset.h +++ b/src/server/bitset.h @@ -130,309 +130,6 @@ static inline uint64_t bit_run_count(bitfield_p b, uint64_t from, uint64_t len, return count; } -enum bitset_stream_events { - BITSET_STREAM_UNSET = 0, - BITSET_STREAM_SET = 1, - BITSET_STREAM_ON = 2, - BITSET_STREAM_OFF = 3 -}; -#define BITSET_STREAM_EVENTS_ENUM_SIZE 4 - -struct bitset_stream_entry { - enum bitset_stream_events event; - uint64_t from; - uint64_t len; -}; - -/** Limit the stream size to 1MB for now. - * - * If this is too small, it'll cause requests to stall as the migration lags - * behind the changes made by those requests. - */ -#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) ) - -struct bitset_stream { - struct bitset_stream_entry entries[BITSET_STREAM_SIZE]; - int in; - int out; - int size; - pthread_mutex_t mutex; - pthread_cond_t cond_not_full; - pthread_cond_t cond_not_empty; - uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE]; -}; - - -/** An application of a bitset - a bitset mapping represents a file of ''size'' - * broken down into ''resolution''-sized chunks. The bit set is assumed to - * represent one bit per chunk. We also bundle a lock so that the set can be - * written reliably by multiple threads. - */ -struct bitset { - pthread_mutex_t lock; - uint64_t size; - int resolution; - struct bitset_stream *stream; - int stream_enabled; - bitfield_word_t bits[]; -}; - -/** Allocate a bitset for a file of the given size, and chunks of the - * given resolution. - */ -static inline struct bitset *bitset_alloc( uint64_t size, int resolution ) -{ - // calculate a size to allocate that is a multiple of the size of the - // bitfield word - size_t bitfield_size = - BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t ); - struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) ); - - bitset->size = size; - bitset->resolution = resolution; - /* don't actually need to call pthread_mutex_destroy '*/ - pthread_mutex_init(&bitset->lock, NULL); - bitset->stream = xmalloc( sizeof( struct bitset_stream ) ); - pthread_mutex_init( &bitset->stream->mutex, NULL ); - - /* Technically don't need to call pthread_cond_destroy either */ - pthread_cond_init( &bitset->stream->cond_not_full, NULL ); - pthread_cond_init( &bitset->stream->cond_not_empty, NULL ); - - return bitset; -} - -static inline void bitset_free( struct bitset * set ) -{ - /* TODO: free our mutex... */ - - free( set->stream ); - set->stream = NULL; - - free( set ); -} - -#define INT_FIRST_AND_LAST \ - uint64_t first = from/set->resolution, \ - last = ((from+len)-1)/set->resolution, \ - bitlen = (last-first)+1 - -#define BITSET_LOCK \ - FATAL_IF_NEGATIVE(pthread_mutex_lock(&set->lock), "Error locking bitset") - -#define BITSET_UNLOCK \ - FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset") - - -static inline void bitset_stream_enqueue( - struct bitset * set, - enum bitset_stream_events event, - uint64_t from, - uint64_t len -) -{ - struct bitset_stream * stream = set->stream; - - pthread_mutex_lock( &stream->mutex ); - - while ( stream->size == BITSET_STREAM_SIZE ) { - pthread_cond_wait( &stream->cond_not_full, &stream->mutex ); - } - - stream->entries[stream->in].event = event; - stream->entries[stream->in].from = from; - stream->entries[stream->in].len = len; - stream->queued_bytes[event] += len; - - stream->size++; - stream->in++; - stream->in %= BITSET_STREAM_SIZE; - - pthread_mutex_unlock( & stream->mutex ); - pthread_cond_signal( &stream->cond_not_empty ); - - return; -} - -static inline void bitset_stream_dequeue( - struct bitset * set, - struct bitset_stream_entry * out -) -{ - struct bitset_stream * stream = set->stream; - struct bitset_stream_entry * dequeued; - - pthread_mutex_lock( &stream->mutex ); - - while ( stream->size == 0 ) { - pthread_cond_wait( &stream->cond_not_empty, &stream->mutex ); - } - - dequeued = &stream->entries[stream->out]; - - if ( out != NULL ) { - out->event = dequeued->event; - out->from = dequeued->from; - out->len = dequeued->len; - } - - stream->queued_bytes[dequeued->event] -= dequeued->len; - stream->size--; - stream->out++; - stream->out %= BITSET_STREAM_SIZE; - - pthread_mutex_unlock( &stream->mutex ); - pthread_cond_signal( &stream->cond_not_full ); - - return; -} - -static inline size_t bitset_stream_size( struct bitset * set ) -{ - size_t size; - - pthread_mutex_lock( &set->stream->mutex ); - size = set->stream->size; - pthread_mutex_unlock( &set->stream->mutex ); - - return size; -} - -static inline uint64_t bitset_stream_queued_bytes( - struct bitset * set, - enum bitset_stream_events event -) -{ - uint64_t total; - - pthread_mutex_lock( &set->stream->mutex ); - total = set->stream->queued_bytes[event]; - pthread_mutex_unlock( &set->stream->mutex ); - - return total; -} - -static inline void bitset_enable_stream( struct bitset * set ) -{ - BITSET_LOCK; - set->stream_enabled = 1; - bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size ); - BITSET_UNLOCK; -} - -static inline void bitset_disable_stream( struct bitset * set ) -{ - BITSET_LOCK; - bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size ); - set->stream_enabled = 0; - BITSET_UNLOCK; -} - -/** Set the bits in a bitset which correspond to the given bytes in the larger - * file. - */ -static inline void bitset_set_range( - struct bitset * set, - uint64_t from, - uint64_t len) -{ - INT_FIRST_AND_LAST; - BITSET_LOCK; - bit_set_range(set->bits, first, bitlen); - - if ( set->stream_enabled ) { - bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len ); - } - - BITSET_UNLOCK; -} - - -/** Set every bit in the bitset. */ -static inline void bitset_set( struct bitset * set ) -{ - bitset_set_range(set, 0, set->size); -} - -/** Clear the bits in a bitset which correspond to the given bytes in the - * larger file. - */ -static inline void bitset_clear_range( - struct bitset * set, - uint64_t from, - uint64_t len) -{ - INT_FIRST_AND_LAST; - BITSET_LOCK; - bit_clear_range(set->bits, first, bitlen); - - if ( set->stream_enabled ) { - bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len ); - } - - BITSET_UNLOCK; -} - - -/** Clear every bit in the bitset. */ -static inline void bitset_clear( struct bitset * set ) -{ - bitset_clear_range(set, 0, set->size); -} - -/** As per bitset_run_count but also tells you whether the run it found was set - * or unset, atomically. - */ -static inline uint64_t bitset_run_count_ex( - struct bitset * set, - uint64_t from, - uint64_t len, - int* run_is_set -) -{ - uint64_t run; - - /* Clip our requests to the end of the bitset, avoiding uint underflow. */ - if ( from > set->size ) { - return 0; - } - len = ( len + from ) > set->size ? ( set->size - from ) : len; - - INT_FIRST_AND_LAST; - - BITSET_LOCK; - run = bit_run_count(set->bits, first, bitlen, run_is_set) * set->resolution; - run -= (from % set->resolution); - BITSET_UNLOCK; - - return run; -} - -/** Counts the number of contiguous bytes that are represented as a run in - * the bit field. - */ -static inline uint64_t bitset_run_count( - struct bitset * set, - uint64_t from, - uint64_t len) -{ - return bitset_run_count_ex( set, from, len, NULL ); -} - -/** Tests whether the bit field is clear for the given file offset. - */ -static inline int bitset_is_clear_at( struct bitset * set, uint64_t at ) -{ - return bit_is_clear(set->bits, at/set->resolution); -} - -/** Tests whether the bit field is set for the given file offset. - */ -static inline int bitset_is_set_at( struct bitset * set, uint64_t at ) -{ - return bit_is_set(set->bits, at/set->resolution); -} - #endif diff --git a/src/server/bitstream.h b/src/server/bitstream.h new file mode 100644 index 0000000..ec45747 --- /dev/null +++ b/src/server/bitstream.h @@ -0,0 +1,317 @@ +/* + * bitstream.h + * + * Created on: 13 Oct 2016 + * Author: michel + */ + +#ifndef SRC_SERVER_BITSTREAM_H_ +#define SRC_SERVER_BITSTREAM_H_ + +#include "bitset.h" + +enum bitset_stream_events { + BITSET_STREAM_UNSET = 0, + BITSET_STREAM_SET = 1, + BITSET_STREAM_ON = 2, + BITSET_STREAM_OFF = 3 +}; +#define BITSET_STREAM_EVENTS_ENUM_SIZE 4 + +struct bitset_stream_entry { + enum bitset_stream_events event; + uint64_t from; + uint64_t len; +}; + +/** Limit the stream size to 1MB for now. + * + * If this is too small, it'll cause requests to stall as the migration lags + * behind the changes made by those requests. + */ +#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) ) + +struct bitset_stream { + struct bitset_stream_entry entries[BITSET_STREAM_SIZE]; + int in; + int out; + int size; + pthread_mutex_t mutex; + pthread_cond_t cond_not_full; + pthread_cond_t cond_not_empty; + uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE]; +}; + + +/** An application of a bitset - a bitset mapping represents a file of ''size'' + * broken down into ''resolution''-sized chunks. The bit set is assumed to + * represent one bit per chunk. We also bundle a lock so that the set can be + * written reliably by multiple threads. + */ +struct bitset { + pthread_mutex_t lock; + uint64_t size; + int resolution; + struct bitset_stream *stream; + int stream_enabled; + bitfield_word_t bits[]; +}; + +/** Allocate a bitset for a file of the given size, and chunks of the + * given resolution. + */ +static inline struct bitset *bitset_alloc( uint64_t size, int resolution ) +{ + // calculate a size to allocate that is a multiple of the size of the + // bitfield word + size_t bitfield_size = + BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t ); + struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) ); + + bitset->size = size; + bitset->resolution = resolution; + /* don't actually need to call pthread_mutex_destroy '*/ + pthread_mutex_init(&bitset->lock, NULL); + bitset->stream = xmalloc( sizeof( struct bitset_stream ) ); + pthread_mutex_init( &bitset->stream->mutex, NULL ); + + /* Technically don't need to call pthread_cond_destroy either */ + pthread_cond_init( &bitset->stream->cond_not_full, NULL ); + pthread_cond_init( &bitset->stream->cond_not_empty, NULL ); + + return bitset; +} + +static inline void bitset_free( struct bitset * set ) +{ + /* TODO: free our mutex... */ + + free( set->stream ); + set->stream = NULL; + + free( set ); +} + +#define INT_FIRST_AND_LAST \ + uint64_t first = from/set->resolution, \ + last = ((from+len)-1)/set->resolution, \ + bitlen = (last-first)+1 + +#define BITSET_LOCK \ + FATAL_IF_NEGATIVE(pthread_mutex_lock(&set->lock), "Error locking bitset") + +#define BITSET_UNLOCK \ + FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset") + + +static inline void bitset_stream_enqueue( + struct bitset * set, + enum bitset_stream_events event, + uint64_t from, + uint64_t len +) +{ + struct bitset_stream * stream = set->stream; + + pthread_mutex_lock( &stream->mutex ); + + while ( stream->size == BITSET_STREAM_SIZE ) { + pthread_cond_wait( &stream->cond_not_full, &stream->mutex ); + } + + stream->entries[stream->in].event = event; + stream->entries[stream->in].from = from; + stream->entries[stream->in].len = len; + stream->queued_bytes[event] += len; + + stream->size++; + stream->in++; + stream->in %= BITSET_STREAM_SIZE; + + pthread_mutex_unlock( & stream->mutex ); + pthread_cond_signal( &stream->cond_not_empty ); + + return; +} + +static inline void bitset_stream_dequeue( + struct bitset * set, + struct bitset_stream_entry * out +) +{ + struct bitset_stream * stream = set->stream; + struct bitset_stream_entry * dequeued; + + pthread_mutex_lock( &stream->mutex ); + + while ( stream->size == 0 ) { + pthread_cond_wait( &stream->cond_not_empty, &stream->mutex ); + } + + dequeued = &stream->entries[stream->out]; + + if ( out != NULL ) { + out->event = dequeued->event; + out->from = dequeued->from; + out->len = dequeued->len; + } + + stream->queued_bytes[dequeued->event] -= dequeued->len; + stream->size--; + stream->out++; + stream->out %= BITSET_STREAM_SIZE; + + pthread_mutex_unlock( &stream->mutex ); + pthread_cond_signal( &stream->cond_not_full ); + + return; +} + +static inline size_t bitset_stream_size( struct bitset * set ) +{ + size_t size; + + pthread_mutex_lock( &set->stream->mutex ); + size = set->stream->size; + pthread_mutex_unlock( &set->stream->mutex ); + + return size; +} + +static inline uint64_t bitset_stream_queued_bytes( + struct bitset * set, + enum bitset_stream_events event +) +{ + uint64_t total; + + pthread_mutex_lock( &set->stream->mutex ); + total = set->stream->queued_bytes[event]; + pthread_mutex_unlock( &set->stream->mutex ); + + return total; +} + +static inline void bitset_enable_stream( struct bitset * set ) +{ + BITSET_LOCK; + set->stream_enabled = 1; + bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size ); + BITSET_UNLOCK; +} + +static inline void bitset_disable_stream( struct bitset * set ) +{ + BITSET_LOCK; + bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size ); + set->stream_enabled = 0; + BITSET_UNLOCK; +} + +/** Set the bits in a bitset which correspond to the given bytes in the larger + * file. + */ +static inline void bitset_set_range( + struct bitset * set, + uint64_t from, + uint64_t len) +{ + INT_FIRST_AND_LAST; + BITSET_LOCK; + bit_set_range(set->bits, first, bitlen); + + if ( set->stream_enabled ) { + bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len ); + } + + BITSET_UNLOCK; +} + + +/** Set every bit in the bitset. */ +static inline void bitset_set( struct bitset * set ) +{ + bitset_set_range(set, 0, set->size); +} + +/** Clear the bits in a bitset which correspond to the given bytes in the + * larger file. + */ +static inline void bitset_clear_range( + struct bitset * set, + uint64_t from, + uint64_t len) +{ + INT_FIRST_AND_LAST; + BITSET_LOCK; + bit_clear_range(set->bits, first, bitlen); + + if ( set->stream_enabled ) { + bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len ); + } + + BITSET_UNLOCK; +} + + +/** Clear every bit in the bitset. */ +static inline void bitset_clear( struct bitset * set ) +{ + bitset_clear_range(set, 0, set->size); +} + +/** As per bitset_run_count but also tells you whether the run it found was set + * or unset, atomically. + */ +static inline uint64_t bitset_run_count_ex( + struct bitset * set, + uint64_t from, + uint64_t len, + int* run_is_set +) +{ + uint64_t run; + + /* Clip our requests to the end of the bitset, avoiding uint underflow. */ + if ( from > set->size ) { + return 0; + } + len = ( len + from ) > set->size ? ( set->size - from ) : len; + + INT_FIRST_AND_LAST; + + BITSET_LOCK; + run = bit_run_count(set->bits, first, bitlen, run_is_set) * set->resolution; + run -= (from % set->resolution); + BITSET_UNLOCK; + + return run; +} + +/** Counts the number of contiguous bytes that are represented as a run in + * the bit field. + */ +static inline uint64_t bitset_run_count( + struct bitset * set, + uint64_t from, + uint64_t len) +{ + return bitset_run_count_ex( set, from, len, NULL ); +} + +/** Tests whether the bit field is clear for the given file offset. + */ +static inline int bitset_is_clear_at( struct bitset * set, uint64_t at ) +{ + return bit_is_clear(set->bits, at/set->resolution); +} + +/** Tests whether the bit field is set for the given file offset. + */ +static inline int bitset_is_set_at( struct bitset * set, uint64_t at ) +{ + return bit_is_set(set->bits, at/set->resolution); +} + + +#endif /* SRC_SERVER_BITSTREAM_H_ */ diff --git a/src/server/client.c b/src/server/client.c index 8b57b47..2c6cd39 100644 --- a/src/server/client.c +++ b/src/server/client.c @@ -3,7 +3,7 @@ #include "ioutil.h" #include "sockutil.h" #include "util.h" -#include "bitset.h" +#include "bitstream.h" #include "nbdtypes.h" #include "self_pipe.h" diff --git a/src/server/mirror.c b/src/server/mirror.c index b3d22d5..708a67d 100644 --- a/src/server/mirror.c +++ b/src/server/mirror.c @@ -22,7 +22,7 @@ #include "sockutil.h" #include "parse.h" #include "readwrite.h" -#include "bitset.h" +#include "bitstream.h" #include "self_pipe.h" #include "status.h" diff --git a/src/server/serve.c b/src/server/serve.c index de6b871..d0856f2 100644 --- a/src/server/serve.c +++ b/src/server/serve.c @@ -4,7 +4,7 @@ #include "ioutil.h" #include "sockutil.h" #include "util.h" -#include "bitset.h" +#include "bitstream.h" #include "control.h" #include "self_pipe.h"