This commit is contained in:
mbloch
2012-05-30 20:14:14 +01:00
29 changed files with 1005 additions and 260 deletions

176
src/bitset.h Normal file
View File

@@ -0,0 +1,176 @@
#ifndef __BITSET_H
#define __BITSET_H
#include <inttypes.h>
#include <string.h>
#include <pthread.h>
#include "util.h"
static inline char char_with_bit_set(int num) { return 1<<(num%8); }
/** Return 1 if the bit at ''idx'' in array ''b'' is set */
static inline int bit_is_set(char* b, int idx) {
return (b[idx/8] & char_with_bit_set(idx)) != 0;
}
/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
static inline int bit_is_clear(char* b, int idx) {
return !bit_is_set(b, idx);
}
/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
static inline int bit_has_value(char* b, int idx, int value) {
if (value)
return bit_is_set(b, idx);
else
return bit_is_clear(b, idx);
}
/** Sets the bit ''idx'' in array ''b'' */
static inline void bit_set(char* b, int idx) {
b[idx/8] |= char_with_bit_set(idx);
//__sync_fetch_and_or(b+(idx/8), char_with_bit_set(idx));
}
/** Clears the bit ''idx'' in array ''b'' */
static inline void bit_clear(char* b, int idx) {
b[idx/8] &= ~char_with_bit_set(idx);
//__sync_fetch_and_nand(b+(idx/8), char_with_bit_set(idx));
}
/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_set_range(char* b, int from, int len) {
for (; from%8 != 0 && len > 0; len--)
bit_set(b, from++);
if (len >= 8)
memset(b+(from/8), 255, len/8);
for (; len > 0; len--)
bit_set(b, from++);
}
/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_clear_range(char* b, int from, int len) {
for (; from%8 != 0 && len > 0; len--)
bit_clear(b, from++);
if (len >= 8)
memset(b+(from/8), 0, len/8);
for (; len > 0; len--)
bit_clear(b, from++);
}
/** Counts the number of contiguous bits in array ''b'', starting at ''from''
* up to a maximum number of bits ''len''. Returns the number of contiguous
* bits that are the same as the first one specified.
*/
static inline int bit_run_count(char* b, int from, int len) {
int count;
int first_value = bit_is_set(b, from);
for (count=0; len > 0 && bit_has_value(b, from+count, first_value); count++, len--)
;
/* FIXME: debug this later */
/*for (; (from+count) % 64 != 0 && len > 0; len--)
if (bit_has_value(b, from+count, first_value))
count++;
else
return count;
for (; len >= 64; len-=64) {
if (*((uint64_t*)(b + ((from+count)/8))) == UINT64_MAX)
count += 64;
else
break;
}
for (; len > 0; len--)
if (bit_is_set(b, from+count))
count++;*/
return count;
}
/** An application of a bitset - a bitset mapping represents a file of ''size''
* broken down into ''resolution''-sized chunks. The bit set is assumed to
* represent one bit per chunk.
*/
struct bitset_mapping {
uint64_t size;
int resolution;
char bits[];
};
/** Allocate a bitset_mapping for a file of the given size, and chunks of the
* given resolution.
*/
static inline struct bitset_mapping* bitset_alloc(
uint64_t size,
int resolution
)
{
struct bitset_mapping *bitset = xmalloc(
sizeof(struct bitset_mapping)+
(size+resolution-1)/resolution
);
bitset->size = size;
bitset->resolution = resolution;
return bitset;
}
#define INT_FIRST_AND_LAST \
int first = from/set->resolution, \
last = (from+len-1)/set->resolution, \
bitlen = last-first+1
/** Set the bits in a bitset which correspond to the given bytes in the larger
* file.
*/
static inline void bitset_set_range(
struct bitset_mapping* set,
uint64_t from,
uint64_t len)
{
INT_FIRST_AND_LAST;
bit_set_range(set->bits, first, bitlen);
}
/** Clear the bits in a bitset which correspond to the given bytes in the
* larger file.
*/
static inline void bitset_clear_range(
struct bitset_mapping* set,
uint64_t from,
uint64_t len)
{
INT_FIRST_AND_LAST;
bit_clear_range(set->bits, first, bitlen);
}
/** Counts the number of contiguous bytes that are represented as a run in
* the bit field.
*/
static inline int bitset_run_count(
struct bitset_mapping* set,
uint64_t from,
uint64_t len)
{
INT_FIRST_AND_LAST;
return bit_run_count(set->bits, first, bitlen) * set->resolution;
}
/** Tests whether the bit field is clear for the given file offset.
*/
static inline int bitset_is_clear_at(
struct bitset_mapping* set,
uint64_t at
)
{
return bit_is_clear(set->bits, at/set->resolution);
}
/** Tests whether the bit field is set for the given file offset.
*/
static inline int bitset_is_set_at(
struct bitset_mapping* set,
uint64_t at
)
{
return bit_is_set(set->bits, at/set->resolution);
}
#endif

384
src/control.c Normal file
View File

