Moved the server-specific files into src/server

This commit is contained in:
Alex Young
2014-03-11 11:05:43 +00:00
parent 5960e4d10b
commit c41eeff2fc
19 changed files with 0 additions and 0 deletions

108
src/server/acl.c Normal file
View File

@@ -0,0 +1,108 @@
#include <stdlib.h>
#include "util.h"
#include "parse.h"
#include "acl.h"
struct acl * acl_create( int len, char ** lines, int default_deny )
{
struct acl * acl;
acl = (struct acl *)xmalloc( sizeof( struct acl ) );
acl->len = parse_acl( &acl->entries, len, lines );
acl->default_deny = default_deny;
return acl;
}
static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 };
/** Test whether AF_INET or AF_INET6 sockaddr is included in the given access
* control list, returning 1 if it is, and 0 if not.
*/
static int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], union mysockaddr* test)
{
NULLCHECK( test );
int i;
for (i=0; i < list_length; i++) {
struct ip_and_mask *entry = &(*list)[i];
int testbits;
unsigned char *raw_address1 = NULL, *raw_address2 = NULL;
debug("checking acl entry %d (%d/%d)", i, test->generic.sa_family, entry->ip.family);
if (test->generic.sa_family != entry->ip.family) {
continue;
}
if (test->generic.sa_family == AF_INET) {
debug("it's an AF_INET");
raw_address1 = (unsigned char*) &test->v4.sin_addr;
raw_address2 = (unsigned char*) &entry->ip.v4.sin_addr;
}
else if (test->generic.sa_family == AF_INET6) {
debug("it's an AF_INET6");
raw_address1 = (unsigned char*) &test->v6.sin6_addr;
raw_address2 = (unsigned char*) &entry->ip.v6.sin6_addr;
}
else {
fatal( "Can't check an ACL for this address type." );
}
debug("testbits=%d", entry->mask);
for (testbits = entry->mask; testbits > 0; testbits -= 8) {
debug("testbits=%d, c1=%02x, c2=%02x", testbits, raw_address1[0], raw_address2[0]);
if (testbits >= 8) {
if (raw_address1[0] != raw_address2[0]) { goto no_match; }
}
else {
if ((raw_address1[0] & testmasks[testbits%8]) !=
(raw_address2[0] & testmasks[testbits%8]) ) {
goto no_match;
}
}
raw_address1++;
raw_address2++;
}
return 1;
no_match: ;
debug("no match");
}
return 0;
}
int acl_includes( struct acl * acl, union mysockaddr * addr )
{
NULLCHECK( acl );
if ( 0 == acl->len ) {
return !( acl->default_deny );
}
else {
return is_included_in_acl( acl->len, acl->entries, addr );
}
}
int acl_default_deny( struct acl * acl )
{
NULLCHECK( acl );
return acl->default_deny;
}
void acl_destroy( struct acl * acl )
{
free( acl->entries );
acl->len = 0;
acl->entries = NULL;
free( acl );
}

37
src/server/acl.h Normal file
View File

@@ -0,0 +1,37 @@
#ifndef ACL_H
#define ACL_H
#include "parse.h"
struct acl {
int len;
int default_deny;
struct ip_and_mask (*entries)[];
};
/** Allocate a new acl structure, parsing the given lines to sockaddr
* structures in the process. After allocation, acl->len might not
* equal len. In that case, there was an error in parsing and acl->len
* will be the index of the failed entry in lines.
*
* default_deny controls the behaviour of an empty list: if true, all
* requests will be denied. If true, all requests will be accepted.
*/
struct acl * acl_create( int len, char **lines, int default_deny );
/** Check to see whether an address is allowed by an acl.
* See acl_create for how the default_deny setting affects this.
*/
int acl_includes( struct acl *, union mysockaddr *);
/** Get the default_deny status */
int acl_default_deny( struct acl * );
/** Free the acl structure and the internal acl entries table.
*/
void acl_destroy( struct acl * );
#endif

707
src/server/client.c Normal file
View File

@@ -0,0 +1,707 @@
#include "client.h"
#include "serve.h"
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include "bitset.h"
#include "nbdtypes.h"
#include "self_pipe.h"
#include <sys/mman.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
// When this signal is invoked, we call shutdown() on the client fd, which
// results in the thread being wound up
void client_killswitch_hit(int signal __attribute__ ((unused)), siginfo_t *info, void *ptr __attribute__ ((unused)))
{
int fd = info->si_value.sival_int;
warn( "Killswitch for fd %i activated, calling shutdown on socket", fd );
FATAL_IF(
-1 == shutdown( fd, SHUT_RDWR ),
SHOW_ERRNO( "Failed to shutdown() the socket, killing the server" )
);
}
struct client *client_create( struct server *serve, int socket )
{
NULLCHECK( serve );
struct client *c;
struct sigevent evp = {
.sigev_notify = SIGEV_SIGNAL,
.sigev_signo = CLIENT_KILLSWITCH_SIGNAL
};
/*
* Our killswitch closes this socket, forcing read() and write() calls
* blocked on it to return with an error. The thread then close()s the
* socket itself, avoiding races.
*/
evp.sigev_value.sival_int = socket;
c = xmalloc( sizeof( struct client ) );
c->stopped = 0;
c->socket = socket;
c->serve = serve;
c->stop_signal = self_pipe_create();
FATAL_IF_NEGATIVE(
timer_create( CLOCK_MONOTONIC, &evp, &(c->killswitch) ),
SHOW_ERRNO( "Failed to create killswitch timer" )
);
debug( "Alloced client %p with socket %d", c, socket );
return c;
}
void client_signal_stop( struct client *c)
{
NULLCHECK( c);
debug("client %p: signal stop (%d, %d)", c,c->stop_signal->read_fd, c->stop_signal->write_fd );
self_pipe_signal( c->stop_signal );
}
void client_destroy( struct client *client )
{
NULLCHECK( client );
FATAL_IF_NEGATIVE(
timer_delete( client->killswitch ),
SHOW_ERRNO( "Couldn't delete killswitch" )
);
debug( "Destroying stop signal for client %p", client );
self_pipe_destroy( client->stop_signal );
debug( "Freeing client %p", client );
free( client );
}
/**
* So waiting on client->socket is len bytes of data, and we must write it all
* to client->mapped. However while doing do we must consult the bitmap
* client->serve->allocation_map, which is a bitmap where one bit represents
* block_allocation_resolution bytes. Where a bit isn't set, there are no
* disc blocks allocated for that portion of the file, and we'd like to keep
* it that way.
*
* If the bitmap shows that every block in our prospective write is already
* allocated, we can proceed as normal and make one call to writeloop.
*
*/
void write_not_zeroes(struct client* client, uint64_t from, uint64_t len)
{
NULLCHECK( client );
NULLCHECK( client->serve );
NULLCHECK( client->serve->allocation_map );
struct bitset * map = client->serve->allocation_map;
while (len > 0) {
/* so we have to calculate how much of our input to consider
* next based on the bitmap of allocated blocks. This will be
* at a coarser resolution than the actual write, which may
* not fall on a block boundary at either end. So we look up
* how many blocks our write covers, then cut off the start
* and end to get the exact number of bytes.
*/
uint64_t run = bitset_run_count(map, from, len);
debug("write_not_zeroes: from=%ld, len=%d, run=%d", from, len, run);
if (run > len) {
run = len;
debug("(run adjusted to %d)", run);
}
/*
// Useful but expensive
if (0)
{
uint64_t i;
fprintf(stderr, "full map resolution=%d: ", map->resolution);
for (i=0; i<client->serve->size; i+=map->resolution) {
int here = (from >= i && from < i+map->resolution);
if (here) { fprintf(stderr, ">"); }
fprintf(stderr, bitset_is_set_at(map, i) ? "1" : "0");
if (here) { fprintf(stderr, "<"); }
}
fprintf(stderr, "\n");
}
*/
#define DO_READ(dst, len) ERROR_IF_NEGATIVE( \
readloop( \
client->socket, \
(dst), \
(len) \
), \
"read failed %ld+%d", from, (len) \
)
if (bitset_is_set_at(map, from)) {
debug("writing the lot: from=%ld, run=%d", from, run);
/* already allocated, just write it all */
DO_READ(client->mapped + from, run);
/* We know from our earlier call to bitset_run_count that the
* bitset is all-1s at this point, but we need to dirty it for the
* sake of the event stream - the actual bytes have changed, and we
* are interested in that fact.
*/
bitset_set_range( map, from, run );
len -= run;
from += run;
}
else {
char zerobuffer[block_allocation_resolution];
/* not allocated, read in block_allocation_resoution */
while (run > 0) {
uint64_t blockrun = block_allocation_resolution -
(from % block_allocation_resolution);
if (blockrun > run)
blockrun = run;
DO_READ(zerobuffer, blockrun);
/* This reads the buffer twice in the worst case
* but we're leaning on memcmp failing early
* and memcpy being fast, rather than try to
* hand-optimized something specific.
*/
int all_zeros = (zerobuffer[0] == 0) &&
(0 == memcmp( zerobuffer, zerobuffer+1, blockrun-1 ));
if ( !all_zeros ) {
memcpy(client->mapped+from, zerobuffer, blockrun);
bitset_set_range(map, from, blockrun);
/* at this point we could choose to
* short-cut the rest of the write for
* faster I/O but by continuing to do it
* the slow way we preserve as much
* sparseness as possible.
*/
}
/* When the block is all_zeroes, no bytes have changed, so we
* don't need to put an event into the bitset stream. This may
* be surprising in the future.
*/
len -= blockrun;
run -= blockrun;
from += blockrun;
}
}
}
}
int fd_read_request( int fd, struct nbd_request_raw *out_request)
{
return readloop(fd, out_request, sizeof(struct nbd_request_raw));
}
/* Returns 1 if *request was filled with a valid request which we should
* try to honour. 0 otherwise. */
int client_read_request( struct client * client , struct nbd_request *out_request, int * disconnected )
{
NULLCHECK( client );
NULLCHECK( out_request );
struct nbd_request_raw request_raw;
if (fd_read_request(client->socket, &request_raw) == -1) {
*disconnected = 1;
switch( errno ){
case 0:
debug( "EOF while reading request" );
return 0;
case ECONNRESET:
debug( "Connection reset while"
" reading request" );
return 0;
default:
/* FIXME: I've seen this happen, but I
* couldn't reproduce it so I'm leaving
* it here with a better debug output in
* the hope it'll spontaneously happen
* again. It should *probably* be an
* error() call, but I want to be sure.
* */
fatal("Error reading request: %d, %s",
errno,
strerror( errno ));
}
}
nbd_r2h_request( &request_raw, out_request );
return 1;
}
int fd_write_reply( int fd, char *handle, int error )
{
struct nbd_reply reply;
struct nbd_reply_raw reply_raw;
reply.magic = REPLY_MAGIC;
reply.error = error;
memcpy( reply.handle, handle, 8 );
nbd_h2r_reply( &reply, &reply_raw );
debug( "Replying with handle=0x%08X, error=%"PRIu32, handle, error );
if( -1 == writeloop( fd, &reply_raw, sizeof( reply_raw ) ) ) {
switch( errno ) {
case ECONNRESET:
error( "Connection reset while writing reply" );
break;
case EBADF:
fatal( "Tried to write to an invalid file descriptor" );
break;
case EPIPE:
error( "Remote end closed" );
break;
default:
fatal( "Unhandled error while writing: %d", errno );
}
}
return 1;
}
/* Writes a reply to request *request, with error, to the client's
* socket.
* Returns 1; we don't check for errors on the write.
* TODO: Check for errors on the write.
*/
int client_write_reply( struct client * client, struct nbd_request *request, int error )
{
return fd_write_reply( client->socket, request->handle, error);
}
void client_write_init( struct client * client, uint64_t size )
{
struct nbd_init init = {{0}};
struct nbd_init_raw init_raw = {{0}};
memcpy( init.passwd, INIT_PASSWD, sizeof( init.passwd ) );
init.magic = INIT_MAGIC;
init.size = size;
memset( init.reserved, 0, 128 );
nbd_h2r_init( &init, &init_raw );
ERROR_IF_NEGATIVE(
writeloop(client->socket, &init_raw, sizeof(init_raw)),
"Couldn't send hello"
);
}
/* Remove len bytes from the client socket. This is needed when the
* client sends a write we can't honour - we need to get rid of the
* bytes they've already written before we can look for another request.
*/
void client_flush( struct client * client, size_t len )
{
int devnull = open("/dev/null", O_WRONLY);
FATAL_IF_NEGATIVE( devnull,
"Couldn't open /dev/null: %s", strerror(errno));
int pipes[2];
pipe( pipes );
const unsigned int flags = SPLICE_F_MORE | SPLICE_F_MOVE;
size_t spliced = 0;
while ( spliced < len ) {
ssize_t received = splice(
client->socket, NULL,
pipes[1], NULL,
len-spliced, flags );
FATAL_IF_NEGATIVE( received,
"splice error: %s",
strerror(errno));
ssize_t junked = 0;
while( junked < received ) {
ssize_t junk;
junk = splice(
pipes[0], NULL,
devnull, NULL,
received, flags );
FATAL_IF_NEGATIVE( junk,
"splice error: %s",
strerror(errno));
junked += junk;
}
spliced += received;
}
debug("Flushed %d bytes", len);
close( devnull );
}
/* Check to see if the client's request needs a reply constructing.
* Returns 1 if we do, 0 otherwise.
* request_err is set to 0 if the client sent a bad request, in which
* case we drop the connection.
*/
int client_request_needs_reply( struct client * client,
struct nbd_request request )
{
/* The client is stupid, but don't take down the whole server as a result.
* We send a reply before disconnecting so that at least some indication of
* the problem is visible, and so proxies don't retry the same (bad) request
* forever.
*/
if (request.magic != REQUEST_MAGIC) {
warn("Bad magic 0x%08X from client", request.magic);
client_write_reply( client, &request, EBADMSG );
client->disconnect = 1; // no need to flush
return 0;
}
debug(
"request type=%"PRIu32", from=%"PRIu64", len=%"PRIu32", handle=0x%08X",
request.type, request.from, request.len, request.handle
);
/* check it's not out of range */
if ( request.from+request.len > client->serve->size) {
warn("write request %"PRIu64"+%"PRIu32" out of range",
request.from, request.len
);
if ( request.type == REQUEST_WRITE ) {
client_flush( client, request.len );
}
client_write_reply( client, &request, EPERM ); /* TODO: Change to ERANGE ? */
client->disconnect = 0;
return 0;
}
switch (request.type)
{
case REQUEST_READ:
break;
case REQUEST_WRITE:
break;
case REQUEST_DISCONNECT:
debug("request disconnect");
client->disconnect = 1;
return 0;
default:
fatal("Unknown request 0x%08X", request.type);
}
return 1;
}
void client_reply_to_read( struct client* client, struct nbd_request request )
{
off64_t offset;
debug("request read %ld+%d", request.from, request.len);
sock_set_tcp_cork( client->socket, 1 );
client_write_reply( client, &request, 0 );
offset = request.from;
/* If we get cut off partway through this sendfile, we don't
* want to kill the server. This should be an error.
*/
ERROR_IF_NEGATIVE(
sendfileloop(
client->socket,
client->fileno,
&offset,
request.len),
"sendfile failed from=%ld, len=%d",
offset,
request.len);
sock_set_tcp_cork( client->socket, 0 );
}
void client_reply_to_write( struct client* client, struct nbd_request request )
{
debug("request write from=%"PRIu64", len=%"PRIu32", handle=0x%08X", request.from, request.len, request.handle);
if (client->serve->allocation_map_built) {
write_not_zeroes( client, request.from, request.len );
}
else {
debug("No allocation map, writing directly.");
/* If we get cut off partway through reading this data:
* */
ERROR_IF_NEGATIVE(
readloop( client->socket,
client->mapped + request.from,
request.len),
"reading write data failed from=%ld, len=%d",
request.from,
request.len
);
/* the allocation_map is shared between client threads, and may be
* being built. We need to reflect the write in it, as it may be in
* a position the builder has already gone over.
*/
bitset_set_range(client->serve->allocation_map, request.from, request.len);
}
if (1) /* not sure whether this is necessary... */
{
/* multiple of 4K page size */
uint64_t from_rounded = request.from & (!0xfff);
uint64_t len_rounded = request.len + (request.from - from_rounded);
FATAL_IF_NEGATIVE(
msync( client->mapped + from_rounded,
len_rounded,
MS_SYNC | MS_INVALIDATE),
"msync failed %ld %ld", request.from, request.len
);
}
client_write_reply( client, &request, 0);
}
void client_reply( struct client* client, struct nbd_request request )
{
switch (request.type) {
case REQUEST_READ:
client_reply_to_read( client, request );
break;
case REQUEST_WRITE:
client_reply_to_write( client, request );
break;
}
}
/* Starts a timer that will kill the whole process if disarm is not called
* within a timeout (see CLIENT_HANDLE_TIMEOUT).
*/
void client_arm_killswitch( struct client* client )
{
struct itimerspec its = {
.it_value = { .tv_nsec = 0, .tv_sec = CLIENT_HANDLER_TIMEOUT },
.it_interval = { .tv_nsec = 0, .tv_sec = 0 }
};
if ( !client->serve->use_killswitch ) {
return;
}
debug( "Arming killswitch" );
FATAL_IF_NEGATIVE(
timer_settime( client->killswitch, 0, &its, NULL ),
SHOW_ERRNO( "Failed to arm killswitch" )
);
return;
}
void client_disarm_killswitch( struct client* client )
{
struct itimerspec its = {
.it_value = { .tv_nsec = 0, .tv_sec = 0 },
.it_interval = { .tv_nsec = 0, .tv_sec = 0 }
};
if ( !client->serve->use_killswitch ) {
return;
}
debug( "Disarming killswitch" );
FATAL_IF_NEGATIVE(
timer_settime( client->killswitch, 0, &its, NULL ),
SHOW_ERRNO( "Failed to disarm killswitch" )
);
return;
}
/* Returns 0 if we should continue trying to serve requests */
int client_serve_request(struct client* client)
{
struct nbd_request request = {0};
int stop = 1;
int disconnected = 0;
fd_set rfds, efds;
int fd_count;
/* wait until there are some bytes on the fd before committing to reads
* FIXME: this whole scheme is broken because we're using blocking reads.
* read() can block directly after a select anyway, and it's possible that,
* without the killswitch, we'd hang forever. With the killswitch, we just
* hang for "a while". The Right Thing to do is to rewrite client.c to be
* non-blocking.
*/
FD_ZERO( &rfds );
FD_SET( client->socket, &rfds );
self_pipe_fd_set( client->stop_signal, &rfds );
FD_ZERO( &efds );
FD_SET( client->socket, &efds );
fd_count = sock_try_select( FD_SETSIZE, &rfds, NULL, &efds, NULL );
if ( fd_count == 0 ) {
/* This "can't ever happen" */
fatal( "No FDs selected, and no timeout!" );
}
else if ( fd_count < 0 ) { fatal( "Select failed" ); }
if ( self_pipe_fd_isset( client->stop_signal, &rfds ) ){
debug("Client received stop signal.");
return 1; // Don't try to serve more requests
}
if ( FD_ISSET( client->socket, &efds ) ) {
debug( "Client connection closed" );
return 1;
}
/* We arm / disarm around the whole request cycle. The reason for this is
* that the remote peer could uncleanly die at any point; if we're stuck on
* a blocking read(), then that will hang for (almost) forever. This is bad
* in general, makes the server respond only to kill -9, and breaks
* outward mirroring in a most unpleasant way.
*
* Don't forget to disarm before exiting, no matter what!
*
* The replication is simple: open a connection to the flexnbd server, write
* a single byte, and then wait.
*
*/
client_arm_killswitch( client );
if ( !client_read_request( client, &request, &disconnected ) ) {
client_disarm_killswitch( client );
return stop;
}
if ( disconnected ) {
client_disarm_killswitch( client );
return stop;
}
if ( !client_request_needs_reply( client, request ) ) {
client_disarm_killswitch( client );
return client->disconnect;
}
{
if ( !server_is_closed( client->serve ) ) {
client_reply( client, request );
stop = 0;
}
}
client_disarm_killswitch( client );
return stop;
}
void client_send_hello(struct client* client)
{
client_write_init( client, client->serve->size );
}
void client_cleanup(struct client* client,
int fatal __attribute__ ((unused)) )
{
info("client cleanup for client %p", client);
/* If the thread hits an error, we need to ensure this is off */
client_disarm_killswitch( client );
if (client->socket) {
FATAL_IF_NEGATIVE( close(client->socket),
"Error closing client socket %d",
client->socket );
debug("Closed client socket fd %d", client->socket);
client->socket = -1;
}
if (client->mapped) {
munmap(client->mapped, client->serve->size);
}
if (client->fileno) {
FATAL_IF_NEGATIVE( close(client->fileno),
"Error closing file %d",
client->fileno );
debug("Closed client file fd %d", client->fileno );
client->fileno = -1;
}
if ( server_acl_locked( client->serve ) ) { server_unlock_acl( client->serve ); }
}
void* client_serve(void* client_uncast)
{
struct client* client = (struct client*) client_uncast;
error_set_handler((cleanup_handler*) client_cleanup, client);
info("client: mmaping file");
FATAL_IF_NEGATIVE(
open_and_mmap(
client->serve->filename,
&client->fileno,
NULL,
(void**) &client->mapped
),
"Couldn't open/mmap file %s: %s", client->serve->filename, strerror( errno )
);
FATAL_IF_NEGATIVE(
madvise( client->mapped, client->serve->size, MADV_RANDOM ),
SHOW_ERRNO( "Failed to madvise() %s", client->serve->filename )
);
debug( "Opened client file fd %d", client->fileno);
debug("client: sending hello");
client_send_hello(client);
debug("client: serving requests");
while (client_serve_request(client) == 0)
;
debug("client: stopped serving requests");
client->stopped = 1;
if ( client->disconnect ){
debug("client: control arrived" );
server_control_arrived( client->serve );
}
debug("Cleaning client %p up normally in thread %p", client, pthread_self());
client_cleanup(client, 0);
debug("Client thread done" );
return NULL;
}

