bitset: Add an event stream implementation
Nothing is using it yet
This commit is contained in:
128
src/bitset.h
128
src/bitset.h
@@ -110,6 +110,37 @@ static inline uint64_t bit_run_count(char* b, uint64_t from, uint64_t len, int *
|
|||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum bitset_stream_events {
|
||||||
|
BITSET_STREAM_UNSET = 0,
|
||||||
|
BITSET_STREAM_SET = 1,
|
||||||
|
BITSET_STREAM_ON = 2,
|
||||||
|
BITSET_STREAM_OFF = 3
|
||||||
|
};
|
||||||
|
|
||||||
|
struct bitset_stream_entry {
|
||||||
|
enum bitset_stream_events event;
|
||||||
|
uint64_t from;
|
||||||
|
uint64_t len;
|
||||||
|
};
|
||||||
|
|
||||||
|
/** Limit the stream size to 1MB for now.
|
||||||
|
*
|
||||||
|
* If this is too small, it'll cause requests to stall as the migration lags
|
||||||
|
* behind the changes made by those requests.
|
||||||
|
*/
|
||||||
|
#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) )
|
||||||
|
|
||||||
|
struct bitset_stream {
|
||||||
|
struct bitset_stream_entry entries[BITSET_STREAM_SIZE];
|
||||||
|
int in;
|
||||||
|
int out;
|
||||||
|
int size;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
pthread_cond_t cond_not_full;
|
||||||
|
pthread_cond_t cond_not_empty;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/** An application of a bitset - a bitset mapping represents a file of ''size''
|
/** An application of a bitset - a bitset mapping represents a file of ''size''
|
||||||
* broken down into ''resolution''-sized chunks. The bit set is assumed to
|
* broken down into ''resolution''-sized chunks. The bit set is assumed to
|
||||||
* represent one bit per chunk. We also bundle a lock so that the set can be
|
* represent one bit per chunk. We also bundle a lock so that the set can be
|
||||||
@@ -119,6 +150,8 @@ struct bitset_mapping {
|
|||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
uint64_t size;
|
uint64_t size;
|
||||||
int resolution;
|
int resolution;
|
||||||
|
struct bitset_stream *stream;
|
||||||
|
int stream_enabled;
|
||||||
char bits[];
|
char bits[];
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -138,12 +171,23 @@ static inline struct bitset_mapping* bitset_alloc(
|
|||||||
bitset->resolution = resolution;
|
bitset->resolution = resolution;
|
||||||
/* don't actually need to call pthread_mutex_destroy '*/
|
/* don't actually need to call pthread_mutex_destroy '*/
|
||||||
pthread_mutex_init(&bitset->lock, NULL);
|
pthread_mutex_init(&bitset->lock, NULL);
|
||||||
|
bitset->stream = xmalloc( sizeof( struct bitset_stream ) );
|
||||||
|
pthread_mutex_init( &bitset->stream->mutex, NULL );
|
||||||
|
|
||||||
|
/* Technically don't need to call pthread_cond_destroy either */
|
||||||
|
pthread_cond_init( &bitset->stream->cond_not_full, NULL );
|
||||||
|
pthread_cond_init( &bitset->stream->cond_not_empty, NULL );
|
||||||
|
|
||||||
return bitset;
|
return bitset;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void bitset_free( struct bitset_mapping * set )
|
static inline void bitset_free( struct bitset_mapping * set )
|
||||||
{
|
{
|
||||||
/* TODO: free our mutex... */
|
/* TODO: free our mutex... */
|
||||||
|
|
||||||
|
free( set->stream );
|
||||||
|
set->stream = NULL;
|
||||||
|
|
||||||
free( set );
|
free( set );
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,6 +203,80 @@ static inline void bitset_free( struct bitset_mapping * set )
|
|||||||
FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset")
|
FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset")
|
||||||
|
|
||||||
|
|
||||||
|
static inline void bitset_stream_enqueue(
|
||||||
|
struct bitset_mapping * set,
|
||||||
|
enum bitset_stream_events event,
|
||||||
|
uint64_t from,
|
||||||
|
uint64_t len
|
||||||
|
)
|
||||||
|
{
|
||||||
|
struct bitset_stream * stream = set->stream;
|
||||||
|
|
||||||
|
pthread_mutex_lock( &stream->mutex );
|
||||||
|
|
||||||
|
while ( stream->size == BITSET_STREAM_SIZE ) {
|
||||||
|
pthread_cond_wait( &stream->cond_not_full, &stream->mutex );
|
||||||
|
}
|
||||||
|
|
||||||
|
stream->entries[stream->in].event = event;
|
||||||
|
stream->entries[stream->in].from = from;
|
||||||
|
stream->entries[stream->in].len = len;
|
||||||
|
|
||||||
|
stream->size++;
|
||||||
|
stream->in++;
|
||||||
|
stream->in %= BITSET_STREAM_SIZE;
|
||||||
|
|
||||||
|
pthread_mutex_unlock( & stream->mutex );
|
||||||
|
pthread_cond_broadcast( &stream->cond_not_empty );
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void bitset_stream_dequeue(
|
||||||
|
struct bitset_mapping * set,
|
||||||
|
struct bitset_stream_entry * out
|
||||||
|
)
|
||||||
|
{
|
||||||
|
struct bitset_stream * stream = set->stream;
|
||||||
|
|
||||||
|
pthread_mutex_lock( &stream->mutex );
|
||||||
|
|
||||||
|
while ( stream->size == 0 ) {
|
||||||
|
pthread_cond_wait( &stream->cond_not_empty, &stream->mutex );
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( out != NULL ) {
|
||||||
|
out->event = stream->entries[stream->out].event;
|
||||||
|
out->from = stream->entries[stream->out].from;
|
||||||
|
out->len = stream->entries[stream->out].len;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream->size--;
|
||||||
|
stream->out++;
|
||||||
|
stream->out %= BITSET_STREAM_SIZE;
|
||||||
|
|
||||||
|
pthread_mutex_unlock( &stream->mutex );
|
||||||
|
pthread_cond_broadcast( &stream->cond_not_full );
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void bitset_stream_on( struct bitset_mapping * set )
|
||||||
|
{
|
||||||
|
BITSET_LOCK;
|
||||||
|
set->stream_enabled = 1;
|
||||||
|
bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size );
|
||||||
|
BITSET_UNLOCK;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void bitset_stream_off( struct bitset_mapping * set )
|
||||||
|
{
|
||||||
|
BITSET_LOCK;
|
||||||
|
bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size );
|
||||||
|
set->stream_enabled = 0;
|
||||||
|
BITSET_UNLOCK;
|
||||||
|
}
|
||||||
|
|
||||||
/** Set the bits in a bitset which correspond to the given bytes in the larger
|
/** Set the bits in a bitset which correspond to the given bytes in the larger
|
||||||
* file.
|
* file.
|
||||||
*/
|
*/
|
||||||
@@ -170,6 +288,11 @@ static inline void bitset_set_range(
|
|||||||
INT_FIRST_AND_LAST;
|
INT_FIRST_AND_LAST;
|
||||||
BITSET_LOCK;
|
BITSET_LOCK;
|
||||||
bit_set_range(set->bits, first, bitlen);
|
bit_set_range(set->bits, first, bitlen);
|
||||||
|
|
||||||
|
if ( set->stream_enabled ) {
|
||||||
|
bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len );
|
||||||
|
}
|
||||||
|
|
||||||
BITSET_UNLOCK;
|
BITSET_UNLOCK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,6 +314,11 @@ static inline void bitset_clear_range(
|
|||||||
INT_FIRST_AND_LAST;
|
INT_FIRST_AND_LAST;
|
||||||
BITSET_LOCK;
|
BITSET_LOCK;
|
||||||
bit_clear_range(set->bits, first, bitlen);
|
bit_clear_range(set->bits, first, bitlen);
|
||||||
|
|
||||||
|
if ( set->stream_enabled ) {
|
||||||
|
bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len );
|
||||||
|
}
|
||||||
|
|
||||||
BITSET_UNLOCK;
|
BITSET_UNLOCK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -306,25 +306,141 @@ START_TEST( test_bitset_run_count )
|
|||||||
}
|
}
|
||||||
END_TEST
|
END_TEST
|
||||||
|
|
||||||
|
START_TEST( test_bitset_set_range_doesnt_push_to_stream )
|
||||||
|
{
|
||||||
|
struct bitset_mapping *map = bitset_alloc( 64, 1 );
|
||||||
|
bitset_set_range( map, 0, 64 );
|
||||||
|
ck_assert_int_eq( map->stream->size, 0 );
|
||||||
|
bitset_free( map );
|
||||||
|
}
|
||||||
|
END_TEST
|
||||||
|
|
||||||
|
START_TEST( test_bitset_clear_range_doesnt_push_to_stream )
|
||||||
|
{
|
||||||
|
struct bitset_mapping *map = bitset_alloc( 64, 1 );
|
||||||
|
bitset_clear_range( map, 0, 64 );
|
||||||
|
ck_assert_int_eq( map->stream->size, 0 );
|
||||||
|
bitset_free( map );
|
||||||
|
}
|
||||||
|
END_TEST
|
||||||
|
|
||||||
|
START_TEST(test_bitset_stream_on)
|
||||||
|
{
|
||||||
|
struct bitset_mapping *map = bitset_alloc( 64, 1 );
|
||||||
|
struct bitset_stream_entry result;
|
||||||
|
memset( &result, 0, sizeof( result ) );
|
||||||
|
|
||||||
|
bitset_stream_on( map );
|
||||||
|
|
||||||
|
ck_assert_int_eq( 1, map->stream_enabled );
|
||||||
|
|
||||||
|
bitset_stream_dequeue( map, &result );
|
||||||
|
|
||||||
|
ck_assert_int_eq( BITSET_STREAM_ON, result.event );
|
||||||
|
ck_assert_int_eq( 0, result.from );
|
||||||
|
ck_assert_int_eq( 64, result.len );
|
||||||
|
|
||||||
|
bitset_free( map );
|
||||||
|
}
|
||||||
|
END_TEST
|
||||||
|
|
||||||
|
START_TEST(test_bitset_stream_off)
|
||||||
|
{
|
||||||
|
struct bitset_mapping *map = bitset_alloc( 64, 1 );
|
||||||
|
struct bitset_stream_entry result;
|
||||||
|
memset( &result, 0, sizeof( result ) );
|
||||||
|
|
||||||
|
bitset_stream_on( map );
|
||||||
|
bitset_stream_off( map );
|
||||||
|
|
||||||
|
ck_assert_int_eq( 0, map->stream_enabled );
|
||||||
|
ck_assert_int_eq( 2, map->stream->size );
|
||||||
|
|
||||||
|
bitset_stream_dequeue( map, NULL ); // ON
|
||||||
|
bitset_stream_dequeue( map, &result ); // OFF
|
||||||
|
|
||||||
|
ck_assert_int_eq( BITSET_STREAM_OFF, result.event );
|
||||||
|
ck_assert_int_eq( 0, result.from );
|
||||||
|
ck_assert_int_eq( 64, result.len );
|
||||||
|
|
||||||
|
bitset_free( map );
|
||||||
|
}
|
||||||
|
END_TEST
|
||||||
|
|
||||||
|
START_TEST(test_bitset_stream_with_set_range)
|
||||||
|
{
|
||||||
|
struct bitset_mapping *map = bitset_alloc( 64, 1 );
|
||||||
|
struct bitset_stream_entry result;
|
||||||
|
memset( &result, 0, sizeof( result ) );
|
||||||
|
|
||||||
|
bitset_stream_on( map );
|
||||||
|
bitset_set_range( map, 0, 32 );
|
||||||
|
|
||||||
|
ck_assert_int_eq( 2, map->stream->size );
|
||||||
|
|
||||||
|
bitset_stream_dequeue( map, NULL ); // ON
|
||||||
|
bitset_stream_dequeue( map, &result ); // SET
|
||||||
|
|
||||||
|
ck_assert_int_eq( BITSET_STREAM_SET, result.event );
|
||||||
|
ck_assert_int_eq( 0, result.from );
|
||||||
|
ck_assert_int_eq( 32, result.len );
|
||||||
|
|
||||||
|
bitset_free( map );
|
||||||
|
}
|
||||||
|
END_TEST
|
||||||
|
|
||||||
|
START_TEST(test_bitset_stream_with_clear_range)
|
||||||
|
{
|
||||||
|
struct bitset_mapping *map = bitset_alloc( 64, 1 );
|
||||||
|
struct bitset_stream_entry result;
|
||||||
|
memset( &result, 0, sizeof( result ) );
|
||||||
|
|
||||||
|
bitset_stream_on( map );
|
||||||
|
bitset_clear_range( map, 0, 32 );
|
||||||
|
ck_assert_int_eq( 2, map->stream->size );
|
||||||
|
|
||||||
|
bitset_stream_dequeue( map, NULL ); // ON
|
||||||
|
bitset_stream_dequeue( map, &result ); // UNSET
|
||||||
|
|
||||||
|
ck_assert_int_eq( BITSET_STREAM_UNSET, result.event );
|
||||||
|
ck_assert_int_eq( 0, result.from );
|
||||||
|
ck_assert_int_eq( 32, result.len );
|
||||||
|
|
||||||
|
bitset_free( map );
|
||||||
|
}
|
||||||
|
END_TEST
|
||||||
|
|
||||||
Suite* bitset_suite(void)
|
Suite* bitset_suite(void)
|
||||||
{
|
{
|
||||||
Suite *s = suite_create("bitset");
|
Suite *s = suite_create("bitset");
|
||||||
|
|
||||||
TCase *tc_bit = tcase_create("bit");
|
TCase *tc_bit = tcase_create("bit");
|
||||||
TCase *tc_bitset = tcase_create("bitset");
|
|
||||||
tcase_add_test(tc_bit, test_bit_set);
|
tcase_add_test(tc_bit, test_bit_set);
|
||||||
tcase_add_test(tc_bit, test_bit_clear);
|
tcase_add_test(tc_bit, test_bit_clear);
|
||||||
tcase_add_test(tc_bit, test_bit_tests);
|
tcase_add_test(tc_bit, test_bit_tests);
|
||||||
tcase_add_test(tc_bit, test_bit_ranges);
|
tcase_add_test(tc_bit, test_bit_ranges);
|
||||||
tcase_add_test(tc_bit, test_bit_runs);
|
tcase_add_test(tc_bit, test_bit_runs);
|
||||||
|
suite_add_tcase(s, tc_bit);
|
||||||
|
|
||||||
|
TCase *tc_bitset = tcase_create("bitset");
|
||||||
tcase_add_test(tc_bitset, test_bitset);
|
tcase_add_test(tc_bitset, test_bitset);
|
||||||
tcase_add_test(tc_bitset, test_bitset_set);
|
tcase_add_test(tc_bitset, test_bitset_set);
|
||||||
tcase_add_test(tc_bitset, test_bitset_clear);
|
tcase_add_test(tc_bitset, test_bitset_clear);
|
||||||
tcase_add_test(tc_bitset, test_bitset_run_count);
|
tcase_add_test(tc_bitset, test_bitset_run_count);
|
||||||
tcase_add_test(tc_bitset, test_bitset_set_range);
|
tcase_add_test(tc_bitset, test_bitset_set_range);
|
||||||
tcase_add_test(tc_bitset, test_bitset_clear_range);
|
tcase_add_test(tc_bitset, test_bitset_clear_range);
|
||||||
suite_add_tcase(s, tc_bit);
|
tcase_add_test(tc_bitset, test_bitset_set_range_doesnt_push_to_stream);
|
||||||
|
tcase_add_test(tc_bitset, test_bitset_clear_range_doesnt_push_to_stream);
|
||||||
suite_add_tcase(s, tc_bitset);
|
suite_add_tcase(s, tc_bitset);
|
||||||
|
|
||||||
|
|
||||||
|
TCase *tc_bitset_stream = tcase_create("bitset_stream");
|
||||||
|
tcase_add_test(tc_bitset_stream, test_bitset_stream_on);
|
||||||
|
tcase_add_test(tc_bitset_stream, test_bitset_stream_off);
|
||||||
|
tcase_add_test(tc_bitset_stream, test_bitset_stream_with_set_range);
|
||||||
|
tcase_add_test(tc_bitset_stream, test_bitset_stream_with_clear_range);
|
||||||
|
suite_add_tcase(s, tc_bitset_stream);
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user