@@ -0,0 +1,384 @@
/* FlexNBD server (C) Bytemark Hosting 2012
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** The control server responds on a UNIX socket and services our "remote"
* commands which are used for changing the access control list, initiating
* a mirror process, or asking for status. The protocol is pretty simple -
* after connecting the client sends a series of LF-terminated lines, followed
* by a blank line (i.e. double LF). The first line is taken to be the command
* name to invoke, and the lines before the double LF are its arguments.
*
* These commands can be invoked remotely from the command line, with the
* client code to be found in remote.c
*/
#include "params.h"
#include "util.h"
#include "ioutil.h"
#include "parse.h"
#include "readwrite.h"
#include "bitset.h"
#include <stdlib.h>
#include <string.h>
#include <sys/un.h>
#include <unistd.h>
/** The mirror code will split NBD writes, making them this long as a maximum */
static const int mirror_longest_write = 8<<20;
/** If, during a mirror pass, we have sent this number of bytes or fewer, we
* go to freeze the I/O and finish it off. This is just a guess.
*/
static const int mirror_last_pass_after_bytes_written = 100<<20;
/** The largest number of full passes we'll do - the last one will always
* cause the I/O to freeze, however many bytes are left to copy.
*/
static const int mirror_maximum_passes = 7;
/** Thread launched to drive mirror process */
void* mirror_runner(void* serve_params_uncast)
{
const int last_pass = mirror_maximum_passes-1;
int pass;
struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
struct bitset_mapping *map = serve->mirror->dirty_map;
for (pass=0; pass < mirror_maximum_passes; pass++) {
uint64_t current = 0;
uint64_t written = 0;
debug("mirror start pass=%d", pass);
if (pass == last_pass) {
/* last pass, stop everything else */
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&serve->l_accept),
"Problem with accept lock"
);
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&serve->l_io),
"Problem with I/O lock"
);
}
while (current < serve->size) {
int run;
run = bitset_run_count(map, current, mirror_longest_write);
debug("mirror current=%ld, run=%d", current, run);
/* FIXME: we could avoid sending sparse areas of the
* disc here, and probably save a lot of bandwidth and
* time (if we know the destination starts off zeroed).
*/
if (bitset_is_set_at(map, current)) {
/* We've found a dirty area, send it */
debug("^^^ writing");
/* We need to stop the main thread from working
* because it might corrupt the dirty map. This
* is likely to slow things down but will be
* safe.
*/
if (pass < last_pass)
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&serve->l_io),
"Problem with I/O lock"
);
/** FIXME: do something useful with bytes/second */
/** FIXME: error handling code here won't unlock */
socket_nbd_write(
serve->mirror->client,
current,
run,
0,
serve->mirror->mapped + current
);
/* now mark it clean */
bitset_clear_range(map, current, run);
if (pass < last_pass)
SERVER_ERROR_ON_FAILURE(
pthread_mutex_unlock(&serve->l_io),
"Problem with I/O unlock"
);
written += run;
}
current += run;
}
/* if we've not written anything */
if (written < mirror_last_pass_after_bytes_written)
pass = last_pass;
}
switch (serve->mirror->action_at_finish)
{
case ACTION_PROXY:
debug("proxy!");
serve->proxy_fd = serve->mirror->client;
/* don't close our file descriptor, we still need it! */
break;
case ACTION_EXIT:
debug("exit!");
write(serve->close_signal[1], serve, 1); /* any byte will do */
/* fall through */
case ACTION_NOTHING:
debug("nothing!");
close(serve->mirror->client);
}
free(serve->mirror->dirty_map);
free(serve->mirror);
serve->mirror = NULL; /* and we're gone */
SERVER_ERROR_ON_FAILURE(
pthread_mutex_unlock(&serve->l_accept),
"Problem with accept unlock"
);
SERVER_ERROR_ON_FAILURE(
pthread_mutex_unlock(&serve->l_io),
"Problem with I/O unlock"
);
return NULL;
}
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
/** Command parser to start mirror process from socket input */
int control_mirror(struct control_params* client, int linesc, char** lines)
{
off64_t size, remote_size;
int fd, map_fd;
struct mirror_status *mirror;
union mysockaddr connect_to;
uint64_t max_bytes_per_second;
int action_at_finish;
if (linesc < 2) {
write_socket("1: mirror takes at least two parameters");
return -1;
}
if (parse_ip_to_sockaddr(&connect_to.generic, lines[0]) == 0) {
write_socket("1: bad IP address");
return -1;
}
connect_to.v4.sin_port = atoi(lines[1]);
if (connect_to.v4.sin_port < 0 || connect_to.v4.sin_port > 65535) {
write_socket("1: bad IP port number");
return -1;
}
connect_to.v4.sin_port = htobe16(connect_to.v4.sin_port);
max_bytes_per_second = 0;
if (linesc > 2) {
max_bytes_per_second = atoi(lines[2]);
}
action_at_finish = ACTION_PROXY;
if (linesc > 3) {
if (strcmp("proxy", lines[3]) == 0)
action_at_finish = ACTION_PROXY;
else if (strcmp("exit", lines[3]) == 0)
action_at_finish = ACTION_EXIT;
else if (strcmp("nothing", lines[3]) == 0)
action_at_finish = ACTION_NOTHING;
else {
write_socket("1: action must be one of 'proxy', 'exit' or 'nothing'");
return -1;
}
}
if (linesc > 4) {
write_socket("1: unrecognised parameters to mirror");
return -1;
}
fd = socket_connect(&connect_to.generic);
remote_size = socket_nbd_read_hello(fd);
remote_size = remote_size; // shush compiler
mirror = xmalloc(sizeof(struct mirror_status));
mirror->client = fd;
mirror->max_bytes_per_second = max_bytes_per_second;
mirror->action_at_finish = action_at_finish;
CLIENT_ERROR_ON_FAILURE(
open_and_mmap(
client->serve->filename,
&map_fd,
&size,
(void**) &mirror->mapped
),
"Failed to open and mmap %s",
client->serve->filename
);
mirror->dirty_map = bitset_alloc(size, 4096);
bitset_set_range(mirror->dirty_map, 0, size);
client->serve->mirror = mirror;
CLIENT_ERROR_ON_FAILURE( /* FIXME should free mirror on error */
pthread_create(
&mirror->thread,
NULL,
mirror_runner,
client->serve
),
"Failed to create mirror thread"
);
write_socket("0: mirror started");
return 0;
}
/** Command parser to alter access control list from socket input */
int control_acl(struct control_params* client, int linesc, char** lines)
{
int parsed;
struct ip_and_mask (*acl)[], (*old_acl)[];
parsed = parse_acl(&acl, linesc, lines);
if (parsed != linesc) {
write(client->socket, "1: bad spec: ", 13);
write(client->socket, lines[parsed],
strlen(lines[parsed]));
write(client->socket, "\n", 1);
free(acl);
}
else {
old_acl = client->serve->acl;
client->serve->acl = acl;
client->serve->acl_entries = linesc;
free(old_acl);
write_socket("0: updated");
}
return 0;
}
/** FIXME: add some useful statistics */
int control_status(struct control_params* client, int linesc, char** lines)
{
return 0;
}
/** Master command parser for control socket connections, delegates quickly */
void* control_serve(void* client_uncast)
{
struct control_params* client = (struct control_params*) client_uncast;
char **lines = NULL;
int finished=0;
while (!finished) {
int i, linesc;
linesc = read_lines_until_blankline(client->socket, 256, &lines);
if (linesc < 1)
{
write(client->socket, "9: missing command\n", 19);
finished = 1;
/* ignore failure */
}
else if (strcmp(lines[0], "acl") == 0) {
if (control_acl(client, linesc-1, lines+1) < 0)
finished = 1;
}
else if (strcmp(lines[0], "mirror") == 0) {
if (control_mirror(client, linesc-1, lines+1) < 0)
finished = 1;
}
else if (strcmp(lines[0], "status") == 0) {
if (control_status(client, linesc-1, lines+1) < 0)
finished = 1;
}
else {
write(client->socket, "10: unknown command\n", 23);
finished = 1;
}
for (i=0; i<linesc; i++)
free(lines[i]);
free(lines);
}
close(client->socket);
free(client);
return NULL;
}
void accept_control_connection(struct mode_serve_params* params, int client_fd, union mysockaddr* client_address)
{
pthread_t control_thread;
struct control_params* control_params;
control_params = xmalloc(sizeof(struct control_params));
control_params->socket = client_fd;
control_params->serve = params;
SERVER_ERROR_ON_FAILURE(
pthread_create(
&control_thread,
NULL,
control_serve,
control_params
),
"Failed to create client thread"
);
}
void serve_open_control_socket(struct mode_serve_params* params)
{
struct sockaddr_un bind_address;
if (!params->control_socket_name)
return;
params->control = socket(AF_UNIX, SOCK_STREAM, 0);
SERVER_ERROR_ON_FAILURE(params->control,
"Couldn't create control socket");
memset(&bind_address, 0, sizeof(bind_address));
bind_address.sun_family = AF_UNIX;
strncpy(bind_address.sun_path, params->control_socket_name, sizeof(bind_address.sun_path)-1);
unlink(params->control_socket_name); /* ignore failure */
SERVER_ERROR_ON_FAILURE(
bind(params->control, &bind_address, sizeof(bind_address)),
"Couldn't bind control socket to %s",
params->control_socket_name
);
SERVER_ERROR_ON_FAILURE(
listen(params->control, 5),
"Couldn't listen on control socket"
);
}

8
src/control.h Normal file
View File

@@ -0,0 +1,8 @@
#ifndef __CONTROL_H
#define __CONTROL_H
void accept_control_connection(struct mode_serve_params* params, int client_fd, union mysockaddr* client_address);
void serve_open_control_socket(struct mode_serve_params* params);
#endif

511
src/flexnbd.c Normal file
View File