56
src/server/client.h Normal file
View File

@@ -0,0 +1,56 @@
#ifndef CLIENT_H
#define CLIENT_H
#include <signal.h>
#include <time.h>
/** CLIENT_HANDLER_TIMEOUT
* This is the length of time (in seconds) any request can be outstanding for.
* If we spend longer than this in a request, the whole server is killed.
*/
#define CLIENT_HANDLER_TIMEOUT 120
/** CLIENT_KILLSWITCH_SIGNAL
* The signal number we use to kill the server when *any* killswitch timer
* fires. The handler gets the fd of the client socket to work with.
*/
#define CLIENT_KILLSWITCH_SIGNAL ( SIGRTMIN + 1 )
struct client {
/* When we call pthread_join, if the thread is already dead
* we can get an ESRCH. Since we have no other way to tell
* if that ESRCH is from a dead thread or a thread that never
* existed, we use a `stopped` flag to indicate a thread which
* did exist, but went away. Only check this after a
* pthread_join call.
*/
int stopped;
int socket;
int fileno;
char* mapped;
struct self_pipe * stop_signal;
struct server* serve; /* FIXME: remove above duplication */
/* Have we seen a REQUEST_DISCONNECT message? */
int disconnect;
/* kill the whole server if a request has been outstanding too long,
* assuming use_killswitch is set in serve
*/
timer_t killswitch;
};
void client_killswitch_hit(int signal, siginfo_t *info, void *ptr);
void* client_serve(void* client_uncast);
struct client * client_create( struct server * serve, int socket );
void client_destroy( struct client * client );
void client_signal_stop( struct client * client );
#endif

633
src/server/control.c Normal file
View File

