Formatted all code using indent

This commit is contained in:
Patrick J Cherry
2018-02-20 10:05:35 +00:00
parent 19a1127bde
commit f47f56d4c4
59 changed files with 7631 additions and 7762 deletions

View File

@@ -6,103 +6,104 @@
#include "acl.h"
struct acl * acl_create( int len, char ** lines, int default_deny )
struct acl *acl_create(int len, char **lines, int default_deny)
{
struct acl * acl;
struct acl *acl;
acl = (struct acl *)xmalloc( sizeof( struct acl ) );
acl->len = parse_acl( &acl->entries, len, lines );
acl->default_deny = default_deny;
return acl;
acl = (struct acl *) xmalloc(sizeof(struct acl));
acl->len = parse_acl(&acl->entries, len, lines);
acl->default_deny = default_deny;
return acl;
}
static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 };
static int testmasks[9] = { 0, 128, 192, 224, 240, 248, 252, 254, 255 };
/** Test whether AF_INET or AF_INET6 sockaddr is included in the given access
* control list, returning 1 if it is, and 0 if not.
*/
static int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], union mysockaddr* test)
static int is_included_in_acl(int list_length,
struct ip_and_mask (*list)[],
union mysockaddr *test)
{
NULLCHECK( test );
NULLCHECK(test);
int i;
int i;
for (i=0; i < list_length; i++) {
struct ip_and_mask *entry = &(*list)[i];
int testbits;
unsigned char *raw_address1 = NULL, *raw_address2 = NULL;
for (i = 0; i < list_length; i++) {
struct ip_and_mask *entry = &(*list)[i];
int testbits;
unsigned char *raw_address1 = NULL, *raw_address2 = NULL;
debug("checking acl entry %d (%d/%d)", i, test->generic.sa_family, entry->ip.family);
debug("checking acl entry %d (%d/%d)", i, test->generic.sa_family,
entry->ip.family);
if (test->generic.sa_family != entry->ip.family) {
continue;
}
if (test->generic.sa_family == AF_INET) {
debug("it's an AF_INET");
raw_address1 = (unsigned char*) &test->v4.sin_addr;
raw_address2 = (unsigned char*) &entry->ip.v4.sin_addr;
}
else if (test->generic.sa_family == AF_INET6) {
debug("it's an AF_INET6");
raw_address1 = (unsigned char*) &test->v6.sin6_addr;
raw_address2 = (unsigned char*) &entry->ip.v6.sin6_addr;
}
else {
fatal( "Can't check an ACL for this address type." );
}
debug("testbits=%d", entry->mask);
for (testbits = entry->mask; testbits > 0; testbits -= 8) {
debug("testbits=%d, c1=%02x, c2=%02x", testbits, raw_address1[0], raw_address2[0]);
if (testbits >= 8) {
if (raw_address1[0] != raw_address2[0]) { goto no_match; }
}
else {
if ((raw_address1[0] & testmasks[testbits%8]) !=
(raw_address2[0] & testmasks[testbits%8]) ) {
goto no_match;
}
}
raw_address1++;
raw_address2++;
}
return 1;
no_match: ;
debug("no match");
if (test->generic.sa_family != entry->ip.family) {
continue;
}
return 0;
}
int acl_includes( struct acl * acl, union mysockaddr * addr )
{
NULLCHECK( acl );
if ( 0 == acl->len ) {
return !( acl->default_deny );
if (test->generic.sa_family == AF_INET) {
debug("it's an AF_INET");
raw_address1 = (unsigned char *) &test->v4.sin_addr;
raw_address2 = (unsigned char *) &entry->ip.v4.sin_addr;
} else if (test->generic.sa_family == AF_INET6) {
debug("it's an AF_INET6");
raw_address1 = (unsigned char *) &test->v6.sin6_addr;
raw_address2 = (unsigned char *) &entry->ip.v6.sin6_addr;
} else {
fatal("Can't check an ACL for this address type.");
}
else {
return is_included_in_acl( acl->len, acl->entries, addr );
debug("testbits=%d", entry->mask);
for (testbits = entry->mask; testbits > 0; testbits -= 8) {
debug("testbits=%d, c1=%02x, c2=%02x", testbits,
raw_address1[0], raw_address2[0]);
if (testbits >= 8) {
if (raw_address1[0] != raw_address2[0]) {
goto no_match;
}
} else {
if ((raw_address1[0] & testmasks[testbits % 8]) !=
(raw_address2[0] & testmasks[testbits % 8])) {
goto no_match;
}
}
raw_address1++;
raw_address2++;
}
return 1;
no_match:;
debug("no match");
}
return 0;
}
int acl_default_deny( struct acl * acl )
int acl_includes(struct acl *acl, union mysockaddr *addr)
{
NULLCHECK( acl );
return acl->default_deny;
NULLCHECK(acl);
if (0 == acl->len) {
return !(acl->default_deny);
} else {
return is_included_in_acl(acl->len, acl->entries, addr);
}
}
void acl_destroy( struct acl * acl )
int acl_default_deny(struct acl *acl)
{
free( acl->entries );
acl->len = 0;
acl->entries = NULL;
free( acl );
NULLCHECK(acl);
return acl->default_deny;
}
void acl_destroy(struct acl *acl)
{
free(acl->entries);
acl->len = 0;
acl->entries = NULL;
free(acl);
}

View File

@@ -4,9 +4,9 @@
#include "parse.h"
struct acl {
int len;
int default_deny;
struct ip_and_mask (*entries)[];
int len;
int default_deny;
struct ip_and_mask (*entries)[];
};
/** Allocate a new acl structure, parsing the given lines to sockaddr
@@ -17,21 +17,21 @@ struct acl {
* default_deny controls the behaviour of an empty list: if true, all
* requests will be denied. If true, all requests will be accepted.
*/
struct acl * acl_create( int len, char **lines, int default_deny );
struct acl *acl_create(int len, char **lines, int default_deny);
/** Check to see whether an address is allowed by an acl.
* See acl_create for how the default_deny setting affects this.
*/
int acl_includes( struct acl *, union mysockaddr *);
int acl_includes(struct acl *, union mysockaddr *);
/** Get the default_deny status */
int acl_default_deny( struct acl * );
int acl_default_deny(struct acl *);
/** Free the acl structure and the internal acl entries table.
*/
void acl_destroy( struct acl * );
void acl_destroy(struct acl *);
#endif

View File

@@ -12,8 +12,8 @@
* poking at the bits directly without using these
* accessors/macros
*/
typedef uint64_t bitfield_word_t;
typedef bitfield_word_t * bitfield_p;
typedef uint64_t bitfield_word_t;
typedef bitfield_word_t *bitfield_p;
#define BITFIELD_WORD_SIZE sizeof(bitfield_word_t)
#define BITS_PER_WORD (BITFIELD_WORD_SIZE * 8)
@@ -30,65 +30,78 @@ typedef bitfield_word_t * bitfield_p;
((_bytes + (BITFIELD_WORD_SIZE-1)) / BITFIELD_WORD_SIZE)
/** Return the bit value ''idx'' in array ''b'' */
static inline int bit_get(bitfield_p b, uint64_t idx) {
return (BIT_WORD(b, idx) >> (idx & (BITS_PER_WORD-1))) & 1;
static inline int bit_get(bitfield_p b, uint64_t idx)
{
return (BIT_WORD(b, idx) >> (idx & (BITS_PER_WORD - 1))) & 1;
}
/** Return 1 if the bit at ''idx'' in array ''b'' is set */
static inline int bit_is_set(bitfield_p b, uint64_t idx) {
return bit_get(b, idx);
static inline int bit_is_set(bitfield_p b, uint64_t idx)
{
return bit_get(b, idx);
}
/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
static inline int bit_is_clear(bitfield_p b, uint64_t idx) {
return !bit_get(b, idx);
static inline int bit_is_clear(bitfield_p b, uint64_t idx)
{
return !bit_get(b, idx);
}
/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
static inline int bit_has_value(bitfield_p b, uint64_t idx, int value) {
return bit_get(b, idx) == !!value;
static inline int bit_has_value(bitfield_p b, uint64_t idx, int value)
{
return bit_get(b, idx) == ! !value;
}
/** Sets the bit ''idx'' in array ''b'' */
static inline void bit_set(bitfield_p b, uint64_t idx) {
BIT_WORD(b, idx) |= BIT_MASK(idx);
static inline void bit_set(bitfield_p b, uint64_t idx)
{
BIT_WORD(b, idx) |= BIT_MASK(idx);
}
/** Clears the bit ''idx'' in array ''b'' */
static inline void bit_clear(bitfield_p b, uint64_t idx) {
BIT_WORD(b, idx) &= ~BIT_MASK(idx);
static inline void bit_clear(bitfield_p b, uint64_t idx)
{
BIT_WORD(b, idx) &= ~BIT_MASK(idx);
}
/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_set_range(bitfield_p b, uint64_t from, uint64_t len)
{
for ( ; (from % BITS_PER_WORD) != 0 && len > 0 ; len-- ) {
bit_set( b, from++ );
}
for (; (from % BITS_PER_WORD) != 0 && len > 0; len--) {
bit_set(b, from++);
}
if (len >= BITS_PER_WORD) {
memset(&BIT_WORD(b, from), 0xff, len / 8 );
from += len;
len = len % BITS_PER_WORD;
from -= len;
}
if (len >= BITS_PER_WORD) {
memset(&BIT_WORD(b, from), 0xff, len / 8);
from += len;
len = len % BITS_PER_WORD;
from -= len;
}
for ( ; len > 0 ; len-- ) {
bit_set( b, from++ );
}
for (; len > 0; len--) {
bit_set(b, from++);
}
}
/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_clear_range(bitfield_p b, uint64_t from, uint64_t len)
static inline void bit_clear_range(bitfield_p b, uint64_t from,
uint64_t len)
{
for ( ; (from % BITS_PER_WORD) != 0 && len > 0 ; len-- ) {
bit_clear( b, from++ );
}
for (; (from % BITS_PER_WORD) != 0 && len > 0; len--) {
bit_clear(b, from++);
}
if (len >= BITS_PER_WORD) {
memset(&BIT_WORD(b, from), 0, len / 8 );
from += len;
len = len % BITS_PER_WORD;
from -= len;
}
if (len >= BITS_PER_WORD) {
memset(&BIT_WORD(b, from), 0, len / 8);
from += len;
len = len % BITS_PER_WORD;
from -= len;
}
for ( ; len > 0 ; len-- ) {
bit_clear( b, from++ );
}
for (; len > 0; len--) {
bit_clear(b, from++);
}
}
/** Counts the number of contiguous bits in array ''b'', starting at ''from''
@@ -96,52 +109,54 @@ static inline void bit_clear_range(bitfield_p b, uint64_t from, uint64_t len)
* bits that are the same as the first one specified. If ''run_is_set'' is
* non-NULL, the value of that bit is placed into it.
*/
static inline uint64_t bit_run_count(bitfield_p b, uint64_t from, uint64_t len, int *run_is_set) {
uint64_t count = 0;
int first_value = bit_get(b, from);
bitfield_word_t word_match = first_value ? -1 : 0;
static inline uint64_t bit_run_count(bitfield_p b, uint64_t from,
uint64_t len, int *run_is_set)
{
uint64_t count = 0;
int first_value = bit_get(b, from);
bitfield_word_t word_match = first_value ? -1 : 0;
if ( run_is_set != NULL ) {
*run_is_set = first_value;
if (run_is_set != NULL) {
*run_is_set = first_value;
}
for (; ((from + count) % BITS_PER_WORD) != 0 && len > 0; len--) {
if (bit_has_value(b, from + count, first_value)) {
count++;
} else {
return count;
}
}
for ( ; ((from + count) % BITS_PER_WORD) != 0 && len > 0; len--) {
if (bit_has_value(b, from + count, first_value)) {
count++;
} else {
return count;
}
for (; len >= BITS_PER_WORD; len -= BITS_PER_WORD) {
if (BIT_WORD(b, from + count) == word_match) {
count += BITS_PER_WORD;
} else {
break;
}
}
for ( ; len >= BITS_PER_WORD ; len -= BITS_PER_WORD ) {
if (BIT_WORD(b, from + count) == word_match) {
count += BITS_PER_WORD;
} else {
break;
}
for (; len > 0; len--) {
if (bit_has_value(b, from + count, first_value)) {
count++;
}
}
for ( ; len > 0; len-- ) {
if ( bit_has_value(b, from + count, first_value) ) {
count++;
}
}
return count;
return count;
}
enum bitset_stream_events {
BITSET_STREAM_UNSET = 0,
BITSET_STREAM_SET = 1,
BITSET_STREAM_ON = 2,
BITSET_STREAM_OFF = 3
BITSET_STREAM_UNSET = 0,
BITSET_STREAM_SET = 1,
BITSET_STREAM_ON = 2,
BITSET_STREAM_OFF = 3
};
#define BITSET_STREAM_EVENTS_ENUM_SIZE 4
struct bitset_stream_entry {
enum bitset_stream_events event;
uint64_t from;
uint64_t len;
enum bitset_stream_events event;
uint64_t from;
uint64_t len;
};
/** Limit the stream size to 1MB for now.
@@ -152,14 +167,14 @@ struct bitset_stream_entry {
#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) )
struct bitset_stream {
struct bitset_stream_entry entries[BITSET_STREAM_SIZE];
int in;
int out;
int size;
pthread_mutex_t mutex;
pthread_cond_t cond_not_full;
pthread_cond_t cond_not_empty;
uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE];
struct bitset_stream_entry entries[BITSET_STREAM_SIZE];
int in;
int out;
int size;
pthread_mutex_t mutex;
pthread_cond_t cond_not_full;
pthread_cond_t cond_not_empty;
uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE];
};
@@ -169,47 +184,49 @@ struct bitset_stream {
* written reliably by multiple threads.
*/
struct bitset {
pthread_mutex_t lock;
uint64_t size;
int resolution;
struct bitset_stream *stream;
int stream_enabled;
bitfield_word_t bits[];
pthread_mutex_t lock;
uint64_t size;
int resolution;
struct bitset_stream *stream;
int stream_enabled;
bitfield_word_t bits[];
};
/** Allocate a bitset for a file of the given size, and chunks of the
* given resolution.
*/
static inline struct bitset *bitset_alloc( uint64_t size, int resolution )
static inline struct bitset *bitset_alloc(uint64_t size, int resolution)
{
// calculate a size to allocate that is a multiple of the size of the
// bitfield word
size_t bitfield_size =
BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t );
struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) );
// calculate a size to allocate that is a multiple of the size of the
// bitfield word
size_t bitfield_size =
BIT_WORDS_FOR_SIZE(((size + resolution -
1) / resolution)) * sizeof(bitfield_word_t);
struct bitset *bitset =
xmalloc(sizeof(struct bitset) + (bitfield_size / 8));
bitset->size = size;
bitset->resolution = resolution;
/* don't actually need to call pthread_mutex_destroy '*/
pthread_mutex_init(&bitset->lock, NULL);
bitset->stream = xmalloc( sizeof( struct bitset_stream ) );
pthread_mutex_init( &bitset->stream->mutex, NULL );
bitset->size = size;
bitset->resolution = resolution;
/* don't actually need to call pthread_mutex_destroy ' */
pthread_mutex_init(&bitset->lock, NULL);
bitset->stream = xmalloc(sizeof(struct bitset_stream));
pthread_mutex_init(&bitset->stream->mutex, NULL);
/* Technically don't need to call pthread_cond_destroy either */
pthread_cond_init( &bitset->stream->cond_not_full, NULL );
pthread_cond_init( &bitset->stream->cond_not_empty, NULL );
/* Technically don't need to call pthread_cond_destroy either */
pthread_cond_init(&bitset->stream->cond_not_full, NULL);
pthread_cond_init(&bitset->stream->cond_not_empty, NULL);
return bitset;
return bitset;
}
static inline void bitset_free( struct bitset * set )
static inline void bitset_free(struct bitset *set)
{
/* TODO: free our mutex... */
/* TODO: free our mutex... */
free( set->stream );
set->stream = NULL;
free(set->stream);
set->stream = NULL;
free( set );
free(set);
}
#define INT_FIRST_AND_LAST \
@@ -224,215 +241,201 @@ static inline void bitset_free( struct bitset * set )
FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset")
static inline void bitset_stream_enqueue(
struct bitset * set,
enum bitset_stream_events event,
uint64_t from,
uint64_t len
)
static inline void bitset_stream_enqueue(struct bitset *set,
enum bitset_stream_events event,
uint64_t from, uint64_t len)
{
struct bitset_stream * stream = set->stream;
struct bitset_stream *stream = set->stream;
pthread_mutex_lock( &stream->mutex );
pthread_mutex_lock(&stream->mutex);
while ( stream->size == BITSET_STREAM_SIZE ) {
pthread_cond_wait( &stream->cond_not_full, &stream->mutex );
}
while (stream->size == BITSET_STREAM_SIZE) {
pthread_cond_wait(&stream->cond_not_full, &stream->mutex);
}
stream->entries[stream->in].event = event;
stream->entries[stream->in].from = from;
stream->entries[stream->in].len = len;
stream->queued_bytes[event] += len;
stream->entries[stream->in].event = event;
stream->entries[stream->in].from = from;
stream->entries[stream->in].len = len;
stream->queued_bytes[event] += len;
stream->size++;
stream->in++;
stream->in %= BITSET_STREAM_SIZE;
stream->size++;
stream->in++;
stream->in %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( & stream->mutex );
pthread_cond_signal( &stream->cond_not_empty );
pthread_mutex_unlock(&stream->mutex);
pthread_cond_signal(&stream->cond_not_empty);
return;
return;
}
static inline void bitset_stream_dequeue(
struct bitset * set,
struct bitset_stream_entry * out
)
static inline void bitset_stream_dequeue(struct bitset *set,
struct bitset_stream_entry *out)
{
struct bitset_stream * stream = set->stream;
struct bitset_stream_entry * dequeued;
struct bitset_stream *stream = set->stream;
struct bitset_stream_entry *dequeued;
pthread_mutex_lock( &stream->mutex );
pthread_mutex_lock(&stream->mutex);
while ( stream->size == 0 ) {
pthread_cond_wait( &stream->cond_not_empty, &stream->mutex );
}
while (stream->size == 0) {
pthread_cond_wait(&stream->cond_not_empty, &stream->mutex);
}
dequeued = &stream->entries[stream->out];
dequeued = &stream->entries[stream->out];
if ( out != NULL ) {
out->event = dequeued->event;
out->from = dequeued->from;
out->len = dequeued->len;
}
if (out != NULL) {
out->event = dequeued->event;
out->from = dequeued->from;
out->len = dequeued->len;
}
stream->queued_bytes[dequeued->event] -= dequeued->len;
stream->size--;
stream->out++;
stream->out %= BITSET_STREAM_SIZE;
stream->queued_bytes[dequeued->event] -= dequeued->len;
stream->size--;
stream->out++;
stream->out %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( &stream->mutex );
pthread_cond_signal( &stream->cond_not_full );
pthread_mutex_unlock(&stream->mutex);
pthread_cond_signal(&stream->cond_not_full);
return;
return;
}
static inline size_t bitset_stream_size( struct bitset * set )
static inline size_t bitset_stream_size(struct bitset *set)
{
size_t size;
size_t size;
pthread_mutex_lock( &set->stream->mutex );
size = set->stream->size;
pthread_mutex_unlock( &set->stream->mutex );
pthread_mutex_lock(&set->stream->mutex);
size = set->stream->size;
pthread_mutex_unlock(&set->stream->mutex);
return size;
return size;
}
static inline uint64_t bitset_stream_queued_bytes(
struct bitset * set,
enum bitset_stream_events event
)
static inline uint64_t bitset_stream_queued_bytes(struct bitset *set,
enum bitset_stream_events
event)
{
uint64_t total;
uint64_t total;
pthread_mutex_lock( &set->stream->mutex );
total = set->stream->queued_bytes[event];
pthread_mutex_unlock( &set->stream->mutex );
pthread_mutex_lock(&set->stream->mutex);
total = set->stream->queued_bytes[event];
pthread_mutex_unlock(&set->stream->mutex);
return total;
return total;
}
static inline void bitset_enable_stream( struct bitset * set )
static inline void bitset_enable_stream(struct bitset *set)
{
BITSET_LOCK;
set->stream_enabled = 1;
bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size );
BITSET_UNLOCK;
BITSET_LOCK;
set->stream_enabled = 1;
bitset_stream_enqueue(set, BITSET_STREAM_ON, 0, set->size);
BITSET_UNLOCK;
}
static inline void bitset_disable_stream( struct bitset * set )
static inline void bitset_disable_stream(struct bitset *set)
{
BITSET_LOCK;
bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size );
set->stream_enabled = 0;
BITSET_UNLOCK;
BITSET_LOCK;
bitset_stream_enqueue(set, BITSET_STREAM_OFF, 0, set->size);
set->stream_enabled = 0;
BITSET_UNLOCK;
}
/** Set the bits in a bitset which correspond to the given bytes in the larger
* file.
*/
static inline void bitset_set_range(
struct bitset * set,
uint64_t from,
uint64_t len)
static inline void bitset_set_range(struct bitset *set,
uint64_t from, uint64_t len)
{
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_set_range(set->bits, first, bitlen);
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_set_range(set->bits, first, bitlen);
if ( set->stream_enabled ) {
bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len );
}
if (set->stream_enabled) {
bitset_stream_enqueue(set, BITSET_STREAM_SET, from, len);
}
BITSET_UNLOCK;
BITSET_UNLOCK;
}
/** Set every bit in the bitset. */
static inline void bitset_set( struct bitset * set )
static inline void bitset_set(struct bitset *set)
{
bitset_set_range(set, 0, set->size);
bitset_set_range(set, 0, set->size);
}
/** Clear the bits in a bitset which correspond to the given bytes in the
* larger file.
*/
static inline void bitset_clear_range(
struct bitset * set,
uint64_t from,
uint64_t len)
static inline void bitset_clear_range(struct bitset *set,
uint64_t from, uint64_t len)
{
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_clear_range(set->bits, first, bitlen);
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_clear_range(set->bits, first, bitlen);
if ( set->stream_enabled ) {
bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len );
}
if (set->stream_enabled) {
bitset_stream_enqueue(set, BITSET_STREAM_UNSET, from, len);
}
BITSET_UNLOCK;
BITSET_UNLOCK;
}
/** Clear every bit in the bitset. */
static inline void bitset_clear( struct bitset * set )
static inline void bitset_clear(struct bitset *set)
{
bitset_clear_range(set, 0, set->size);
bitset_clear_range(set, 0, set->size);
}
/** As per bitset_run_count but also tells you whether the run it found was set
* or unset, atomically.
*/
static inline uint64_t bitset_run_count_ex(
struct bitset * set,
uint64_t from,
uint64_t len,
int* run_is_set
)
static inline uint64_t bitset_run_count_ex(struct bitset *set,
uint64_t from,
uint64_t len, int *run_is_set)
{
uint64_t run;
uint64_t run;
/* Clip our requests to the end of the bitset, avoiding uint underflow. */
if ( from > set->size ) {
return 0;
}
len = ( len + from ) > set->size ? ( set->size - from ) : len;
/* Clip our requests to the end of the bitset, avoiding uint underflow. */
if (from > set->size) {
return 0;
}
len = (len + from) > set->size ? (set->size - from) : len;
INT_FIRST_AND_LAST;
INT_FIRST_AND_LAST;
BITSET_LOCK;
run = bit_run_count(set->bits, first, bitlen, run_is_set) * set->resolution;
run -= (from % set->resolution);
BITSET_UNLOCK;
BITSET_LOCK;
run =
bit_run_count(set->bits, first, bitlen,
run_is_set) * set->resolution;
run -= (from % set->resolution);
BITSET_UNLOCK;
return run;
return run;
}
/** Counts the number of contiguous bytes that are represented as a run in
* the bit field.
*/
static inline uint64_t bitset_run_count(
struct bitset * set,
uint64_t from,
uint64_t len)
static inline uint64_t bitset_run_count(struct bitset *set,
uint64_t from, uint64_t len)
{
return bitset_run_count_ex( set, from, len, NULL );
return bitset_run_count_ex(set, from, len, NULL);
}
/** Tests whether the bit field is clear for the given file offset.
*/
static inline int bitset_is_clear_at( struct bitset * set, uint64_t at )
static inline int bitset_is_clear_at(struct bitset *set, uint64_t at)
{
return bit_is_clear(set->bits, at/set->resolution);
return bit_is_clear(set->bits, at / set->resolution);
}
/** Tests whether the bit field is set for the given file offset.
*/
static inline int bitset_is_set_at( struct bitset * set, uint64_t at )
static inline int bitset_is_set_at(struct bitset *set, uint64_t at)
{
return bit_is_set(set->bits, at/set->resolution);
return bit_is_set(set->bits, at / set->resolution);
}
#endif

File diff suppressed because it is too large Load Diff

View File

@@ -19,41 +19,40 @@
struct client {
/* When we call pthread_join, if the thread is already dead
* we can get an ESRCH. Since we have no other way to tell
* if that ESRCH is from a dead thread or a thread that never
* existed, we use a `stopped` flag to indicate a thread which
* did exist, but went away. Only check this after a
* pthread_join call.
*/
int stopped;
int socket;
/* When we call pthread_join, if the thread is already dead
* we can get an ESRCH. Since we have no other way to tell
* if that ESRCH is from a dead thread or a thread that never
* existed, we use a `stopped` flag to indicate a thread which
* did exist, but went away. Only check this after a
* pthread_join call.
*/
int stopped;
int socket;
int fileno;
char* mapped;
int fileno;
char *mapped;
uint64_t mapped_size;
uint64_t mapped_size;
struct self_pipe * stop_signal;
struct self_pipe *stop_signal;
struct server* serve; /* FIXME: remove above duplication */
struct server *serve; /* FIXME: remove above duplication */
/* Have we seen a REQUEST_DISCONNECT message? */
int disconnect;
/* Have we seen a REQUEST_DISCONNECT message? */
int disconnect;
/* kill the whole server if a request has been outstanding too long,
* assuming use_killswitch is set in serve
*/
timer_t killswitch;
/* kill the whole server if a request has been outstanding too long,
* assuming use_killswitch is set in serve
*/
timer_t killswitch;
};
void client_killswitch_hit(int signal, siginfo_t *info, void *ptr);
void client_killswitch_hit(int signal, siginfo_t * info, void *ptr);
void* client_serve(void* client_uncast);
struct client * client_create( struct server * serve, int socket );
void client_destroy( struct client * client );
void client_signal_stop( struct client * client );
void *client_serve(void *client_uncast);
struct client *client_create(struct server *serve, int socket);
void client_destroy(struct client *client);
void client_signal_stop(struct client *client);
#endif

View File

@@ -44,590 +44,570 @@
#include <unistd.h>
struct control * control_create(
struct flexnbd * flexnbd,
const char * csn)
struct control *control_create(struct flexnbd *flexnbd, const char *csn)
{
struct control * control = xmalloc( sizeof( struct control ) );
struct control *control = xmalloc(sizeof(struct control));
NULLCHECK( csn );
NULLCHECK(csn);
control->flexnbd = flexnbd;
control->socket_name = csn;
control->open_signal = self_pipe_create();
control->close_signal = self_pipe_create();
control->mirror_state_mbox = mbox_create();
control->flexnbd = flexnbd;
control->socket_name = csn;
control->open_signal = self_pipe_create();
control->close_signal = self_pipe_create();
control->mirror_state_mbox = mbox_create();
return control;
return control;
}
void control_signal_close( struct control * control)
void control_signal_close(struct control *control)
{
NULLCHECK( control );
self_pipe_signal( control->close_signal );
NULLCHECK(control);
self_pipe_signal(control->close_signal);
}
void control_destroy( struct control * control )
void control_destroy(struct control *control)
{
NULLCHECK( control );
NULLCHECK(control);
mbox_destroy( control->mirror_state_mbox );
self_pipe_destroy( control->close_signal );
self_pipe_destroy( control->open_signal );
free( control );
mbox_destroy(control->mirror_state_mbox);
self_pipe_destroy(control->close_signal);
self_pipe_destroy(control->open_signal);
free(control);
}
struct control_client * control_client_create(
struct flexnbd * flexnbd,
int client_fd ,
struct mbox * state_mbox )
struct control_client *control_client_create(struct flexnbd *flexnbd,
int client_fd,
struct mbox *state_mbox)
{
NULLCHECK( flexnbd );
NULLCHECK(flexnbd);
struct control_client * control_client =
xmalloc( sizeof( struct control_client ) );
struct control_client *control_client =
xmalloc(sizeof(struct control_client));
control_client->socket = client_fd;
control_client->flexnbd = flexnbd;
control_client->mirror_state_mbox = state_mbox;
return control_client;
control_client->socket = client_fd;
control_client->flexnbd = flexnbd;
control_client->mirror_state_mbox = state_mbox;
return control_client;
}
void control_client_destroy( struct control_client * client )
void control_client_destroy(struct control_client *client)
{
NULLCHECK( client );
free( client );
NULLCHECK(client);
free(client);
}
void control_respond(struct control_client * client);
void control_respond(struct control_client *client);
void control_handle_client( struct control * control, int client_fd )
void control_handle_client(struct control *control, int client_fd)
{
NULLCHECK( control );
NULLCHECK( control->flexnbd );
struct control_client * control_client =
control_client_create(
control->flexnbd,
client_fd ,
control->mirror_state_mbox);
NULLCHECK(control);
NULLCHECK(control->flexnbd);
struct control_client *control_client =
control_client_create(control->flexnbd,
client_fd,
control->mirror_state_mbox);
/* We intentionally don't spawn a thread for the client here.
* This is to avoid having more than one thread potentially
* waiting on the migration commit status.
*/
control_respond( control_client );
/* We intentionally don't spawn a thread for the client here.
* This is to avoid having more than one thread potentially
* waiting on the migration commit status.
*/
control_respond(control_client);
}
void control_accept_client( struct control * control )
void control_accept_client(struct control *control)
{
int client_fd;
union mysockaddr client_address;
socklen_t addrlen = sizeof( union mysockaddr );
int client_fd;
union mysockaddr client_address;
socklen_t addrlen = sizeof(union mysockaddr);
client_fd = accept( control->control_fd, &client_address.generic, &addrlen );
FATAL_IF( -1 == client_fd, "control accept failed" );
client_fd =
accept(control->control_fd, &client_address.generic, &addrlen);
FATAL_IF(-1 == client_fd, "control accept failed");
control_handle_client( control, client_fd );
control_handle_client(control, client_fd);
}
int control_accept( struct control * control )
int control_accept(struct control *control)
{
NULLCHECK( control );
NULLCHECK(control);
fd_set fds;
fd_set fds;
FD_ZERO( &fds );
FD_SET( control->control_fd, &fds );
self_pipe_fd_set( control->close_signal, &fds );
debug("Control thread selecting");
FATAL_UNLESS( 0 < select( FD_SETSIZE, &fds, NULL, NULL, NULL ),
"Control select failed." );
FD_ZERO(&fds);
FD_SET(control->control_fd, &fds);
self_pipe_fd_set(control->close_signal, &fds);
debug("Control thread selecting");
FATAL_UNLESS(0 < select(FD_SETSIZE, &fds, NULL, NULL, NULL),
"Control select failed.");
if ( self_pipe_fd_isset( control->close_signal, &fds ) ){
return 0;
}
if (self_pipe_fd_isset(control->close_signal, &fds)) {
return 0;
}
if ( FD_ISSET( control->control_fd, &fds ) ) {
control_accept_client( control );
}
return 1;
if (FD_ISSET(control->control_fd, &fds)) {
control_accept_client(control);
}
return 1;
}
void control_accept_loop( struct control * control )
void control_accept_loop(struct control *control)
{
while( control_accept( control ) );
while (control_accept(control));
}
int open_control_socket( const char * socket_name )
int open_control_socket(const char *socket_name)
{
struct sockaddr_un bind_address;
int control_fd;
struct sockaddr_un bind_address;
int control_fd;
if (!socket_name) {
fatal( "Tried to open a control socket without a socket name" );
}
if (!socket_name) {
fatal("Tried to open a control socket without a socket name");
}
control_fd = socket(AF_UNIX, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE(control_fd ,
"Couldn't create control socket");
control_fd = socket(AF_UNIX, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE(control_fd, "Couldn't create control socket");
memset(&bind_address, 0, sizeof(struct sockaddr_un));
bind_address.sun_family = AF_UNIX;
strncpy(bind_address.sun_path, socket_name, sizeof(bind_address.sun_path)-1);
memset(&bind_address, 0, sizeof(struct sockaddr_un));
bind_address.sun_family = AF_UNIX;
strncpy(bind_address.sun_path, socket_name,
sizeof(bind_address.sun_path) - 1);
//unlink(socket_name); /* ignore failure */
//unlink(socket_name); /* ignore failure */
FATAL_IF_NEGATIVE(
bind(control_fd , &bind_address, sizeof(bind_address)),
"Couldn't bind control socket to %s: %s",
socket_name, strerror( errno )
FATAL_IF_NEGATIVE(bind
(control_fd, &bind_address, sizeof(bind_address)),
"Couldn't bind control socket to %s: %s",
socket_name, strerror(errno)
);
FATAL_IF_NEGATIVE(
listen(control_fd , 5),
"Couldn't listen on control socket"
);
return control_fd;
FATAL_IF_NEGATIVE(listen(control_fd, 5),
"Couldn't listen on control socket");
return control_fd;
}
void control_listen(struct control* control)
void control_listen(struct control *control)
{
NULLCHECK( control );
control->control_fd = open_control_socket( control->socket_name );
NULLCHECK(control);
control->control_fd = open_control_socket(control->socket_name);
}
void control_wait_for_open_signal( struct control * control )
void control_wait_for_open_signal(struct control *control)
{
fd_set fds;
FD_ZERO( &fds );
self_pipe_fd_set( control->open_signal, &fds );
FATAL_IF_NEGATIVE( select( FD_SETSIZE, &fds, NULL, NULL, NULL ),
"select() failed" );
fd_set fds;
FD_ZERO(&fds);
self_pipe_fd_set(control->open_signal, &fds);
FATAL_IF_NEGATIVE(select(FD_SETSIZE, &fds, NULL, NULL, NULL),
"select() failed");
self_pipe_signal_clear( control->open_signal );
self_pipe_signal_clear(control->open_signal);
}
void control_serve( struct control * control )
void control_serve(struct control *control)
{
NULLCHECK( control );
NULLCHECK(control);
control_wait_for_open_signal( control );
control_listen( control );
while( control_accept( control ) );
control_wait_for_open_signal(control);
control_listen(control);
while (control_accept(control));
}
void control_cleanup(
struct control * control,
int fatal __attribute__((unused)) )
void control_cleanup(struct control *control,
int fatal __attribute__ ((unused)))
{
NULLCHECK( control );
unlink( control->socket_name );
close( control->control_fd );
NULLCHECK(control);
unlink(control->socket_name);
close(control->control_fd);
}
void * control_runner( void * control_uncast )
void *control_runner(void *control_uncast)
{
debug("Control thread");
NULLCHECK( control_uncast );
struct control * control = (struct control *)control_uncast;
debug("Control thread");
NULLCHECK(control_uncast);
struct control *control = (struct control *) control_uncast;
error_set_handler( (cleanup_handler*)control_cleanup, control );
error_set_handler((cleanup_handler *) control_cleanup, control);
control_serve( control );
control_serve(control);
control_cleanup( control, 0 );
pthread_exit( NULL );
control_cleanup(control, 0);
pthread_exit(NULL);
}
#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1)
void control_write_mirror_response( enum mirror_state mirror_state, int client_fd )
void control_write_mirror_response(enum mirror_state mirror_state,
int client_fd)
{
switch (mirror_state) {
case MS_INIT:
case MS_UNKNOWN:
write_socket( "1: Mirror failed to initialise" );
fatal( "Impossible mirror state: %d", mirror_state );
case MS_FAIL_CONNECT:
write_socket( "1: Mirror failed to connect");
break;
case MS_FAIL_REJECTED:
write_socket( "1: Mirror was rejected" );
break;
case MS_FAIL_NO_HELLO:
write_socket( "1: Remote server failed to respond");
break;
case MS_FAIL_SIZE_MISMATCH:
write_socket( "1: Remote size does not match local size" );
break;
case MS_ABANDONED:
write_socket( "1: Mirroring abandoned" );
break;
case MS_GO:
case MS_DONE: /* Yes, I know we know better, but it's simpler this way */
write_socket( "0: Mirror started" );
break;
default:
fatal( "Unhandled mirror state: %d", mirror_state );
}
switch (mirror_state) {
case MS_INIT:
case MS_UNKNOWN:
write_socket("1: Mirror failed to initialise");
fatal("Impossible mirror state: %d", mirror_state);
case MS_FAIL_CONNECT:
write_socket("1: Mirror failed to connect");
break;
case MS_FAIL_REJECTED:
write_socket("1: Mirror was rejected");
break;
case MS_FAIL_NO_HELLO:
write_socket("1: Remote server failed to respond");
break;
case MS_FAIL_SIZE_MISMATCH:
write_socket("1: Remote size does not match local size");
break;
case MS_ABANDONED:
write_socket("1: Mirroring abandoned");
break;
case MS_GO:
case MS_DONE: /* Yes, I know we know better, but it's simpler this way */
write_socket("0: Mirror started");
break;
default:
fatal("Unhandled mirror state: %d", mirror_state);
}
}
#undef write_socket
/* Call this in the thread where you want to receive the mirror state */
enum mirror_state control_client_mirror_wait(
struct control_client* client)
enum mirror_state control_client_mirror_wait(struct control_client *client)
{
NULLCHECK( client );
NULLCHECK( client->mirror_state_mbox );
NULLCHECK(client);
NULLCHECK(client->mirror_state_mbox);
struct mbox * mbox = client->mirror_state_mbox;
enum mirror_state mirror_state;
enum mirror_state * contents;
struct mbox *mbox = client->mirror_state_mbox;
enum mirror_state mirror_state;
enum mirror_state *contents;
contents = (enum mirror_state*)mbox_receive( mbox );
NULLCHECK( contents );
contents = (enum mirror_state *) mbox_receive(mbox);
NULLCHECK(contents);
mirror_state = *contents;
mirror_state = *contents;
free( contents );
free(contents);
return mirror_state;
return mirror_state;
}
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
/** Command parser to start mirror process from socket input */
int control_mirror(struct control_client* client, int linesc, char** lines)
int control_mirror(struct control_client *client, int linesc, char **lines)
{
NULLCHECK( client );
NULLCHECK(client);
struct flexnbd * flexnbd = client->flexnbd;
union mysockaddr *connect_to = xmalloc( sizeof( union mysockaddr ) );
union mysockaddr *connect_from = NULL;
uint64_t max_Bps = UINT64_MAX;
int action_at_finish;
int raw_port;
struct flexnbd *flexnbd = client->flexnbd;
union mysockaddr *connect_to = xmalloc(sizeof(union mysockaddr));
union mysockaddr *connect_from = NULL;
uint64_t max_Bps = UINT64_MAX;
int action_at_finish;
int raw_port;
if (linesc < 2) {
write_socket("1: mirror takes at least two parameters");
return -1;
if (linesc < 2) {
write_socket("1: mirror takes at least two parameters");
return -1;
}
if (parse_ip_to_sockaddr(&connect_to->generic, lines[0]) == 0) {
write_socket("1: bad IP address");
return -1;
}
raw_port = atoi(lines[1]);
if (raw_port < 0 || raw_port > 65535) {
write_socket("1: bad IP port number");
return -1;
}
connect_to->v4.sin_port = htobe16(raw_port);
action_at_finish = ACTION_EXIT;
if (linesc > 2) {
if (strcmp("exit", lines[2]) == 0) {
action_at_finish = ACTION_EXIT;
} else if (strcmp("unlink", lines[2]) == 0) {
action_at_finish = ACTION_UNLINK;
} else if (strcmp("nothing", lines[2]) == 0) {
action_at_finish = ACTION_NOTHING;
} else {
write_socket("1: action must be 'exit' or 'nothing'");
return -1;
}
}
if (linesc > 3) {
connect_from = xmalloc(sizeof(union mysockaddr));
if (parse_ip_to_sockaddr(&connect_from->generic, lines[3]) == 0) {
write_socket("1: bad bind address");
return -1;
}
}
if (linesc > 4) {
errno = 0;
max_Bps = strtoull(lines[4], NULL, 10);
if (errno == ERANGE) {
write_socket("1: max_bps out of range");
return -1;
} else if (errno != 0) {
write_socket("1: max_bps couldn't be parsed");
return -1;
}
}
if (linesc > 5) {
write_socket("1: unrecognised parameters to mirror");
return -1;
}
struct server *serve = flexnbd_server(flexnbd);
server_lock_start_mirror(serve);
{
if (server_mirror_can_start(serve)) {
serve->mirror_super = mirror_super_create(serve->filename,
connect_to,
connect_from,
max_Bps,
action_at_finish,
client->
mirror_state_mbox);
serve->mirror = serve->mirror_super->mirror;
server_prevent_mirror_start(serve);
} else {
if (serve->mirror_super) {
warn("Tried to start a second mirror run");
write_socket("1: mirror already running");
} else {
warn("Cannot start mirroring, shutting down");
write_socket("1: shutting down");
}
}
if (parse_ip_to_sockaddr(&connect_to->generic, lines[0]) == 0) {
write_socket("1: bad IP address");
return -1;
}
}
server_unlock_start_mirror(serve);
raw_port = atoi(lines[1]);
if (raw_port < 0 || raw_port > 65535) {
write_socket("1: bad IP port number");
return -1;
}
connect_to->v4.sin_port = htobe16(raw_port);
/* Do this outside the lock to minimise the length of time the
* sighandler can block the serve thread
*/
if (serve->mirror_super) {
FATAL_IF(0 != pthread_create(&serve->mirror_super->thread,
NULL,
mirror_super_runner,
serve),
"Failed to create mirror thread");
action_at_finish = ACTION_EXIT;
if (linesc > 2) {
if (strcmp("exit", lines[2]) == 0) {
action_at_finish = ACTION_EXIT;
}
else if (strcmp( "unlink", lines[2]) == 0 ) {
action_at_finish = ACTION_UNLINK;
}
else if (strcmp("nothing", lines[2]) == 0) {
action_at_finish = ACTION_NOTHING;
}
else {
write_socket("1: action must be 'exit' or 'nothing'");
return -1;
}
}
debug("Control thread mirror super waiting");
enum mirror_state state = control_client_mirror_wait(client);
debug("Control thread writing response");
control_write_mirror_response(state, client->socket);
}
if (linesc > 3) {
connect_from = xmalloc( sizeof( union mysockaddr ) );
if (parse_ip_to_sockaddr(&connect_from->generic, lines[3]) == 0) {
write_socket("1: bad bind address");
return -1;
}
}
debug("Control thread going away.");
if (linesc > 4) {
errno = 0;
max_Bps = strtoull( lines[4], NULL, 10 );
if ( errno == ERANGE ) {
write_socket( "1: max_bps out of range" );
return -1;
} else if ( errno != 0 ) {
write_socket( "1: max_bps couldn't be parsed" );
return -1;
}
}
if (linesc > 5) {
write_socket("1: unrecognised parameters to mirror");
return -1;
}
struct server * serve = flexnbd_server(flexnbd);
server_lock_start_mirror( serve );
{
if ( server_mirror_can_start( serve ) ) {
serve->mirror_super = mirror_super_create(
serve->filename,
connect_to,
connect_from,
max_Bps ,
action_at_finish,
client->mirror_state_mbox );
serve->mirror = serve->mirror_super->mirror;
server_prevent_mirror_start( serve );
} else {
if ( serve->mirror_super ) {
warn( "Tried to start a second mirror run" );
write_socket( "1: mirror already running" );
} else {
warn( "Cannot start mirroring, shutting down" );
write_socket( "1: shutting down" );
}
}
}
server_unlock_start_mirror( serve );
/* Do this outside the lock to minimise the length of time the
* sighandler can block the serve thread
*/
if ( serve->mirror_super ) {
FATAL_IF( 0 != pthread_create(
&serve->mirror_super->thread,
NULL,
mirror_super_runner,
serve
),
"Failed to create mirror thread"
);
debug("Control thread mirror super waiting");
enum mirror_state state =
control_client_mirror_wait( client );
debug("Control thread writing response");
control_write_mirror_response( state, client->socket );
}
debug( "Control thread going away." );
return 0;
return 0;
}
int control_mirror_max_bps( struct control_client* client, int linesc, char** lines )
int control_mirror_max_bps(struct control_client *client, int linesc,
char **lines)
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
NULLCHECK(client);
NULLCHECK(client->flexnbd);
struct server* serve = flexnbd_server( client->flexnbd );
uint64_t max_Bps;
struct server *serve = flexnbd_server(client->flexnbd);
uint64_t max_Bps;
if ( !serve->mirror_super ) {
write_socket( "1: Not currently mirroring" );
return -1;
}
if (!serve->mirror_super) {
write_socket("1: Not currently mirroring");
return -1;
}
if ( linesc != 1 ) {
write_socket( "1: Bad format" );
return -1;
}
if (linesc != 1) {
write_socket("1: Bad format");
return -1;
}
errno = 0;
max_Bps = strtoull( lines[0], NULL, 10 );
if ( errno == ERANGE ) {
write_socket( "1: max_bps out of range" );
return -1;
} else if ( errno != 0 ) {
write_socket( "1: max_bps couldn't be parsed" );
return -1;
}
errno = 0;
max_Bps = strtoull(lines[0], NULL, 10);
if (errno == ERANGE) {
write_socket("1: max_bps out of range");
return -1;
} else if (errno != 0) {
write_socket("1: max_bps couldn't be parsed");
return -1;
}
serve->mirror->max_bytes_per_second = max_Bps;
write_socket( "0: updated" );
serve->mirror->max_bytes_per_second = max_Bps;
write_socket("0: updated");
return 0;
return 0;
}
#undef write_socket
/** Command parser to alter access control list from socket input */
int control_acl(struct control_client* client, int linesc, char** lines)
int control_acl(struct control_client *client, int linesc, char **lines)
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
struct flexnbd * flexnbd = client->flexnbd;
NULLCHECK(client);
NULLCHECK(client->flexnbd);
struct flexnbd *flexnbd = client->flexnbd;
int default_deny = flexnbd_default_deny( flexnbd );
struct acl * new_acl = acl_create( linesc, lines, default_deny );
int default_deny = flexnbd_default_deny(flexnbd);
struct acl *new_acl = acl_create(linesc, lines, default_deny);
if (new_acl->len != linesc) {
warn("Bad ACL spec: %s", lines[new_acl->len] );
write(client->socket, "1: bad spec: ", 13);
write(client->socket, lines[new_acl->len],
strlen(lines[new_acl->len]));
write(client->socket, "\n", 1);
acl_destroy( new_acl );
}
else {
flexnbd_replace_acl( flexnbd, new_acl );
info("ACL set");
write( client->socket, "0: updated\n", 11);
}
if (new_acl->len != linesc) {
warn("Bad ACL spec: %s", lines[new_acl->len]);
write(client->socket, "1: bad spec: ", 13);
write(client->socket, lines[new_acl->len],
strlen(lines[new_acl->len]));
write(client->socket, "\n", 1);
acl_destroy(new_acl);
} else {
flexnbd_replace_acl(flexnbd, new_acl);
info("ACL set");
write(client->socket, "0: updated\n", 11);
}
return 0;
return 0;
}
int control_break(
struct control_client* client,
int linesc __attribute__ ((unused)),
char** lines __attribute__((unused))
)
int control_break(struct control_client *client,
int linesc __attribute__ ((unused)),
char **lines __attribute__ ((unused))
)
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
NULLCHECK(client);
NULLCHECK(client->flexnbd);
int result = 0;
struct flexnbd* flexnbd = client->flexnbd;
int result = 0;
struct flexnbd *flexnbd = client->flexnbd;
struct server * serve = flexnbd_server( flexnbd );
struct server *serve = flexnbd_server(flexnbd);
server_lock_start_mirror( serve );
{
if ( server_is_mirroring( serve ) ) {
server_lock_start_mirror(serve);
{
if (server_is_mirroring(serve)) {
info( "Signaling to abandon mirror" );
server_abandon_mirror( serve );
debug( "Abandon signaled" );
info("Signaling to abandon mirror");
server_abandon_mirror(serve);
debug("Abandon signaled");
if ( server_is_closed( serve ) ) {
info( "Mirror completed while canceling" );
write( client->socket,
"1: mirror completed\n", 20 );
}
else {
info( "Mirror successfully stopped." );
write( client->socket,
"0: mirror stopped\n", 18 );
result = 1;
}
if (server_is_closed(serve)) {
info("Mirror completed while canceling");
write(client->socket, "1: mirror completed\n", 20);
} else {
info("Mirror successfully stopped.");
write(client->socket, "0: mirror stopped\n", 18);
result = 1;
}
} else {
warn( "Not mirroring." );
write( client->socket, "1: not mirroring\n", 17 );
}
} else {
warn("Not mirroring.");
write(client->socket, "1: not mirroring\n", 17);
}
server_unlock_start_mirror( serve );
}
server_unlock_start_mirror(serve);
return result;
return result;
}
/** FIXME: add some useful statistics */
int control_status(
struct control_client* client,
int linesc __attribute__ ((unused)),
char** lines __attribute__((unused))
)
int control_status(struct control_client *client,
int linesc __attribute__ ((unused)),
char **lines __attribute__ ((unused))
)
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
struct status * status = flexnbd_status_create( client->flexnbd );
NULLCHECK(client);
NULLCHECK(client->flexnbd);
struct status *status = flexnbd_status_create(client->flexnbd);
write( client->socket, "0: ", 3 );
status_write( status, client->socket );
status_destroy( status );
write(client->socket, "0: ", 3);
status_write(status, client->socket);
status_destroy(status);
return 0;
return 0;
}
void control_client_cleanup(struct control_client* client,
int fatal __attribute__ ((unused)) )
void control_client_cleanup(struct control_client *client,
int fatal __attribute__ ((unused)))
{
if (client->socket) { close(client->socket); }
if (client->socket) {
close(client->socket);
}
/* This is wrongness */
if ( server_acl_locked( client->flexnbd->serve ) ) { server_unlock_acl( client->flexnbd->serve ); }
/* This is wrongness */
if (server_acl_locked(client->flexnbd->serve)) {
server_unlock_acl(client->flexnbd->serve);
}
control_client_destroy( client );
control_client_destroy(client);
}
/** Master command parser for control socket connections, delegates quickly */
void control_respond(struct control_client * client)
void control_respond(struct control_client *client)
{
char **lines = NULL;
char **lines = NULL;
error_set_handler((cleanup_handler*) control_client_cleanup, client);
error_set_handler((cleanup_handler *) control_client_cleanup, client);
int i, linesc;
linesc = read_lines_until_blankline(client->socket, 256, &lines);
int i, linesc;
linesc = read_lines_until_blankline(client->socket, 256, &lines);
if (linesc < 1)
{
write(client->socket, "9: missing command\n", 19);
/* ignore failure */
if (linesc < 1) {
write(client->socket, "9: missing command\n", 19);
/* ignore failure */
} else if (strcmp(lines[0], "acl") == 0) {
info("acl command received");
if (control_acl(client, linesc - 1, lines + 1) < 0) {
debug("acl command failed");
}
else if (strcmp(lines[0], "acl") == 0) {
info("acl command received" );
if (control_acl(client, linesc-1, lines+1) < 0) {
debug("acl command failed");
}
} else if (strcmp(lines[0], "mirror") == 0) {
info("mirror command received");
if (control_mirror(client, linesc - 1, lines + 1) < 0) {
debug("mirror command failed");
}
else if (strcmp(lines[0], "mirror") == 0) {
info("mirror command received" );
if (control_mirror(client, linesc-1, lines+1) < 0) {
debug("mirror command failed");
}
} else if (strcmp(lines[0], "break") == 0) {
info("break command received");
if (control_break(client, linesc - 1, lines + 1) < 0) {
debug("break command failed");
}
else if (strcmp(lines[0], "break") == 0) {
info( "break command received" );
if ( control_break( client, linesc-1, lines+1) < 0) {
debug( "break command failed" );
}
} else if (strcmp(lines[0], "status") == 0) {
info("status command received");
if (control_status(client, linesc - 1, lines + 1) < 0) {
debug("status command failed");
}
else if (strcmp(lines[0], "status") == 0) {
info("status command received" );
if (control_status(client, linesc-1, lines+1) < 0) {
debug("status command failed");
}
} else if ( strcmp( lines[0], "mirror_max_bps" ) == 0 ) {
info( "mirror_max_bps command received" );
if( control_mirror_max_bps( client, linesc-1, lines+1 ) < 0 ) {
debug( "mirror_max_bps command failed" );
}
}
else {
write(client->socket, "10: unknown command\n", 23);
} else if (strcmp(lines[0], "mirror_max_bps") == 0) {
info("mirror_max_bps command received");
if (control_mirror_max_bps(client, linesc - 1, lines + 1) < 0) {
debug("mirror_max_bps command failed");
}
} else {
write(client->socket, "10: unknown command\n", 23);
}
for (i=0; i<linesc; i++) {
free(lines[i]);
}
free(lines);
for (i = 0; i < linesc; i++) {
free(lines[i]);
}
free(lines);
control_client_cleanup(client, 0);
debug("control command handled" );
control_client_cleanup(client, 0);
debug("control command handled");
}

View File

@@ -13,47 +13,46 @@ struct server;
#include "mbox.h"
struct control {
struct flexnbd * flexnbd;
int control_fd;
const char * socket_name;
struct flexnbd *flexnbd;
int control_fd;
const char *socket_name;
pthread_t thread;
pthread_t thread;
struct self_pipe * open_signal;
struct self_pipe * close_signal;
struct self_pipe *open_signal;
struct self_pipe *close_signal;
/* This is owned by the control object, and used by a
* mirror_super to communicate the state of a mirror attempt as
* early as feasible. It can't be owned by the mirror_super
* object because the mirror_super object can be freed at any
* time (including while the control_client is waiting on it),
* whereas the control object lasts for the lifetime of the
* process (and we can only have a mirror thread if the control
* thread has started it).
*/
struct mbox * mirror_state_mbox;
/* This is owned by the control object, and used by a
* mirror_super to communicate the state of a mirror attempt as
* early as feasible. It can't be owned by the mirror_super
* object because the mirror_super object can be freed at any
* time (including while the control_client is waiting on it),
* whereas the control object lasts for the lifetime of the
* process (and we can only have a mirror thread if the control
* thread has started it).
*/
struct mbox *mirror_state_mbox;
};
struct control_client{
int socket;
struct flexnbd * flexnbd;
struct control_client {
int socket;
struct flexnbd *flexnbd;
/* Passed in on creation. We know it's all right to do this
* because we know there's only ever one control_client.
*/
struct mbox * mirror_state_mbox;
/* Passed in on creation. We know it's all right to do this
* because we know there's only ever one control_client.
*/
struct mbox *mirror_state_mbox;
};
struct control * control_create(
struct flexnbd *,
const char * control_socket_name );
void control_signal_close( struct control * );
void control_destroy( struct control * );
struct control *control_create(struct flexnbd *,
const char *control_socket_name);
void control_signal_close(struct control *);
void control_destroy(struct control *);
void * control_runner( void * );
void *control_runner(void *);
void accept_control_connection(struct server* params, int client_fd, union mysockaddr* client_address);
void serve_open_control_socket(struct server* params);
void accept_control_connection(struct server *params, int client_fd,
union mysockaddr *client_address);
void serve_open_control_socket(struct server *params);
#endif

View File

@@ -43,223 +43,206 @@
int flexnbd_build_signal_fd(void)
{
sigset_t mask;
int sfd;
sigset_t mask;
int sfd;
sigemptyset( &mask );
sigaddset( &mask, SIGTERM );
sigaddset( &mask, SIGQUIT );
sigaddset( &mask, SIGINT );
sigemptyset(&mask);
sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGQUIT);
sigaddset(&mask, SIGINT);
FATAL_UNLESS( 0 == pthread_sigmask( SIG_BLOCK, &mask, NULL ),
"Signal blocking failed" );
FATAL_UNLESS(0 == pthread_sigmask(SIG_BLOCK, &mask, NULL),
"Signal blocking failed");
sfd = signalfd( -1, &mask, 0 );
FATAL_IF( -1 == sfd, "Failed to get a signal fd" );
sfd = signalfd(-1, &mask, 0);
FATAL_IF(-1 == sfd, "Failed to get a signal fd");
return sfd;
return sfd;
}
void flexnbd_create_shared(
struct flexnbd * flexnbd,
const char * s_ctrl_sock)
void flexnbd_create_shared(struct flexnbd *flexnbd,
const char *s_ctrl_sock)
{
NULLCHECK( flexnbd );
if ( s_ctrl_sock ){
flexnbd->control =
control_create( flexnbd, s_ctrl_sock );
}
else {
flexnbd->control = NULL;
}
NULLCHECK(flexnbd);
if (s_ctrl_sock) {
flexnbd->control = control_create(flexnbd, s_ctrl_sock);
} else {
flexnbd->control = NULL;
}
flexnbd->signal_fd = flexnbd_build_signal_fd();
flexnbd->signal_fd = flexnbd_build_signal_fd();
}
struct flexnbd * flexnbd_create_serving(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int use_killswitch)
struct flexnbd *flexnbd_create_serving(char *s_ip_address,
char *s_port,
char *s_file,
char *s_ctrl_sock,
int default_deny,
int acl_entries,
char **s_acl_entries,
int max_nbd_clients,
int use_killswitch)
{
struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) );
flexnbd->serve = server_create(
flexnbd,
s_ip_address,
s_port,
s_file,
default_deny,
acl_entries,
s_acl_entries,
max_nbd_clients,
use_killswitch,
1);
flexnbd_create_shared( flexnbd, s_ctrl_sock );
struct flexnbd *flexnbd = xmalloc(sizeof(struct flexnbd));
flexnbd->serve = server_create(flexnbd,
s_ip_address,
s_port,
s_file,
default_deny,
acl_entries,
s_acl_entries,
max_nbd_clients, use_killswitch, 1);
flexnbd_create_shared(flexnbd, s_ctrl_sock);
// Beats installing one handler per client instance
if ( use_killswitch ) {
struct sigaction act = {
.sa_sigaction = client_killswitch_hit,
.sa_flags = SA_RESTART | SA_SIGINFO
};
// Beats installing one handler per client instance
if (use_killswitch) {
struct sigaction act = {
.sa_sigaction = client_killswitch_hit,
.sa_flags = SA_RESTART | SA_SIGINFO
};
FATAL_UNLESS(
0 == sigaction( CLIENT_KILLSWITCH_SIGNAL, &act, NULL ),
"Installing client killswitch signal failed"
);
}
FATAL_UNLESS(0 == sigaction(CLIENT_KILLSWITCH_SIGNAL, &act, NULL),
"Installing client killswitch signal failed");
}
return flexnbd;
return flexnbd;
}
struct flexnbd * flexnbd_create_listening(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries )
struct flexnbd *flexnbd_create_listening(char *s_ip_address,
char *s_port,
char *s_file,
char *s_ctrl_sock,
int default_deny,
int acl_entries,
char **s_acl_entries)
{
struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) );
flexnbd->serve = server_create(
flexnbd,
s_ip_address,
s_port,
s_file,
default_deny,
acl_entries,
s_acl_entries,
1, 0, 0);
flexnbd_create_shared( flexnbd, s_ctrl_sock );
struct flexnbd *flexnbd = xmalloc(sizeof(struct flexnbd));
flexnbd->serve = server_create(flexnbd,
s_ip_address,
s_port,
s_file,
default_deny,
acl_entries, s_acl_entries, 1, 0, 0);
flexnbd_create_shared(flexnbd, s_ctrl_sock);
// listen can't use killswitch, as mirror may pause on sending things
// for a very long time.
// listen can't use killswitch, as mirror may pause on sending things
// for a very long time.
return flexnbd;
return flexnbd;
}
void flexnbd_spawn_control(struct flexnbd * flexnbd )
void flexnbd_spawn_control(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
NULLCHECK( flexnbd->control );
NULLCHECK(flexnbd);
NULLCHECK(flexnbd->control);
pthread_t * control_thread = &flexnbd->control->thread;
pthread_t *control_thread = &flexnbd->control->thread;
FATAL_UNLESS( 0 == pthread_create(
control_thread,
NULL,
control_runner,
flexnbd->control ),
"Couldn't create the control thread" );
FATAL_UNLESS(0 == pthread_create(control_thread,
NULL,
control_runner,
flexnbd->control),
"Couldn't create the control thread");
}
void flexnbd_stop_control( struct flexnbd * flexnbd )
void flexnbd_stop_control(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
NULLCHECK( flexnbd->control );
NULLCHECK(flexnbd);
NULLCHECK(flexnbd->control);
control_signal_close( flexnbd->control );
pthread_t tid = flexnbd->control->thread;
FATAL_UNLESS( 0 == pthread_join( tid, NULL ),
"Failed joining the control thread" );
debug( "Control thread %p pthread_join returned", tid );
control_signal_close(flexnbd->control);
pthread_t tid = flexnbd->control->thread;
FATAL_UNLESS(0 == pthread_join(tid, NULL),
"Failed joining the control thread");
debug("Control thread %p pthread_join returned", tid);
}
int flexnbd_signal_fd( struct flexnbd * flexnbd )
int flexnbd_signal_fd(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
return flexnbd->signal_fd;
NULLCHECK(flexnbd);
return flexnbd->signal_fd;
}
void flexnbd_destroy( struct flexnbd * flexnbd )
void flexnbd_destroy(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
if ( flexnbd->control ) {
control_destroy( flexnbd->control );
}
NULLCHECK(flexnbd);
if (flexnbd->control) {
control_destroy(flexnbd->control);
}
close( flexnbd->signal_fd );
free( flexnbd );
close(flexnbd->signal_fd);
free(flexnbd);
}
struct server * flexnbd_server( struct flexnbd * flexnbd )
struct server *flexnbd_server(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
return flexnbd->serve;
NULLCHECK(flexnbd);
return flexnbd->serve;
}
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl )
void flexnbd_replace_acl(struct flexnbd *flexnbd, struct acl *acl)
{
NULLCHECK( flexnbd );
server_replace_acl( flexnbd_server(flexnbd), acl );
NULLCHECK(flexnbd);
server_replace_acl(flexnbd_server(flexnbd), acl);
}
struct status * flexnbd_status_create( struct flexnbd * flexnbd )
struct status *flexnbd_status_create(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
struct status * status;
NULLCHECK(flexnbd);
struct status *status;
status = status_create( flexnbd_server( flexnbd ) );
return status;
status = status_create(flexnbd_server(flexnbd));
return status;
}
void flexnbd_set_server( struct flexnbd * flexnbd, struct server * serve )
void flexnbd_set_server(struct flexnbd *flexnbd, struct server *serve)
{
NULLCHECK( flexnbd );
flexnbd->serve = serve;
NULLCHECK(flexnbd);
flexnbd->serve = serve;
}
/* Get the default_deny of the current server object. */
int flexnbd_default_deny( struct flexnbd * flexnbd )
int flexnbd_default_deny(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
return server_default_deny( flexnbd->serve );
NULLCHECK(flexnbd);
return server_default_deny(flexnbd->serve);
}
void make_writable( const char * filename )
void make_writable(const char *filename)
{
NULLCHECK( filename );
FATAL_IF_NEGATIVE( chmod( filename, S_IWUSR ),
"Couldn't chmod %s: %s",
filename,
strerror( errno ) );
NULLCHECK(filename);
FATAL_IF_NEGATIVE(chmod(filename, S_IWUSR),
"Couldn't chmod %s: %s", filename, strerror(errno));
}
int flexnbd_serve( struct flexnbd * flexnbd )
int flexnbd_serve(struct flexnbd *flexnbd)
{
NULLCHECK( flexnbd );
int success;
struct self_pipe * open_signal = NULL;
NULLCHECK(flexnbd);
int success;
struct self_pipe *open_signal = NULL;
if ( flexnbd->control ){
debug( "Spawning control thread" );
flexnbd_spawn_control( flexnbd );
open_signal = flexnbd->control->open_signal;
}
if (flexnbd->control) {
debug("Spawning control thread");
flexnbd_spawn_control(flexnbd);
open_signal = flexnbd->control->open_signal;
}
success = do_serve( flexnbd->serve, open_signal );
debug("do_serve success is %d", success );
success = do_serve(flexnbd->serve, open_signal);
debug("do_serve success is %d", success);
if ( flexnbd->control ) {
debug( "Stopping control thread" );
flexnbd_stop_control( flexnbd );
debug("Control thread stopped");
}
if (flexnbd->control) {
debug("Stopping control thread");
flexnbd_stop_control(flexnbd);
debug("Control thread stopped");
}
return success;
return success;
}

View File

@@ -13,54 +13,51 @@
/* Carries the "globals". */
struct flexnbd {
/* Our serve pointer should never be dereferenced outside a
* flexnbd_switch_lock/unlock pair.
*/
struct server * serve;
/* Our serve pointer should never be dereferenced outside a
* flexnbd_switch_lock/unlock pair.
*/
struct server *serve;
/* We only have a control object if a control socket name was
* passed on the command line.
*/
struct control * control;
/* We only have a control object if a control socket name was
* passed on the command line.
*/
struct control *control;
/* File descriptor for a signalfd(2) signal stream. */
int signal_fd;
/* File descriptor for a signalfd(2) signal stream. */
int signal_fd;
};
struct flexnbd * flexnbd_create(void);
struct flexnbd * flexnbd_create_serving(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int use_killswitch);
struct flexnbd *flexnbd_create(void);
struct flexnbd *flexnbd_create_serving(char *s_ip_address,
char *s_port,
char *s_file,
char *s_ctrl_sock,
int default_deny,
int acl_entries,
char **s_acl_entries,
int max_nbd_clients,
int use_killswitch);
struct flexnbd * flexnbd_create_listening(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries );
struct flexnbd *flexnbd_create_listening(char *s_ip_address,
char *s_port,
char *s_file,
char *s_ctrl_sock,
int default_deny,
int acl_entries,
char **s_acl_entries);
void flexnbd_destroy( struct flexnbd * );
void flexnbd_destroy(struct flexnbd *);
enum mirror_state;
enum mirror_state flexnbd_get_mirror_state( struct flexnbd * );
int flexnbd_default_deny( struct flexnbd * );
void flexnbd_set_server( struct flexnbd * flexnbd, struct server * serve );
int flexnbd_signal_fd( struct flexnbd * flexnbd );
enum mirror_state flexnbd_get_mirror_state(struct flexnbd *);
int flexnbd_default_deny(struct flexnbd *);
void flexnbd_set_server(struct flexnbd *flexnbd, struct server *serve);
int flexnbd_signal_fd(struct flexnbd *flexnbd);
int flexnbd_serve( struct flexnbd * flexnbd );
int flexnbd_proxy( struct flexnbd * flexnbd );
struct server * flexnbd_server( struct flexnbd * flexnbd );
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl );
struct status * flexnbd_status_create( struct flexnbd * flexnbd );
int flexnbd_serve(struct flexnbd *flexnbd);
int flexnbd_proxy(struct flexnbd *flexnbd);
struct server *flexnbd_server(struct flexnbd *flexnbd);
void flexnbd_replace_acl(struct flexnbd *flexnbd, struct acl *acl);
struct status *flexnbd_status_create(struct flexnbd *flexnbd);
#endif

View File

@@ -4,72 +4,70 @@
#include <pthread.h>
struct flexthread_mutex * flexthread_mutex_create(void)
struct flexthread_mutex *flexthread_mutex_create(void)
{
struct flexthread_mutex * ftm =
xmalloc( sizeof( struct flexthread_mutex ) );
struct flexthread_mutex *ftm =
xmalloc(sizeof(struct flexthread_mutex));
FATAL_UNLESS( 0 == pthread_mutex_init( &ftm->mutex, NULL ),
"Mutex initialisation failed" );
return ftm;
FATAL_UNLESS(0 == pthread_mutex_init(&ftm->mutex, NULL),
"Mutex initialisation failed");
return ftm;
}
void flexthread_mutex_destroy( struct flexthread_mutex * ftm )
void flexthread_mutex_destroy(struct flexthread_mutex *ftm)
{
NULLCHECK( ftm );
NULLCHECK(ftm);
if( flexthread_mutex_held( ftm ) ) {
flexthread_mutex_unlock( ftm );
}
else if ( (pthread_t)NULL != ftm->holder ) {
/* This "should never happen": if we can try to destroy
* a mutex currently held by another thread, there's a
* logic bug somewhere. I know the test here is racy,
* but there's not a lot we can do about it at this
* point.
*/
fatal( "Attempted to destroy a flexthread_mutex"\
" held by another thread!" );
}
if (flexthread_mutex_held(ftm)) {
flexthread_mutex_unlock(ftm);
} else if ((pthread_t) NULL != ftm->holder) {
/* This "should never happen": if we can try to destroy
* a mutex currently held by another thread, there's a
* logic bug somewhere. I know the test here is racy,
* but there's not a lot we can do about it at this
* point.
*/
fatal("Attempted to destroy a flexthread_mutex"
" held by another thread!");
}
FATAL_UNLESS( 0 == pthread_mutex_destroy( &ftm->mutex ),
"Mutex destroy failed" );
free( ftm );
FATAL_UNLESS(0 == pthread_mutex_destroy(&ftm->mutex),
"Mutex destroy failed");
free(ftm);
}
int flexthread_mutex_lock( struct flexthread_mutex * ftm )
int flexthread_mutex_lock(struct flexthread_mutex *ftm)
{
NULLCHECK( ftm );
NULLCHECK(ftm);
int failure = pthread_mutex_lock( &ftm->mutex );
if ( 0 == failure ) {
ftm->holder = pthread_self();
}
int failure = pthread_mutex_lock(&ftm->mutex);
if (0 == failure) {
ftm->holder = pthread_self();
}
return failure;
return failure;
}
int flexthread_mutex_unlock( struct flexthread_mutex * ftm )
int flexthread_mutex_unlock(struct flexthread_mutex *ftm)
{
NULLCHECK( ftm );
NULLCHECK(ftm);
pthread_t orig = ftm->holder;
ftm->holder = (pthread_t)NULL;
int failure = pthread_mutex_unlock( &ftm->mutex );
if ( 0 != failure ) {
ftm->holder = orig;
}
return failure;
pthread_t orig = ftm->holder;
ftm->holder = (pthread_t) NULL;
int failure = pthread_mutex_unlock(&ftm->mutex);
if (0 != failure) {
ftm->holder = orig;
}
return failure;
}
int flexthread_mutex_held( struct flexthread_mutex * ftm )
int flexthread_mutex_held(struct flexthread_mutex *ftm)
{
NULLCHECK( ftm );
return pthread_self() == ftm->holder;
NULLCHECK(ftm);
return pthread_self() == ftm->holder;
}

View File

@@ -15,15 +15,15 @@
*/
struct flexthread_mutex {
pthread_mutex_t mutex;
pthread_t holder;
pthread_mutex_t mutex;
pthread_t holder;
};
struct flexthread_mutex * flexthread_mutex_create(void);
void flexthread_mutex_destroy( struct flexthread_mutex * );
struct flexthread_mutex *flexthread_mutex_create(void);
void flexthread_mutex_destroy(struct flexthread_mutex *);
int flexthread_mutex_lock( struct flexthread_mutex * );
int flexthread_mutex_unlock( struct flexthread_mutex * );
int flexthread_mutex_held( struct flexthread_mutex * );
int flexthread_mutex_lock(struct flexthread_mutex *);
int flexthread_mutex_unlock(struct flexthread_mutex *);
int flexthread_mutex_held(struct flexthread_mutex *);
#endif

View File

@@ -3,75 +3,75 @@
#include <pthread.h>
struct mbox * mbox_create( void )
struct mbox *mbox_create(void)
{
struct mbox * mbox = xmalloc( sizeof( struct mbox ) );
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->filled_cond, NULL ),
"Failed to initialise a condition variable" );
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->emptied_cond, NULL ),
"Failed to initialise a condition variable" );
FATAL_UNLESS( 0 == pthread_mutex_init( &mbox->mutex, NULL ),
"Failed to initialise a mutex" );
return mbox;
struct mbox *mbox = xmalloc(sizeof(struct mbox));
FATAL_UNLESS(0 == pthread_cond_init(&mbox->filled_cond, NULL),
"Failed to initialise a condition variable");
FATAL_UNLESS(0 == pthread_cond_init(&mbox->emptied_cond, NULL),
"Failed to initialise a condition variable");
FATAL_UNLESS(0 == pthread_mutex_init(&mbox->mutex, NULL),
"Failed to initialise a mutex");
return mbox;
}
void mbox_post( struct mbox * mbox, void * contents )
void mbox_post(struct mbox *mbox, void *contents)
{
pthread_mutex_lock( &mbox->mutex );
{
if (mbox->full){
pthread_cond_wait( &mbox->emptied_cond, &mbox->mutex );
}
mbox->contents = contents;
mbox->full = 1;
while( 0 != pthread_cond_signal( &mbox->filled_cond ) );
pthread_mutex_lock(&mbox->mutex);
{
if (mbox->full) {
pthread_cond_wait(&mbox->emptied_cond, &mbox->mutex);
}
pthread_mutex_unlock( &mbox->mutex );
mbox->contents = contents;
mbox->full = 1;
while (0 != pthread_cond_signal(&mbox->filled_cond));
}
pthread_mutex_unlock(&mbox->mutex);
}
void * mbox_contents( struct mbox * mbox )
void *mbox_contents(struct mbox *mbox)
{
return mbox->contents;
return mbox->contents;
}
int mbox_is_full( struct mbox * mbox )
int mbox_is_full(struct mbox *mbox)
{
return mbox->full;
return mbox->full;
}
void * mbox_receive( struct mbox * mbox )
void *mbox_receive(struct mbox *mbox)
{
NULLCHECK( mbox );
void * result;
NULLCHECK(mbox);
void *result;
pthread_mutex_lock( &mbox->mutex );
{
if ( !mbox->full ) {
pthread_cond_wait( &mbox->filled_cond, &mbox->mutex );
}
mbox->full = 0;
result = mbox->contents;
mbox->contents = NULL;
while( 0 != pthread_cond_signal( &mbox->emptied_cond));
pthread_mutex_lock(&mbox->mutex);
{
if (!mbox->full) {
pthread_cond_wait(&mbox->filled_cond, &mbox->mutex);
}
pthread_mutex_unlock( &mbox->mutex );
mbox->full = 0;
result = mbox->contents;
mbox->contents = NULL;
return result;
while (0 != pthread_cond_signal(&mbox->emptied_cond));
}
pthread_mutex_unlock(&mbox->mutex);
return result;
}
void mbox_destroy( struct mbox * mbox )
void mbox_destroy(struct mbox *mbox)
{
NULLCHECK( mbox );
NULLCHECK(mbox);
while( 0 != pthread_cond_destroy( &mbox->emptied_cond ) );
while( 0 != pthread_cond_destroy( &mbox->filled_cond ) );
while (0 != pthread_cond_destroy(&mbox->emptied_cond));
while (0 != pthread_cond_destroy(&mbox->filled_cond));
while( 0 != pthread_mutex_destroy( &mbox->mutex ) );
while (0 != pthread_mutex_destroy(&mbox->mutex));
free( mbox );
free(mbox);
}

View File

@@ -14,42 +14,42 @@
struct mbox {
void * contents;
void *contents;
/** Marker to tell us if there's content in the box.
* Keeping this separate allows us to use NULL for the contents.
*/
int full;
int full;
/** This gets signaled by mbox_post, and waited on by
* mbox_receive */
pthread_cond_t filled_cond;
pthread_cond_t filled_cond;
/** This is signaled by mbox_receive, and waited on by mbox_post */
pthread_cond_t emptied_cond;
pthread_mutex_t mutex;
pthread_cond_t emptied_cond;
pthread_mutex_t mutex;
};
/* Create an mbox. */
struct mbox * mbox_create(void);
struct mbox *mbox_create(void);
/* Put something in the mbox, blocking if it's already full.
* That something can be NULL if you want.
*/
void mbox_post( struct mbox *, void *);
void mbox_post(struct mbox *, void *);
/* See what's in the mbox. This isn't thread-safe. */
void * mbox_contents( struct mbox *);
void *mbox_contents(struct mbox *);
/* See if anything has been put into the mbox. This isn't thread-safe.
* */
int mbox_is_full( struct mbox *);
int mbox_is_full(struct mbox *);
/* Get the contents from the mbox, blocking if there's nothing there. */
void * mbox_receive( struct mbox *);
void *mbox_receive(struct mbox *);
/* Free the mbox and destroy the associated pthread bits. */
void mbox_destroy( struct mbox *);
void mbox_destroy(struct mbox *);
#endif

File diff suppressed because it is too large Load Diff

View File

@@ -58,65 +58,65 @@ enum mirror_state;
#define MS_REQUEST_LIMIT_SECS_F 60.0
enum mirror_finish_action {
ACTION_EXIT,
ACTION_UNLINK,
ACTION_NOTHING
ACTION_EXIT,
ACTION_UNLINK,
ACTION_NOTHING
};
enum mirror_state {
MS_UNKNOWN,
MS_INIT,
MS_GO,
MS_ABANDONED,
MS_DONE,
MS_FAIL_CONNECT,
MS_FAIL_REJECTED,
MS_FAIL_NO_HELLO,
MS_FAIL_SIZE_MISMATCH
MS_UNKNOWN,
MS_INIT,
MS_GO,
MS_ABANDONED,
MS_DONE,
MS_FAIL_CONNECT,
MS_FAIL_REJECTED,
MS_FAIL_NO_HELLO,
MS_FAIL_SIZE_MISMATCH
};
struct mirror {
pthread_t thread;
pthread_t thread;
/* Signal to this then join the thread if you want to abandon mirroring */
struct self_pipe * abandon_signal;
/* Signal to this then join the thread if you want to abandon mirroring */
struct self_pipe *abandon_signal;
union mysockaddr * connect_to;
union mysockaddr * connect_from;
int client;
const char * filename;
union mysockaddr *connect_to;
union mysockaddr *connect_from;
int client;
const char *filename;
/* Limiter, used to restrict migration speed Only dirty bytes (those going
* over the network) are considered */
uint64_t max_bytes_per_second;
/* Limiter, used to restrict migration speed Only dirty bytes (those going
* over the network) are considered */
uint64_t max_bytes_per_second;
enum mirror_finish_action action_at_finish;
enum mirror_finish_action action_at_finish;
char *mapped;
char *mapped;
/* We need to send every byte at least once; we do so by */
uint64_t offset;
/* We need to send every byte at least once; we do so by */
uint64_t offset;
enum mirror_state commit_state;
enum mirror_state commit_state;
/* commit_signal is sent immediately after attempting to connect
* and checking the remote size, whether successful or not.
*/
struct mbox * commit_signal;
/* commit_signal is sent immediately after attempting to connect
* and checking the remote size, whether successful or not.
*/
struct mbox *commit_signal;
/* The time (from monotonic_time_ms()) the migration was started. Can be
* used to calculate bps, etc. */
uint64_t migration_started;
/* The time (from monotonic_time_ms()) the migration was started. Can be
* used to calculate bps, etc. */
uint64_t migration_started;
/* Running count of all bytes we've transferred */
uint64_t all_dirty;
/* Running count of all bytes we've transferred */
uint64_t all_dirty;
};
struct mirror_super {
struct mirror * mirror;
pthread_t thread;
struct mbox * state_mbox;
struct mirror *mirror;
pthread_t thread;
struct mbox *state_mbox;
};
@@ -127,15 +127,13 @@ struct mirror_super {
struct server;
struct flexnbd;
struct mirror_super * mirror_super_create(
const char * filename,
union mysockaddr * connect_to,
union mysockaddr * connect_from,
uint64_t max_Bps,
enum mirror_finish_action action_at_finish,
struct mbox * state_mbox
);
void * mirror_super_runner( void * serve_uncast );
struct mirror_super *mirror_super_create(const char *filename,
union mysockaddr *connect_to,
union mysockaddr *connect_from,
uint64_t max_Bps,
enum mirror_finish_action
action_at_finish,
struct mbox *state_mbox);
void *mirror_super_runner(void *serve_uncast);
#endif

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -3,20 +3,20 @@
#include <sys/types.h>
#include <unistd.h>
#include <signal.h> /* for sig_atomic_t */
#include <signal.h> /* for sig_atomic_t */
#include "flexnbd.h"
#include "parse.h"
#include "acl.h"
static const int block_allocation_resolution = 4096;//128<<10;
static const int block_allocation_resolution = 4096; //128<<10;
struct client_tbl_entry {
pthread_t thread;
union mysockaddr address;
struct client * client;
pthread_t thread;
union mysockaddr address;
struct client *client;
};
@@ -25,146 +25,143 @@ struct client_tbl_entry {
#define CLIENT_KEEPALIVE_INTVL 10
#define CLIENT_KEEPALIVE_PROBES 3
struct server {
/* The flexnbd wrapper this server is attached to */
struct flexnbd * flexnbd;
/* The flexnbd wrapper this server is attached to */
struct flexnbd *flexnbd;
/** address/port to bind to */
union mysockaddr bind_to;
union mysockaddr bind_to;
/** (static) file name to serve */
char* filename;
char *filename;
/** TCP backlog for listen() */
int tcp_backlog;
int tcp_backlog;
/** (static) file name of UNIX control socket (or NULL if none) */
char* control_socket_name;
char *control_socket_name;
/** size of file */
uint64_t size;
uint64_t size;
/** to interrupt accept loop and clients, write() to close_signal[1] */
struct self_pipe * close_signal;
struct self_pipe *close_signal;
/** access control list */
struct acl * acl;
struct acl *acl;
/** acl_updated_signal will be signalled after the acl struct
* has been replaced
*/
struct self_pipe * acl_updated_signal;
struct self_pipe *acl_updated_signal;
/* Claimed around any updates to the ACL. */
struct flexthread_mutex * l_acl;
/* Claimed around any updates to the ACL. */
struct flexthread_mutex *l_acl;
/* Claimed around starting a mirror so that it doesn't race with
* shutting down on a SIGTERM. */
struct flexthread_mutex * l_start_mirror;
/* Claimed around starting a mirror so that it doesn't race with
* shutting down on a SIGTERM. */
struct flexthread_mutex *l_start_mirror;
struct mirror* mirror;
struct mirror_super * mirror_super;
/* This is used to stop the mirror from starting after we
* receive a SIGTERM */
int mirror_can_start;
struct mirror *mirror;
struct mirror_super *mirror_super;
/* This is used to stop the mirror from starting after we
* receive a SIGTERM */
int mirror_can_start;
int server_fd;
int control_fd;
int server_fd;
int control_fd;
/* the allocation_map keeps track of which blocks in the backing file
* have been allocated, or part-allocated on disc, with unallocated
* blocks presumed to contain zeroes (i.e. represented as sparse files
* by the filesystem). We can use this information when receiving
* incoming writes, and avoid writing zeroes to unallocated sections
* of the file which would needlessly increase disc usage. This
* bitmap will start at all-zeroes for an empty file, and tend towards
* all-ones as the file is written to (i.e. we assume that allocated
* blocks can never become unallocated again, as is the case with ext3
* at least).
*/
struct bitset * allocation_map;
/* when starting up, this thread builds the allocation_map */
pthread_t allocation_map_builder_thread;
/* the allocation_map keeps track of which blocks in the backing file
* have been allocated, or part-allocated on disc, with unallocated
* blocks presumed to contain zeroes (i.e. represented as sparse files
* by the filesystem). We can use this information when receiving
* incoming writes, and avoid writing zeroes to unallocated sections
* of the file which would needlessly increase disc usage. This
* bitmap will start at all-zeroes for an empty file, and tend towards
* all-ones as the file is written to (i.e. we assume that allocated
* blocks can never become unallocated again, as is the case with ext3
* at least).
*/
struct bitset *allocation_map;
/* when starting up, this thread builds the allocation_map */
pthread_t allocation_map_builder_thread;
/* when the thread has finished, it sets this to 1 */
volatile sig_atomic_t allocation_map_built;
volatile sig_atomic_t allocation_map_not_built;
/* when the thread has finished, it sets this to 1 */
volatile sig_atomic_t allocation_map_built;
volatile sig_atomic_t allocation_map_not_built;
int max_nbd_clients;
struct client_tbl_entry *nbd_client;
int max_nbd_clients;
struct client_tbl_entry *nbd_client;
/** Should clients use the killswitch? */
int use_killswitch;
int use_killswitch;
/** If this isn't set, newly accepted clients will be closed immediately */
int allow_new_clients;
int allow_new_clients;
/* Marker for whether this server has control over the data in
* the file, or if we're waiting to receive it from an inbound
* migration which hasn't yet finished.
*
* It's the value which controls the exit status of a serve or
* listen process.
*/
int success;
/* Marker for whether this server has control over the data in
* the file, or if we're waiting to receive it from an inbound
* migration which hasn't yet finished.
*
* It's the value which controls the exit status of a serve or
* listen process.
*/
int success;
};
struct server * server_create(
struct flexnbd * flexnbd,
char* s_ip_address,
char* s_port,
char* s_file,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int use_killswitch,
int success );
void server_destroy( struct server * );
int server_is_closed(struct server* serve);
void serve_signal_close( struct server *serve );
void serve_wait_for_close( struct server * serve );
void server_replace_acl( struct server *serve, struct acl * acl);
void server_control_arrived( struct server *serve );
int server_is_in_control( struct server *serve );
int server_default_deny( struct server * serve );
int server_acl_locked( struct server * serve );
void server_lock_acl( struct server *serve );
void server_unlock_acl( struct server *serve );
void server_lock_start_mirror( struct server *serve );
void server_unlock_start_mirror( struct server *serve );
int server_is_mirroring( struct server * serve );
struct server *server_create(struct flexnbd *flexnbd,
char *s_ip_address,
char *s_port,
char *s_file,
int default_deny,
int acl_entries,
char **s_acl_entries,
int max_nbd_clients,
int use_killswitch, int success);
void server_destroy(struct server *);
int server_is_closed(struct server *serve);
void serve_signal_close(struct server *serve);
void serve_wait_for_close(struct server *serve);
void server_replace_acl(struct server *serve, struct acl *acl);
void server_control_arrived(struct server *serve);
int server_is_in_control(struct server *serve);
int server_default_deny(struct server *serve);
int server_acl_locked(struct server *serve);
void server_lock_acl(struct server *serve);
void server_unlock_acl(struct server *serve);
void server_lock_start_mirror(struct server *serve);
void server_unlock_start_mirror(struct server *serve);
int server_is_mirroring(struct server *serve);
uint64_t server_mirror_bytes_remaining( struct server * serve );
uint64_t server_mirror_eta( struct server * serve );
uint64_t server_mirror_bps( struct server * serve );
uint64_t server_mirror_bytes_remaining(struct server *serve);
uint64_t server_mirror_eta(struct server *serve);
uint64_t server_mirror_bps(struct server *serve);
void server_abandon_mirror( struct server * serve );
void server_prevent_mirror_start( struct server *serve );
void server_allow_mirror_start( struct server *serve );
int server_mirror_can_start( struct server *serve );
void server_abandon_mirror(struct server *serve);
void server_prevent_mirror_start(struct server *serve);
void server_allow_mirror_start(struct server *serve);
int server_mirror_can_start(struct server *serve);
/* These three functions are used by mirror around the final pass, to close
* existing clients and prevent new ones from being around
*/
void server_forbid_new_clients( struct server *serve );
void server_close_clients( struct server *serve );
void server_join_clients( struct server *serve );
void server_allow_new_clients( struct server *serve );
void server_forbid_new_clients(struct server *serve);
void server_close_clients(struct server *serve);
void server_join_clients(struct server *serve);
void server_allow_new_clients(struct server *serve);
/* Returns a count (ish) of the number of currently-running client threads */
int server_count_clients( struct server *params );
int server_count_clients(struct server *params);
void server_unlink( struct server * serve );
void server_unlink(struct server *serve);
int do_serve( struct server *, struct self_pipe * );
int do_serve(struct server *, struct self_pipe *);
struct mode_readwrite_params {
union mysockaddr connect_to;
union mysockaddr connect_from;
union mysockaddr connect_to;
union mysockaddr connect_from;
uint64_t from;
uint32_t len;
uint64_t from;
uint32_t len;
int data_fd;
int client;
int data_fd;
int client;
};
#endif

View File

@@ -2,41 +2,44 @@
#include "serve.h"
#include "util.h"
struct status * status_create( struct server * serve )
struct status *status_create(struct server *serve)
{
NULLCHECK( serve );
struct status * status;
NULLCHECK(serve);
struct status *status;
status = xmalloc( sizeof( struct status ) );
status->pid = getpid();
status->size = serve->size;
status->has_control = serve->success;
status = xmalloc(sizeof(struct status));
status->pid = getpid();
status->size = serve->size;
status->has_control = serve->success;
status->clients_allowed = serve->allow_new_clients;
status->num_clients = server_count_clients( serve );
status->clients_allowed = serve->allow_new_clients;
status->num_clients = server_count_clients(serve);
server_lock_start_mirror( serve );
server_lock_start_mirror(serve);
status->is_mirroring = NULL != serve->mirror;
if ( status->is_mirroring ) {
status->migration_duration = monotonic_time_ms();
status->is_mirroring = NULL != serve->mirror;
if (status->is_mirroring) {
status->migration_duration = monotonic_time_ms();
if ( ( serve->mirror->migration_started ) < status->migration_duration ) {
status->migration_duration -= serve->mirror->migration_started;
} else {
status->migration_duration = 0;
}
status->migration_duration /= 1000;
status->migration_speed = server_mirror_bps( serve );
status->migration_speed_limit = serve->mirror->max_bytes_per_second;
status->migration_seconds_left = server_mirror_eta( serve );
status->migration_bytes_left = server_mirror_bytes_remaining( serve );
if ((serve->mirror->migration_started) <
status->migration_duration) {
status->migration_duration -= serve->mirror->migration_started;
} else {
status->migration_duration = 0;
}
status->migration_duration /= 1000;
status->migration_speed = server_mirror_bps(serve);
status->migration_speed_limit =
serve->mirror->max_bytes_per_second;
server_unlock_start_mirror( serve );
status->migration_seconds_left = server_mirror_eta(serve);
status->migration_bytes_left =
server_mirror_bytes_remaining(serve);
}
return status;
server_unlock_start_mirror(serve);
return status;
}
@@ -48,33 +51,32 @@ struct status * status_create( struct server * serve )
#define PRINT_UINT64( var ) \
do{dprintf( fd, #var "=%"PRIu64" ", status->var );}while(0)
int status_write( struct status * status, int fd )
int status_write(struct status *status, int fd)
{
PRINT_INT( pid );
PRINT_UINT64( size );
PRINT_BOOL( is_mirroring );
PRINT_BOOL( clients_allowed );
PRINT_INT( num_clients );
PRINT_BOOL( has_control );
PRINT_INT(pid);
PRINT_UINT64(size);
PRINT_BOOL(is_mirroring);
PRINT_BOOL(clients_allowed);
PRINT_INT(num_clients);
PRINT_BOOL(has_control);
if ( status->is_mirroring ) {
PRINT_UINT64( migration_speed );
PRINT_UINT64( migration_duration );
PRINT_UINT64( migration_seconds_left );
PRINT_UINT64( migration_bytes_left );
if ( status->migration_speed_limit < UINT64_MAX ) {
PRINT_UINT64( migration_speed_limit );
};
}
if (status->is_mirroring) {
PRINT_UINT64(migration_speed);
PRINT_UINT64(migration_duration);
PRINT_UINT64(migration_seconds_left);
PRINT_UINT64(migration_bytes_left);
if (status->migration_speed_limit < UINT64_MAX) {
PRINT_UINT64(migration_speed_limit);
};
}
dprintf(fd, "\n");
return 1;
dprintf(fd, "\n");
return 1;
}
void status_destroy( struct status * status )
void status_destroy(struct status *status)
{
NULLCHECK( status );
free( status );
NULLCHECK(status);
free(status);
}

View File

@@ -75,30 +75,29 @@
#include <unistd.h>
struct status {
pid_t pid;
uint64_t size;
int has_control;
int clients_allowed;
int num_clients;
int is_mirroring;
pid_t pid;
uint64_t size;
int has_control;
int clients_allowed;
int num_clients;
int is_mirroring;
uint64_t migration_duration;
uint64_t migration_speed;
uint64_t migration_speed_limit;
uint64_t migration_seconds_left;
uint64_t migration_bytes_left;
uint64_t migration_duration;
uint64_t migration_speed;
uint64_t migration_speed_limit;
uint64_t migration_seconds_left;
uint64_t migration_bytes_left;
};
/** Create a status object for the given server. */
struct status * status_create( struct server * );
struct status *status_create(struct server *);
/** Output the given status object to the given file descriptot */
int status_write( struct status *, int fd );
int status_write(struct status *, int fd);
/** Free the status object */
void status_destroy( struct status * );
void status_destroy(struct status *);
#endif