@@ -0,0 +1,511 @@
/* FlexNBD server (C) Bytemark Hosting 2012
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** main() function for parsing and dispatching commands. Each mode has
* a corresponding structure which is filled in and passed to a do_ function
* elsewhere in the program.
*/
#include "params.h"
#include "util.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <getopt.h>
#include "options.h"
void exit_err( char *msg )
{
fprintf( stderr, msg );
exit( 1 );
}
void params_serve(
struct mode_serve_params* out,
char* s_ip_address,
char* s_port,
char* s_file,
char *s_ctrl_sock,
int acl_entries,
char** s_acl_entries /* first may actually be path to control socket */
)
{
int parsed;
out->tcp_backlog = 10; /* does this need to be settable? */
if (s_ip_address == NULL)
SERVER_ERROR("No IP address supplied");
if (s_port == NULL)
SERVER_ERROR("No port number supplied");
if (s_file == NULL)
SERVER_ERROR("No filename supplied");
if (parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address) == 0)
SERVER_ERROR("Couldn't parse server address '%s' (use 0 if "
"you want to bind to all IPs)", s_ip_address);
/* control_socket_name is optional. It just won't get created if
* we pass NULL. */
out->control_socket_name = s_ctrl_sock;
out->acl_entries = acl_entries;
parsed = parse_acl(&out->acl, acl_entries, s_acl_entries);
if (parsed != acl_entries)
SERVER_ERROR("Bad ACL entry '%s'", s_acl_entries[parsed]);
out->bind_to.v4.sin_port = atoi(s_port);
if (out->bind_to.v4.sin_port < 0 || out->bind_to.v4.sin_port > 65535)
SERVER_ERROR("Port number must be >= 0 and <= 65535");
out->bind_to.v4.sin_port = htobe16(out->bind_to.v4.sin_port);
out->filename = s_file;
out->filename_incomplete = xmalloc(strlen(s_file)+11);
strcpy(out->filename_incomplete, s_file);
strcpy(out->filename_incomplete + strlen(s_file), ".INCOMPLETE");
}
/* TODO: Separate this function.
* It should be:
* params_read( struct mode_readwrite_params* out,
* char *s_ip_address,
* char *s_port,
* char *s_from,
* char *s_length )
* params_write( struct mode_readwrite_params* out,
* char *s_ip_address,
* char *s_port,
* char *s_from,
* char *s_length,
* char *s_filename )
*/
void params_readwrite(
int write_not_read,
struct mode_readwrite_params* out,
char* s_ip_address,
char* s_port,
char* s_from,
char* s_length_or_filename
)
{
if (s_ip_address == NULL)
SERVER_ERROR("No IP address supplied");
if (s_port == NULL)
SERVER_ERROR("No port number supplied");
if (s_from == NULL)
SERVER_ERROR("No from supplied");
if (s_length_or_filename == NULL)
SERVER_ERROR("No length supplied");
if (parse_ip_to_sockaddr(&out->connect_to.generic, s_ip_address) == 0)
SERVER_ERROR("Couldn't parse connection address '%s'",
s_ip_address);
/* FIXME: duplicated from above */
out->connect_to.v4.sin_port = atoi(s_port);
if (out->connect_to.v4.sin_port < 0 || out->connect_to.v4.sin_port > 65535)
SERVER_ERROR("Port number must be >= 0 and <= 65535");
out->connect_to.v4.sin_port = htobe16(out->connect_to.v4.sin_port);
out->from = atol(s_from);
if (write_not_read) {
if (s_length_or_filename[0]-48 < 10) {
out->len = atol(s_length_or_filename);
out->data_fd = 0;
}
else {
out->data_fd = open(
s_length_or_filename, O_RDONLY);
SERVER_ERROR_ON_FAILURE(out->data_fd,
"Couldn't open %s", s_length_or_filename);
out->len = lseek64(out->data_fd, 0, SEEK_END);
SERVER_ERROR_ON_FAILURE(out->len,
"Couldn't find length of %s", s_length_or_filename);
SERVER_ERROR_ON_FAILURE(
lseek64(out->data_fd, 0, SEEK_SET),
"Couldn't rewind %s", s_length_or_filename
);
}
}
else {
out->len = atol(s_length_or_filename);
out->data_fd = 1;
}
}
void do_serve(struct mode_serve_params* params);
void do_read(struct mode_readwrite_params* params);
void do_write(struct mode_readwrite_params* params);
void do_remote_command(char* command, char* mode, int argc, char** argv);
void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char **sock )
{
switch(c){
case 'h':
fprintf(stdout, serve_help_text );
exit( 0 );
break;
case 'l':
*ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
case 'f':
*file = optarg;
break;
case 's':
*sock = optarg;
break;
default:
exit_err( serve_help_text );
break;
}
}
void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **from, char **size)
{
switch(c){
case 'h':
fprintf(stdout, read_help_text );
exit( 0 );
break;
case 'l':
*ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
case 'F':
*from = optarg;
break;
case 'S':
*size = optarg;
break;
default:
exit_err( read_help_text );
break;
}
}
void read_sock_param( int c, char **sock, char *help_text )
{
switch(c){
case 'h':
fprintf( stdout, help_text );
exit( 0 );
break;
case 's':
*sock = optarg;
break;
default:
exit_err( help_text );
break;
}
}
void read_acl_param( int c, char **sock )
{
read_sock_param( c, sock, acl_help_text );
}
void read_mirror_param( int c, char **sock, char **ip_addr, char **ip_port )
{
switch( c ){
case 'h':
fprintf( stdout, mirror_help_text );
exit( 0 );
break;
case 's':
*sock = optarg;
break;
case 'l':
*ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
default:
exit_err( mirror_help_text );
break;
}
}
void read_status_param( int c, char **sock )
{
read_sock_param( c, sock, status_help_text );
}
int mode_serve( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *ip_port = NULL;
char *file = NULL;
char *sock = NULL;
int err = 0;
struct mode_serve_params serve;
while (1) {
c = getopt_long(argc, argv, serve_short_options, serve_options, NULL);
if ( c == -1 ) break;
read_serve_param( c, &ip_addr, &ip_port, &file, &sock );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == file ) {
err = 1;
fprintf( stderr, "--file is required\n" );
}
if ( err ) { exit_err( serve_help_text ); }
memset( &serve, 0, sizeof( serve ) );
params_serve( &serve, ip_addr, ip_port, file, sock, argc - optind, argv + optind );
do_serve( &serve );
return 0;
}
int mode_read( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *ip_port = NULL;
char *from = NULL;
char *size = NULL;
int err = 0;
struct mode_readwrite_params readwrite;
while (1){
c = getopt_long(argc, argv, read_short_options, read_options, NULL);
if ( c == -1 ) break;
read_readwrite_param( c, &ip_addr, &ip_port, &from, &size );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == from || NULL == size ) {
err = 1;
fprintf( stderr, "both --from and --size are required.\n" );
}
if ( err ) { exit_err( read_help_text ); }
memset( &readwrite, 0, sizeof( readwrite ) );
params_readwrite( 0, &readwrite, ip_addr, ip_port, from, size );
do_read( &readwrite );
return 0;
}
int mode_write( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *ip_port = NULL;
char *from = NULL;
char *size = NULL;
int err = 0;
struct mode_readwrite_params readwrite;
while (1){
c = getopt_long(argc, argv, write_short_options, write_options, NULL);
if ( c == -1 ) break;
read_readwrite_param( c, &ip_addr, &ip_port, &from, &size );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == from || NULL == size ) {
err = 1;
fprintf( stderr, "both --from and --size are required.\n" );
}
if ( err ) { exit_err( write_help_text ); }
memset( &readwrite, 0, sizeof( readwrite ) );
params_readwrite( 1, &readwrite, ip_addr, ip_port, from, size );
do_write( &readwrite );
return 0;
}
int mode_acl( int argc, char *argv[] )
{
int c;
char *sock = NULL;
while (1) {
c = getopt_long( argc, argv, acl_short_options, acl_options, NULL );
if ( c == -1 ) break;
read_acl_param( c, &sock );
}
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
exit_err( acl_help_text );
}
/* Don't use the CMD_ACL macro here, "acl" is the remote command
* name, not the cli option
*/
do_remote_command( "acl", sock, argc - optind, argv + optind );
return 0;
}
int mode_mirror( int argc, char *argv[] )
{
int c;
char *sock = NULL;
char *remote_argv[3] = {0};
int err = 0;
while (1) {
c = getopt_long( argc, argv, mirror_short_options, mirror_options, NULL);
if ( -1 == c ) break;
read_mirror_param( c, &sock, &remote_argv[0], &remote_argv[1] );
}
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
err = 1;
}
if ( NULL == remote_argv[0] || NULL == remote_argv[1] ) {
fprintf( stderr, "both --addr and --port are required.\n");
err = 1;
}
if ( err ) { exit_err( mirror_help_text ); }
do_remote_command( "mirror", sock, 2, remote_argv );
return 0;
}
int mode_status( int argc, char *argv[] )
{
int c;
char *sock = NULL;
while (1) {
c = getopt_long( argc, argv, status_short_options, status_options, NULL );
if ( -1 == c ) break;
read_status_param( c, &sock );
}
if ( NULL == sock ){
fprintf( stderr, "--sock is required.\n" );
exit_err( acl_help_text );
}
do_remote_command( "status", sock, argc - optind, argv + optind );
return 0;
}
int mode_help( int argc, char *argv[] )
{
char *cmd;
char *help_text;
if ( argc < 1 ){
help_text = help_help_text;
} else {
cmd = argv[0];
if (IS_CMD( CMD_SERVE, cmd ) ) {
help_text = serve_help_text;
} else if ( IS_CMD( CMD_READ, cmd ) ) {
help_text = read_help_text;
} else if ( IS_CMD( CMD_WRITE, cmd ) ) {
help_text = write_help_text;
} else if ( IS_CMD( CMD_ACL, cmd ) ) {
help_text = acl_help_text;
} else if ( IS_CMD( CMD_MIRROR, cmd ) ) {
help_text = mirror_help_text;
} else if ( IS_CMD( CMD_STATUS, cmd ) ) {
help_text = status_help_text;
} else { exit_err( help_help_text ); }
}
fprintf( stdout, help_text );
return 0;
}
void mode(char* mode, int argc, char **argv)
{
if ( IS_CMD( CMD_SERVE, mode ) ) {
mode_serve( argc, argv );
}
else if ( IS_CMD( CMD_READ, mode ) ) {
mode_read( argc, argv );
}
else if ( IS_CMD( CMD_WRITE, mode ) ) {
mode_write( argc, argv );
}
else if ( IS_CMD( CMD_ACL, mode ) ) {
mode_acl( argc, argv );
}
else if ( IS_CMD( CMD_MIRROR, mode ) ) {
mode_mirror( argc, argv );
}
else if ( IS_CMD( CMD_STATUS, mode ) ) {
mode_status( argc, argv );
}
else if ( IS_CMD( CMD_HELP, mode ) ) {
mode_help( argc-1, argv+1 );
}
else {
mode_help( argc-1, argv+1 );
exit( 1 );
}
exit(0);
}
int main(int argc, char** argv)
{
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
error_init();
if (argc < 2) {
exit_err( help_help_text );
}
mode(argv[1], argc-1, argv+1); /* never returns */
return 0;
}