@@ -0,0 +1,633 @@
/* FlexNBD server (C) Bytemark Hosting 2012
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** The control server responds on a UNIX socket and services our "remote"
* commands which are used for changing the access control list, initiating
* a mirror process, or asking for status. The protocol is pretty simple -
* after connecting the client sends a series of LF-terminated lines, followed
* by a blank line (i.e. double LF). The first line is taken to be the command
* name to invoke, and the lines before the double LF are its arguments.
*
* These commands can be invoked remotely from the command line, with the
* client code to be found in remote.c
*/
#include "control.h"
#include "mirror.h"
#include "serve.h"
#include "util.h"
#include "ioutil.h"
#include "parse.h"
#include "readwrite.h"
#include "bitset.h"
#include "self_pipe.h"
#include "acl.h"
#include "status.h"
#include "mbox.h"
#include <stdlib.h>
#include <string.h>
#include <sys/un.h>
#include <unistd.h>
struct control * control_create(
struct flexnbd * flexnbd,
const char * csn)
{
struct control * control = xmalloc( sizeof( struct control ) );
NULLCHECK( csn );
control->flexnbd = flexnbd;
control->socket_name = csn;
control->open_signal = self_pipe_create();
control->close_signal = self_pipe_create();
control->mirror_state_mbox = mbox_create();
return control;
}
void control_signal_close( struct control * control)
{
NULLCHECK( control );
self_pipe_signal( control->close_signal );
}
void control_destroy( struct control * control )
{
NULLCHECK( control );
mbox_destroy( control->mirror_state_mbox );
self_pipe_destroy( control->close_signal );
self_pipe_destroy( control->open_signal );
free( control );
}
struct control_client * control_client_create(
struct flexnbd * flexnbd,
int client_fd ,
struct mbox * state_mbox )
{
NULLCHECK( flexnbd );
struct control_client * control_client =
xmalloc( sizeof( struct control_client ) );
control_client->socket = client_fd;
control_client->flexnbd = flexnbd;
control_client->mirror_state_mbox = state_mbox;
return control_client;
}
void control_client_destroy( struct control_client * client )
{
NULLCHECK( client );
free( client );
}
void control_respond(struct control_client * client);
void control_handle_client( struct control * control, int client_fd )
{
NULLCHECK( control );
NULLCHECK( control->flexnbd );
struct control_client * control_client =
control_client_create(
control->flexnbd,
client_fd ,
control->mirror_state_mbox);
/* We intentionally don't spawn a thread for the client here.
* This is to avoid having more than one thread potentially
* waiting on the migration commit status.
*/
control_respond( control_client );
}
void control_accept_client( struct control * control )
{
int client_fd;
union mysockaddr client_address;
socklen_t addrlen = sizeof( union mysockaddr );
client_fd = accept( control->control_fd, &client_address.generic, &addrlen );
FATAL_IF( -1 == client_fd, "control accept failed" );
control_handle_client( control, client_fd );
}
int control_accept( struct control * control )
{
NULLCHECK( control );
fd_set fds;
FD_ZERO( &fds );
FD_SET( control->control_fd, &fds );
self_pipe_fd_set( control->close_signal, &fds );
debug("Control thread selecting");
FATAL_UNLESS( 0 < select( FD_SETSIZE, &fds, NULL, NULL, NULL ),
"Control select failed." );
if ( self_pipe_fd_isset( control->close_signal, &fds ) ){
return 0;
}
if ( FD_ISSET( control->control_fd, &fds ) ) {
control_accept_client( control );
}
return 1;
}
void control_accept_loop( struct control * control )
{
while( control_accept( control ) );
}
int open_control_socket( const char * socket_name )
{
struct sockaddr_un bind_address;
int control_fd;
if (!socket_name) {
fatal( "Tried to open a control socket without a socket name" );
}
control_fd = socket(AF_UNIX, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE(control_fd ,
"Couldn't create control socket");
memset(&bind_address, 0, sizeof(struct sockaddr_un));
bind_address.sun_family = AF_UNIX;
strncpy(bind_address.sun_path, socket_name, sizeof(bind_address.sun_path)-1);
//unlink(socket_name); /* ignore failure */
FATAL_IF_NEGATIVE(
bind(control_fd , &bind_address, sizeof(bind_address)),
"Couldn't bind control socket to %s: %s",
socket_name, strerror( errno )
);
FATAL_IF_NEGATIVE(
listen(control_fd , 5),
"Couldn't listen on control socket"
);
return control_fd;
}
void control_listen(struct control* control)
{
NULLCHECK( control );
control->control_fd = open_control_socket( control->socket_name );
}
void control_wait_for_open_signal( struct control * control )
{
fd_set fds;
FD_ZERO( &fds );
self_pipe_fd_set( control->open_signal, &fds );
FATAL_IF_NEGATIVE( select( FD_SETSIZE, &fds, NULL, NULL, NULL ),
"select() failed" );
self_pipe_signal_clear( control->open_signal );
}
void control_serve( struct control * control )
{
NULLCHECK( control );
control_wait_for_open_signal( control );
control_listen( control );
while( control_accept( control ) );
}
void control_cleanup(
struct control * control,
int fatal __attribute__((unused)) )
{
NULLCHECK( control );
unlink( control->socket_name );
close( control->control_fd );
}
void * control_runner( void * control_uncast )
{
debug("Control thread");
NULLCHECK( control_uncast );
struct control * control = (struct control *)control_uncast;
error_set_handler( (cleanup_handler*)control_cleanup, control );
control_serve( control );
control_cleanup( control, 0 );
pthread_exit( NULL );
}
#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1)
void control_write_mirror_response( enum mirror_state mirror_state, int client_fd )
{
switch (mirror_state) {
case MS_INIT:
case MS_UNKNOWN:
write_socket( "1: Mirror failed to initialise" );
fatal( "Impossible mirror state: %d", mirror_state );
case MS_FAIL_CONNECT:
write_socket( "1: Mirror failed to connect");
break;
case MS_FAIL_REJECTED:
write_socket( "1: Mirror was rejected" );
break;
case MS_FAIL_NO_HELLO:
write_socket( "1: Remote server failed to respond");
break;
case MS_FAIL_SIZE_MISMATCH:
write_socket( "1: Remote size does not match local size" );
break;
case MS_ABANDONED:
write_socket( "1: Mirroring abandoned" );
break;
case MS_GO:
case MS_DONE: /* Yes, I know we know better, but it's simpler this way */
write_socket( "0: Mirror started" );
break;
default:
fatal( "Unhandled mirror state: %d", mirror_state );
}
}
#undef write_socket
/* Call this in the thread where you want to receive the mirror state */
enum mirror_state control_client_mirror_wait(
struct control_client* client)
{
NULLCHECK( client );
NULLCHECK( client->mirror_state_mbox );
struct mbox * mbox = client->mirror_state_mbox;
enum mirror_state mirror_state;
enum mirror_state * contents;
contents = (enum mirror_state*)mbox_receive( mbox );
NULLCHECK( contents );
mirror_state = *contents;
free( contents );
return mirror_state;
}
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
/** Command parser to start mirror process from socket input */
int control_mirror(struct control_client* client, int linesc, char** lines)
{
NULLCHECK( client );
struct flexnbd * flexnbd = client->flexnbd;
union mysockaddr *connect_to = xmalloc( sizeof( union mysockaddr ) );
union mysockaddr *connect_from = NULL;
uint64_t max_Bps = UINT64_MAX;
int action_at_finish;
int raw_port;
if (linesc < 2) {
write_socket("1: mirror takes at least two parameters");
return -1;
}
if (parse_ip_to_sockaddr(&connect_to->generic, lines[0]) == 0) {
write_socket("1: bad IP address");
return -1;
}
raw_port = atoi(lines[1]);
if (raw_port < 0 || raw_port > 65535) {
write_socket("1: bad IP port number");
return -1;
}
connect_to->v4.sin_port = htobe16(raw_port);
action_at_finish = ACTION_EXIT;
if (linesc > 2) {
if (strcmp("exit", lines[2]) == 0) {
action_at_finish = ACTION_EXIT;
}
else if (strcmp( "unlink", lines[2]) == 0 ) {
action_at_finish = ACTION_UNLINK;
}
else if (strcmp("nothing", lines[2]) == 0) {
action_at_finish = ACTION_NOTHING;
}
else {
write_socket("1: action must be 'exit' or 'nothing'");
return -1;
}
}
if (linesc > 3) {
connect_from = xmalloc( sizeof( union mysockaddr ) );
if (parse_ip_to_sockaddr(&connect_from->generic, lines[3]) == 0) {
write_socket("1: bad bind address");
return -1;
}
}
if (linesc > 4) {
errno = 0;
max_Bps = strtoull( lines[4], NULL, 10 );
if ( errno == ERANGE ) {
write_socket( "1: max_bps out of range" );
return -1;
} else if ( errno != 0 ) {
write_socket( "1: max_bps couldn't be parsed" );
return -1;
}
}
if (linesc > 5) {
write_socket("1: unrecognised parameters to mirror");
return -1;
}
struct server * serve = flexnbd_server(flexnbd);
server_lock_start_mirror( serve );
{
if ( server_mirror_can_start( serve ) ) {
serve->mirror_super = mirror_super_create(
serve->filename,
connect_to,
connect_from,
max_Bps ,
action_at_finish,
client->mirror_state_mbox );
serve->mirror = serve->mirror_super->mirror;
server_prevent_mirror_start( serve );
} else {
if ( serve->mirror_super ) {
warn( "Tried to start a second mirror run" );
write_socket( "1: mirror already running" );
} else {
warn( "Cannot start mirroring, shutting down" );
write_socket( "1: shutting down" );
}
}
}
server_unlock_start_mirror( serve );
/* Do this outside the lock to minimise the length of time the
* sighandler can block the serve thread
*/
if ( serve->mirror_super ) {
FATAL_IF( 0 != pthread_create(
&serve->mirror_super->thread,
NULL,
mirror_super_runner,
serve
),
"Failed to create mirror thread"
);
debug("Control thread mirror super waiting");
enum mirror_state state =
control_client_mirror_wait( client );
debug("Control thread writing response");
control_write_mirror_response( state, client->socket );
}
debug( "Control thread going away." );
return 0;
}
int control_mirror_max_bps( struct control_client* client, int linesc, char** lines )
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
struct server* serve = flexnbd_server( client->flexnbd );
uint64_t max_Bps;
if ( !serve->mirror_super ) {
write_socket( "1: Not currently mirroring" );
return -1;
}
if ( linesc != 1 ) {
write_socket( "1: Bad format" );
return -1;
}
errno = 0;
max_Bps = strtoull( lines[0], NULL, 10 );
if ( errno == ERANGE ) {
write_socket( "1: max_bps out of range" );
return -1;
} else if ( errno != 0 ) {
write_socket( "1: max_bps couldn't be parsed" );
return -1;
}
serve->mirror->max_bytes_per_second = max_Bps;
write_socket( "0: updated" );
return 0;
}
#undef write_socket
/** Command parser to alter access control list from socket input */
int control_acl(struct control_client* client, int linesc, char** lines)
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
struct flexnbd * flexnbd = client->flexnbd;
int default_deny = flexnbd_default_deny( flexnbd );
struct acl * new_acl = acl_create( linesc, lines, default_deny );
if (new_acl->len != linesc) {
warn("Bad ACL spec: %s", lines[new_acl->len] );
write(client->socket, "1: bad spec: ", 13);
write(client->socket, lines[new_acl->len],
strlen(lines[new_acl->len]));
write(client->socket, "\n", 1);
acl_destroy( new_acl );
}
else {
flexnbd_replace_acl( flexnbd, new_acl );
info("ACL set");
write( client->socket, "0: updated\n", 11);
}
return 0;
}
int control_break(
struct control_client* client,
int linesc __attribute__ ((unused)),
char** lines __attribute__((unused))
)
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
int result = 0;
struct flexnbd* flexnbd = client->flexnbd;
struct server * serve = flexnbd_server( flexnbd );
server_lock_start_mirror( serve );
{
if ( server_is_mirroring( serve ) ) {
info( "Signaling to abandon mirror" );
server_abandon_mirror( serve );
debug( "Abandon signaled" );
if ( server_is_closed( serve ) ) {
info( "Mirror completed while canceling" );
write( client->socket,
"1: mirror completed\n", 20 );
}
else {
info( "Mirror successfully stopped." );
write( client->socket,
"0: mirror stopped\n", 18 );
result = 1;
}
} else {
warn( "Not mirroring." );
write( client->socket, "1: not mirroring\n", 17 );
}
}
server_unlock_start_mirror( serve );
return result;
}
/** FIXME: add some useful statistics */
int control_status(
struct control_client* client,
int linesc __attribute__ ((unused)),
char** lines __attribute__((unused))
)
{
NULLCHECK( client );
NULLCHECK( client->flexnbd );
struct status * status = flexnbd_status_create( client->flexnbd );
write( client->socket, "0: ", 3 );
status_write( status, client->socket );
status_destroy( status );
return 0;
}
void control_client_cleanup(struct control_client* client,
int fatal __attribute__ ((unused)) )
{
if (client->socket) { close(client->socket); }
/* This is wrongness */
if ( server_acl_locked( client->flexnbd->serve ) ) { server_unlock_acl( client->flexnbd->serve ); }
control_client_destroy( client );
}
/** Master command parser for control socket connections, delegates quickly */
void control_respond(struct control_client * client)
{
char **lines = NULL;
error_set_handler((cleanup_handler*) control_client_cleanup, client);
int i, linesc;
linesc = read_lines_until_blankline(client->socket, 256, &lines);
if (linesc < 1)
{
write(client->socket, "9: missing command\n", 19);
/* ignore failure */
}
else if (strcmp(lines[0], "acl") == 0) {
info("acl command received" );
if (control_acl(client, linesc-1, lines+1) < 0) {
debug("acl command failed");
}
}
else if (strcmp(lines[0], "mirror") == 0) {
info("mirror command received" );
if (control_mirror(client, linesc-1, lines+1) < 0) {
debug("mirror command failed");
}
}
else if (strcmp(lines[0], "break") == 0) {
info( "break command received" );
if ( control_break( client, linesc-1, lines+1) < 0) {
debug( "break command failed" );
}
}
else if (strcmp(lines[0], "status") == 0) {
info("status command received" );
if (control_status(client, linesc-1, lines+1) < 0) {
debug("status command failed");
}
} else if ( strcmp( lines[0], "mirror_max_bps" ) == 0 ) {
info( "mirror_max_bps command received" );
if( control_mirror_max_bps( client, linesc-1, lines+1 ) < 0 ) {
debug( "mirror_max_bps command failed" );
}
}
else {
write(client->socket, "10: unknown command\n", 23);
}
for (i=0; i<linesc; i++) {
free(lines[i]);
}
free(lines);
control_client_cleanup(client, 0);
debug("control command handled" );
}

59
src/server/control.h Normal file
View File

@@ -0,0 +1,59 @@
#ifndef CONTROL_H
#define CONTROL_H
/* We need this to avoid a complaint about struct server * in
* void accept_control_connection
*/
struct server;
#include "parse.h"
#include "mirror.h"
#include "serve.h"
#include "flexnbd.h"
#include "mbox.h"
struct control {
struct flexnbd * flexnbd;
int control_fd;
const char * socket_name;
pthread_t thread;
struct self_pipe * open_signal;
struct self_pipe * close_signal;
/* This is owned by the control object, and used by a
* mirror_super to communicate the state of a mirror attempt as
* early as feasible. It can't be owned by the mirror_super
* object because the mirror_super object can be freed at any
* time (including while the control_client is waiting on it),
* whereas the control object lasts for the lifetime of the
* process (and we can only have a mirror thread if the control
* thread has started it).
*/
struct mbox * mirror_state_mbox;
};
struct control_client{
int socket;
struct flexnbd * flexnbd;
/* Passed in on creation. We know it's all right to do this
* because we know there's only ever one control_client.
*/
struct mbox * mirror_state_mbox;
};
struct control * control_create(
struct flexnbd *,
const char * control_socket_name );
void control_signal_close( struct control * );
void control_destroy( struct control * );
void * control_runner( void * );
void accept_control_connection(struct server* params, int client_fd, union mysockaddr* client_address);
void serve_open_control_socket(struct server* params);
#endif

265
src/server/flexnbd.c Normal file
View File

