diff --git a/src/bitset.h b/src/bitset.h index 3697e5c..76d2a5e 100644 --- a/src/bitset.h +++ b/src/bitset.h @@ -110,6 +110,37 @@ static inline uint64_t bit_run_count(char* b, uint64_t from, uint64_t len, int * return count; } +enum bitset_stream_events { + BITSET_STREAM_UNSET = 0, + BITSET_STREAM_SET = 1, + BITSET_STREAM_ON = 2, + BITSET_STREAM_OFF = 3 +}; + +struct bitset_stream_entry { + enum bitset_stream_events event; + uint64_t from; + uint64_t len; +}; + +/** Limit the stream size to 1MB for now. + * + * If this is too small, it'll cause requests to stall as the migration lags + * behind the changes made by those requests. + */ +#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) ) + +struct bitset_stream { + struct bitset_stream_entry entries[BITSET_STREAM_SIZE]; + int in; + int out; + int size; + pthread_mutex_t mutex; + pthread_cond_t cond_not_full; + pthread_cond_t cond_not_empty; +}; + + /** An application of a bitset - a bitset mapping represents a file of ''size'' * broken down into ''resolution''-sized chunks. The bit set is assumed to * represent one bit per chunk. We also bundle a lock so that the set can be @@ -119,6 +150,8 @@ struct bitset_mapping { pthread_mutex_t lock; uint64_t size; int resolution; + struct bitset_stream *stream; + int stream_enabled; char bits[]; }; @@ -138,12 +171,23 @@ static inline struct bitset_mapping* bitset_alloc( bitset->resolution = resolution; /* don't actually need to call pthread_mutex_destroy '*/ pthread_mutex_init(&bitset->lock, NULL); + bitset->stream = xmalloc( sizeof( struct bitset_stream ) ); + pthread_mutex_init( &bitset->stream->mutex, NULL ); + + /* Technically don't need to call pthread_cond_destroy either */ + pthread_cond_init( &bitset->stream->cond_not_full, NULL ); + pthread_cond_init( &bitset->stream->cond_not_empty, NULL ); + return bitset; } static inline void bitset_free( struct bitset_mapping * set ) { /* TODO: free our mutex... */ + + free( set->stream ); + set->stream = NULL; + free( set ); } @@ -159,6 +203,80 @@ static inline void bitset_free( struct bitset_mapping * set ) FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset") +static inline void bitset_stream_enqueue( + struct bitset_mapping * set, + enum bitset_stream_events event, + uint64_t from, + uint64_t len +) +{ + struct bitset_stream * stream = set->stream; + + pthread_mutex_lock( &stream->mutex ); + + while ( stream->size == BITSET_STREAM_SIZE ) { + pthread_cond_wait( &stream->cond_not_full, &stream->mutex ); + } + + stream->entries[stream->in].event = event; + stream->entries[stream->in].from = from; + stream->entries[stream->in].len = len; + + stream->size++; + stream->in++; + stream->in %= BITSET_STREAM_SIZE; + + pthread_mutex_unlock( & stream->mutex ); + pthread_cond_broadcast( &stream->cond_not_empty ); + + return; +} + +static inline void bitset_stream_dequeue( + struct bitset_mapping * set, + struct bitset_stream_entry * out +) +{ + struct bitset_stream * stream = set->stream; + + pthread_mutex_lock( &stream->mutex ); + + while ( stream->size == 0 ) { + pthread_cond_wait( &stream->cond_not_empty, &stream->mutex ); + } + + if ( out != NULL ) { + out->event = stream->entries[stream->out].event; + out->from = stream->entries[stream->out].from; + out->len = stream->entries[stream->out].len; + } + + stream->size--; + stream->out++; + stream->out %= BITSET_STREAM_SIZE; + + pthread_mutex_unlock( &stream->mutex ); + pthread_cond_broadcast( &stream->cond_not_full ); + + return; +} + +static inline void bitset_stream_on( struct bitset_mapping * set ) +{ + BITSET_LOCK; + set->stream_enabled = 1; + bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size ); + BITSET_UNLOCK; +} + +static inline void bitset_stream_off( struct bitset_mapping * set ) +{ + BITSET_LOCK; + bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size ); + set->stream_enabled = 0; + BITSET_UNLOCK; +} + /** Set the bits in a bitset which correspond to the given bytes in the larger * file. */ @@ -170,6 +288,11 @@ static inline void bitset_set_range( INT_FIRST_AND_LAST; BITSET_LOCK; bit_set_range(set->bits, first, bitlen); + + if ( set->stream_enabled ) { + bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len ); + } + BITSET_UNLOCK; } @@ -191,6 +314,11 @@ static inline void bitset_clear_range( INT_FIRST_AND_LAST; BITSET_LOCK; bit_clear_range(set->bits, first, bitlen); + + if ( set->stream_enabled ) { + bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len ); + } + BITSET_UNLOCK; } diff --git a/tests/unit/check_bitset.c b/tests/unit/check_bitset.c index e3401d3..07e6575 100644 --- a/tests/unit/check_bitset.c +++ b/tests/unit/check_bitset.c @@ -306,25 +306,141 @@ START_TEST( test_bitset_run_count ) } END_TEST +START_TEST( test_bitset_set_range_doesnt_push_to_stream ) +{ + struct bitset_mapping *map = bitset_alloc( 64, 1 ); + bitset_set_range( map, 0, 64 ); + ck_assert_int_eq( map->stream->size, 0 ); + bitset_free( map ); +} +END_TEST + +START_TEST( test_bitset_clear_range_doesnt_push_to_stream ) +{ + struct bitset_mapping *map = bitset_alloc( 64, 1 ); + bitset_clear_range( map, 0, 64 ); + ck_assert_int_eq( map->stream->size, 0 ); + bitset_free( map ); +} +END_TEST + +START_TEST(test_bitset_stream_on) +{ + struct bitset_mapping *map = bitset_alloc( 64, 1 ); + struct bitset_stream_entry result; + memset( &result, 0, sizeof( result ) ); + + bitset_stream_on( map ); + + ck_assert_int_eq( 1, map->stream_enabled ); + + bitset_stream_dequeue( map, &result ); + + ck_assert_int_eq( BITSET_STREAM_ON, result.event ); + ck_assert_int_eq( 0, result.from ); + ck_assert_int_eq( 64, result.len ); + + bitset_free( map ); +} +END_TEST + +START_TEST(test_bitset_stream_off) +{ + struct bitset_mapping *map = bitset_alloc( 64, 1 ); + struct bitset_stream_entry result; + memset( &result, 0, sizeof( result ) ); + + bitset_stream_on( map ); + bitset_stream_off( map ); + + ck_assert_int_eq( 0, map->stream_enabled ); + ck_assert_int_eq( 2, map->stream->size ); + + bitset_stream_dequeue( map, NULL ); // ON + bitset_stream_dequeue( map, &result ); // OFF + + ck_assert_int_eq( BITSET_STREAM_OFF, result.event ); + ck_assert_int_eq( 0, result.from ); + ck_assert_int_eq( 64, result.len ); + + bitset_free( map ); +} +END_TEST + +START_TEST(test_bitset_stream_with_set_range) +{ + struct bitset_mapping *map = bitset_alloc( 64, 1 ); + struct bitset_stream_entry result; + memset( &result, 0, sizeof( result ) ); + + bitset_stream_on( map ); + bitset_set_range( map, 0, 32 ); + + ck_assert_int_eq( 2, map->stream->size ); + + bitset_stream_dequeue( map, NULL ); // ON + bitset_stream_dequeue( map, &result ); // SET + + ck_assert_int_eq( BITSET_STREAM_SET, result.event ); + ck_assert_int_eq( 0, result.from ); + ck_assert_int_eq( 32, result.len ); + + bitset_free( map ); +} +END_TEST + +START_TEST(test_bitset_stream_with_clear_range) +{ + struct bitset_mapping *map = bitset_alloc( 64, 1 ); + struct bitset_stream_entry result; + memset( &result, 0, sizeof( result ) ); + + bitset_stream_on( map ); + bitset_clear_range( map, 0, 32 ); + ck_assert_int_eq( 2, map->stream->size ); + + bitset_stream_dequeue( map, NULL ); // ON + bitset_stream_dequeue( map, &result ); // UNSET + + ck_assert_int_eq( BITSET_STREAM_UNSET, result.event ); + ck_assert_int_eq( 0, result.from ); + ck_assert_int_eq( 32, result.len ); + + bitset_free( map ); +} +END_TEST + Suite* bitset_suite(void) { Suite *s = suite_create("bitset"); + TCase *tc_bit = tcase_create("bit"); - TCase *tc_bitset = tcase_create("bitset"); tcase_add_test(tc_bit, test_bit_set); tcase_add_test(tc_bit, test_bit_clear); tcase_add_test(tc_bit, test_bit_tests); tcase_add_test(tc_bit, test_bit_ranges); tcase_add_test(tc_bit, test_bit_runs); + suite_add_tcase(s, tc_bit); + + TCase *tc_bitset = tcase_create("bitset"); tcase_add_test(tc_bitset, test_bitset); tcase_add_test(tc_bitset, test_bitset_set); tcase_add_test(tc_bitset, test_bitset_clear); tcase_add_test(tc_bitset, test_bitset_run_count); tcase_add_test(tc_bitset, test_bitset_set_range); tcase_add_test(tc_bitset, test_bitset_clear_range); - suite_add_tcase(s, tc_bit); + tcase_add_test(tc_bitset, test_bitset_set_range_doesnt_push_to_stream); + tcase_add_test(tc_bitset, test_bitset_clear_range_doesnt_push_to_stream); suite_add_tcase(s, tc_bitset); + + TCase *tc_bitset_stream = tcase_create("bitset_stream"); + tcase_add_test(tc_bitset_stream, test_bitset_stream_on); + tcase_add_test(tc_bitset_stream, test_bitset_stream_off); + tcase_add_test(tc_bitset_stream, test_bitset_stream_with_set_range); + tcase_add_test(tc_bitset_stream, test_bitset_stream_with_clear_range); + suite_add_tcase(s, tc_bitset_stream); + return s; }