237
src/ioutil.c Normal file
View File

@@ -0,0 +1,237 @@
#define _LARGEFILE64_SOURCE
#define _GNU_SOURCE
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <linux/fs.h>
#include <linux/fiemap.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include "util.h"
#include "bitset.h"
char* build_allocation_map(int fd, off64_t size, int resolution)
{
int i;
char *allocation_map = xmalloc((size+resolution)/resolution);
struct fiemap *fiemap_count, *fiemap;
fiemap_count = (struct fiemap*) xmalloc(sizeof(struct fiemap));
fiemap_count->fm_start = 0;
fiemap_count->fm_length = size;
fiemap_count->fm_flags = 0;
fiemap_count->fm_extent_count = 0;
fiemap_count->fm_mapped_extents = 0;
/* Find out how many extents there are */
if (ioctl(fd, FS_IOC_FIEMAP, fiemap_count) < 0)
return NULL;
/* Resize fiemap to allow us to read in the extents */
fiemap = (struct fiemap*)xmalloc(
sizeof(struct fiemap) + (
sizeof(struct fiemap_extent) *
fiemap_count->fm_mapped_extents
)
);
/* realloc makes valgrind complain a lot */
memcpy(fiemap, fiemap_count, sizeof(struct fiemap));
fiemap->fm_extent_count = fiemap->fm_mapped_extents;
fiemap->fm_mapped_extents = 0;
if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0)
return NULL;
for (i=0;i<fiemap->fm_mapped_extents;i++) {
int first_bit = fiemap->fm_extents[i].fe_logical / resolution;
int last_bit = (fiemap->fm_extents[i].fe_logical +
fiemap->fm_extents[i].fe_length + resolution - 1) /
resolution;
int run = last_bit - first_bit;
bit_set_range(allocation_map, first_bit, run);
}
for (i=0; i<16; i++) {
debug("map[%d] = %d%d%d%d%d%d%d%d",
i,
(allocation_map[i] & 1) == 1,
(allocation_map[i] & 2) == 2,
(allocation_map[i] & 4) == 4,
(allocation_map[i] & 8) == 8,
(allocation_map[i] & 16) == 16,
(allocation_map[i] & 32) == 32,
(allocation_map[i] & 64) == 64,
(allocation_map[i] & 128) == 128
);
}
free(fiemap);
return allocation_map;
}
int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map)
{
off64_t size;
*out_fd = open(filename, O_RDWR|O_DIRECT|O_SYNC);
if (*out_fd < 1)
return *out_fd;
size = lseek64(*out_fd, 0, SEEK_END);
if (size < 0)
return size;
if (out_size)
*out_size = size;
if (out_map) {
*out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED,
*out_fd, 0);
if (((long) *out_map) == -1)
return -1;
}
debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map);
return 0;
}
int writeloop(int filedes, const void *buffer, size_t size)
{
size_t written=0;
while (written < size) {
size_t result = write(filedes, buffer+written, size-written);
if (result == -1)
return -1;
written += result;
}
return 0;
}
int readloop(int filedes, void *buffer, size_t size)
{
size_t readden=0;
while (readden < size) {
size_t result = read(filedes, buffer+readden, size-readden);
if (result == 0 /* EOF */ || result == -1 /* error */)
return -1;
readden += result;
}
return 0;
}
int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count)
{
size_t sent=0;
while (sent < count) {
size_t result = sendfile64(out_fd, in_fd, offset, count-sent);
debug("sendfile64(out_fd=%d, in_fd=%d, offset=%p, count-sent=%ld) = %ld", out_fd, in_fd, offset, count-sent, result);
if (result == -1)
return -1;
sent += result;
debug("sent=%ld, count=%ld", sent, count);
}
debug("exiting sendfileloop");
return 0;
}
#include <errno.h>
ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags2)
{
const unsigned int flags = SPLICE_F_MORE|SPLICE_F_MOVE|flags2;
size_t spliced=0;
//debug("spliceloop(%d, %ld, %d, %ld, %ld)", fd_in, off_in ? *off_in : 0, fd_out, off_out ? *off_out : 0, len);
while (spliced < len) {
ssize_t result = splice(fd_in, off_in, fd_out, off_out, len, flags);
if (result < 0) {
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
if (errno == EAGAIN && (flags & SPLICE_F_NONBLOCK) ) {
return spliced;
}
else {
return -1;
}
} else {
spliced += result;
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
}
}
return spliced;
}
int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
{
int pipefd[2]; /* read end, write end */
size_t spliced=0;
if (pipe(pipefd) == -1)
return -1;
while (spliced < len) {
ssize_t run = len-spliced;
ssize_t s2, s1 = spliceloop(fd_in, NULL, pipefd[1], NULL, run, SPLICE_F_NONBLOCK);
/*if (run > 65535)
run = 65535;*/
if (s1 < 0)
break;
s2 = spliceloop(pipefd[0], NULL, fd_out, NULL, s1, 0);
if (s2 < 0)
break;
spliced += s2;
}
close(pipefd[0]);
close(pipefd[1]);
return spliced < len ? -1 : 0;
}
int read_until_newline(int fd, char* buf, int bufsize)
{
int cur;
bufsize -=1;
for (cur=0; cur < bufsize; cur++) {
int result = read(fd, buf+cur, 1);
if (result < 0)
return -1;
if (buf[cur] == 10)
break;
}
buf[cur++] = 0;
return cur;
}
int read_lines_until_blankline(int fd, int max_line_length, char ***lines)
{
int lines_count = 0;
char line[max_line_length+1];
*lines = NULL;
memset(line, 0, max_line_length+1);
while (1) {
if (read_until_newline(fd, line, max_line_length) < 0)
return lines_count;
*lines = xrealloc(*lines, (lines_count+1) * sizeof(char*));
(*lines)[lines_count] = strdup(line);
if ((*lines)[lines_count][0] == 0)
return lines_count;
lines_count++;
}
}

58
src/ioutil.h Normal file
View File

@@ -0,0 +1,58 @@
#ifndef __IOUTIL_H
#define __IOUTIL_H
#include "params.h"
/** Returns a bit field representing which blocks are allocated in file
* descriptor ''fd''. You must supply the size, and the resolution at which
* you want the bits to represent allocated blocks. If the OS represents
* allocated blocks at a finer resolution than you've asked for, any block
* or part block will count as "allocated" with the corresponding bit set.
*/
char* build_allocation_map(int fd, off64_t size, int resolution);
/** Repeat a write() operation that succeeds partially until ''size'' bytes
* are written, or an error is returned, when it returns -1 as usual.
*/
int writeloop(int filedes, const void *buffer, size_t size);
/** Repeat a read() operation that succeeds partially until ''size'' bytes
* are written, or an error is returned, when it returns -1 as usual.
*/
int readloop(int filedes, void *buffer, size_t size);
/** Repeat a sendfile() operation that succeeds partially until ''size'' bytes
* are written, or an error is returned, when it returns -1 as usual.
*/
int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count);
/** Copy ''len'' bytes from ''fd_in'' to ''fd_out'' by creating a temporary
* pipe and using the Linux splice call repeatedly until it has transferred
* all the data. Returns -1 on error.
*/
int splice_via_pipe_loop(int fd_in, int fd_out, size_t len);
/** Fill up to ''bufsize'' characters starting at ''buf'' with data from ''fd''
* until an LF character is received, which is written to the buffer at a zero
* byte. Returns -1 on error, or the number of bytes written to the buffer.
*/
int read_until_newline(int fd, char* buf, int bufsize);
/** Read a number of lines using read_until_newline, until an empty line is
* received (i.e. the sequence LF LF). The data is read from ''fd'' and
* lines must be a maximum of ''max_line_length''. The set of lines is
* returned as an array of zero-terminated strings; you must pass an address
* ''lines'' in which you want the address of this array returned.
*/
int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
/** Open the given ''filename'', determine its size, and mmap it in its
* entirety. The file descriptor is stored in ''out_fd'', the size in
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
* wrong, returns -1 setting errno, otherwise 0.
*/
int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map);
#endif

36
src/nbdtypes.h Normal file
View File

@@ -0,0 +1,36 @@
#ifndef __NBDTYPES_H
#define __NBDTYPES_H
/* http://linux.derkeiler.com/Mailing-Lists/Kernel/2003-09/2332.html */
#define INIT_PASSWD "NBDMAGIC"
#define INIT_MAGIC 0x0000420281861253
#define REQUEST_MAGIC 0x25609513
#define REPLY_MAGIC 0x67446698
#define REQUEST_READ 0
#define REQUEST_WRITE 1
#define REQUEST_DISCONNECT 2
#include <linux/types.h>
struct nbd_init {
char passwd[8];
__be64 magic;
__be64 size;
char reserved[128];
};
struct nbd_request {
__be32 magic;
__be32 type; /* == READ || == WRITE */
char handle[8];
__be64 from;
__be32 len;
} __attribute__((packed));
struct nbd_reply {
__be32 magic;
__be32 error; /* 0 = ok, else error */
char handle[8]; /* handle you got from request */
};
#endif