@@ -0,0 +1,265 @@
/* FlexNBD server (C) Bytemark Hosting 2012
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** main() function for parsing and dispatching commands. Each mode has
* a corresponding structure which is filled in and passed to a do_ function
* elsewhere in the program.
*/
#include "flexnbd.h"
#include "serve.h"
#include "util.h"
#include "control.h"
#include "status.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/signalfd.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <getopt.h>
#include "acl.h"
int flexnbd_build_signal_fd(void)
{
sigset_t mask;
int sfd;
sigemptyset( &mask );
sigaddset( &mask, SIGTERM );
sigaddset( &mask, SIGQUIT );
sigaddset( &mask, SIGINT );
FATAL_UNLESS( 0 == pthread_sigmask( SIG_BLOCK, &mask, NULL ),
"Signal blocking failed" );
sfd = signalfd( -1, &mask, 0 );
FATAL_IF( -1 == sfd, "Failed to get a signal fd" );
return sfd;
}
void flexnbd_create_shared(
struct flexnbd * flexnbd,
const char * s_ctrl_sock)
{
NULLCHECK( flexnbd );
if ( s_ctrl_sock ){
flexnbd->control =
control_create( flexnbd, s_ctrl_sock );
}
else {
flexnbd->control = NULL;
}
flexnbd->signal_fd = flexnbd_build_signal_fd();
}
struct flexnbd * flexnbd_create_serving(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int use_killswitch)
{
struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) );
flexnbd->serve = server_create(
flexnbd,
s_ip_address,
s_port,
s_file,
default_deny,
acl_entries,
s_acl_entries,
max_nbd_clients,
use_killswitch,
1);
flexnbd_create_shared( flexnbd, s_ctrl_sock );
// Beats installing one handler per client instance
if ( use_killswitch ) {
struct sigaction act = {
.sa_sigaction = client_killswitch_hit,
.sa_flags = SA_RESTART | SA_SIGINFO
};
FATAL_UNLESS(
0 == sigaction( CLIENT_KILLSWITCH_SIGNAL, &act, NULL ),
"Installing client killswitch signal failed"
);
}
return flexnbd;
}
struct flexnbd * flexnbd_create_listening(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries )
{
struct flexnbd * flexnbd = xmalloc( sizeof( struct flexnbd ) );
flexnbd->serve = server_create(
flexnbd,
s_ip_address,
s_port,
s_file,
default_deny,
acl_entries,
s_acl_entries,
1, 0, 0);
flexnbd_create_shared( flexnbd, s_ctrl_sock );
// listen can't use killswitch, as mirror may pause on sending things
// for a very long time.
return flexnbd;
}
void flexnbd_spawn_control(struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
NULLCHECK( flexnbd->control );
pthread_t * control_thread = &flexnbd->control->thread;
FATAL_UNLESS( 0 == pthread_create(
control_thread,
NULL,
control_runner,
flexnbd->control ),
"Couldn't create the control thread" );
}
void flexnbd_stop_control( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
NULLCHECK( flexnbd->control );
control_signal_close( flexnbd->control );
pthread_t tid = flexnbd->control->thread;
FATAL_UNLESS( 0 == pthread_join( tid, NULL ),
"Failed joining the control thread" );
debug( "Control thread %p pthread_join returned", tid );
}
int flexnbd_signal_fd( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
return flexnbd->signal_fd;
}
void flexnbd_destroy( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
if ( flexnbd->control ) {
control_destroy( flexnbd->control );
}
close( flexnbd->signal_fd );
free( flexnbd );
}
struct server * flexnbd_server( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
return flexnbd->serve;
}
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl )
{
NULLCHECK( flexnbd );
server_replace_acl( flexnbd_server(flexnbd), acl );
}
struct status * flexnbd_status_create( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
struct status * status;
status = status_create( flexnbd_server( flexnbd ) );
return status;
}
void flexnbd_set_server( struct flexnbd * flexnbd, struct server * serve )
{
NULLCHECK( flexnbd );
flexnbd->serve = serve;
}
/* Get the default_deny of the current server object. */
int flexnbd_default_deny( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
return server_default_deny( flexnbd->serve );
}
void make_writable( const char * filename )
{
NULLCHECK( filename );
FATAL_IF_NEGATIVE( chmod( filename, S_IWUSR ),
"Couldn't chmod %s: %s",
filename,
strerror( errno ) );
}
int flexnbd_serve( struct flexnbd * flexnbd )
{
NULLCHECK( flexnbd );
int success;
struct self_pipe * open_signal = NULL;
if ( flexnbd->control ){
debug( "Spawning control thread" );
flexnbd_spawn_control( flexnbd );
open_signal = flexnbd->control->open_signal;
}
success = do_serve( flexnbd->serve, open_signal );
debug("do_serve success is %d", success );
if ( flexnbd->control ) {
debug( "Stopping control thread" );
flexnbd_stop_control( flexnbd );
debug("Control thread stopped");
}
return success;
}

66
src/server/flexnbd.h Normal file
View File

@@ -0,0 +1,66 @@
#ifndef FLEXNBD_H
#define FLEXNBD_H
#include "acl.h"
#include "mirror.h"
#include "serve.h"
#include "proxy.h"
#include "client.h"
#include "self_pipe.h"
#include "mbox.h"
#include "control.h"
#include "flexthread.h"
/* Carries the "globals". */
struct flexnbd {
/* Our serve pointer should never be dereferenced outside a
* flexnbd_switch_lock/unlock pair.
*/
struct server * serve;
/* We only have a control object if a control socket name was
* passed on the command line.
*/
struct control * control;
/* File descriptor for a signalfd(2) signal stream. */
int signal_fd;
};
struct flexnbd * flexnbd_create(void);
struct flexnbd * flexnbd_create_serving(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int use_killswitch);
struct flexnbd * flexnbd_create_listening(
char* s_ip_address,
char* s_port,
char* s_file,
char* s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries );
void flexnbd_destroy( struct flexnbd * );
enum mirror_state;
enum mirror_state flexnbd_get_mirror_state( struct flexnbd * );
int flexnbd_default_deny( struct flexnbd * );
void flexnbd_set_server( struct flexnbd * flexnbd, struct server * serve );
int flexnbd_signal_fd( struct flexnbd * flexnbd );
int flexnbd_serve( struct flexnbd * flexnbd );
int flexnbd_proxy( struct flexnbd * flexnbd );
struct server * flexnbd_server( struct flexnbd * flexnbd );
void flexnbd_replace_acl( struct flexnbd * flexnbd, struct acl * acl );
struct status * flexnbd_status_create( struct flexnbd * flexnbd );
#endif

75
src/server/flexthread.c Normal file
View File

@@ -0,0 +1,75 @@
#include "flexthread.h"
#include "util.h"
#include <pthread.h>
struct flexthread_mutex * flexthread_mutex_create(void)
{
struct flexthread_mutex * ftm =
xmalloc( sizeof( struct flexthread_mutex ) );
FATAL_UNLESS( 0 == pthread_mutex_init( &ftm->mutex, NULL ),
"Mutex initialisation failed" );
return ftm;
}
void flexthread_mutex_destroy( struct flexthread_mutex * ftm )
{
NULLCHECK( ftm );
if( flexthread_mutex_held( ftm ) ) {
flexthread_mutex_unlock( ftm );
}
else if ( (pthread_t)NULL != ftm->holder ) {
/* This "should never happen": if we can try to destroy
* a mutex currently held by another thread, there's a
* logic bug somewhere. I know the test here is racy,
* but there's not a lot we can do about it at this
* point.
*/
fatal( "Attempted to destroy a flexthread_mutex"\
" held by another thread!" );
}
FATAL_UNLESS( 0 == pthread_mutex_destroy( &ftm->mutex ),
"Mutex destroy failed" );
free( ftm );
}
int flexthread_mutex_lock( struct flexthread_mutex * ftm )
{
NULLCHECK( ftm );
int failure = pthread_mutex_lock( &ftm->mutex );
if ( 0 == failure ) {
ftm->holder = pthread_self();
}
return failure;
}
int flexthread_mutex_unlock( struct flexthread_mutex * ftm )
{
NULLCHECK( ftm );
pthread_t orig = ftm->holder;
ftm->holder = (pthread_t)NULL;
int failure = pthread_mutex_unlock( &ftm->mutex );
if ( 0 != failure ) {
ftm->holder = orig;
}
return failure;
}
int flexthread_mutex_held( struct flexthread_mutex * ftm )
{
NULLCHECK( ftm );
return pthread_self() == ftm->holder;
}

29
src/server/flexthread.h Normal file
View File

@@ -0,0 +1,29 @@
#ifndef FLEXTHREAD_H
#define FLEXTHREAD_H
#include <pthread.h>
/* Define a mutex wrapper object. This wrapper allows us to easily
* track whether or not we currently hold the wrapped mutex. If we hold
* the mutex when we destroy it, then we first release it.
*
* These are specifically for the case where an ERROR_* handler gets
* called when we might (or might not) have a mutex held. The
* flexthread_mutex_held() function will tell you if your thread
* currently holds the given mutex. It's not safe to make any other
* comparisons.
*/
struct flexthread_mutex {
pthread_mutex_t mutex;
pthread_t holder;
};
struct flexthread_mutex * flexthread_mutex_create(void);
void flexthread_mutex_destroy( struct flexthread_mutex * );
int flexthread_mutex_lock( struct flexthread_mutex * );
int flexthread_mutex_unlock( struct flexthread_mutex * );
int flexthread_mutex_held( struct flexthread_mutex * );
#endif

77
src/server/mbox.c Normal file
View File

@@ -0,0 +1,77 @@
#include "mbox.h"
#include "util.h"
#include <pthread.h>
struct mbox * mbox_create( void )
{
struct mbox * mbox = xmalloc( sizeof( struct mbox ) );
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->filled_cond, NULL ),
"Failed to initialise a condition variable" );
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->emptied_cond, NULL ),
"Failed to initialise a condition variable" );
FATAL_UNLESS( 0 == pthread_mutex_init( &mbox->mutex, NULL ),
"Failed to initialise a mutex" );
return mbox;
}
void mbox_post( struct mbox * mbox, void * contents )
{
pthread_mutex_lock( &mbox->mutex );
{
if (mbox->full){
pthread_cond_wait( &mbox->emptied_cond, &mbox->mutex );
}
mbox->contents = contents;
mbox->full = 1;
while( 0 != pthread_cond_signal( &mbox->filled_cond ) );
}
pthread_mutex_unlock( &mbox->mutex );
}
void * mbox_contents( struct mbox * mbox )
{
return mbox->contents;
}
int mbox_is_full( struct mbox * mbox )
{
return mbox->full;
}
void * mbox_receive( struct mbox * mbox )
{
NULLCHECK( mbox );
void * result;
pthread_mutex_lock( &mbox->mutex );
{
if ( !mbox->full ) {
pthread_cond_wait( &mbox->filled_cond, &mbox->mutex );
}
mbox->full = 0;
result = mbox->contents;
mbox->contents = NULL;
while( 0 != pthread_cond_signal( &mbox->emptied_cond));
}
pthread_mutex_unlock( &mbox->mutex );
return result;
}
void mbox_destroy( struct mbox * mbox )
{
NULLCHECK( mbox );
while( 0 != pthread_cond_destroy( &mbox->emptied_cond ) );
while( 0 != pthread_cond_destroy( &mbox->filled_cond ) );
while( 0 != pthread_mutex_destroy( &mbox->mutex ) );
free( mbox );
}

55
src/server/mbox.h Normal file
View File

@@ -0,0 +1,55 @@
#ifndef MBOX_H
#define MBOX_H
/** mbox
* A thread sync object. Put a void * into the mbox in one thread, and
* get it out in another. The receiving thread will block if there's
* nothing in the mbox, and the sending thread will block if there is.
* The mbox doesn't assume any responsibility for the pointer it's
* passed - you must free it yourself if it's malloced.
*/
#include <pthread.h>
struct mbox {
void * contents;
/** Marker to tell us if there's content in the box.
* Keeping this separate allows us to use NULL for the contents.
*/
int full;
/** This gets signaled by mbox_post, and waited on by
* mbox_receive */
pthread_cond_t filled_cond;
/** This is signaled by mbox_receive, and waited on by mbox_post */
pthread_cond_t emptied_cond;
pthread_mutex_t mutex;
};
/* Create an mbox. */
struct mbox * mbox_create(void);
/* Put something in the mbox, blocking if it's already full.
* That something can be NULL if you want.
*/
void mbox_post( struct mbox *, void *);
/* See what's in the mbox. This isn't thread-safe. */
void * mbox_contents( struct mbox *);
/* See if anything has been put into the mbox. This isn't thread-safe.
* */
int mbox_is_full( struct mbox *);
/* Get the contents from the mbox, blocking if there's nothing there. */
void * mbox_receive( struct mbox *);
/* Free the mbox and destroy the associated pthread bits. */
void mbox_destroy( struct mbox *);
#endif

1071
src/server/mirror.c Normal file

File diff suppressed because it is too large Load Diff

141
src/server/mirror.h Normal file
View File

@@ -0,0 +1,141 @@
#ifndef MIRROR_H
#define MIRROR_H
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "bitset.h"
#include "self_pipe.h"
enum mirror_state;
#include "serve.h"
#include "mbox.h"
/* MS_CONNECT_TIME_SECS
* The length of time after which the sender will assume a connect() to
* the destination has failed.
*/
#define MS_CONNECT_TIME_SECS 60
/* MS_MAX_DOWNTIME_SECS
* The length of time a migration must be estimated to have remaining for us to
* disconnect clients for convergence
*
* TODO: Make this configurable so refusing-to-converge clients can be manually
* fixed.
* TODO: Make this adaptive - 5 seconds is fine, as long as we can guarantee
* that all migrations will be able to converge in time. We'd add a new
* state between open and closed, where gradually-increasing latency is
* added to client requests to allow the mirror to be faster.
*/
#define MS_CONVERGE_TIME_SECS 5
/* MS_HELLO_TIME_SECS
* The length of time the sender will wait for the NBD hello message
* after connect() before aborting the connection attempt.
*/
#define MS_HELLO_TIME_SECS 5
/* MS_RETRY_DELAY_SECS
* The delay after a failed migration attempt before launching another
* thread to try again.
*/
#define MS_RETRY_DELAY_SECS 1
/* MS_REQUEST_LIMIT_SECS
* We must receive a reply to a request within this time. For a read
* request, this is the time between the end of the NBD request and the
* start of the NBD reply. For a write request, this is the time
* between the end of the written data and the start of the NBD reply.
* Can be overridden by the environment variable:
* FLEXNBD_MS_REQUEST_LIMIT_SECS
*/
#define MS_REQUEST_LIMIT_SECS 60
#define MS_REQUEST_LIMIT_SECS_F 60.0
enum mirror_finish_action {
ACTION_EXIT,
ACTION_UNLINK,
ACTION_NOTHING
};
enum mirror_state {
MS_UNKNOWN,
MS_INIT,
MS_GO,
MS_ABANDONED,
MS_DONE,
MS_FAIL_CONNECT,
MS_FAIL_REJECTED,
MS_FAIL_NO_HELLO,
MS_FAIL_SIZE_MISMATCH
};
struct mirror {
pthread_t thread;
/* Signal to this then join the thread if you want to abandon mirroring */
struct self_pipe * abandon_signal;
union mysockaddr * connect_to;
union mysockaddr * connect_from;
int client;
const char * filename;
/* Limiter, used to restrict migration speed Only dirty bytes (those going
* over the network) are considered */
uint64_t max_bytes_per_second;
enum mirror_finish_action action_at_finish;
char *mapped;
/* We need to send every byte at least once; we do so by */
uint64_t offset;
enum mirror_state commit_state;
/* commit_signal is sent immediately after attempting to connect
* and checking the remote size, whether successful or not.
*/
struct mbox * commit_signal;
/* The time (from monotonic_time_ms()) the migration was started. Can be
* used to calculate bps, etc. */
uint64_t migration_started;
/* Running count of all bytes we've transferred */
uint64_t all_dirty;
};
struct mirror_super {
struct mirror * mirror;
pthread_t thread;
struct mbox * state_mbox;
};
/* We need these declaration to get around circular dependencies in the
* .h's
*/
struct server;
struct flexnbd;
struct mirror_super * mirror_super_create(
const char * filename,
union mysockaddr * connect_to,
union mysockaddr * connect_from,
uint64_t max_Bps,
enum mirror_finish_action action_at_finish,
struct mbox * state_mbox
);
void * mirror_super_runner( void * serve_uncast );
#endif

884
src/server/mode.c Normal file
View File

@@ -0,0 +1,884 @@
#include "mode.h"
#include "flexnbd.h"
#include <getopt.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
static struct option serve_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_FILE,
GETOPT_SOCK,
GETOPT_DENY,
GETOPT_QUIET,
GETOPT_KILLSWITCH,
GETOPT_VERBOSE,
{0}
};
static char serve_short_options[] = "hl:p:f:s:dk" SOPT_QUIET SOPT_VERBOSE;
static char serve_help_text[] =
"Usage: flexnbd " CMD_SERVE " <options> [<acl address>*]\n\n"
"Serve FILE from ADDR:PORT, with an optional control socket at SOCK.\n\n"
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to serve on.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to serve on.\n"
"\t--" OPT_FILE ",-f <FILE>\tThe file to serve.\n"
"\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n"
"\t--" OPT_KILLSWITCH",-k \tKill the server if a request takes 120 seconds.\n"
SOCK_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option listen_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_FILE,
GETOPT_SOCK,
GETOPT_DENY,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char listen_short_options[] = "hl:p:f:s:d" SOPT_QUIET SOPT_VERBOSE;
static char listen_help_text[] =
"Usage: flexnbd " CMD_LISTEN " <options> [<acl_address>*]\n\n"
"Listen for an incoming migration on ADDR:PORT."
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to listen on.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to listen on.\n"
"\t--" OPT_FILE ",-f <FILE>\tThe file to serve.\n"
"\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n"
SOCK_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option read_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_FROM,
GETOPT_SIZE,
GETOPT_BIND,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char read_short_options[] = "hl:p:F:S:b:" SOPT_QUIET SOPT_VERBOSE;
static char read_help_text[] =
"Usage: flexnbd " CMD_READ " <options>\n\n"
"Read SIZE bytes from a server at ADDR:PORT to stdout, starting at OFFSET.\n\n"
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to read from.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to read from.\n"
"\t--" OPT_FROM ",-F <OFFSET>\tByte offset to read from.\n"
"\t--" OPT_SIZE ",-S <SIZE>\tBytes to read.\n"
BIND_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option *write_options = read_options;
static char *write_short_options = read_short_options;
static char write_help_text[] =
"Usage: flexnbd " CMD_WRITE" <options>\n\n"
"Write SIZE bytes from stdin to a server at ADDR:PORT, starting at OFFSET.\n\n"
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to write to.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to write to.\n"
"\t--" OPT_FROM ",-F <OFFSET>\tByte offset to write from.\n"
"\t--" OPT_SIZE ",-S <SIZE>\tBytes to write.\n"
BIND_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option acl_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char acl_short_options[] = "hs:" SOPT_QUIET SOPT_VERBOSE;
static char acl_help_text[] =
"Usage: flexnbd " CMD_ACL " <options> [<acl address>+]\n\n"
"Set the access control list for a server with control socket SOCK.\n\n"
HELP_LINE
SOCK_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option mirror_speed_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
GETOPT_MAX_SPEED,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char mirror_speed_short_options[] = "hs:m:" SOPT_QUIET SOPT_VERBOSE;
static char mirror_speed_help_text[] =
"Usage: flexnbd " CMD_MIRROR_SPEED " <options>\n\n"
"Set the maximum speed of a migration from a mirring server listening on SOCK.\n\n"
HELP_LINE
SOCK_LINE
MAX_SPEED_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option mirror_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_UNLINK,
GETOPT_BIND,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char mirror_short_options[] = "hs:l:p:ub:" SOPT_QUIET SOPT_VERBOSE;
static char mirror_help_text[] =
"Usage: flexnbd " CMD_MIRROR " <options>\n\n"
"Start mirroring from the server with control socket SOCK to one at ADDR:PORT.\n\n"
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to mirror to.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to mirror to.\n"
SOCK_LINE
"\t--" OPT_UNLINK ",-u\tUnlink the local file when done.\n"
BIND_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option break_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char break_short_options[] = "hs:" SOPT_QUIET SOPT_VERBOSE;
static char break_help_text[] =
"Usage: flexnbd " CMD_BREAK " <options>\n\n"
"Stop mirroring from the server with control socket SOCK.\n\n"
HELP_LINE
SOCK_LINE
VERBOSE_LINE
QUIET_LINE;
static struct option status_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
};
static char status_short_options[] = "hs:" SOPT_QUIET SOPT_VERBOSE;
static char status_help_text[] =
"Usage: flexnbd " CMD_STATUS " <options>\n\n"
"Get the status for a server with control socket SOCK.\n\n"
HELP_LINE
SOCK_LINE
VERBOSE_LINE
QUIET_LINE;
char help_help_text_arr[] =
"Usage: flexnbd <cmd> [cmd options]\n\n"
"Commands:\n"
"\tflexnbd serve\n"
"\tflexnbd listen\n"
"\tflexnbd read\n"
"\tflexnbd write\n"
"\tflexnbd acl\n"
"\tflexnbd mirror\n"
"\tflexnbd mirror-speed\n"
"\tflexnbd break\n"
"\tflexnbd status\n"
"\tflexnbd help\n\n"
"See flexnbd help <cmd> for further info\n";
/* Slightly odd array/pointer pair to stop the compiler from complaining
* about symbol sizes
*/
char * help_help_text = help_help_text_arr;
void do_read(struct mode_readwrite_params* params);
void do_write(struct mode_readwrite_params* params);
void do_remote_command(char* command, char* mode, int argc, char** argv);
void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char **sock, int *default_deny, int *use_killswitch )
{
switch(c){
case 'h':
fprintf(stdout, "%s\n", serve_help_text );
exit( 0 );
case 'l':
*ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
case 'f':
*file = optarg;
break;
case 's':
*sock = optarg;
break;
case 'd':
*default_deny = 1;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
case 'k':
*use_killswitch = 1;
break;
default:
exit_err( serve_help_text );
break;
}
}
void read_listen_param( int c,
char **ip_addr,
char **ip_port,
char **file,
char **sock,
int *default_deny )
{
switch(c){
case 'h':
fprintf(stdout, "%s\n", listen_help_text );
exit(0);
case 'l':
*ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
case 'f':
*file = optarg;
break;
case 's':
*sock = optarg;
break;
case 'd':
*default_deny = 1;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( listen_help_text );
break;
}
}
void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **bind_addr, char **from, char **size, char *err_text )
{
switch(c){
case 'h':
fprintf(stdout, "%s\n", err_text );
exit( 0 );
case 'l':
*ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
case 'F':
*from = optarg;
break;
case 'S':
*size = optarg;
break;
case 'b':
*bind_addr = optarg;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( err_text );
break;
}
}
void read_sock_param( int c, char **sock, char *help_text )
{
switch(c){
case 'h':
fprintf( stdout, "%s\n", help_text );
exit( 0 );
case 's':
*sock = optarg;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( help_text );
break;
}
}
void read_acl_param( int c, char **sock )
{
read_sock_param( c, sock, acl_help_text );
}
void read_mirror_speed_param(
int c,
char **sock,
char **max_speed
)
{
switch( c ) {
case 'h':
fprintf( stdout, "%s\n", mirror_speed_help_text );
exit( 0 );
case 's':
*sock = optarg;
break;
case 'm':
*max_speed = optarg;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( mirror_speed_help_text );
break;
}
}
void read_mirror_param(
int c,
char **sock,
char **ip_addr,
char **ip_port,
int *unlink,
char **bind_addr )
{
switch( c ){
case 'h':
fprintf( stdout, "%s\n", mirror_help_text );
exit( 0 );
case 's':
*sock = optarg;
break;
case 'l':
*ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
case 'u':
*unlink = 1;
break;
case 'b':
*bind_addr = optarg;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( mirror_help_text );
break;
}
}
void read_break_param( int c, char **sock )
{
switch( c ) {
case 'h':
fprintf( stdout, "%s\n", break_help_text );
exit( 0 );
case 's':
*sock = optarg;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
case 'v':
log_level = VERBOSE_LOG_LEVEL;
break;
default:
exit_err( break_help_text );
break;
}
}
void read_status_param( int c, char **sock )
{
read_sock_param( c, sock, status_help_text );
}
int mode_serve( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *ip_port = NULL;
char *file = NULL;
char *sock = NULL;
int default_deny = 0; // not on by default
int use_killswitch = 0;
int err = 0;
int success;
struct flexnbd * flexnbd;
while (1) {
c = getopt_long(argc, argv, serve_short_options, serve_options, NULL);
if ( c == -1 ) { break; }
read_serve_param( c, &ip_addr, &ip_port, &file, &sock, &default_deny, &use_killswitch );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == file ) {
err = 1;
fprintf( stderr, "--file is required\n" );
}
if ( err ) { exit_err( serve_help_text ); }
flexnbd = flexnbd_create_serving( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS, use_killswitch );
info( "Serving file %s", file );
success = flexnbd_serve( flexnbd );
flexnbd_destroy( flexnbd );
return success ? 0 : 1;
}
int mode_listen( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *ip_port = NULL;
char *file = NULL;
char *sock = NULL;
int default_deny = 0; // not on by default
int err = 0;
int success;
struct flexnbd * flexnbd;
while (1) {
c = getopt_long(argc, argv, listen_short_options, listen_options, NULL);
if ( c == -1 ) { break; }
read_listen_param( c, &ip_addr, &ip_port,
&file, &sock, &default_deny );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == file ) {
err = 1;
fprintf( stderr, "--file is required\n" );
}
if ( err ) { exit_err( listen_help_text ); }
flexnbd = flexnbd_create_listening(
ip_addr,
ip_port,
file,
sock,
default_deny,
argc - optind,
argv + optind);
success = flexnbd_serve( flexnbd );
flexnbd_destroy( flexnbd );
return success ? 0 : 1;
}
/* TODO: Separate this function.
* It should be:
* params_read( struct mode_readwrite_params* out,
* char *s_ip_address,
* char *s_port,
* char *s_from,
* char *s_length )
* params_write( struct mode_readwrite_params* out,
* char *s_ip_address,
* char *s_port,
* char *s_from,
* char *s_length,
* char *s_filename )
*/
void params_readwrite(
int write_not_read,
struct mode_readwrite_params* out,
char* s_ip_address,
char* s_port,
char* s_bind_address,
char* s_from,
char* s_length_or_filename
)
{
FATAL_IF_NULL(s_ip_address, "No IP address supplied");
FATAL_IF_NULL(s_port, "No port number supplied");
FATAL_IF_NULL(s_from, "No from supplied");
FATAL_IF_NULL(s_length_or_filename, "No length supplied");
FATAL_IF_ZERO(
parse_ip_to_sockaddr(&out->connect_to.generic, s_ip_address),
"Couldn't parse connection address '%s'",
s_ip_address
);
if (s_bind_address != NULL &&
parse_ip_to_sockaddr(&out->connect_from.generic, s_bind_address) == 0) {
fatal("Couldn't parse bind address '%s'", s_bind_address);
}
parse_port( s_port, &out->connect_to.v4 );
long signed_from = atol(s_from);
FATAL_IF_NEGATIVE( signed_from,
"Can't read from a negative offset %d.", signed_from);
out->from = signed_from;
if (write_not_read) {
if (s_length_or_filename[0]-48 < 10) {
out->len = atol(s_length_or_filename);
out->data_fd = 0;
}
else {
out->data_fd = open(
s_length_or_filename, O_RDONLY);
FATAL_IF_NEGATIVE(out->data_fd,
"Couldn't open %s", s_length_or_filename);
off64_t signed_len = lseek64(out->data_fd, 0, SEEK_END);
FATAL_IF_NEGATIVE(signed_len,
"Couldn't find length of %s", s_length_or_filename);
out->len = signed_len;
FATAL_IF_NEGATIVE(
lseek64(out->data_fd, 0, SEEK_SET),
"Couldn't rewind %s", s_length_or_filename
);
}
}
else {
out->len = atol(s_length_or_filename);
out->data_fd = 1;
}
}
int mode_read( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *ip_port = NULL;
char *bind_addr = NULL;
char *from = NULL;
char *size = NULL;
int err = 0;
struct mode_readwrite_params readwrite;
while (1){
c = getopt_long(argc, argv, read_short_options, read_options, NULL);
if ( c == -1 ) { break; }
read_readwrite_param( c, &ip_addr, &ip_port, &bind_addr, &from, &size, read_help_text );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == from || NULL == size ) {
err = 1;
fprintf( stderr, "both --from and --size are required.\n" );
}
if ( err ) { exit_err( read_help_text ); }
memset( &readwrite, 0, sizeof( readwrite ) );
params_readwrite( 0, &readwrite, ip_addr, ip_port, bind_addr, from, size );
do_read( &readwrite );
return 0;
}
int mode_write( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *ip_port = NULL;
char *bind_addr = NULL;
char *from = NULL;
char *size = NULL;
int err = 0;
struct mode_readwrite_params readwrite;
while (1){
c = getopt_long(argc, argv, write_short_options, write_options, NULL);
if ( c == -1 ) { break; }
read_readwrite_param( c, &ip_addr, &ip_port, &bind_addr, &from, &size, write_help_text );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == from || NULL == size ) {
err = 1;
fprintf( stderr, "both --from and --size are required.\n" );
}
if ( err ) { exit_err( write_help_text ); }
memset( &readwrite, 0, sizeof( readwrite ) );
params_readwrite( 1, &readwrite, ip_addr, ip_port, bind_addr, from, size );
do_write( &readwrite );
return 0;
}
int mode_acl( int argc, char *argv[] )
{
int c;
char *sock = NULL;
while (1) {
c = getopt_long( argc, argv, acl_short_options, acl_options, NULL );
if ( c == -1 ) { break; }
read_acl_param( c, &sock );
}
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
exit_err( acl_help_text );
}
/* Don't use the CMD_ACL macro here, "acl" is the remote command
* name, not the cli option
*/
do_remote_command( "acl", sock, argc - optind, argv + optind );
return 0;
}
int mode_mirror_speed( int argc, char *argv[] )
{
int c;
char *sock = NULL;
char *speed = NULL;
while( 1 ) {
c = getopt_long( argc, argv, mirror_speed_short_options, mirror_speed_options, NULL );
if ( -1 == c ) { break; }
read_mirror_speed_param( c, &sock, &speed );
}
if ( NULL == sock ) {
fprintf( stderr, "--sock is required.\n" );
exit_err( mirror_speed_help_text );
}
if ( NULL == speed ) {
fprintf( stderr, "--max-speed is required.\n");
exit_err( mirror_speed_help_text );
}
do_remote_command( "mirror_max_bps", sock, 1, &speed );
return 0;
}
int mode_mirror( int argc, char *argv[] )
{
int c;
char *sock = NULL;
char *remote_argv[4] = {0};
int err = 0;
int unlink = 0;
remote_argv[2] = "exit";
while (1) {
c = getopt_long( argc, argv, mirror_short_options, mirror_options, NULL);
if ( -1 == c ) { break; }
read_mirror_param( c,
&sock,
&remote_argv[0],
&remote_argv[1],
&unlink,
&remote_argv[3] );
}
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
err = 1;
}
if ( NULL == remote_argv[0] || NULL == remote_argv[1] ) {
fprintf( stderr, "both --addr and --port are required.\n");
err = 1;
}
if ( err ) { exit_err( mirror_help_text ); }
if ( unlink ) { remote_argv[2] = "unlink"; }
if (remote_argv[3] == NULL) {
do_remote_command( "mirror", sock, 3, remote_argv );
}
else {
do_remote_command( "mirror", sock, 4, remote_argv );
}
return 0;
}
int mode_break( int argc, char *argv[] )
{
int c;
char *sock = NULL;
while (1) {
c = getopt_long( argc, argv, break_short_options, break_options, NULL );
if ( -1 == c ) { break; }
read_break_param( c, &sock );
}
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
exit_err( break_help_text );
}
do_remote_command( "break", sock, argc - optind, argv + optind );
return 0;
}
int mode_status( int argc, char *argv[] )
{
int c;
char *sock = NULL;
while (1) {
c = getopt_long( argc, argv, status_short_options, status_options, NULL );
if ( -1 == c ) { break; }
read_status_param( c, &sock );
}
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
exit_err( status_help_text );
}
do_remote_command( "status", sock, argc - optind, argv + optind );
return 0;
}
int mode_help( int argc, char *argv[] )
{
char *cmd;
char *help_text = NULL;
if ( argc < 1 ){
help_text = help_help_text;
} else {
cmd = argv[0];
if (IS_CMD( CMD_SERVE, cmd ) ) {
help_text = serve_help_text;
} else if ( IS_CMD( CMD_LISTEN, cmd ) ) {
help_text = listen_help_text;
} else if ( IS_CMD( CMD_READ, cmd ) ) {
help_text = read_help_text;
} else if ( IS_CMD( CMD_WRITE, cmd ) ) {
help_text = write_help_text;
} else if ( IS_CMD( CMD_ACL, cmd ) ) {
help_text = acl_help_text;
} else if ( IS_CMD( CMD_MIRROR, cmd ) ) {
help_text = mirror_help_text;
} else if ( IS_CMD( CMD_STATUS, cmd ) ) {
help_text = status_help_text;
} else { exit_err( help_help_text ); }
}
fprintf( stdout, "%s\n", help_text );
return 0;
}
void mode(char* mode, int argc, char **argv)
{
if ( IS_CMD( CMD_SERVE, mode ) ) {
exit( mode_serve( argc, argv ) );
}
else if ( IS_CMD( CMD_LISTEN, mode ) ) {
exit( mode_listen( argc, argv ) );
}
else if ( IS_CMD( CMD_READ, mode ) ) {
mode_read( argc, argv );
}
else if ( IS_CMD( CMD_WRITE, mode ) ) {
mode_write( argc, argv );
}
else if ( IS_CMD( CMD_ACL, mode ) ) {
mode_acl( argc, argv );
} else if ( IS_CMD ( CMD_MIRROR_SPEED, mode ) ) {
mode_mirror_speed( argc, argv );
}
else if ( IS_CMD( CMD_MIRROR, mode ) ) {
mode_mirror( argc, argv );
}
else if ( IS_CMD( CMD_BREAK, mode ) ) {
mode_break( argc, argv );
}
else if ( IS_CMD( CMD_STATUS, mode ) ) {
mode_status( argc, argv );
}
else if ( IS_CMD( CMD_HELP, mode ) ) {
mode_help( argc-1, argv+1 );
}
else {
mode_help( argc-1, argv+1 );
exit( 1 );
}
exit(0);
}