134
src/options.h Normal file
View File

@@ -0,0 +1,134 @@
#define GETOPT_ARG(x,s) {(x), 1, 0, (s)}
#define GETOPT_FLAG(x,v) {(x), 0, 0, (v)}
#define OPT_HELP "help"
#define OPT_ADDR "addr"
#define OPT_PORT "port"
#define OPT_FILE "file"
#define OPT_SOCK "sock"
#define OPT_FROM "from"
#define OPT_SIZE "size"
#define CMD_SERVE "serve"
#define CMD_READ "read"
#define CMD_WRITE "write"
#define CMD_ACL "acl"
#define CMD_MIRROR "mirror"
#define CMD_STATUS "status"
#define CMD_HELP "help"
#define LEN_CMD_MAX 6
#define PATH_LEN_MAX 1024
#define ADDR_LEN_MAX 64
#define IS_CMD(x,c) (strncmp((x),(c),(LEN_CMD_MAX)) == 0)
#define GETOPT_HELP GETOPT_FLAG( OPT_HELP, 'h' )
#define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' )
#define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' )
#define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' )
#define GETOPT_SOCK GETOPT_ARG( OPT_SOCK, 's' )
#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' )
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
static struct option serve_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_FILE,
GETOPT_SOCK,
{0}
};
static char serve_short_options[] = "hl:p:f:s:";
static char serve_help_text[] =
"Usage: flexnbd " CMD_SERVE " <options> [<acl address>*]\n\n"
"Serve FILE from ADDR:PORT, with an optional control socket at SOCK.\n\n"
"\t--" OPT_HELP ",-h\tThis text.\n"
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to serve on.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to serve on.\n"
"\t--" OPT_FILE ",-f <FILE>\tThe file to serve.\n"
"\t--" OPT_SOCK ",-s <SOCK>\tPath to the control socket to open.\n";
static struct option read_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_PORT,
GETOPT_FROM,
GETOPT_SIZE,
{0}
};
static char read_short_options[] = "hl:p:F:S:";
static char read_help_text[] =
"Usage: flexnbd " CMD_READ " <options>\n\n"
"Read SIZE bytes from a server at ADDR:PORT to stdout, starting at OFFSET.\n\n"
"\t--" OPT_HELP ",-h\tThis text.\n"
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to read from.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to read from.\n"
"\t--" OPT_FROM ",-F <OFFSET>\tByte offset to read from.\n"
"\t--" OPT_SIZE ",-S <SIZE>\tBytes to read.\n";
static struct option *write_options = read_options;
static char *write_short_options = read_short_options;
static char write_help_text[] =
"Usage: flexnbd " CMD_WRITE" <options>\n\n"
"Write SIZE bytes from stdin to a server at ADDR:PORT, starting at OFFSET.\n\n"
"\t--" OPT_HELP ",-h\tThis text.\n"
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to write to.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to write to.\n"
"\t--" OPT_FROM ",-F <OFFSET>\tByte offset to write from.\n"
"\t--" OPT_SIZE ",-S <SIZE>\tBytes to write.\n";
struct option acl_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
{0}
};
static char acl_short_options[] = "hs:";
static char acl_help_text[] =
"Usage: flexnbd " CMD_ACL " <options> [<acl address>+]\n\n"
"Set the access control list for a server with control socket SOCK.\n\n"
"\t--" OPT_HELP ",-h\tThis text.\n"
"\t--" OPT_SOCK ",-s <SOCK>\tPath to the control socket.\n";
struct option mirror_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
GETOPT_ADDR,
GETOPT_PORT,
{0}
};
static char mirror_short_options[] = "hs:l:p:";
static char mirror_help_text[] =
"Usage: flexnbd " CMD_MIRROR " <options>\n\n"
"Start mirroring from the server with control socket SOCK to one at ADDR:PORT.\n\n"
"\t--" OPT_HELP ",-h\tThis text.\n"
"\t--" OPT_SOCK ",-s <SOCK>\tPath to the control socket.\n"
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to mirror to.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to mirror to.\n";
struct option status_options[] = {
GETOPT_HELP,
GETOPT_SOCK,
{0}
};
static char status_short_options[] = "hs:";
static char status_help_text[] =
"Usage: flexnbd " CMD_STATUS " <options>\n\n"
"Get the status for a server with control socket SOCK.\n\n"
"\t--" OPT_HELP ",-h\tThis text.\n"
"\t--" OPT_SOCK ",-s <SOCK>\tPath to the control socket.\n";
static char help_help_text[] =
"Usage: flexnbd <cmd> [cmd options]\n\n"
"Commands:\n"
"\tflexnbd serve\n"
"\tflexnbd read\n"
"\tflexnbd write\n"
"\tflexnbd acl\n"
"\tflexnbd mirror\n"
"\tflexnbd status\n"
"\tflexnbd help\n\n"
"See flexnbd help <cmd> for further info\n";

109
src/params.h Normal file
View File

@@ -0,0 +1,109 @@
#ifndef __PARAMS_H
#define __PARAMS_H
#define _GNU_SOURCE
#define _LARGEFILE64_SOURCE
#include "parse.h"
#include <sys/types.h>
enum mirror_finish_action {
ACTION_PROXY,
ACTION_EXIT,
ACTION_NOTHING
};
struct mirror_status {
pthread_t thread;
int client;
char *filename;
off64_t max_bytes_per_second;
enum mirror_finish_action action_at_finish;
char *mapped;
struct bitset_mapping *dirty_map;
};
struct control_params {
int socket;
struct mode_serve_params* serve;
};
#define MAX_NBD_CLIENTS 16
struct mode_serve_params {
/** address/port to bind to */
union mysockaddr bind_to;
/** number of entries in current access control list*/
int acl_entries;
/** pointer to access control list entries*/
struct ip_and_mask (*acl)[0];
/** (static) file name to serve */
char* filename;
/** file name of INCOMPLETE flag */
char* filename_incomplete;
/** TCP backlog for listen() */
int tcp_backlog;
/** (static) file name of UNIX control socket (or NULL if none) */
char* control_socket_name;
/** size of file */
off64_t size;
/* NB dining philosophers if we ever mave more than one thread
* that might need to pause the whole server. At the moment we only
* have the one.
*/
/** Claimed around any accept/thread starting loop */
pthread_mutex_t l_accept;
/** Claims around any I/O to this file */
pthread_mutex_t l_io;
/** set to non-zero to cause r/w requests to go via this fd */
int proxy_fd;
/** to interrupt accept loop and clients, write() to close_signal[1] */
int close_signal[2];
struct mirror_status* mirror;
int server;
int control;
char* block_allocation_map;
struct { pthread_t thread; union mysockaddr address; }
nbd_client[MAX_NBD_CLIENTS];
};
struct mode_readwrite_params {
union mysockaddr connect_to;
off64_t from;
off64_t len;
int data_fd;
int client;
};
struct client_params {
int socket;
int fileno;
char* mapped;
struct mode_serve_params* serve; /* FIXME: remove above duplication */
};
/* FIXME: wrong place */
static inline void* sockaddr_address_data(struct sockaddr* sockaddr)
{
struct sockaddr_in* in = (struct sockaddr_in*) sockaddr;
struct sockaddr_in6* in6 = (struct sockaddr_in6*) sockaddr;
if (sockaddr->sa_family == AF_INET)
return &in->sin_addr;
if (sockaddr->sa_family == AF_INET6)
return &in6->sin6_addr;
return NULL;
}
#endif

91
src/parse.c Normal file
View File

@@ -0,0 +1,91 @@
#include "parse.h"
#include "util.h"
int atoi(const char *nptr);
#define IS_IP_VALID_CHAR(x) ( ((x) >= '0' && (x) <= '9' ) || \
((x) >= 'a' && (x) <= 'f') || \
((x) >= 'A' && (x) <= 'F' ) || \
(x) == ':' || (x) == '.' \
)
/* FIXME: should change this to return negative on error like everything else */
int parse_ip_to_sockaddr(struct sockaddr* out, char* src)
{
char temp[64];
struct sockaddr_in *v4 = (struct sockaddr_in *) out;
struct sockaddr_in6 *v6 = (struct sockaddr_in6 *) out;
/* allow user to start with [ and end with any other invalid char */
{
int i=0, j=0;
if (src[i] == '[')
i++;
for (; i<64 && IS_IP_VALID_CHAR(src[i]); i++)
temp[j++] = src[i];
temp[j] = 0;
}
if (temp[0] == '0' && temp[1] == '\0') {
v4->sin_family = AF_INET;
v4->sin_addr.s_addr = INADDR_ANY;
return 1;
}
if (inet_pton(AF_INET, temp, &v4->sin_addr) == 1) {
out->sa_family = AF_INET;
return 1;
}
if (inet_pton(AF_INET6, temp, &v6->sin6_addr) == 1) {
out->sa_family = AF_INET6;
return 1;
}
return 0;
}
int parse_acl(struct ip_and_mask (**out)[], int max, char **entries)
{
struct ip_and_mask (*list)[0];
int i;
if (max == 0) {
*out = NULL;
return 0;
}
else {
*out = xmalloc(max * sizeof(struct ip_and_mask));
debug("acl alloc: %p", *out);
}
list = *out;
for (i = 0; i < max; i++) {
# define MAX_MASK_BITS (outentry->ip.family == AF_INET ? 32 : 128)
int j;
struct ip_and_mask* outentry = list[i];
if (parse_ip_to_sockaddr(&outentry->ip.generic, entries[i]) == 0)
return i;
for (j=0; entries[i][j] && entries[i][j] != '/'; j++)
;
if (entries[i][j] == '/') {
outentry->mask = atoi(entries[i]+j+1);
if (outentry->mask < 1 || outentry->mask > MAX_MASK_BITS)
return i;
}
else
outentry->mask = MAX_MASK_BITS;
# undef MAX_MASK_BITS
debug("acl ptr[%d]: %p %d",i, outentry, outentry->mask);
}
for (i=0; i < max; i++) {
struct ip_and_mask* entry = list[i];
debug("acl entry %d @ %p has mask %d", i, entry, entry->mask);
}
return max;
}

24
src/parse.h Normal file
View File

@@ -0,0 +1,24 @@
#ifndef __PARSE_H
#define __PARSE_H
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
union mysockaddr {
unsigned short family;
struct sockaddr generic;
struct sockaddr_in v4;
struct sockaddr_in6 v6;
};
struct ip_and_mask {
union mysockaddr ip;
int mask;
};
int parse_ip_to_sockaddr(struct sockaddr* out, char* src);
int parse_acl(struct ip_and_mask (**out)[0], int max, char **entries);
#endif

124
src/readwrite.c Normal file
View File

@@ -0,0 +1,124 @@
#include "nbdtypes.h"
#include "ioutil.h"
#include "util.h"
#include "params.h"
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
int socket_connect(struct sockaddr* to)
{
int fd = socket(to->sa_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0);
SERVER_ERROR_ON_FAILURE(fd, "Couldn't create client socket");
SERVER_ERROR_ON_FAILURE(connect(fd, to, sizeof(struct sockaddr_in6)),
"connect failed");
return fd;
}
off64_t socket_nbd_read_hello(int fd)
{
struct nbd_init init;
SERVER_ERROR_ON_FAILURE(readloop(fd, &init, sizeof(init)),
"Couldn't read init");
if (strncmp(init.passwd, INIT_PASSWD, 8) != 0)
SERVER_ERROR("wrong passwd");
if (be64toh(init.magic) != INIT_MAGIC)
SERVER_ERROR("wrong magic (%x)", be64toh(init.magic));
return be64toh(init.size);
}
void fill_request(struct nbd_request *request, int type, int from, int len)
{
request->magic = htobe32(REQUEST_MAGIC);
request->type = htobe32(type);
((int*) request->handle)[0] = rand();
((int*) request->handle)[1] = rand();
request->from = htobe64(from);
request->len = htobe32(len);
}
void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply)
{
SERVER_ERROR_ON_FAILURE(readloop(fd, reply, sizeof(*reply)),
"Couldn't read reply");
if (be32toh(reply->magic) != REPLY_MAGIC)
SERVER_ERROR("Reply magic incorrect (%p)", be32toh(reply->magic));
if (be32toh(reply->error) != 0)
SERVER_ERROR("Server replied with error %d", be32toh(reply->error));
if (strncmp(request->handle, reply->handle, 8) != 0)
SERVER_ERROR("Did not reply with correct handle");
}
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf)
{
struct nbd_request request;
struct nbd_reply reply;
fill_request(&request, REQUEST_READ, from, len);
SERVER_ERROR_ON_FAILURE(writeloop(fd, &request, sizeof(request)),
"Couldn't write request");
read_reply(fd, &request, &reply);
if (out_buf) {
SERVER_ERROR_ON_FAILURE(readloop(fd, out_buf, len),
"Read failed");
}
else {
SERVER_ERROR_ON_FAILURE(
splice_via_pipe_loop(fd, out_fd, len),
"Splice failed"
);
}
}
void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf)
{
struct nbd_request request;
struct nbd_reply reply;
fill_request(&request, REQUEST_WRITE, from, len);
SERVER_ERROR_ON_FAILURE(writeloop(fd, &request, sizeof(request)),
"Couldn't write request");
if (in_buf) {
SERVER_ERROR_ON_FAILURE(writeloop(fd, in_buf, len),
"Write failed");
}
else {
SERVER_ERROR_ON_FAILURE(
splice_via_pipe_loop(in_fd, fd, len),
"Splice failed"
);
}
read_reply(fd, &request, &reply);
}
#define CHECK_RANGE(error_type) { \
off64_t size = socket_nbd_read_hello(params->client); \
if (params->from < 0 || (params->from + params->len) > size) \
SERVER_ERROR(error_type \
" request %d+%d is out of range given size %d", \
params->from, params->len, size\
); \
}
void do_read(struct mode_readwrite_params* params)
{
params->client = socket_connect(&params->connect_to.generic);
CHECK_RANGE("read");
socket_nbd_read(params->client, params->from, params->len,
params->data_fd, NULL);
close(params->client);
}
void do_write(struct mode_readwrite_params* params)
{
params->client = socket_connect(&params->connect_to.generic);
CHECK_RANGE("write");
socket_nbd_write(params->client, params->from, params->len,
params->data_fd, NULL);
close(params->client);
}

11
src/readwrite.h Normal file
View File

@@ -0,0 +1,11 @@
#ifndef __READWRITE_H
#define __READWRITE_H
int socket_connect(struct sockaddr* to);
off64_t socket_nbd_read_hello(int fd);
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf);
void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf);
#endif

51
src/remote.c Normal file
View File

@@ -0,0 +1,51 @@
#include "ioutil.h"
#include "util.h"
#include <stdlib.h>
#include <sys/un.h>
static const int max_response=1024;
void do_remote_command(char* command, char* socket_name, int argc, char** argv)
{
char newline=10;
int i;
int exit_status;
int remote = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un address;
char response[max_response];
memset(&address, 0, sizeof(address));
SERVER_ERROR_ON_FAILURE(remote, "Couldn't create client socket");
address.sun_family = AF_UNIX;
strncpy(address.sun_path, socket_name, sizeof(address.sun_path));
SERVER_ERROR_ON_FAILURE(
connect(remote, (struct sockaddr*) &address, sizeof(address)),
"Couldn't connect to %s", socket_name
);
write(remote, command, strlen(command));
write(remote, &newline, 1);
for (i=0; i<argc; i++) {
write(remote, argv[i], strlen(argv[i]));
write(remote, &newline, 1);
}
write(remote, &newline, 1);
SERVER_ERROR_ON_FAILURE(
read_until_newline(remote, response, max_response),
"Couldn't read response from %s", socket_name
);
exit_status = atoi(response);
if (exit_status > 0)
fprintf(stderr, "%s\n", strchr(response, ':')+2);
exit(atoi(response));
close(remote);
}

589
src/serve.c Normal file
View File