954
src/server/serve.c Normal file
View File

@@ -0,0 +1,954 @@
#include "serve.h"
#include "client.h"
#include "nbdtypes.h"
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include "bitset.h"
#include "control.h"
#include "self_pipe.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/un.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
struct server * server_create (
struct flexnbd * flexnbd,
char* s_ip_address,
char* s_port,
char* s_file,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int use_killswitch,
int success)
{
NULLCHECK( flexnbd );
struct server * out;
out = xmalloc( sizeof( struct server ) );
out->flexnbd = flexnbd;
out->success = success;
out->max_nbd_clients = max_nbd_clients;
out->use_killswitch = use_killswitch;
server_allow_new_clients( out );
out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) );
out->tcp_backlog = 10; /* does this need to be settable? */
FATAL_IF_NULL(s_ip_address, "No IP address supplied");
FATAL_IF_NULL(s_port, "No port number supplied");
FATAL_IF_NULL(s_file, "No filename supplied");
NULLCHECK( s_ip_address );
FATAL_IF_ZERO(
parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address),
"Couldn't parse server address '%s' (use 0 if "
"you want to bind to all IPs)",
s_ip_address
);
out->acl = acl_create( acl_entries, s_acl_entries, default_deny );
if (out->acl && out->acl->len != acl_entries) {
fatal("Bad ACL entry '%s'", s_acl_entries[out->acl->len]);
}
parse_port( s_port, &out->bind_to.v4 );
out->filename = s_file;
out->l_acl = flexthread_mutex_create();
out->l_start_mirror = flexthread_mutex_create();
out->mirror_can_start = 1;
out->close_signal = self_pipe_create();
out->acl_updated_signal = self_pipe_create();
NULLCHECK( out->close_signal );
NULLCHECK( out->acl_updated_signal );
return out;
}
void server_destroy( struct server * serve )
{
self_pipe_destroy( serve->acl_updated_signal );
serve->acl_updated_signal = NULL;
self_pipe_destroy( serve->close_signal );
serve->close_signal = NULL;
flexthread_mutex_destroy( serve->l_start_mirror );
flexthread_mutex_destroy( serve->l_acl );
if ( serve->acl ) {
acl_destroy( serve->acl );
serve->acl = NULL;
}
free( serve->nbd_client );
free( serve );
}
void server_unlink( struct server * serve )
{
NULLCHECK( serve );
NULLCHECK( serve->filename );
FATAL_IF_NEGATIVE( unlink( serve->filename ),
"Failed to unlink %s: %s",
serve->filename,
strerror( errno ) );
}
#define SERVER_LOCK( s, f, msg ) \
do { NULLCHECK( s ); \
FATAL_IF( 0 != flexthread_mutex_lock( s->f ), msg ); } while (0)
#define SERVER_UNLOCK( s, f, msg ) \
do { NULLCHECK( s ); \
FATAL_IF( 0 != flexthread_mutex_unlock( s->f ), msg ); } while (0)
void server_lock_acl( struct server *serve )
{
debug("ACL locking");
SERVER_LOCK( serve, l_acl, "Problem with ACL lock" );
}
void server_unlock_acl( struct server *serve )
{
debug( "ACL unlocking" );
SERVER_UNLOCK( serve, l_acl, "Problem with ACL unlock" );
}
int server_acl_locked( struct server * serve )
{
NULLCHECK( serve );
return flexthread_mutex_held( serve->l_acl );
}
void server_lock_start_mirror( struct server *serve )
{
debug("Mirror start locking");
SERVER_LOCK( serve, l_start_mirror, "Problem with start mirror lock" );
}
void server_unlock_start_mirror( struct server *serve )
{
debug("Mirror start unlocking");
SERVER_UNLOCK( serve, l_start_mirror, "Problem with start mirror unlock" );
}
int server_start_mirror_locked( struct server * serve )
{
NULLCHECK( serve );
return flexthread_mutex_held( serve->l_start_mirror );
}
/** Return the actual port the server bound to. This is used because we
* are allowed to pass "0" on the command-line.
*/
int server_port( struct server * server )
{
NULLCHECK( server );
union mysockaddr addr;
socklen_t len = sizeof( addr.v4 );
if ( getsockname( server->server_fd, &addr.v4, &len ) < 0 ) {
fatal( "Failed to get the port number." );
}
return be16toh( addr.v4.sin_port );
}
/** Prepares a listening socket for the NBD server, binding etc. */
void serve_open_server_socket(struct server* params)
{
NULLCHECK( params );
params->server_fd = socket(params->bind_to.generic.sa_family == AF_INET ?
PF_INET : PF_INET6, SOCK_STREAM, 0);
FATAL_IF_NEGATIVE( params->server_fd, "Couldn't create server socket" );
/* We need SO_REUSEADDR so that when we switch from listening to
* serving we don't have to change address if we don't want to.
*
* If this fails, it's not necessarily bad in principle, but at
* this point in the code we can't tell if it's going to be a
* problem. It's also indicative of something odd going on, so
* we barf.
*/
FATAL_IF_NEGATIVE(
sock_set_reuseaddr( params->server_fd, 1 ), "Couldn't set SO_REUSEADDR"
);
/* TCP_NODELAY makes everything not be slow. If we can't set
* this, again, there's something odd going on which we don't
* understand.
*/
FATAL_IF_NEGATIVE(
sock_set_tcp_nodelay( params->server_fd, 1 ), "Couldn't set TCP_NODELAY"
);
/* If we can't bind, presumably that's because someone else is
* squatting on our ip/port combo, or the ip isn't yet
* configured. Ideally we want to retry this. */
FATAL_UNLESS_ZERO(
sock_try_bind( params->server_fd, &params->bind_to.generic ),
SHOW_ERRNO( "Failed to bind() socket" )
);
FATAL_IF_NEGATIVE(
listen(params->server_fd, params->tcp_backlog),
"Couldn't listen on server socket"
);
}
int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthread_t, void **) )
{
NULLCHECK( entry );
NULLCHECK( joinfunc );
int was_closed = 0;
void * status=NULL;
if (entry->thread != 0) {
char s_client_address[128];
sockaddr_address_string( &entry->address.generic, &s_client_address[0], 128 );
debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread );
int join_errno = joinfunc(entry->thread, &status);
/* join_errno can legitimately be ESRCH if the thread is
* already dead, but the client still needs tidying up. */
if (join_errno != 0 && !entry->client->stopped ) {
debug( "join_errno was %s, stopped was %d", strerror( join_errno ), entry->client->stopped );
FATAL_UNLESS( join_errno == EBUSY,
"Problem with joining thread %p: %s",
entry->thread,
strerror(join_errno) );
}
else if ( join_errno == 0 ) {
debug("nbd thread %016x exited (%s) with status %ld",
entry->thread,
s_client_address,
(uint64_t)status);
client_destroy( entry->client );
entry->client = NULL;
entry->thread = 0;
was_closed = 1;
}
}
return was_closed;
}
/**
* Check to see if a client thread has finished, and if so, tidy up
* after it.
* Returns 1 if the thread was cleaned up and the slot freed, 0
* otherwise.
*
* It's important that client_destroy gets called in the same thread
* which signals the client threads to stop. This avoids the
* possibility of sending a stop signal via a signal which has already
* been destroyed. However, it means that stopped client threads,
* including their signal pipes, won't be cleaned up until the next new
* client connection attempt.
*/
int cleanup_client_thread( struct client_tbl_entry * entry )
{
return tryjoin_client_thread( entry, pthread_tryjoin_np );
}
void cleanup_client_threads( struct client_tbl_entry * entries, size_t entries_len )
{
size_t i;
for( i = 0; i < entries_len; i++ ) {
cleanup_client_thread( &entries[i] );
}
}
/**
* Join a client thread after having sent a stop signal to it.
* This function will not return until pthread_join has returned, so
* ensures that the client thread is dead.
*/
int join_client_thread( struct client_tbl_entry *entry )
{
return tryjoin_client_thread( entry, pthread_join );
}
/** We can only accommodate MAX_NBD_CLIENTS connections at once. This function
* goes through the current list, waits for any threads that have finished
* and returns the next slot free (or -1 if there are none).
*/
int cleanup_and_find_client_slot(struct server* params)
{
NULLCHECK( params );
int slot=-1, i;
cleanup_client_threads( params->nbd_client, params->max_nbd_clients );
for ( i = 0; i < params->max_nbd_clients; i++ ) {
if( params->nbd_client[i].thread == 0 && slot == -1 ){
slot = i;
break;
}
}
return slot;
}
int server_count_clients( struct server *params )
{
NULLCHECK( params );
int i, count = 0;
for ( i = 0 ; i < params->max_nbd_clients ; i++ ) {
if ( params->nbd_client[i].thread != 0 ) {
count++;
}
}
return count;
}
/** Check whether the address client_address is allowed or not according
* to the current acl. If params->acl is NULL, the result will be 1,
* otherwise it will be the result of acl_includes().
*/
int server_acl_accepts( struct server *params, union mysockaddr * client_address )
{
NULLCHECK( params );
NULLCHECK( client_address );
struct acl * acl;
int accepted;
server_lock_acl( params );
{
acl = params->acl;
accepted = acl ? acl_includes( acl, client_address ) : 1;
}
server_unlock_acl( params );
return accepted;
}
int server_should_accept_client(
struct server * params,
union mysockaddr * client_address,
char *s_client_address,
size_t s_client_address_len )
{
NULLCHECK( params );
NULLCHECK( client_address );
NULLCHECK( s_client_address );
const char* result = sockaddr_address_string(
&client_address->generic, s_client_address, s_client_address_len
);
if ( NULL == result ) {
warn( "Rejecting client %s: Bad client_address", s_client_address );
return 0;
}
if ( !server_acl_accepts( params, client_address ) ) {
warn( "Rejecting client %s: Access control error", s_client_address );
debug( "We %s have an acl, and default_deny is %s",
(params->acl ? "do" : "do not"),
(params->acl->default_deny ? "true" : "false") );
return 0;
}
return 1;
}
int spawn_client_thread(
struct client * client_params,
pthread_t *out_thread)
{
int result = pthread_create(out_thread, NULL, client_serve, client_params);
return result;
}
/** Dispatch function for accepting an NBD connection and starting a thread
* to handle it. Rejects the connection if there is an ACL, and the far end's
* address doesn't match, or if there are too many clients already connected.
*/
void accept_nbd_client(
struct server* params,
int client_fd,
union mysockaddr* client_address)
{
NULLCHECK(params);
NULLCHECK(client_address);
struct client* client_params;
int slot;
char s_client_address[64] = {0};
if ( !server_should_accept_client( params, client_address, s_client_address, 64 ) ) {
FATAL_IF_NEGATIVE( close( client_fd ),
"Error closing client socket fd %d", client_fd );
debug("Closed client socket fd %d", client_fd);
return;
}
slot = cleanup_and_find_client_slot(params);
if (slot < 0) {
warn("too many clients to accept connection");
FATAL_IF_NEGATIVE( close( client_fd ),
"Error closing client socket fd %d", client_fd );
debug("Closed client socket fd %d", client_fd);
return;
}
info( "Client %s accepted on fd %d.", s_client_address, client_fd );
client_params = client_create( params, client_fd );
params->nbd_client[slot].client = client_params;
memcpy(&params->nbd_client[slot].address, client_address,
sizeof(union mysockaddr));
pthread_t * thread = &params->nbd_client[slot].thread;
if ( 0 != spawn_client_thread( client_params, thread ) ) {
debug( "Thread creation problem." );
client_destroy( client_params );
FATAL_IF_NEGATIVE( close(client_fd),
"Error closing client socket fd %d", client_fd );
debug("Closed client socket fd %d", client_fd);
return;
}
debug("nbd thread %p started (%s)", params->nbd_client[slot].thread, s_client_address);
}
void server_audit_clients( struct server * serve)
{
NULLCHECK( serve );
int i;
struct client_tbl_entry * entry;
/* There's an apparent race here. If the acl updates while
* we're traversing the nbd_clients array, the earlier entries
* won't have been audited against the later acl. This isn't a
* problem though, because in order to update the acl
* server_replace_acl must have been called, so the
* server_accept loop will see a second acl_updated signal as
* soon as it hits select, and a second audit will be run.
*/
for( i = 0; i < serve->max_nbd_clients; i++ ) {
entry = &serve->nbd_client[i];
if ( 0 == entry->thread ) { continue; }
if ( server_acl_accepts( serve, &entry->address ) ) { continue; }
client_signal_stop( entry->client );
}
}
int server_is_closed(struct server* serve)
{
NULLCHECK( serve );
return fd_is_closed( serve->server_fd );
}
void server_close_clients( struct server *params )
{
NULLCHECK(params);
info("closing all clients");
int i; /* , j; */
struct client_tbl_entry *entry;
for( i = 0; i < params->max_nbd_clients; i++ ) {
entry = &params->nbd_client[i];
if ( entry->thread != 0 ) {
debug( "Stop signaling client %p", entry->client );
client_signal_stop( entry->client );
}
}
/* We don't join the clients here. When we enter the final
* mirror pass, we get the IO lock, then wait for the server_fd
* to close before sending the data, to be sure that no new
* clients can be accepted which might think they've written
* to the disc. However, an existing client thread can be
* waiting for the IO lock already, so if we try to join it
* here, we deadlock.
*
* The client threads will be joined in serve_cleanup.
*
*/
}
/** Replace the current acl with a new one. The old one will be thrown
* away.
*/
void server_replace_acl( struct server *serve, struct acl * new_acl )
{
NULLCHECK(serve);
NULLCHECK(new_acl);
/* We need to lock around updates to the acl in case we try to
* destroy the old acl while checking against it.
*/
server_lock_acl( serve );
{
struct acl * old_acl = serve->acl;
serve->acl = new_acl;
/* We should always have an old_acl, but just in case... */
if ( old_acl ) { acl_destroy( old_acl ); }
}
server_unlock_acl( serve );
self_pipe_signal( serve->acl_updated_signal );
}
void server_prevent_mirror_start( struct server *serve )
{
NULLCHECK( serve );
serve->mirror_can_start = 0;
}
void server_allow_mirror_start( struct server *serve )
{
NULLCHECK( serve );
serve->mirror_can_start = 1;
}
/* Only call this with the mirror start lock held */
int server_mirror_can_start( struct server *serve )
{
NULLCHECK( serve );
return serve->mirror_can_start;
}
/* Queries to see if we are currently mirroring. If we are, we need
* to communicate that via the process exit status. because otherwise
* the supervisor will assume the migration completed.
*/
int serve_shutdown_is_graceful( struct server *params )
{
int is_mirroring = 0;
server_lock_start_mirror( params );
{
if ( server_is_mirroring( params ) ) {
is_mirroring = 1;
warn( "Stop signal received while mirroring." );
server_prevent_mirror_start( params );
}
}
server_unlock_start_mirror( params );
return !is_mirroring;
}
/** Accept either an NBD or control socket connection, dispatch appropriately */
int server_accept( struct server * params )
{
NULLCHECK( params );
debug("accept loop starting");
union mysockaddr client_address;
fd_set fds;
socklen_t socklen=sizeof(client_address);
/* We select on this fd to receive OS signals (only a few of
* which we're interested in, see flexnbd.c */
int signal_fd = flexnbd_signal_fd( params->flexnbd );
int should_continue = 1;
FD_ZERO(&fds);
FD_SET(params->server_fd, &fds);
if( 0 < signal_fd ) { FD_SET(signal_fd, &fds); }
self_pipe_fd_set( params->close_signal, &fds );
self_pipe_fd_set( params->acl_updated_signal, &fds );
FATAL_IF_NEGATIVE(
sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL),
SHOW_ERRNO( "select() failed" )
);
if ( self_pipe_fd_isset( params->close_signal, &fds ) ){
server_close_clients( params );
should_continue = 0;
}
if ( 0 < signal_fd && FD_ISSET( signal_fd, &fds ) ){
debug( "Stop signal received." );
server_close_clients( params );
params->success = params->success && serve_shutdown_is_graceful( params );
should_continue = 0;
}
if ( self_pipe_fd_isset( params->acl_updated_signal, &fds ) ) {
self_pipe_signal_clear( params->acl_updated_signal );
server_audit_clients( params );
}
if ( FD_ISSET( params->server_fd, &fds ) ){
int client_fd = accept( params->server_fd, &client_address.generic, &socklen );
if ( params->allow_new_clients ) {
debug("Accepted nbd client socket fd %d", client_fd);
accept_nbd_client(params, client_fd, &client_address);
} else {
debug( "New NBD client socket %d not allowed", client_fd );
sock_try_close( client_fd );
}
}
return should_continue;
}
void serve_accept_loop(struct server* params)
{
NULLCHECK( params );
while( server_accept( params ) );
}
void* build_allocation_map_thread(void* serve_uncast)
{
NULLCHECK( serve_uncast );
struct server* serve = (struct server*) serve_uncast;
NULLCHECK( serve->filename );
NULLCHECK( serve->allocation_map );
int fd = open( serve->filename, O_RDONLY );
FATAL_IF_NEGATIVE( fd, "Couldn't open %s", serve->filename );
if ( build_allocation_map( serve->allocation_map, fd ) ) {
serve->allocation_map_built = 1;
}
else {
/* We can operate without it, but we can't free it without a race.
* All that happens if we leave it is that it gradually builds up an
* *incomplete* record of writes. Nobody will use it, as
* allocation_map_built == 0 for the lifetime of the process.
*
* The stream functionality can still be relied on. We don't need to
* worry about mirroring waiting for the allocation map to finish,
* because we already copy every byte at least once. If that changes in
* the future, we'll need to wait for the allocation map to finish or
* fail before we can complete the migration.
*/
serve->allocation_map_not_built = 1;
warn( "Didn't build allocation map for %s", serve->filename );
}
close( fd );
return NULL;
}
/** Initialisation function that sets up the initial allocation map, i.e. so
* we know which blocks of the file are allocated.
*/
void serve_init_allocation_map(struct server* params)
{
NULLCHECK( params );
NULLCHECK( params->filename );
int fd = open( params->filename, O_RDONLY );
off64_t size;
FATAL_IF_NEGATIVE(fd, "Couldn't open %s", params->filename );
size = lseek64( fd, 0, SEEK_END );
params->size = size;
FATAL_IF_NEGATIVE( size, "Couldn't find size of %s",
params->filename );
params->allocation_map =
bitset_alloc( params->size, block_allocation_resolution );
int ok = pthread_create( &params->allocation_map_builder_thread,
NULL,
build_allocation_map_thread,
params );
FATAL_IF_NEGATIVE( ok, "Couldn't create thread" );
}
void server_forbid_new_clients( struct server * serve )
{
serve->allow_new_clients = 0;
return;
}
void server_allow_new_clients( struct server * serve )
{
serve->allow_new_clients = 1;
return;
}
void server_join_clients( struct server * serve ) {
int i;
void* status;
for (i=0; i < serve->max_nbd_clients; i++) {
pthread_t thread_id = serve->nbd_client[i].thread;
if (thread_id != 0) {
debug( "joining thread %p", thread_id );
int err = pthread_join( thread_id, &status );
if ( 0 == err ) {
serve->nbd_client[i].thread = 0;
} else {
warn( "Error %s (%i) joining thread %p", strerror( err ), err, thread_id );
}
}
}
return;
}
/* Tell the server to close all the things. */
void serve_signal_close( struct server * serve )
{
NULLCHECK( serve );
info("signalling close");
self_pipe_signal( serve->close_signal );
}
/* Block until the server closes the server_fd.
*/
void serve_wait_for_close( struct server * serve )
{
while( !fd_is_closed( serve->server_fd ) ){
usleep(10000);
}
}
/* We've just had an DISCONNECT pair, so we need to shut down
* and signal our listener that we can safely take over.
*/
void server_control_arrived( struct server *serve )
{
debug( "server_control_arrived" );
NULLCHECK( serve );
if ( !serve->success ) {
serve->success = 1;
serve_signal_close( serve );
}
}
void flexnbd_stop_control( struct flexnbd * flexnbd );
/** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct server* params,
int fatal __attribute__ ((unused)) )
{
NULLCHECK( params );
void* status;
info("cleaning up");
if (params->server_fd){ close(params->server_fd); }
/* need to stop background build if we're killed very early on */
pthread_cancel(params->allocation_map_builder_thread);
pthread_join(params->allocation_map_builder_thread, &status);
int need_mirror_lock;
need_mirror_lock = !server_start_mirror_locked( params );
if ( need_mirror_lock ) { server_lock_start_mirror( params ); }
{
if ( server_is_mirroring( params ) ) {
server_abandon_mirror( params );
}
server_prevent_mirror_start( params );
}
if ( need_mirror_lock ) { server_unlock_start_mirror( params ); }
server_join_clients( params );
if (params->allocation_map) {
bitset_free( params->allocation_map );
}
if ( server_start_mirror_locked( params ) ) {
server_unlock_start_mirror( params );
}
if ( server_acl_locked( params ) ) {
server_unlock_acl( params );
}
/* if( params->flexnbd ) { */
/* if ( params->flexnbd->control ) { */
/* flexnbd_stop_control( params->flexnbd ); */
/* } */
/* flexnbd_destroy( params->flexnbd ); */
/* } */
/* server_destroy( params ); */
debug( "Cleanup done");
}
int server_is_in_control( struct server *serve )
{
NULLCHECK( serve );
return serve->success;
}
int server_is_mirroring( struct server * serve )
{
NULLCHECK( serve );
return !!serve->mirror_super;
}
uint64_t server_mirror_bytes_remaining( struct server * serve )
{
if ( server_is_mirroring( serve ) ) {
uint64_t bytes_to_xfer =
bitset_stream_queued_bytes( serve->allocation_map, BITSET_STREAM_SET ) +
( serve->size - serve->mirror->offset );
return bytes_to_xfer;
}
return 0;
}
/* Given historic bps measurements and number of bytes left to transfer, give
* an estimate of how many seconds are remaining before the migration is
* complete, assuming no new bytes are written.
*/
uint64_t server_mirror_eta( struct server * serve )
{
if ( server_is_mirroring( serve ) ) {
uint64_t bytes_to_xfer = server_mirror_bytes_remaining( serve );
return bytes_to_xfer / ( server_mirror_bps( serve ) + 1 );
}
return 0;
}
uint64_t server_mirror_bps( struct server * serve )
{
if ( server_is_mirroring( serve ) ) {
uint64_t duration_ms =
monotonic_time_ms() - serve->mirror->migration_started;
return serve->mirror->all_dirty / ( ( duration_ms / 1000 ) + 1 );
}
return 0;
}
void mirror_super_destroy( struct mirror_super * super );
/* This must only be called with the start_mirror lock held */
void server_abandon_mirror( struct server * serve )
{
NULLCHECK( serve );
if ( serve->mirror_super ) {
/* FIXME: AWOOGA! RACE!
* We can set abandon_signal after mirror_super has checked it, but
* before the reset. However, mirror_reset doesn't clear abandon_signal
* so it'll just terminate early on the next pass. */
ERROR_UNLESS(
self_pipe_signal( serve->mirror->abandon_signal ),
"Failed to signal abandon to mirror"
);
pthread_t tid = serve->mirror_super->thread;
pthread_join( tid, NULL );
debug( "Mirror thread %p pthread_join returned", tid );
server_allow_mirror_start( serve );
mirror_super_destroy( serve->mirror_super );
serve->mirror = NULL;
serve->mirror_super = NULL;
debug( "Mirror supervisor done." );
}
}
int server_default_deny( struct server * serve )
{
NULLCHECK( serve );
return acl_default_deny( serve->acl );
}
/** Full lifecycle of the server */
int do_serve( struct server* params, struct self_pipe * open_signal )
{
NULLCHECK( params );
int success;
error_set_handler((cleanup_handler*) serve_cleanup, params);
serve_open_server_socket(params);
/* Only signal that we are open for business once the server
socket is open */
if ( NULL != open_signal ) { self_pipe_signal( open_signal ); }
serve_init_allocation_map(params);
serve_accept_loop(params);
success = params->success;
serve_cleanup(params, 0);
return success;
}

167
src/server/serve.h Normal file
View File

@@ -0,0 +1,167 @@
#ifndef SERVE_H
#define SERVE_H
#include <sys/types.h>
#include <unistd.h>
#include <signal.h> /* for sig_atomic_t */
#include "flexnbd.h"
#include "parse.h"
#include "acl.h"
static const int block_allocation_resolution = 4096;//128<<10;
struct client_tbl_entry {
pthread_t thread;
union mysockaddr address;
struct client * client;
};
#define MAX_NBD_CLIENTS 16
struct server {
/* The flexnbd wrapper this server is attached to */
struct flexnbd * flexnbd;
/** address/port to bind to */
union mysockaddr bind_to;
/** (static) file name to serve */
char* filename;
/** TCP backlog for listen() */
int tcp_backlog;
/** (static) file name of UNIX control socket (or NULL if none) */
char* control_socket_name;
/** size of file */
uint64_t size;
/** to interrupt accept loop and clients, write() to close_signal[1] */
struct self_pipe * close_signal;
/** access control list */
struct acl * acl;
/** acl_updated_signal will be signalled after the acl struct
* has been replaced
*/
struct self_pipe * acl_updated_signal;
/* Claimed around any updates to the ACL. */
struct flexthread_mutex * l_acl;
/* Claimed around starting a mirror so that it doesn't race with
* shutting down on a SIGTERM. */
struct flexthread_mutex * l_start_mirror;
struct mirror* mirror;
struct mirror_super * mirror_super;
/* This is used to stop the mirror from starting after we
* receive a SIGTERM */
int mirror_can_start;
int server_fd;
int control_fd;
/* the allocation_map keeps track of which blocks in the backing file
* have been allocated, or part-allocated on disc, with unallocated
* blocks presumed to contain zeroes (i.e. represented as sparse files
* by the filesystem). We can use this information when receiving
* incoming writes, and avoid writing zeroes to unallocated sections
* of the file which would needlessly increase disc usage. This
* bitmap will start at all-zeroes for an empty file, and tend towards
* all-ones as the file is written to (i.e. we assume that allocated
* blocks can never become unallocated again, as is the case with ext3
* at least).
*/
struct bitset * allocation_map;
/* when starting up, this thread builds the allocation_map */
pthread_t allocation_map_builder_thread;
/* when the thread has finished, it sets this to 1 */
volatile sig_atomic_t allocation_map_built;
volatile sig_atomic_t allocation_map_not_built;
int max_nbd_clients;
struct client_tbl_entry *nbd_client;
/** Should clients use the killswitch? */
int use_killswitch;
/** If this isn't set, newly accepted clients will be closed immediately */
int allow_new_clients;
/* Marker for whether this server has control over the data in
* the file, or if we're waiting to receive it from an inbound
* migration which hasn't yet finished.
*
* It's the value which controls the exit status of a serve or
* listen process.
*/
int success;
};
struct server * server_create(
struct flexnbd * flexnbd,
char* s_ip_address,
char* s_port,
char* s_file,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients,
int use_killswitch,
int success );
void server_destroy( struct server * );
int server_is_closed(struct server* serve);
void serve_signal_close( struct server *serve );
void serve_wait_for_close( struct server * serve );
void server_replace_acl( struct server *serve, struct acl * acl);
void server_control_arrived( struct server *serve );
int server_is_in_control( struct server *serve );
int server_default_deny( struct server * serve );
int server_acl_locked( struct server * serve );
void server_lock_acl( struct server *serve );
void server_unlock_acl( struct server *serve );
void server_lock_start_mirror( struct server *serve );
void server_unlock_start_mirror( struct server *serve );
int server_is_mirroring( struct server * serve );
uint64_t server_mirror_bytes_remaining( struct server * serve );
uint64_t server_mirror_eta( struct server * serve );
uint64_t server_mirror_bps( struct server * serve );
void server_abandon_mirror( struct server * serve );
void server_prevent_mirror_start( struct server *serve );
void server_allow_mirror_start( struct server *serve );
int server_mirror_can_start( struct server *serve );
/* These three functions are used by mirror around the final pass, to close
* existing clients and prevent new ones from being around
*/
void server_forbid_new_clients( struct server *serve );
void server_close_clients( struct server *serve );
void server_join_clients( struct server *serve );
void server_allow_new_clients( struct server *serve );
/* Returns a count (ish) of the number of currently-running client threads */
int server_count_clients( struct server *params );
void server_unlink( struct server * serve );
int do_serve( struct server *, struct self_pipe * );
struct mode_readwrite_params {
union mysockaddr connect_to;
union mysockaddr connect_from;
uint64_t from;
uint32_t len;
int data_fd;
int client;
};
#endif