@@ -0,0 +1,589 @@
#include "params.h"
#include "nbdtypes.h"
#include "ioutil.h"
#include "util.h"
#include "bitset.h"
#include "control.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/un.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
static const int block_allocation_resolution = 4096;//128<<10;
static inline void dirty(struct mode_serve_params *serve, off64_t from, int len)
{
if (serve->mirror)
bitset_set_range(serve->mirror->dirty_map, from, len);
}
int server_detect_closed(struct mode_serve_params* serve)
{
int errno_old = errno;
int result = fcntl(serve->server, F_GETFD, 0) < 0;
errno = errno_old;
return result;
}
/**
* So waiting on client->socket is len bytes of data, and we must write it all
* to client->mapped. However while doing do we must consult the bitmap
* client->block_allocation_map, which is a bitmap where one bit represents
* block_allocation_resolution bytes. Where a bit isn't set, there are no
* disc blocks allocated for that portion of the file, and we'd like to keep
* it that way.
*
* If the bitmap shows that every block in our prospective write is already
* allocated, we can proceed as normal and make one call to writeloop.
*
*/
void write_not_zeroes(struct client_params* client, off64_t from, int len)
{
char *map = client->serve->block_allocation_map;
while (len > 0) {
/* so we have to calculate how much of our input to consider
* next based on the bitmap of allocated blocks. This will be
* at a coarser resolution than the actual write, which may
* not fall on a block boundary at either end. So we look up
* how many blocks our write covers, then cut off the start
* and end to get the exact number of bytes.
*/
int first_bit = from/block_allocation_resolution;
int last_bit = (from+len+block_allocation_resolution-1) /
block_allocation_resolution;
int run = bit_run_count(map, first_bit, last_bit-first_bit) *
block_allocation_resolution;
if (run > len)
run = len;
debug("write_not_zeroes: %ld+%d, first_bit=%d, last_bit=%d, run=%d",
from, len, first_bit, last_bit, run);
#define DO_READ(dst, len) CLIENT_ERROR_ON_FAILURE( \
readloop( \
client->socket, \
(dst), \
(len) \
), \
"read failed %ld+%d", from, (len) \
)
if (bit_is_set(map, from/block_allocation_resolution)) {
debug("writing the lot");
/* already allocated, just write it all */
DO_READ(client->mapped + from, run);
dirty(client->serve, from, run);
len -= run;
from += run;
}
else {
char zerobuffer[block_allocation_resolution];
/* not allocated, read in block_allocation_resoution */
while (run > 0) {
char *dst = client->mapped+from;
int bit = from/block_allocation_resolution;
int blockrun = block_allocation_resolution -
(from % block_allocation_resolution);
if (blockrun > run)
blockrun = run;
debug("writing partial: bit=%d, blockrun=%d (run=%d)",
bit, blockrun, run);
DO_READ(zerobuffer, blockrun);
/* This reads the buffer twice in the worst case
* but we're leaning on memcmp failing early
* and memcpy being fast, rather than try to
* hand-optimized something specific.
*/
if (zerobuffer[0] != 0 ||
memcmp(zerobuffer, zerobuffer + 1, blockrun - 1)) {
memcpy(dst, zerobuffer, blockrun);
bit_set(map, bit);
dirty(client->serve, from, blockrun);
debug("non-zero, copied and set bit %d", bit);
/* at this point we could choose to
* short-cut the rest of the write for
* faster I/O but by continuing to do it
* the slow way we preserve as much
* sparseness as possible.
*/
}
else {
debug("all zero, skip write");
}
len -= blockrun;
run -= blockrun;
from += blockrun;
}
}
}
}
int client_serve_request(struct client_params* client)
{
off64_t offset;
struct nbd_request request;
struct nbd_reply reply;
fd_set fds;
FD_ZERO(&fds);
FD_SET(client->socket, &fds);
FD_SET(client->serve->close_signal[0], &fds);
CLIENT_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, NULL, NULL, NULL),
"select() failed");
if (FD_ISSET(client->serve->close_signal[0], &fds))
return 1;
if (readloop(client->socket, &request, sizeof(request)) == -1) {
if (errno == 0) {
debug("EOF reading request");
return 1; /* neat point to close the socket */
}
else {
CLIENT_ERROR_ON_FAILURE(-1, "Error reading request");
}
}
reply.magic = htobe32(REPLY_MAGIC);
reply.error = htobe32(0);
memcpy(reply.handle, request.handle, 8);
debug("request type %d", be32toh(request.type));
if (be32toh(request.magic) != REQUEST_MAGIC)
CLIENT_ERROR("Bad magic %08x", be32toh(request.magic));
switch (be32toh(request.type))
{
case REQUEST_READ:
if (access(client->serve->filename_incomplete, F_OK) == 0 ) {
debug("read request while data incomplete");
reply.error = htobe32(10);
write(client->socket, &reply, sizeof(reply));
return 0;
}
case REQUEST_WRITE:
/* check it's not out of range */
if (be64toh(request.from) < 0 ||
be64toh(request.from)+be32toh(request.len) > client->serve->size) {
debug("request read %ld+%d out of range",
be64toh(request.from),
be32toh(request.len)
);
reply.error = htobe32(1);
write(client->socket, &reply, sizeof(reply));
return 0;
}
break;
case REQUEST_DISCONNECT:
debug("request disconnect");
return 1;
default:
CLIENT_ERROR("Unknown request %08x", be32toh(request.type));
}
CLIENT_ERROR_ON_FAILURE(
pthread_mutex_lock(&client->serve->l_io),
"Problem with I/O lock"
);
if (server_detect_closed(client->serve)) {
CLIENT_ERROR_ON_FAILURE(
pthread_mutex_unlock(&client->serve->l_io),
"Problem with I/O unlock"
);
return 1;
}
switch (be32toh(request.type))
{
case REQUEST_READ:
debug("request read %ld+%d", be64toh(request.from), be32toh(request.len));
write(client->socket, &reply, sizeof(reply));
offset = be64toh(request.from);
CLIENT_ERROR_ON_FAILURE(
sendfileloop(
client->socket,
client->fileno,
&offset,
be32toh(request.len)
),
"sendfile failed from=%ld, len=%d",
offset,
be32toh(request.len)
);
break;
case REQUEST_WRITE:
debug("request write %ld+%d", be64toh(request.from), be32toh(request.len));
if (client->serve->block_allocation_map) {
write_not_zeroes(
client,
be64toh(request.from),
be32toh(request.len)
);
}
else {
CLIENT_ERROR_ON_FAILURE(
readloop(
client->socket,
client->mapped + be64toh(request.from),
be32toh(request.len)
),
"read failed from=%ld, len=%d",
be64toh(request.from),
be32toh(request.len)
);
dirty(client->serve, be64toh(request.from), be32toh(request.len));
}
write(client->socket, &reply, sizeof(reply));
break;
}
CLIENT_ERROR_ON_FAILURE(
pthread_mutex_unlock(&client->serve->l_io),
"Problem with I/O unlock"
);
return 0;
}
void client_send_hello(struct client_params* client)
{
struct nbd_init init;
memcpy(init.passwd, INIT_PASSWD, sizeof(INIT_PASSWD));
init.magic = htobe64(INIT_MAGIC);
init.size = htobe64(client->serve->size);
memset(init.reserved, 0, 128);
CLIENT_ERROR_ON_FAILURE(
writeloop(client->socket, &init, sizeof(init)),
"Couldn't send hello"
);
}
void* client_serve(void* client_uncast)
{
struct client_params* client = (struct client_params*) client_uncast;
//client_open_file(client);
CLIENT_ERROR_ON_FAILURE(
open_and_mmap(
client->serve->filename,
&client->fileno,
NULL,
(void**) &client->mapped
),
"Couldn't open/mmap file %s", client->serve->filename
);
client_send_hello(client);
while (client_serve_request(client) == 0)
;
CLIENT_ERROR_ON_FAILURE(
close(client->socket),
"Couldn't close socket %d",
client->socket
);
close(client->socket);
close(client->fileno);
munmap(client->mapped, client->serve->size);
free(client);
return NULL;
}
static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 };
/** Test whether AF_INET or AF_INET6 sockaddr is included in the given access
* control list, returning 1 if it is, and 0 if not.
*/
int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], union mysockaddr* test)
{
int i;
for (i=0; i < list_length; i++) {
struct ip_and_mask *entry = &(*list)[i];
int testbits;
unsigned char *raw_address1, *raw_address2;
debug("checking acl entry %d (%d/%d)", i, test->generic.sa_family, entry->ip.family);
if (test->generic.sa_family != entry->ip.family)
continue;
if (test->generic.sa_family == AF_INET) {
debug("it's an AF_INET");
raw_address1 = (unsigned char*) &test->v4.sin_addr;
raw_address2 = (unsigned char*) &entry->ip.v4.sin_addr;
}
else if (test->generic.sa_family == AF_INET6) {
debug("it's an AF_INET6");
raw_address1 = (unsigned char*) &test->v6.sin6_addr;
raw_address2 = (unsigned char*) &entry->ip.v6.sin6_addr;
}
debug("testbits=%d", entry->mask);
for (testbits = entry->mask; testbits > 0; testbits -= 8) {
debug("testbits=%d, c1=%02x, c2=%02x", testbits, raw_address1[0], raw_address2[0]);
if (testbits >= 8) {
if (raw_address1[0] != raw_address2[0])
goto no_match;
}
else {
if ((raw_address1[0] & testmasks[testbits%8]) !=
(raw_address2[0] & testmasks[testbits%8]) )
goto no_match;
}
raw_address1++;
raw_address2++;
}
return 1;
no_match: ;
debug("no match");
}
return 0;
}
/** Prepares a listening socket for the NBD server, binding etc. */
void serve_open_server_socket(struct mode_serve_params* params)
{
int optval=1;
params->server = socket(params->bind_to.generic.sa_family == AF_INET ?
PF_INET : PF_INET6, SOCK_STREAM, 0);
SERVER_ERROR_ON_FAILURE(params->server,
"Couldn't create server socket");
SERVER_ERROR_ON_FAILURE(
setsockopt(params->server, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)),
"Couldn't set SO_REUSEADDR"
);
SERVER_ERROR_ON_FAILURE(
bind(params->server, &params->bind_to.generic,
sizeof(params->bind_to)),
"Couldn't bind server to IP address"
);
SERVER_ERROR_ON_FAILURE(
listen(params->server, params->tcp_backlog),
"Couldn't listen on server socket"
);
}
/** We can only accommodate MAX_NBD_CLIENTS connections at once. This function
* goes through the current list, waits for any threads that have finished
* and returns the next slot free (or -1 if there are none).
*/
int cleanup_and_find_client_slot(struct mode_serve_params* params)
{
int slot=-1, i;
for (i=0; i < MAX_NBD_CLIENTS; i++) {
void* status;
if (params->nbd_client[i].thread != 0) {
char s_client_address[64];
memset(s_client_address, 0, 64);
strcpy(s_client_address, "???");
inet_ntop(
params->nbd_client[i].address.generic.sa_family,
sockaddr_address_data(&params->nbd_client[i].address.generic), s_client_address,
64
);
if (pthread_tryjoin_np(params->nbd_client[i].thread, &status) < 0) {
if (errno != EBUSY)
SERVER_ERROR_ON_FAILURE(-1, "Problem with joining thread");
}
else {
uint64_t status1 = (uint64_t) status;
params->nbd_client[i].thread = 0;
debug("nbd thread %d exited (%s) with status %ld", (int) params->nbd_client[i].thread, s_client_address, status1);
}
}
if (params->nbd_client[i].thread == 0 && slot == -1)
slot = i;
}
return slot;
}
/** Dispatch function for accepting an NBD connection and starting a thread
* to handle it. Rejects the connection if there is an ACL, and the far end's
* address doesn't match, or if there are too many clients already connected.
*/
void accept_nbd_client(struct mode_serve_params* params, int client_fd, union mysockaddr* client_address)
{
struct client_params* client_params;
int slot = cleanup_and_find_client_slot(params);
char s_client_address[64];
if (inet_ntop(client_address->generic.sa_family, sockaddr_address_data(&client_address->generic), s_client_address, 64) == NULL) {
write(client_fd, "Bad client_address", 18);
close(client_fd);
return;
}
if (params->acl &&
!is_included_in_acl(params->acl_entries, params->acl, client_address)) {
write(client_fd, "Access control error", 20);
close(client_fd);
return;
}
if (slot < 0) {
write(client_fd, "Too many clients", 16);
close(client_fd);
return;
}
client_params = xmalloc(sizeof(struct client_params));
client_params->socket = client_fd;
client_params->serve = params;
if (pthread_create(&params->nbd_client[slot].thread, NULL, client_serve, client_params) < 0) {
write(client_fd, "Thread creation problem", 23);
free(client_params);
close(client_fd);
return;
}
memcpy(&params->nbd_client[slot].address, client_address,
sizeof(union mysockaddr));
debug("nbd thread %d started (%s)", (int) params->nbd_client[slot].thread, s_client_address);
}
/** Accept either an NBD or control socket connection, dispatch appropriately */
void serve_accept_loop(struct mode_serve_params* params)
{
while (1) {
int activity_fd, client_fd;
union mysockaddr client_address;
fd_set fds;
socklen_t socklen=sizeof(client_address);
FD_ZERO(&fds);
FD_SET(params->server, &fds);
FD_SET(params->close_signal[0], &fds);
if (params->control_socket_name)
FD_SET(params->control, &fds);
SERVER_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds,
NULL, NULL, NULL), "select() failed");
if (FD_ISSET(params->close_signal[0], &fds))
return;
activity_fd = FD_ISSET(params->server, &fds) ? params->server :
params->control;
client_fd = accept(activity_fd, &client_address.generic, &socklen);
SERVER_ERROR_ON_FAILURE(
pthread_mutex_lock(&params->l_accept),
"Problem with accept lock"
);
if (activity_fd == params->server)
accept_nbd_client(params, client_fd, &client_address);
if (activity_fd == params->control)
accept_control_connection(params, client_fd, &client_address);
SERVER_ERROR_ON_FAILURE(
pthread_mutex_unlock(&params->l_accept),
"Problem with accept unlock"
);
}
}
/** Initialisation function that sets up the initial allocation map, i.e. so
* we know which blocks of the file are allocated.
*/
void serve_init_allocation_map(struct mode_serve_params* params)
{
int fd = open(params->filename, O_RDONLY);
off64_t size;
SERVER_ERROR_ON_FAILURE(fd, "Couldn't open %s", params->filename);
size = lseek64(fd, 0, SEEK_END);
params->size = size;
SERVER_ERROR_ON_FAILURE(size, "Couldn't find size of %s",
params->filename);
params->block_allocation_map =
build_allocation_map(fd, size, block_allocation_resolution);
close(fd);
}
/** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct mode_serve_params* params)
{
int i;
close(params->server);
close(params->control);
if (params->acl)
free(params->acl);
//free(params->filename);
if (params->control_socket_name)
//free(params->control_socket_name);
pthread_mutex_destroy(&params->l_accept);
pthread_mutex_destroy(&params->l_io);
if (params->proxy_fd);
close(params->proxy_fd);
close(params->close_signal[0]);
close(params->close_signal[1]);
free(params->block_allocation_map);
if (params->mirror)
debug("mirror thread running! this should not happen!");
for (i=0; i < MAX_NBD_CLIENTS; i++) {
void* status;
if (params->nbd_client[i].thread != 0) {
debug("joining thread %d", i);
pthread_join(params->nbd_client[i].thread, &status);
}
}
}
/** Full lifecycle of the server */
void do_serve(struct mode_serve_params* params)
{
pthread_mutex_init(&params->l_accept, NULL);
pthread_mutex_init(&params->l_io, NULL);
SERVER_ERROR_ON_FAILURE(pipe(params->close_signal) , "pipe failed");
serve_open_server_socket(params);
serve_open_control_socket(params);
serve_init_allocation_map(params);
serve_accept_loop(params);
serve_cleanup(params);
}

61
src/util.c Normal file
View File

@@ -0,0 +1,61 @@
#include <stdarg.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <malloc.h>
#include <unistd.h>
#include "util.h"
static pthread_t main_thread;
void error_init()
{
main_thread = pthread_self();
}
void error(int consult_errno, int close_socket, pthread_mutex_t* unlock, const char* format, ...)
{
va_list argptr;
fprintf(stderr, "*** ");
va_start(argptr, format);
vfprintf(stderr, format, argptr);
va_end(argptr);
if (consult_errno) {
fprintf(stderr, " (errno=%d, %s)", errno, strerror(errno));
}
if (close_socket)
close(close_socket);
if (unlock)
pthread_mutex_unlock(unlock);
fprintf(stderr, "\n");
if (pthread_equal(pthread_self(), main_thread))
exit(1);
else
pthread_exit((void*) 1);
}
void* xrealloc(void* ptr, size_t size)
{
void* p = realloc(ptr, size);
if (p == NULL)
SERVER_ERROR("couldn't xrealloc %d bytes", size);
return p;
}
void* xmalloc(size_t size)
{
void* p = xrealloc(NULL, size);
memset(p, 0, size);
return p;
}

34
src/util.h Normal file
View File

@@ -0,0 +1,34 @@
#ifndef __UTIL_H
#define __UTIL_H
#include <stdio.h>
#include <pthread.h>
void error_init();
void error(int consult_errno, int close_socket, pthread_mutex_t* unlock, const char* format, ...);
void* xrealloc(void* ptr, size_t size);
void* xmalloc(size_t size);
#ifndef DEBUG
# define debug(msg, ...)
#else
# include <sys/times.h>
# define debug(msg, ...) fprintf(stderr, "%08x %4d: " msg "\n" , \
(int) pthread_self(), (int) clock(), ##__VA_ARGS__)
#endif
#define CLIENT_ERROR(msg, ...) \
error(0, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__)
#define CLIENT_ERROR_ON_FAILURE(test, msg, ...) \
if (test < 0) { error(1, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__); }
#define SERVER_ERROR(msg, ...) \
error(0, 0, NULL, msg, ##__VA_ARGS__)
#define SERVER_ERROR_ON_FAILURE(test, msg, ...) \
if (test < 0) { error(1, 0, NULL, msg, ##__VA_ARGS__); }
#endif