78
src/server/status.c Normal file
View File

@@ -0,0 +1,78 @@
#include "status.h"
#include "serve.h"
#include "util.h"
struct status * status_create( struct server * serve )
{
NULLCHECK( serve );
struct status * status;
status = xmalloc( sizeof( struct status ) );
status->pid = getpid();
status->size = serve->size;
status->has_control = serve->success;
status->clients_allowed = serve->allow_new_clients;
status->num_clients = server_count_clients( serve );
server_lock_start_mirror( serve );
status->is_mirroring = NULL != serve->mirror;
if ( status->is_mirroring ) {
status->migration_duration = monotonic_time_ms();
if ( ( serve->mirror->migration_started ) < status->migration_duration ) {
status->migration_duration -= serve->mirror->migration_started;
} else {
status->migration_duration = 0;
}
status->migration_duration /= 1000;
status->migration_speed = server_mirror_bps( serve );
status->migration_speed_limit = serve->mirror->max_bytes_per_second;
status->migration_seconds_left = server_mirror_eta( serve );
}
server_unlock_start_mirror( serve );
return status;
}
#define BOOL_S(var) (var ? "true" : "false" )
#define PRINT_BOOL( var ) \
do{dprintf( fd, #var "=%s ", BOOL_S( status->var ) );}while(0)
#define PRINT_INT( var ) \
do{dprintf( fd, #var "=%d ", status->var );}while(0)
#define PRINT_UINT64( var ) \
do{dprintf( fd, #var "=%"PRIu64" ", status->var );}while(0)
int status_write( struct status * status, int fd )
{
PRINT_INT( pid );
PRINT_UINT64( size );
PRINT_BOOL( is_mirroring );
PRINT_BOOL( clients_allowed );
PRINT_INT( num_clients );
PRINT_BOOL( has_control );
if ( status->is_mirroring ) {
PRINT_UINT64( migration_speed );
PRINT_UINT64( migration_duration );
PRINT_UINT64( migration_seconds_left );
if ( status->migration_speed_limit < UINT64_MAX ) {
PRINT_UINT64( migration_speed_limit );
};
}
dprintf(fd, "\n");
return 1;
}
void status_destroy( struct status * status )
{
NULLCHECK( status );
free( status );
}

101
src/server/status.h Normal file
View File

@@ -0,0 +1,101 @@
#ifndef STATUS_H
#define STATUS_H
/* Status reports
*
* The status will be reported by writing to a file descriptor. The
* status report will be on a single line. The status format will be:
*
* A=B C=D
*
* That is, a space-separated list of label,value pairs, each pair
* separated by an '=' character. Neither ' ' nor '=' will appear in
* either labels or values.
*
* Boolean values will appear as the strings "true" and "false".
*
* The following status fields are defined:
*
* pid:
* The current process ID.
*
* size:
* The size of the backing file being served, in bytes.
*
* has_control:
* This will be false when the server is listening for an incoming
* migration. It will switch to true when the end-of-migration
* handshake is successfully completed.
* If the server is started in "serve" mode, this will never be
* false.
*
* clients_allowed:
* This will be false if the server is not currently allowing new
* connections, for instance, if we're in the migration endgame.
*
* num_clients:
* This tells us how many clients are currently running. If we're in the
* migration endgame, it should be 0
*
* is_migrating:
* This will be false when the server is started in either "listen"
* or "serve" mode. It will become true when a server in "serve"
* mode starts a migration, and will become false again when the
* migration terminates, successfully or not.
* If the server is currently in "listen" mode, this will never be
* true.
*
*
* If is_migrating is true, then a number of other attributes may appear,
* relating to the progress of the migration.
*
* migration_duration:
* How long the migration has been running for, in ms.
*
* migration_speed:
* Network transfer speed, in bytes/second. This only takes dirty bytes
* into account.
*
* migration_speed_limit:
* If set, the speed we're going to try to limit the migration to.
*
* migration_seconds_left:
* Our current best estimate of how many seconds are left before the migration
* migration is finished.
*
*/
#include "serve.h"
#include <sys/types.h>
#include <unistd.h>
struct status {
pid_t pid;
uint64_t size;
int has_control;
int clients_allowed;
int num_clients;
int is_mirroring;
uint64_t migration_duration;
uint64_t migration_speed;
uint64_t migration_speed_limit;
uint64_t migration_seconds_left;
};
/** Create a status object for the given server. */
struct status * status_create( struct server * );
/** Output the given status object to the given file descriptot */
int status_write( struct status *, int fd );
/** Free the status object */
void status_destroy( struct status * );
#endif