Rearranged the project to have src/ and build/ directories
This simplifies keeping everything clean.
This commit is contained in:
176
src/bitset.h
Normal file
176
src/bitset.h
Normal file
@@ -0,0 +1,176 @@
|
||||
#ifndef __BITSET_H
|
||||
#define __BITSET_H
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "util.h"
|
||||
|
||||
static inline char char_with_bit_set(int num) { return 1<<(num%8); }
|
||||
|
||||
/** Return 1 if the bit at ''idx'' in array ''b'' is set */
|
||||
static inline int bit_is_set(char* b, int idx) {
|
||||
return (b[idx/8] & char_with_bit_set(idx)) != 0;
|
||||
}
|
||||
/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
|
||||
static inline int bit_is_clear(char* b, int idx) {
|
||||
return !bit_is_set(b, idx);
|
||||
}
|
||||
/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
|
||||
static inline int bit_has_value(char* b, int idx, int value) {
|
||||
if (value)
|
||||
return bit_is_set(b, idx);
|
||||
else
|
||||
return bit_is_clear(b, idx);
|
||||
}
|
||||
/** Sets the bit ''idx'' in array ''b'' */
|
||||
static inline void bit_set(char* b, int idx) {
|
||||
b[idx/8] |= char_with_bit_set(idx);
|
||||
//__sync_fetch_and_or(b+(idx/8), char_with_bit_set(idx));
|
||||
}
|
||||
/** Clears the bit ''idx'' in array ''b'' */
|
||||
static inline void bit_clear(char* b, int idx) {
|
||||
b[idx/8] &= ~char_with_bit_set(idx);
|
||||
//__sync_fetch_and_nand(b+(idx/8), char_with_bit_set(idx));
|
||||
}
|
||||
/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
|
||||
static inline void bit_set_range(char* b, int from, int len) {
|
||||
for (; from%8 != 0 && len > 0; len--)
|
||||
bit_set(b, from++);
|
||||
if (len >= 8)
|
||||
memset(b+(from/8), 255, len/8);
|
||||
for (; len > 0; len--)
|
||||
bit_set(b, from++);
|
||||
}
|
||||
/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
|
||||
static inline void bit_clear_range(char* b, int from, int len) {
|
||||
for (; from%8 != 0 && len > 0; len--)
|
||||
bit_clear(b, from++);
|
||||
if (len >= 8)
|
||||
memset(b+(from/8), 0, len/8);
|
||||
for (; len > 0; len--)
|
||||
bit_clear(b, from++);
|
||||
}
|
||||
|
||||
/** Counts the number of contiguous bits in array ''b'', starting at ''from''
|
||||
* up to a maximum number of bits ''len''. Returns the number of contiguous
|
||||
* bits that are the same as the first one specified.
|
||||
*/
|
||||
static inline int bit_run_count(char* b, int from, int len) {
|
||||
int count;
|
||||
int first_value = bit_is_set(b, from);
|
||||
|
||||
for (count=0; len > 0 && bit_has_value(b, from+count, first_value); count++, len--)
|
||||
;
|
||||
|
||||
/* FIXME: debug this later */
|
||||
/*for (; (from+count) % 64 != 0 && len > 0; len--)
|
||||
if (bit_has_value(b, from+count, first_value))
|
||||
count++;
|
||||
else
|
||||
return count;
|
||||
for (; len >= 64; len-=64) {
|
||||
if (*((uint64_t*)(b + ((from+count)/8))) == UINT64_MAX)
|
||||
count += 64;
|
||||
else
|
||||
break;
|
||||
}
|
||||
for (; len > 0; len--)
|
||||
if (bit_is_set(b, from+count))
|
||||
count++;*/
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/** An application of a bitset - a bitset mapping represents a file of ''size''
|
||||
* broken down into ''resolution''-sized chunks. The bit set is assumed to
|
||||
* represent one bit per chunk.
|
||||
*/
|
||||
struct bitset_mapping {
|
||||
uint64_t size;
|
||||
int resolution;
|
||||
char bits[];
|
||||
};
|
||||
|
||||
/** Allocate a bitset_mapping for a file of the given size, and chunks of the
|
||||
* given resolution.
|
||||
*/
|
||||
static inline struct bitset_mapping* bitset_alloc(
|
||||
uint64_t size,
|
||||
int resolution
|
||||
)
|
||||
{
|
||||
struct bitset_mapping *bitset = xmalloc(
|
||||
sizeof(struct bitset_mapping)+
|
||||
(size+resolution-1)/resolution
|
||||
);
|
||||
bitset->size = size;
|
||||
bitset->resolution = resolution;
|
||||
return bitset;
|
||||
}
|
||||
|
||||
#define INT_FIRST_AND_LAST \
|
||||
int first = from/set->resolution, \
|
||||
last = (from+len-1)/set->resolution, \
|
||||
bitlen = last-first+1
|
||||
|
||||
/** Set the bits in a bitset which correspond to the given bytes in the larger
|
||||
* file.
|
||||
*/
|
||||
static inline void bitset_set_range(
|
||||
struct bitset_mapping* set,
|
||||
uint64_t from,
|
||||
uint64_t len)
|
||||
{
|
||||
INT_FIRST_AND_LAST;
|
||||
bit_set_range(set->bits, first, bitlen);
|
||||
}
|
||||
|
||||
/** Clear the bits in a bitset which correspond to the given bytes in the
|
||||
* larger file.
|
||||
*/
|
||||
static inline void bitset_clear_range(
|
||||
struct bitset_mapping* set,
|
||||
uint64_t from,
|
||||
uint64_t len)
|
||||
{
|
||||
INT_FIRST_AND_LAST;
|
||||
bit_clear_range(set->bits, first, bitlen);
|
||||
}
|
||||
|
||||
/** Counts the number of contiguous bytes that are represented as a run in
|
||||
* the bit field.
|
||||
*/
|
||||
static inline int bitset_run_count(
|
||||
struct bitset_mapping* set,
|
||||
uint64_t from,
|
||||
uint64_t len)
|
||||
{
|
||||
INT_FIRST_AND_LAST;
|
||||
return bit_run_count(set->bits, first, bitlen) * set->resolution;
|
||||
}
|
||||
|
||||
/** Tests whether the bit field is clear for the given file offset.
|
||||
*/
|
||||
static inline int bitset_is_clear_at(
|
||||
struct bitset_mapping* set,
|
||||
uint64_t at
|
||||
)
|
||||
{
|
||||
return bit_is_clear(set->bits, at/set->resolution);
|
||||
}
|
||||
|
||||
/** Tests whether the bit field is set for the given file offset.
|
||||
*/
|
||||
static inline int bitset_is_set_at(
|
||||
struct bitset_mapping* set,
|
||||
uint64_t at
|
||||
)
|
||||
{
|
||||
return bit_is_set(set->bits, at/set->resolution);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
384
src/control.c
Normal file
384
src/control.c
Normal file
@@ -0,0 +1,384 @@
|
||||
/* FlexNBD server (C) Bytemark Hosting 2012
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/** The control server responds on a UNIX socket and services our "remote"
|
||||
* commands which are used for changing the access control list, initiating
|
||||
* a mirror process, or asking for status. The protocol is pretty simple -
|
||||
* after connecting the client sends a series of LF-terminated lines, followed
|
||||
* by a blank line (i.e. double LF). The first line is taken to be the command
|
||||
* name to invoke, and the lines before the double LF are its arguments.
|
||||
*
|
||||
* These commands can be invoked remotely from the command line, with the
|
||||
* client code to be found in remote.c
|
||||
*/
|
||||
|
||||
#include "params.h"
|
||||
#include "util.h"
|
||||
#include "ioutil.h"
|
||||
#include "parse.h"
|
||||
#include "readwrite.h"
|
||||
#include "bitset.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/un.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/** The mirror code will split NBD writes, making them this long as a maximum */
|
||||
static const int mirror_longest_write = 8<<20;
|
||||
|
||||
/** If, during a mirror pass, we have sent this number of bytes or fewer, we
|
||||
* go to freeze the I/O and finish it off. This is just a guess.
|
||||
*/
|
||||
static const int mirror_last_pass_after_bytes_written = 100<<20;
|
||||
|
||||
/** The largest number of full passes we'll do - the last one will always
|
||||
* cause the I/O to freeze, however many bytes are left to copy.
|
||||
*/
|
||||
static const int mirror_maximum_passes = 7;
|
||||
|
||||
/** Thread launched to drive mirror process */
|
||||
void* mirror_runner(void* serve_params_uncast)
|
||||
{
|
||||
const int last_pass = mirror_maximum_passes-1;
|
||||
int pass;
|
||||
struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
|
||||
struct bitset_mapping *map = serve->mirror->dirty_map;
|
||||
|
||||
for (pass=0; pass < mirror_maximum_passes; pass++) {
|
||||
uint64_t current = 0;
|
||||
uint64_t written = 0;
|
||||
|
||||
debug("mirror start pass=%d", pass);
|
||||
|
||||
if (pass == last_pass) {
|
||||
/* last pass, stop everything else */
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_lock(&serve->l_accept),
|
||||
"Problem with accept lock"
|
||||
);
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_lock(&serve->l_io),
|
||||
"Problem with I/O lock"
|
||||
);
|
||||
}
|
||||
|
||||
while (current < serve->size) {
|
||||
int run;
|
||||
|
||||
run = bitset_run_count(map, current, mirror_longest_write);
|
||||
|
||||
debug("mirror current=%ld, run=%d", current, run);
|
||||
|
||||
/* FIXME: we could avoid sending sparse areas of the
|
||||
* disc here, and probably save a lot of bandwidth and
|
||||
* time (if we know the destination starts off zeroed).
|
||||
*/
|
||||
if (bitset_is_set_at(map, current)) {
|
||||
/* We've found a dirty area, send it */
|
||||
debug("^^^ writing");
|
||||
|
||||
/* We need to stop the main thread from working
|
||||
* because it might corrupt the dirty map. This
|
||||
* is likely to slow things down but will be
|
||||
* safe.
|
||||
*/
|
||||
if (pass < last_pass)
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_lock(&serve->l_io),
|
||||
"Problem with I/O lock"
|
||||
);
|
||||
|
||||
/** FIXME: do something useful with bytes/second */
|
||||
|
||||
/** FIXME: error handling code here won't unlock */
|
||||
socket_nbd_write(
|
||||
serve->mirror->client,
|
||||
current,
|
||||
run,
|
||||
0,
|
||||
serve->mirror->mapped + current
|
||||
);
|
||||
|
||||
/* now mark it clean */
|
||||
bitset_clear_range(map, current, run);
|
||||
|
||||
if (pass < last_pass)
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_unlock(&serve->l_io),
|
||||
"Problem with I/O unlock"
|
||||
);
|
||||
|
||||
written += run;
|
||||
}
|
||||
current += run;
|
||||
}
|
||||
|
||||
/* if we've not written anything */
|
||||
if (written < mirror_last_pass_after_bytes_written)
|
||||
pass = last_pass;
|
||||
}
|
||||
|
||||
switch (serve->mirror->action_at_finish)
|
||||
{
|
||||
case ACTION_PROXY:
|
||||
debug("proxy!");
|
||||
serve->proxy_fd = serve->mirror->client;
|
||||
/* don't close our file descriptor, we still need it! */
|
||||
break;
|
||||
case ACTION_EXIT:
|
||||
debug("exit!");
|
||||
write(serve->close_signal[1], serve, 1); /* any byte will do */
|
||||
/* fall through */
|
||||
case ACTION_NOTHING:
|
||||
debug("nothing!");
|
||||
close(serve->mirror->client);
|
||||
}
|
||||
|
||||
free(serve->mirror->dirty_map);
|
||||
free(serve->mirror);
|
||||
serve->mirror = NULL; /* and we're gone */
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_unlock(&serve->l_accept),
|
||||
"Problem with accept unlock"
|
||||
);
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_unlock(&serve->l_io),
|
||||
"Problem with I/O unlock"
|
||||
);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1)
|
||||
|
||||
/** Command parser to start mirror process from socket input */
|
||||
int control_mirror(struct control_params* client, int linesc, char** lines)
|
||||
{
|
||||
off64_t size, remote_size;
|
||||
int fd, map_fd;
|
||||
struct mirror_status *mirror;
|
||||
union mysockaddr connect_to;
|
||||
uint64_t max_bytes_per_second;
|
||||
int action_at_finish;
|
||||
|
||||
if (linesc < 2) {
|
||||
write_socket("1: mirror takes at least two parameters");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (parse_ip_to_sockaddr(&connect_to.generic, lines[0]) == 0) {
|
||||
write_socket("1: bad IP address");
|
||||
return -1;
|
||||
}
|
||||
|
||||
connect_to.v4.sin_port = atoi(lines[1]);
|
||||
if (connect_to.v4.sin_port < 0 || connect_to.v4.sin_port > 65535) {
|
||||
write_socket("1: bad IP port number");
|
||||
return -1;
|
||||
}
|
||||
connect_to.v4.sin_port = htobe16(connect_to.v4.sin_port);
|
||||
|
||||
max_bytes_per_second = 0;
|
||||
if (linesc > 2) {
|
||||
max_bytes_per_second = atoi(lines[2]);
|
||||
}
|
||||
|
||||
action_at_finish = ACTION_PROXY;
|
||||
if (linesc > 3) {
|
||||
if (strcmp("proxy", lines[3]) == 0)
|
||||
action_at_finish = ACTION_PROXY;
|
||||
else if (strcmp("exit", lines[3]) == 0)
|
||||
action_at_finish = ACTION_EXIT;
|
||||
else if (strcmp("nothing", lines[3]) == 0)
|
||||
action_at_finish = ACTION_NOTHING;
|
||||
else {
|
||||
write_socket("1: action must be one of 'proxy', 'exit' or 'nothing'");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (linesc > 4) {
|
||||
write_socket("1: unrecognised parameters to mirror");
|
||||
return -1;
|
||||
}
|
||||
|
||||
fd = socket_connect(&connect_to.generic);
|
||||
|
||||
remote_size = socket_nbd_read_hello(fd);
|
||||
remote_size = remote_size; // shush compiler
|
||||
|
||||
mirror = xmalloc(sizeof(struct mirror_status));
|
||||
mirror->client = fd;
|
||||
mirror->max_bytes_per_second = max_bytes_per_second;
|
||||
mirror->action_at_finish = action_at_finish;
|
||||
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
open_and_mmap(
|
||||
client->serve->filename,
|
||||
&map_fd,
|
||||
&size,
|
||||
(void**) &mirror->mapped
|
||||
),
|
||||
"Failed to open and mmap %s",
|
||||
client->serve->filename
|
||||
);
|
||||
|
||||
mirror->dirty_map = bitset_alloc(size, 4096);
|
||||
bitset_set_range(mirror->dirty_map, 0, size);
|
||||
|
||||
client->serve->mirror = mirror;
|
||||
|
||||
CLIENT_ERROR_ON_FAILURE( /* FIXME should free mirror on error */
|
||||
pthread_create(
|
||||
&mirror->thread,
|
||||
NULL,
|
||||
mirror_runner,
|
||||
client->serve
|
||||
),
|
||||
"Failed to create mirror thread"
|
||||
);
|
||||
|
||||
write_socket("0: mirror started");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Command parser to alter access control list from socket input */
|
||||
int control_acl(struct control_params* client, int linesc, char** lines)
|
||||
{
|
||||
int acl_entries = 0, parsed;
|
||||
struct ip_and_mask (*acl)[], (*old_acl)[];
|
||||
|
||||
parsed = parse_acl(&acl, linesc, lines);
|
||||
if (parsed != linesc) {
|
||||
write(client->socket, "1: bad spec: ", 13);
|
||||
write(client->socket, lines[parsed],
|
||||
strlen(lines[parsed]));
|
||||
write(client->socket, "\n", 1);
|
||||
free(acl);
|
||||
}
|
||||
else {
|
||||
old_acl = client->serve->acl;
|
||||
client->serve->acl = acl;
|
||||
client->serve->acl_entries = acl_entries;
|
||||
free(old_acl);
|
||||
write_socket("0: updated");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** FIXME: add some useful statistics */
|
||||
int control_status(struct control_params* client, int linesc, char** lines)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Master command parser for control socket connections, delegates quickly */
|
||||
void* control_serve(void* client_uncast)
|
||||
{
|
||||
struct control_params* client = (struct control_params*) client_uncast;
|
||||
char **lines = NULL;
|
||||
int finished=0;
|
||||
|
||||
while (!finished) {
|
||||
int i, linesc;
|
||||
linesc = read_lines_until_blankline(client->socket, 256, &lines);
|
||||
|
||||
if (linesc < 1)
|
||||
{
|
||||
write(client->socket, "9: missing command\n", 19);
|
||||
finished = 1;
|
||||
/* ignore failure */
|
||||
}
|
||||
else if (strcmp(lines[0], "acl") == 0) {
|
||||
if (control_acl(client, linesc-1, lines+1) < 0)
|
||||
finished = 1;
|
||||
}
|
||||
else if (strcmp(lines[0], "mirror") == 0) {
|
||||
if (control_mirror(client, linesc-1, lines+1) < 0)
|
||||
finished = 1;
|
||||
}
|
||||
else if (strcmp(lines[0], "status") == 0) {
|
||||
if (control_status(client, linesc-1, lines+1) < 0)
|
||||
finished = 1;
|
||||
}
|
||||
else {
|
||||
write(client->socket, "10: unknown command\n", 23);
|
||||
finished = 1;
|
||||
}
|
||||
|
||||
for (i=0; i<linesc; i++)
|
||||
free(lines[i]);
|
||||
free(lines);
|
||||
}
|
||||
|
||||
close(client->socket);
|
||||
free(client);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void accept_control_connection(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address)
|
||||
{
|
||||
pthread_t control_thread;
|
||||
struct control_params* control_params;
|
||||
|
||||
control_params = xmalloc(sizeof(struct control_params));
|
||||
control_params->socket = client_fd;
|
||||
control_params->serve = params;
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_create(
|
||||
&control_thread,
|
||||
NULL,
|
||||
control_serve,
|
||||
control_params
|
||||
),
|
||||
"Failed to create client thread"
|
||||
);
|
||||
}
|
||||
|
||||
void serve_open_control_socket(struct mode_serve_params* params)
|
||||
{
|
||||
struct sockaddr_un bind_address;
|
||||
|
||||
if (!params->control_socket_name)
|
||||
return;
|
||||
|
||||
params->control = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
SERVER_ERROR_ON_FAILURE(params->control,
|
||||
"Couldn't create control socket");
|
||||
|
||||
memset(&bind_address, 0, sizeof(bind_address));
|
||||
bind_address.sun_family = AF_UNIX;
|
||||
strncpy(bind_address.sun_path, params->control_socket_name, sizeof(bind_address.sun_path)-1);
|
||||
|
||||
unlink(params->control_socket_name); /* ignore failure */
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
bind(params->control, &bind_address, sizeof(bind_address)),
|
||||
"Couldn't bind control socket to %s",
|
||||
params->control_socket_name
|
||||
);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
listen(params->control, 5),
|
||||
"Couldn't listen on control socket"
|
||||
);
|
||||
}
|
||||
|
8
src/control.h
Normal file
8
src/control.h
Normal file
@@ -0,0 +1,8 @@
|
||||
#ifndef __CONTROL_H
|
||||
#define __CONTROL_H
|
||||
|
||||
void accept_control_connection(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address);
|
||||
void serve_open_control_socket(struct mode_serve_params* params);
|
||||
|
||||
#endif
|
||||
|
222
src/flexnbd.c
Normal file
222
src/flexnbd.c
Normal file
@@ -0,0 +1,222 @@
|
||||
/* FlexNBD server (C) Bytemark Hosting 2012
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/** main() function for parsing and dispatching commands. Each mode has
|
||||
* a corresponding structure which is filled in and passed to a do_ function
|
||||
* elsewhere in the program.
|
||||
*/
|
||||
|
||||
#include "params.h"
|
||||
#include "util.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
|
||||
void syntax()
|
||||
{
|
||||
fprintf(stderr,
|
||||
"Syntax: flexnbd serve <listen IP address> <port> <file> \\\n"
|
||||
" [full path to control socket] \\\n"
|
||||
" [allowed connection addresses ...]\n"
|
||||
" flexnbd read <IP address> <port> <offset> <length> > data\n"
|
||||
" flexnbd write <IP address> <port> <offset> <length> < data\n"
|
||||
" flexnbd write <IP address> <port> <offset> <file to write>\n"
|
||||
" flexnbd acl <control socket> [allowed connection addresses ...]\n"
|
||||
" flexnbd mirror <control socket> <dst IP address> <dst port>\n"
|
||||
" [bytes per second] [proxy|nothing|exit]"
|
||||
" flexnbd status <control socket>\n"
|
||||
);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
void params_serve(
|
||||
struct mode_serve_params* out,
|
||||
char* s_ip_address,
|
||||
char* s_port,
|
||||
char* s_file,
|
||||
int acl_entries,
|
||||
char** s_acl_entries /* first may actually be path to control socket */
|
||||
)
|
||||
{
|
||||
int parsed;
|
||||
|
||||
out->tcp_backlog = 10; /* does this need to be settable? */
|
||||
|
||||
if (s_ip_address == NULL)
|
||||
SERVER_ERROR("No IP address supplied");
|
||||
if (s_port == NULL)
|
||||
SERVER_ERROR("No port number supplied");
|
||||
if (s_file == NULL)
|
||||
SERVER_ERROR("No filename supplied");
|
||||
|
||||
if (parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address) == 0)
|
||||
SERVER_ERROR("Couldn't parse server address '%s' (use 0 if "
|
||||
"you want to bind to all IPs)", s_ip_address);
|
||||
|
||||
out->control_socket_name = NULL;
|
||||
|
||||
if (acl_entries > 0 && s_acl_entries[0][0] == '/') {
|
||||
out->control_socket_name = s_acl_entries[0];
|
||||
s_acl_entries++;
|
||||
acl_entries--;
|
||||
}
|
||||
|
||||
out->acl_entries = acl_entries;
|
||||
parsed = parse_acl(&out->acl, acl_entries, s_acl_entries);
|
||||
if (parsed != acl_entries)
|
||||
SERVER_ERROR("Bad ACL entry '%s'", s_acl_entries[parsed]);
|
||||
|
||||
out->bind_to.v4.sin_port = atoi(s_port);
|
||||
if (out->bind_to.v4.sin_port < 0 || out->bind_to.v4.sin_port > 65535)
|
||||
SERVER_ERROR("Port number must be >= 0 and <= 65535");
|
||||
out->bind_to.v4.sin_port = htobe16(out->bind_to.v4.sin_port);
|
||||
|
||||
out->filename = s_file;
|
||||
out->filename_incomplete = xmalloc(strlen(s_file)+11);
|
||||
strcpy(out->filename_incomplete, s_file);
|
||||
strcpy(out->filename_incomplete + strlen(s_file), ".INCOMPLETE");
|
||||
}
|
||||
|
||||
void params_readwrite(
|
||||
int write_not_read,
|
||||
struct mode_readwrite_params* out,
|
||||
char* s_ip_address,
|
||||
char* s_port,
|
||||
char* s_from,
|
||||
char* s_length_or_filename
|
||||
)
|
||||
{
|
||||
if (s_ip_address == NULL)
|
||||
SERVER_ERROR("No IP address supplied");
|
||||
if (s_port == NULL)
|
||||
SERVER_ERROR("No port number supplied");
|
||||
if (s_from == NULL)
|
||||
SERVER_ERROR("No from supplied");
|
||||
if (s_length_or_filename == NULL)
|
||||
SERVER_ERROR("No length supplied");
|
||||
|
||||
if (parse_ip_to_sockaddr(&out->connect_to.generic, s_ip_address) == 0)
|
||||
SERVER_ERROR("Couldn't parse connection address '%s'",
|
||||
s_ip_address);
|
||||
|
||||
/* FIXME: duplicated from above */
|
||||
out->connect_to.v4.sin_port = atoi(s_port);
|
||||
if (out->connect_to.v4.sin_port < 0 || out->connect_to.v4.sin_port > 65535)
|
||||
SERVER_ERROR("Port number must be >= 0 and <= 65535");
|
||||
out->connect_to.v4.sin_port = htobe16(out->connect_to.v4.sin_port);
|
||||
|
||||
out->from = atol(s_from);
|
||||
|
||||
if (write_not_read) {
|
||||
if (s_length_or_filename[0]-48 < 10) {
|
||||
out->len = atol(s_length_or_filename);
|
||||
out->data_fd = 0;
|
||||
}
|
||||
else {
|
||||
out->data_fd = open(
|
||||
s_length_or_filename, O_RDONLY);
|
||||
SERVER_ERROR_ON_FAILURE(out->data_fd,
|
||||
"Couldn't open %s", s_length_or_filename);
|
||||
out->len = lseek64(out->data_fd, 0, SEEK_END);
|
||||
SERVER_ERROR_ON_FAILURE(out->len,
|
||||
"Couldn't find length of %s", s_length_or_filename);
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
lseek64(out->data_fd, 0, SEEK_SET),
|
||||
"Couldn't rewind %s", s_length_or_filename
|
||||
);
|
||||
}
|
||||
}
|
||||
else {
|
||||
out->len = atol(s_length_or_filename);
|
||||
out->data_fd = 1;
|
||||
}
|
||||
}
|
||||
|
||||
void do_serve(struct mode_serve_params* params);
|
||||
void do_read(struct mode_readwrite_params* params);
|
||||
void do_write(struct mode_readwrite_params* params);
|
||||
void do_remote_command(char* command, char* mode, int argc, char** argv);
|
||||
|
||||
union mode_params {
|
||||
struct mode_serve_params serve;
|
||||
struct mode_readwrite_params readwrite;
|
||||
};
|
||||
|
||||
void mode(char* mode, int argc, char **argv)
|
||||
{
|
||||
union mode_params params;
|
||||
memset(¶ms, 0, sizeof(params));
|
||||
|
||||
if (strcmp(mode, "serve") == 0) {
|
||||
if (argc >= 3) {
|
||||
params_serve(¶ms.serve, argv[0], argv[1], argv[2], argc-3, argv+3);
|
||||
do_serve(¶ms.serve);
|
||||
}
|
||||
else {
|
||||
syntax();
|
||||
}
|
||||
}
|
||||
else if (strcmp(mode, "read") == 0 ) {
|
||||
if (argc == 4) {
|
||||
params_readwrite(0, ¶ms.readwrite, argv[0], argv[1], argv[2], argv[3]);
|
||||
do_read(¶ms.readwrite);
|
||||
}
|
||||
else {
|
||||
syntax();
|
||||
}
|
||||
}
|
||||
else if (strcmp(mode, "write") == 0 ) {
|
||||
if (argc == 4) {
|
||||
params_readwrite(1, ¶ms.readwrite, argv[0], argv[1], argv[2], argv[3]);
|
||||
do_write(¶ms.readwrite);
|
||||
}
|
||||
else {
|
||||
syntax();
|
||||
}
|
||||
}
|
||||
else if (strcmp(mode, "acl") == 0 || strcmp(mode, "mirror") == 0 || strcmp(mode, "status") == 0) {
|
||||
if (argc >= 1) {
|
||||
do_remote_command(mode, argv[0], argc-1, argv+1);
|
||||
}
|
||||
else {
|
||||
syntax();
|
||||
}
|
||||
}
|
||||
else {
|
||||
syntax();
|
||||
}
|
||||
exit(0);
|
||||
}
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
|
||||
error_init();
|
||||
|
||||
if (argc < 2)
|
||||
syntax();
|
||||
mode(argv[1], argc-2, argv+2); /* never returns */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
237
src/ioutil.c
Normal file
237
src/ioutil.c
Normal file
@@ -0,0 +1,237 @@
|
||||
#define _LARGEFILE64_SOURCE
|
||||
#define _GNU_SOURCE
|
||||
|
||||
#include <sys/mman.h>
|
||||
#include <sys/sendfile.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/types.h>
|
||||
#include <linux/fs.h>
|
||||
#include <linux/fiemap.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "util.h"
|
||||
#include "bitset.h"
|
||||
|
||||
char* build_allocation_map(int fd, off64_t size, int resolution)
|
||||
{
|
||||
int i;
|
||||
char *allocation_map = xmalloc((size+resolution)/resolution);
|
||||
struct fiemap *fiemap_count, *fiemap;
|
||||
|
||||
fiemap_count = (struct fiemap*) xmalloc(sizeof(struct fiemap));
|
||||
|
||||
fiemap_count->fm_start = 0;
|
||||
fiemap_count->fm_length = size;
|
||||
fiemap_count->fm_flags = 0;
|
||||
fiemap_count->fm_extent_count = 0;
|
||||
fiemap_count->fm_mapped_extents = 0;
|
||||
|
||||
/* Find out how many extents there are */
|
||||
if (ioctl(fd, FS_IOC_FIEMAP, fiemap_count) < 0)
|
||||
return NULL;
|
||||
|
||||
/* Resize fiemap to allow us to read in the extents */
|
||||
fiemap = (struct fiemap*)xmalloc(
|
||||
sizeof(struct fiemap) + (
|
||||
sizeof(struct fiemap_extent) *
|
||||
fiemap_count->fm_mapped_extents
|
||||
)
|
||||
);
|
||||
|
||||
/* realloc makes valgrind complain a lot */
|
||||
memcpy(fiemap, fiemap_count, sizeof(struct fiemap));
|
||||
|
||||
fiemap->fm_extent_count = fiemap->fm_mapped_extents;
|
||||
fiemap->fm_mapped_extents = 0;
|
||||
|
||||
if (ioctl(fd, FS_IOC_FIEMAP, fiemap) < 0)
|
||||
return NULL;
|
||||
|
||||
for (i=0;i<fiemap->fm_mapped_extents;i++) {
|
||||
int first_bit = fiemap->fm_extents[i].fe_logical / resolution;
|
||||
int last_bit = (fiemap->fm_extents[i].fe_logical +
|
||||
fiemap->fm_extents[i].fe_length + resolution - 1) /
|
||||
resolution;
|
||||
int run = last_bit - first_bit;
|
||||
|
||||
bit_set_range(allocation_map, first_bit, run);
|
||||
}
|
||||
|
||||
for (i=0; i<16; i++) {
|
||||
debug("map[%d] = %d%d%d%d%d%d%d%d",
|
||||
i,
|
||||
(allocation_map[i] & 1) == 1,
|
||||
(allocation_map[i] & 2) == 2,
|
||||
(allocation_map[i] & 4) == 4,
|
||||
(allocation_map[i] & 8) == 8,
|
||||
(allocation_map[i] & 16) == 16,
|
||||
(allocation_map[i] & 32) == 32,
|
||||
(allocation_map[i] & 64) == 64,
|
||||
(allocation_map[i] & 128) == 128
|
||||
);
|
||||
}
|
||||
|
||||
free(fiemap);
|
||||
|
||||
return allocation_map;
|
||||
}
|
||||
|
||||
int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map)
|
||||
{
|
||||
off64_t size;
|
||||
|
||||
*out_fd = open(filename, O_RDWR|O_DIRECT|O_SYNC);
|
||||
if (*out_fd < 1)
|
||||
return *out_fd;
|
||||
|
||||
size = lseek64(*out_fd, 0, SEEK_END);
|
||||
if (size < 0)
|
||||
return size;
|
||||
if (out_size)
|
||||
*out_size = size;
|
||||
|
||||
if (out_map) {
|
||||
*out_map = mmap64(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED,
|
||||
*out_fd, 0);
|
||||
if (((long) *out_map) == -1)
|
||||
return -1;
|
||||
}
|
||||
debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int writeloop(int filedes, const void *buffer, size_t size)
|
||||
{
|
||||
size_t written=0;
|
||||
while (written < size) {
|
||||
size_t result = write(filedes, buffer+written, size-written);
|
||||
if (result == -1)
|
||||
return -1;
|
||||
written += result;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int readloop(int filedes, void *buffer, size_t size)
|
||||
{
|
||||
size_t readden=0;
|
||||
while (readden < size) {
|
||||
size_t result = read(filedes, buffer+readden, size-readden);
|
||||
if (result == 0 /* EOF */ || result == -1 /* error */)
|
||||
return -1;
|
||||
readden += result;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count)
|
||||
{
|
||||
size_t sent=0;
|
||||
while (sent < count) {
|
||||
size_t result = sendfile64(out_fd, in_fd, offset, count-sent);
|
||||
debug("sendfile64(out_fd=%d, in_fd=%d, offset=%p, count-sent=%ld) = %ld", out_fd, in_fd, offset, count-sent, result);
|
||||
|
||||
if (result == -1)
|
||||
return -1;
|
||||
sent += result;
|
||||
debug("sent=%ld, count=%ld", sent, count);
|
||||
}
|
||||
debug("exiting sendfileloop");
|
||||
return 0;
|
||||
}
|
||||
|
||||
#include <errno.h>
|
||||
ssize_t spliceloop(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags2)
|
||||
{
|
||||
const unsigned int flags = SPLICE_F_MORE|SPLICE_F_MOVE|flags2;
|
||||
size_t spliced=0;
|
||||
|
||||
//debug("spliceloop(%d, %ld, %d, %ld, %ld)", fd_in, off_in ? *off_in : 0, fd_out, off_out ? *off_out : 0, len);
|
||||
|
||||
while (spliced < len) {
|
||||
ssize_t result = splice(fd_in, off_in, fd_out, off_out, len, flags);
|
||||
if (result < 0) {
|
||||
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
|
||||
if (errno == EAGAIN && (flags & SPLICE_F_NONBLOCK) ) {
|
||||
return spliced;
|
||||
}
|
||||
else {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
spliced += result;
|
||||
//debug("result=%ld (%s), spliced=%ld, len=%ld", result, strerror(errno), spliced, len);
|
||||
}
|
||||
}
|
||||
|
||||
return spliced;
|
||||
}
|
||||
|
||||
int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
|
||||
{
|
||||
|
||||
int pipefd[2]; /* read end, write end */
|
||||
size_t spliced=0;
|
||||
|
||||
if (pipe(pipefd) == -1)
|
||||
return -1;
|
||||
|
||||
while (spliced < len) {
|
||||
ssize_t run = len-spliced;
|
||||
ssize_t s2, s1 = spliceloop(fd_in, NULL, pipefd[1], NULL, run, SPLICE_F_NONBLOCK);
|
||||
/*if (run > 65535)
|
||||
run = 65535;*/
|
||||
if (s1 < 0)
|
||||
break;
|
||||
|
||||
s2 = spliceloop(pipefd[0], NULL, fd_out, NULL, s1, 0);
|
||||
if (s2 < 0)
|
||||
break;
|
||||
spliced += s2;
|
||||
}
|
||||
close(pipefd[0]);
|
||||
close(pipefd[1]);
|
||||
|
||||
return spliced < len ? -1 : 0;
|
||||
}
|
||||
|
||||
int read_until_newline(int fd, char* buf, int bufsize)
|
||||
{
|
||||
int cur;
|
||||
bufsize -=1;
|
||||
|
||||
for (cur=0; cur < bufsize; cur++) {
|
||||
int result = read(fd, buf+cur, 1);
|
||||
if (result < 0)
|
||||
return -1;
|
||||
if (buf[cur] == 10)
|
||||
break;
|
||||
}
|
||||
buf[cur++] = 0;
|
||||
|
||||
return cur;
|
||||
}
|
||||
|
||||
int read_lines_until_blankline(int fd, int max_line_length, char ***lines)
|
||||
{
|
||||
int lines_count = 0;
|
||||
char line[max_line_length+1];
|
||||
*lines = NULL;
|
||||
|
||||
memset(line, 0, max_line_length+1);
|
||||
|
||||
while (1) {
|
||||
if (read_until_newline(fd, line, max_line_length) < 0)
|
||||
return lines_count;
|
||||
*lines = xrealloc(*lines, (lines_count+1) * sizeof(char*));
|
||||
(*lines)[lines_count] = strdup(line);
|
||||
if ((*lines)[lines_count][0] == 0)
|
||||
return lines_count;
|
||||
lines_count++;
|
||||
}
|
||||
}
|
||||
|
58
src/ioutil.h
Normal file
58
src/ioutil.h
Normal file
@@ -0,0 +1,58 @@
|
||||
#ifndef __IOUTIL_H
|
||||
#define __IOUTIL_H
|
||||
|
||||
|
||||
#include "params.h"
|
||||
|
||||
/** Returns a bit field representing which blocks are allocated in file
|
||||
* descriptor ''fd''. You must supply the size, and the resolution at which
|
||||
* you want the bits to represent allocated blocks. If the OS represents
|
||||
* allocated blocks at a finer resolution than you've asked for, any block
|
||||
* or part block will count as "allocated" with the corresponding bit set.
|
||||
*/
|
||||
char* build_allocation_map(int fd, off64_t size, int resolution);
|
||||
|
||||
/** Repeat a write() operation that succeeds partially until ''size'' bytes
|
||||
* are written, or an error is returned, when it returns -1 as usual.
|
||||
*/
|
||||
int writeloop(int filedes, const void *buffer, size_t size);
|
||||
|
||||
/** Repeat a read() operation that succeeds partially until ''size'' bytes
|
||||
* are written, or an error is returned, when it returns -1 as usual.
|
||||
*/
|
||||
int readloop(int filedes, void *buffer, size_t size);
|
||||
|
||||
/** Repeat a sendfile() operation that succeeds partially until ''size'' bytes
|
||||
* are written, or an error is returned, when it returns -1 as usual.
|
||||
*/
|
||||
int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count);
|
||||
|
||||
/** Copy ''len'' bytes from ''fd_in'' to ''fd_out'' by creating a temporary
|
||||
* pipe and using the Linux splice call repeatedly until it has transferred
|
||||
* all the data. Returns -1 on error.
|
||||
*/
|
||||
int splice_via_pipe_loop(int fd_in, int fd_out, size_t len);
|
||||
|
||||
/** Fill up to ''bufsize'' characters starting at ''buf'' with data from ''fd''
|
||||
* until an LF character is received, which is written to the buffer at a zero
|
||||
* byte. Returns -1 on error, or the number of bytes written to the buffer.
|
||||
*/
|
||||
int read_until_newline(int fd, char* buf, int bufsize);
|
||||
|
||||
/** Read a number of lines using read_until_newline, until an empty line is
|
||||
* received (i.e. the sequence LF LF). The data is read from ''fd'' and
|
||||
* lines must be a maximum of ''max_line_length''. The set of lines is
|
||||
* returned as an array of zero-terminated strings; you must pass an address
|
||||
* ''lines'' in which you want the address of this array returned.
|
||||
*/
|
||||
int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
|
||||
|
||||
/** Open the given ''filename'', determine its size, and mmap it in its
|
||||
* entirety. The file descriptor is stored in ''out_fd'', the size in
|
||||
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
|
||||
* wrong, returns -1 setting errno, otherwise 0.
|
||||
*/
|
||||
int open_and_mmap(char* filename, int* out_fd, off64_t *out_size, void **out_map);
|
||||
|
||||
#endif
|
||||
|
36
src/nbdtypes.h
Normal file
36
src/nbdtypes.h
Normal file
@@ -0,0 +1,36 @@
|
||||
#ifndef __NBDTYPES_H
|
||||
#define __NBDTYPES_H
|
||||
|
||||
/* http://linux.derkeiler.com/Mailing-Lists/Kernel/2003-09/2332.html */
|
||||
#define INIT_PASSWD "NBDMAGIC"
|
||||
#define INIT_MAGIC 0x0000420281861253
|
||||
#define REQUEST_MAGIC 0x25609513
|
||||
#define REPLY_MAGIC 0x67446698
|
||||
#define REQUEST_READ 0
|
||||
#define REQUEST_WRITE 1
|
||||
#define REQUEST_DISCONNECT 2
|
||||
|
||||
#include <linux/types.h>
|
||||
struct nbd_init {
|
||||
char passwd[8];
|
||||
__be64 magic;
|
||||
__be64 size;
|
||||
char reserved[128];
|
||||
};
|
||||
|
||||
struct nbd_request {
|
||||
__be32 magic;
|
||||
__be32 type; /* == READ || == WRITE */
|
||||
char handle[8];
|
||||
__be64 from;
|
||||
__be32 len;
|
||||
} __attribute__((packed));
|
||||
|
||||
struct nbd_reply {
|
||||
__be32 magic;
|
||||
__be32 error; /* 0 = ok, else error */
|
||||
char handle[8]; /* handle you got from request */
|
||||
};
|
||||
|
||||
#endif
|
||||
|
109
src/params.h
Normal file
109
src/params.h
Normal file
@@ -0,0 +1,109 @@
|
||||
#ifndef __PARAMS_H
|
||||
#define __PARAMS_H
|
||||
|
||||
#define _GNU_SOURCE
|
||||
#define _LARGEFILE64_SOURCE
|
||||
|
||||
#include "parse.h"
|
||||
|
||||
#include <sys/types.h>
|
||||
|
||||
enum mirror_finish_action {
|
||||
ACTION_PROXY,
|
||||
ACTION_EXIT,
|
||||
ACTION_NOTHING
|
||||
};
|
||||
|
||||
struct mirror_status {
|
||||
pthread_t thread;
|
||||
int client;
|
||||
char *filename;
|
||||
off64_t max_bytes_per_second;
|
||||
enum mirror_finish_action action_at_finish;
|
||||
|
||||
char *mapped;
|
||||
struct bitset_mapping *dirty_map;
|
||||
};
|
||||
|
||||
struct control_params {
|
||||
int socket;
|
||||
struct mode_serve_params* serve;
|
||||
};
|
||||
|
||||
#define MAX_NBD_CLIENTS 16
|
||||
struct mode_serve_params {
|
||||
/** address/port to bind to */
|
||||
union mysockaddr bind_to;
|
||||
/** number of entries in current access control list*/
|
||||
int acl_entries;
|
||||
/** pointer to access control list entries*/
|
||||
struct ip_and_mask (*acl)[0];
|
||||
/** (static) file name to serve */
|
||||
char* filename;
|
||||
/** file name of INCOMPLETE flag */
|
||||
char* filename_incomplete;
|
||||
/** TCP backlog for listen() */
|
||||
int tcp_backlog;
|
||||
/** (static) file name of UNIX control socket (or NULL if none) */
|
||||
char* control_socket_name;
|
||||
/** size of file */
|
||||
off64_t size;
|
||||
|
||||
/* NB dining philosophers if we ever mave more than one thread
|
||||
* that might need to pause the whole server. At the moment we only
|
||||
* have the one.
|
||||
*/
|
||||
|
||||
/** Claimed around any accept/thread starting loop */
|
||||
pthread_mutex_t l_accept;
|
||||
/** Claims around any I/O to this file */
|
||||
pthread_mutex_t l_io;
|
||||
|
||||
/** set to non-zero to cause r/w requests to go via this fd */
|
||||
int proxy_fd;
|
||||
|
||||
/** to interrupt accept loop and clients, write() to close_signal[1] */
|
||||
int close_signal[2];
|
||||
|
||||
struct mirror_status* mirror;
|
||||
int server;
|
||||
int control;
|
||||
|
||||
char* block_allocation_map;
|
||||
|
||||
struct { pthread_t thread; struct sockaddr address; }
|
||||
nbd_client[MAX_NBD_CLIENTS];
|
||||
};
|
||||
|
||||
struct mode_readwrite_params {
|
||||
union mysockaddr connect_to;
|
||||
off64_t from;
|
||||
off64_t len;
|
||||
int data_fd;
|
||||
int client;
|
||||
};
|
||||
|
||||
struct client_params {
|
||||
int socket;
|
||||
|
||||
int fileno;
|
||||
char* mapped;
|
||||
|
||||
struct mode_serve_params* serve; /* FIXME: remove above duplication */
|
||||
};
|
||||
|
||||
/* FIXME: wrong place */
|
||||
static inline void* sockaddr_address_data(struct sockaddr* sockaddr)
|
||||
{
|
||||
struct sockaddr_in* in = (struct sockaddr_in*) sockaddr;
|
||||
struct sockaddr_in6* in6 = (struct sockaddr_in6*) sockaddr;
|
||||
|
||||
if (sockaddr->sa_family == AF_INET)
|
||||
return &in->sin_addr;
|
||||
if (sockaddr->sa_family == AF_INET6)
|
||||
return &in6->sin6_addr;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
91
src/parse.c
Normal file
91
src/parse.c
Normal file
@@ -0,0 +1,91 @@
|
||||
#include "parse.h"
|
||||
#include "util.h"
|
||||
|
||||
int atoi(const char *nptr);
|
||||
|
||||
#define IS_IP_VALID_CHAR(x) ( ((x) >= '0' && (x) <= '9' ) || \
|
||||
((x) >= 'a' && (x) <= 'f') || \
|
||||
((x) >= 'A' && (x) <= 'F' ) || \
|
||||
(x) == ':' || (x) == '.' \
|
||||
)
|
||||
/* FIXME: should change this to return negative on error like everything else */
|
||||
int parse_ip_to_sockaddr(struct sockaddr* out, char* src)
|
||||
{
|
||||
char temp[64];
|
||||
struct sockaddr_in *v4 = (struct sockaddr_in *) out;
|
||||
struct sockaddr_in6 *v6 = (struct sockaddr_in6 *) out;
|
||||
|
||||
/* allow user to start with [ and end with any other invalid char */
|
||||
{
|
||||
int i=0, j=0;
|
||||
if (src[i] == '[')
|
||||
i++;
|
||||
for (; i<64 && IS_IP_VALID_CHAR(src[i]); i++)
|
||||
temp[j++] = src[i];
|
||||
temp[j] = 0;
|
||||
}
|
||||
|
||||
if (temp[0] == '0' && temp[1] == '\0') {
|
||||
v4->sin_family = AF_INET;
|
||||
v4->sin_addr.s_addr = INADDR_ANY;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (inet_pton(AF_INET, temp, &v4->sin_addr) == 1) {
|
||||
out->sa_family = AF_INET;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (inet_pton(AF_INET6, temp, &v6->sin6_addr) == 1) {
|
||||
out->sa_family = AF_INET6;
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int parse_acl(struct ip_and_mask (**out)[], int max, char **entries)
|
||||
{
|
||||
struct ip_and_mask (*list)[0];
|
||||
int i;
|
||||
|
||||
if (max == 0) {
|
||||
*out = NULL;
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
*out = xmalloc(max * sizeof(struct ip_and_mask));
|
||||
debug("acl alloc: %p", *out);
|
||||
}
|
||||
|
||||
list = *out;
|
||||
|
||||
for (i = 0; i < max; i++) {
|
||||
# define MAX_MASK_BITS (outentry->ip.family == AF_INET ? 32 : 128)
|
||||
int j;
|
||||
struct ip_and_mask* outentry = list[i];
|
||||
|
||||
if (parse_ip_to_sockaddr(&outentry->ip.generic, entries[i]) == 0)
|
||||
return i;
|
||||
|
||||
for (j=0; entries[i][j] && entries[i][j] != '/'; j++)
|
||||
;
|
||||
if (entries[i][j] == '/') {
|
||||
outentry->mask = atoi(entries[i]+j+1);
|
||||
if (outentry->mask < 1 || outentry->mask > MAX_MASK_BITS)
|
||||
return i;
|
||||
}
|
||||
else
|
||||
outentry->mask = MAX_MASK_BITS;
|
||||
# undef MAX_MASK_BITS
|
||||
debug("acl ptr[%d]: %p %d",i, outentry, outentry->mask);
|
||||
}
|
||||
|
||||
for (i=0; i < max; i++) {
|
||||
struct ip_and_mask* entry = list[i];
|
||||
debug("acl entry %d @ %p has mask %d", i, entry, entry->mask);
|
||||
}
|
||||
|
||||
return max;
|
||||
}
|
||||
|
24
src/parse.h
Normal file
24
src/parse.h
Normal file
@@ -0,0 +1,24 @@
|
||||
#ifndef __PARSE_H
|
||||
#define __PARSE_H
|
||||
|
||||
#include <sys/socket.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <unistd.h>
|
||||
|
||||
union mysockaddr {
|
||||
unsigned short family;
|
||||
struct sockaddr generic;
|
||||
struct sockaddr_in v4;
|
||||
struct sockaddr_in6 v6;
|
||||
};
|
||||
|
||||
struct ip_and_mask {
|
||||
union mysockaddr ip;
|
||||
int mask;
|
||||
};
|
||||
|
||||
int parse_ip_to_sockaddr(struct sockaddr* out, char* src);
|
||||
int parse_acl(struct ip_and_mask (**out)[0], int max, char **entries);
|
||||
|
||||
#endif
|
||||
|
124
src/readwrite.c
Normal file
124
src/readwrite.c
Normal file
@@ -0,0 +1,124 @@
|
||||
#include "nbdtypes.h"
|
||||
#include "ioutil.h"
|
||||
#include "util.h"
|
||||
#include "params.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
int socket_connect(struct sockaddr* to)
|
||||
{
|
||||
int fd = socket(to->sa_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0);
|
||||
SERVER_ERROR_ON_FAILURE(fd, "Couldn't create client socket");
|
||||
SERVER_ERROR_ON_FAILURE(connect(fd, to, sizeof(struct sockaddr_in6)),
|
||||
"connect failed");
|
||||
return fd;
|
||||
}
|
||||
|
||||
off64_t socket_nbd_read_hello(int fd)
|
||||
{
|
||||
struct nbd_init init;
|
||||
SERVER_ERROR_ON_FAILURE(readloop(fd, &init, sizeof(init)),
|
||||
"Couldn't read init");
|
||||
if (strncmp(init.passwd, INIT_PASSWD, 8) != 0)
|
||||
SERVER_ERROR("wrong passwd");
|
||||
if (be64toh(init.magic) != INIT_MAGIC)
|
||||
SERVER_ERROR("wrong magic (%x)", be64toh(init.magic));
|
||||
return be64toh(init.size);
|
||||
}
|
||||
|
||||
void fill_request(struct nbd_request *request, int type, int from, int len)
|
||||
{
|
||||
request->magic = htobe32(REQUEST_MAGIC);
|
||||
request->type = htobe32(type);
|
||||
((int*) request->handle)[0] = rand();
|
||||
((int*) request->handle)[1] = rand();
|
||||
request->from = htobe64(from);
|
||||
request->len = htobe32(len);
|
||||
}
|
||||
|
||||
void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply)
|
||||
{
|
||||
SERVER_ERROR_ON_FAILURE(readloop(fd, reply, sizeof(*reply)),
|
||||
"Couldn't read reply");
|
||||
if (be32toh(reply->magic) != REPLY_MAGIC)
|
||||
SERVER_ERROR("Reply magic incorrect (%p)", be32toh(reply->magic));
|
||||
if (be32toh(reply->error) != 0)
|
||||
SERVER_ERROR("Server replied with error %d", be32toh(reply->error));
|
||||
if (strncmp(request->handle, reply->handle, 8) != 0)
|
||||
SERVER_ERROR("Did not reply with correct handle");
|
||||
}
|
||||
|
||||
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf)
|
||||
{
|
||||
struct nbd_request request;
|
||||
struct nbd_reply reply;
|
||||
|
||||
fill_request(&request, REQUEST_READ, from, len);
|
||||
SERVER_ERROR_ON_FAILURE(writeloop(fd, &request, sizeof(request)),
|
||||
"Couldn't write request");
|
||||
read_reply(fd, &request, &reply);
|
||||
|
||||
if (out_buf) {
|
||||
SERVER_ERROR_ON_FAILURE(readloop(fd, out_buf, len),
|
||||
"Read failed");
|
||||
}
|
||||
else {
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
splice_via_pipe_loop(fd, out_fd, len),
|
||||
"Splice failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf)
|
||||
{
|
||||
struct nbd_request request;
|
||||
struct nbd_reply reply;
|
||||
|
||||
fill_request(&request, REQUEST_WRITE, from, len);
|
||||
SERVER_ERROR_ON_FAILURE(writeloop(fd, &request, sizeof(request)),
|
||||
"Couldn't write request");
|
||||
|
||||
if (in_buf) {
|
||||
SERVER_ERROR_ON_FAILURE(writeloop(fd, in_buf, len),
|
||||
"Write failed");
|
||||
}
|
||||
else {
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
splice_via_pipe_loop(in_fd, fd, len),
|
||||
"Splice failed"
|
||||
);
|
||||
}
|
||||
|
||||
read_reply(fd, &request, &reply);
|
||||
}
|
||||
|
||||
#define CHECK_RANGE(error_type) { \
|
||||
off64_t size = socket_nbd_read_hello(params->client); \
|
||||
if (params->from < 0 || (params->from + params->len) > size) \
|
||||
SERVER_ERROR(error_type \
|
||||
" request %d+%d is out of range given size %d", \
|
||||
params->from, params->len, size\
|
||||
); \
|
||||
}
|
||||
|
||||
void do_read(struct mode_readwrite_params* params)
|
||||
{
|
||||
params->client = socket_connect(¶ms->connect_to.generic);
|
||||
CHECK_RANGE("read");
|
||||
socket_nbd_read(params->client, params->from, params->len,
|
||||
params->data_fd, NULL);
|
||||
close(params->client);
|
||||
}
|
||||
|
||||
void do_write(struct mode_readwrite_params* params)
|
||||
{
|
||||
params->client = socket_connect(¶ms->connect_to.generic);
|
||||
CHECK_RANGE("write");
|
||||
socket_nbd_write(params->client, params->from, params->len,
|
||||
params->data_fd, NULL);
|
||||
close(params->client);
|
||||
}
|
||||
|
11
src/readwrite.h
Normal file
11
src/readwrite.h
Normal file
@@ -0,0 +1,11 @@
|
||||
#ifndef __READWRITE_H
|
||||
|
||||
#define __READWRITE_H
|
||||
|
||||
int socket_connect(struct sockaddr* to);
|
||||
off64_t socket_nbd_read_hello(int fd);
|
||||
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf);
|
||||
void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf);
|
||||
|
||||
#endif
|
||||
|
51
src/remote.c
Normal file
51
src/remote.c
Normal file
@@ -0,0 +1,51 @@
|
||||
#include "ioutil.h"
|
||||
#include "util.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <sys/un.h>
|
||||
|
||||
static const int max_response=1024;
|
||||
|
||||
void do_remote_command(char* command, char* socket_name, int argc, char** argv)
|
||||
{
|
||||
char newline=10;
|
||||
int i;
|
||||
int exit_status;
|
||||
int remote = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
struct sockaddr_un address;
|
||||
char response[max_response];
|
||||
|
||||
memset(&address, 0, sizeof(address));
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(remote, "Couldn't create client socket");
|
||||
|
||||
address.sun_family = AF_UNIX;
|
||||
strncpy(address.sun_path, socket_name, sizeof(address.sun_path));
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
connect(remote, (struct sockaddr*) &address, sizeof(address)),
|
||||
"Couldn't connect to %s", socket_name
|
||||
);
|
||||
|
||||
write(remote, command, strlen(command));
|
||||
write(remote, &newline, 1);
|
||||
for (i=0; i<argc; i++) {
|
||||
write(remote, argv[i], strlen(argv[i]));
|
||||
write(remote, &newline, 1);
|
||||
}
|
||||
write(remote, &newline, 1);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
read_until_newline(remote, response, max_response),
|
||||
"Couldn't read response from %s", socket_name
|
||||
);
|
||||
|
||||
exit_status = atoi(response);
|
||||
if (exit_status > 0)
|
||||
fprintf(stderr, "%s\n", strchr(response, ':')+2);
|
||||
|
||||
exit(atoi(response));
|
||||
|
||||
close(remote);
|
||||
}
|
||||
|
592
src/serve.c
Normal file
592
src/serve.c
Normal file
@@ -0,0 +1,592 @@
|
||||
#include "params.h"
|
||||
#include "nbdtypes.h"
|
||||
#include "ioutil.h"
|
||||
#include "util.h"
|
||||
#include "bitset.h"
|
||||
#include "control.h"
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/un.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
|
||||
static const int block_allocation_resolution = 4096;//128<<10;
|
||||
|
||||
static inline void dirty(struct mode_serve_params *serve, off64_t from, int len)
|
||||
{
|
||||
if (serve->mirror)
|
||||
bitset_set_range(serve->mirror->dirty_map, from, len);
|
||||
}
|
||||
|
||||
int server_detect_closed(struct mode_serve_params* serve)
|
||||
{
|
||||
int errno_old = errno;
|
||||
int result = fcntl(serve->server, F_GETFD, 0) < 0;
|
||||
errno = errno_old;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* So waiting on client->socket is len bytes of data, and we must write it all
|
||||
* to client->mapped. However while doing do we must consult the bitmap
|
||||
* client->block_allocation_map, which is a bitmap where one bit represents
|
||||
* block_allocation_resolution bytes. Where a bit isn't set, there are no
|
||||
* disc blocks allocated for that portion of the file, and we'd like to keep
|
||||
* it that way.
|
||||
*
|
||||
* If the bitmap shows that every block in our prospective write is already
|
||||
* allocated, we can proceed as normal and make one call to writeloop.
|
||||
*
|
||||
*/
|
||||
void write_not_zeroes(struct client_params* client, off64_t from, int len)
|
||||
{
|
||||
char *map = client->serve->block_allocation_map;
|
||||
|
||||
while (len > 0) {
|
||||
/* so we have to calculate how much of our input to consider
|
||||
* next based on the bitmap of allocated blocks. This will be
|
||||
* at a coarser resolution than the actual write, which may
|
||||
* not fall on a block boundary at either end. So we look up
|
||||
* how many blocks our write covers, then cut off the start
|
||||
* and end to get the exact number of bytes.
|
||||
*/
|
||||
int first_bit = from/block_allocation_resolution;
|
||||
int last_bit = (from+len+block_allocation_resolution-1) /
|
||||
block_allocation_resolution;
|
||||
int run = bit_run_count(map, first_bit, last_bit-first_bit) *
|
||||
block_allocation_resolution;
|
||||
|
||||
if (run > len)
|
||||
run = len;
|
||||
|
||||
debug("write_not_zeroes: %ld+%d, first_bit=%d, last_bit=%d, run=%d",
|
||||
from, len, first_bit, last_bit, run);
|
||||
|
||||
#define DO_READ(dst, len) CLIENT_ERROR_ON_FAILURE( \
|
||||
readloop( \
|
||||
client->socket, \
|
||||
(dst), \
|
||||
(len) \
|
||||
), \
|
||||
"read failed %ld+%d", from, (len) \
|
||||
)
|
||||
|
||||
if (bit_is_set(map, from/block_allocation_resolution)) {
|
||||
debug("writing the lot");
|
||||
/* already allocated, just write it all */
|
||||
DO_READ(client->mapped + from, run);
|
||||
dirty(client->serve, from, run);
|
||||
len -= run;
|
||||
from += run;
|
||||
}
|
||||
else {
|
||||
char zerobuffer[block_allocation_resolution];
|
||||
/* not allocated, read in block_allocation_resoution */
|
||||
while (run > 0) {
|
||||
char *dst = client->mapped+from;
|
||||
int bit = from/block_allocation_resolution;
|
||||
int blockrun = block_allocation_resolution -
|
||||
(from % block_allocation_resolution);
|
||||
if (blockrun > run)
|
||||
blockrun = run;
|
||||
|
||||
debug("writing partial: bit=%d, blockrun=%d (run=%d)",
|
||||
bit, blockrun, run);
|
||||
|
||||
DO_READ(zerobuffer, blockrun);
|
||||
|
||||
/* This reads the buffer twice in the worst case
|
||||
* but we're leaning on memcmp failing early
|
||||
* and memcpy being fast, rather than try to
|
||||
* hand-optimized something specific.
|
||||
*/
|
||||
if (zerobuffer[0] != 0 ||
|
||||
memcmp(zerobuffer, zerobuffer + 1, blockrun - 1)) {
|
||||
memcpy(dst, zerobuffer, blockrun);
|
||||
bit_set(map, bit);
|
||||
dirty(client->serve, from, blockrun);
|
||||
debug("non-zero, copied and set bit %d", bit);
|
||||
/* at this point we could choose to
|
||||
* short-cut the rest of the write for
|
||||
* faster I/O but by continuing to do it
|
||||
* the slow way we preserve as much
|
||||
* sparseness as possible.
|
||||
*/
|
||||
}
|
||||
else {
|
||||
debug("all zero, skip write");
|
||||
}
|
||||
len -= blockrun;
|
||||
run -= blockrun;
|
||||
from += blockrun;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int client_serve_request(struct client_params* client)
|
||||
{
|
||||
off64_t offset;
|
||||
struct nbd_request request;
|
||||
struct nbd_reply reply;
|
||||
fd_set fds;
|
||||
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(client->socket, &fds);
|
||||
FD_SET(client->serve->close_signal[0], &fds);
|
||||
CLIENT_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds, NULL, NULL, NULL),
|
||||
"select() failed");
|
||||
|
||||
if (FD_ISSET(client->serve->close_signal[0], &fds))
|
||||
return 1;
|
||||
|
||||
if (readloop(client->socket, &request, sizeof(request)) == -1) {
|
||||
if (errno == 0) {
|
||||
debug("EOF reading request");
|
||||
return 1; /* neat point to close the socket */
|
||||
}
|
||||
else {
|
||||
CLIENT_ERROR_ON_FAILURE(-1, "Error reading request");
|
||||
}
|
||||
}
|
||||
|
||||
reply.magic = htobe32(REPLY_MAGIC);
|
||||
reply.error = htobe32(0);
|
||||
memcpy(reply.handle, request.handle, 8);
|
||||
|
||||
debug("request type %d", be32toh(request.type));
|
||||
|
||||
if (be32toh(request.magic) != REQUEST_MAGIC)
|
||||
CLIENT_ERROR("Bad magic %08x", be32toh(request.magic));
|
||||
|
||||
switch (be32toh(request.type))
|
||||
{
|
||||
case REQUEST_READ:
|
||||
if (access(client->serve->filename_incomplete, F_OK) == 0 ) {
|
||||
debug("read request while data incomplete");
|
||||
reply.error = htobe32(10);
|
||||
write(client->socket, &reply, sizeof(reply));
|
||||
return 0;
|
||||
}
|
||||
case REQUEST_WRITE:
|
||||
/* check it's not out of range */
|
||||
if (be64toh(request.from) < 0 ||
|
||||
be64toh(request.from)+be32toh(request.len) > client->serve->size) {
|
||||
debug("request read %ld+%d out of range",
|
||||
be64toh(request.from),
|
||||
be32toh(request.len)
|
||||
);
|
||||
reply.error = htobe32(1);
|
||||
write(client->socket, &reply, sizeof(reply));
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
case REQUEST_DISCONNECT:
|
||||
debug("request disconnect");
|
||||
return 1;
|
||||
|
||||
default:
|
||||
CLIENT_ERROR("Unknown request %08x", be32toh(request.type));
|
||||
}
|
||||
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
pthread_mutex_lock(&client->serve->l_io),
|
||||
"Problem with I/O lock"
|
||||
);
|
||||
|
||||
if (server_detect_closed(client->serve)) {
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
pthread_mutex_unlock(&client->serve->l_io),
|
||||
"Problem with I/O unlock"
|
||||
);
|
||||
return 1;
|
||||
}
|
||||
|
||||
switch (be32toh(request.type))
|
||||
{
|
||||
case REQUEST_READ:
|
||||
debug("request read %ld+%d", be64toh(request.from), be32toh(request.len));
|
||||
write(client->socket, &reply, sizeof(reply));
|
||||
|
||||
offset = be64toh(request.from);
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
sendfileloop(
|
||||
client->socket,
|
||||
client->fileno,
|
||||
&offset,
|
||||
be32toh(request.len)
|
||||
),
|
||||
"sendfile failed from=%ld, len=%d",
|
||||
offset,
|
||||
be32toh(request.len)
|
||||
);
|
||||
break;
|
||||
|
||||
case REQUEST_WRITE:
|
||||
debug("request write %ld+%d", be64toh(request.from), be32toh(request.len));
|
||||
if (client->serve->block_allocation_map) {
|
||||
write_not_zeroes(
|
||||
client,
|
||||
be64toh(request.from),
|
||||
be32toh(request.len)
|
||||
);
|
||||
}
|
||||
else {
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
readloop(
|
||||
client->socket,
|
||||
client->mapped + be64toh(request.from),
|
||||
be32toh(request.len)
|
||||
),
|
||||
"read failed from=%ld, len=%d",
|
||||
be64toh(request.from),
|
||||
be32toh(request.len)
|
||||
);
|
||||
dirty(client->serve, be64toh(request.from), be32toh(request.len));
|
||||
}
|
||||
write(client->socket, &reply, sizeof(reply));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
pthread_mutex_unlock(&client->serve->l_io),
|
||||
"Problem with I/O unlock"
|
||||
);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void client_send_hello(struct client_params* client)
|
||||
{
|
||||
struct nbd_init init;
|
||||
|
||||
memcpy(init.passwd, INIT_PASSWD, sizeof(INIT_PASSWD));
|
||||
init.magic = htobe64(INIT_MAGIC);
|
||||
init.size = htobe64(client->serve->size);
|
||||
memset(init.reserved, 0, 128);
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
writeloop(client->socket, &init, sizeof(init)),
|
||||
"Couldn't send hello"
|
||||
);
|
||||
}
|
||||
|
||||
void* client_serve(void* client_uncast)
|
||||
{
|
||||
struct client_params* client = (struct client_params*) client_uncast;
|
||||
|
||||
//client_open_file(client);
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
open_and_mmap(
|
||||
client->serve->filename,
|
||||
&client->fileno,
|
||||
NULL,
|
||||
(void**) &client->mapped
|
||||
),
|
||||
"Couldn't open/mmap file %s", client->serve->filename
|
||||
);
|
||||
client_send_hello(client);
|
||||
|
||||
while (client_serve_request(client) == 0)
|
||||
;
|
||||
|
||||
CLIENT_ERROR_ON_FAILURE(
|
||||
close(client->socket),
|
||||
"Couldn't close socket %d",
|
||||
client->socket
|
||||
);
|
||||
|
||||
close(client->socket);
|
||||
close(client->fileno);
|
||||
munmap(client->mapped, client->serve->size);
|
||||
free(client);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 };
|
||||
|
||||
/** Test whether AF_INET or AF_INET6 sockaddr is included in the given access
|
||||
* control list, returning 1 if it is, and 0 if not.
|
||||
*/
|
||||
int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct sockaddr* test)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i=0; i < list_length; i++) {
|
||||
struct ip_and_mask *entry = &(*list)[i];
|
||||
int testbits;
|
||||
char *raw_address1, *raw_address2;
|
||||
|
||||
debug("checking acl entry %d (%d/%d)", i, test->sa_family, entry->ip.family);
|
||||
|
||||
if (test->sa_family != entry->ip.family)
|
||||
continue;
|
||||
|
||||
if (test->sa_family == AF_INET) {
|
||||
debug("it's an AF_INET");
|
||||
raw_address1 = (char*)
|
||||
&((struct sockaddr_in*) test)->sin_addr;
|
||||
raw_address2 = (char*) &entry->ip.v4.sin_addr;
|
||||
}
|
||||
else if (test->sa_family == AF_INET6) {
|
||||
debug("it's an AF_INET6");
|
||||
raw_address1 = (char*)
|
||||
&((struct sockaddr_in6*) test)->sin6_addr;
|
||||
raw_address2 = (char*) &entry->ip.v6.sin6_addr;
|
||||
}
|
||||
|
||||
debug("testbits=%d", entry->mask);
|
||||
|
||||
for (testbits = entry->mask; testbits > 0; testbits -= 8) {
|
||||
debug("testbits=%d, c1=%d, c2=%d", testbits, raw_address1[0], raw_address2[0]);
|
||||
if (testbits >= 8) {
|
||||
if (raw_address1[0] != raw_address2[0])
|
||||
goto no_match;
|
||||
}
|
||||
else {
|
||||
if ((raw_address1[0] & testmasks[testbits%8]) !=
|
||||
(raw_address2[0] & testmasks[testbits%8]) )
|
||||
goto no_match;
|
||||
}
|
||||
|
||||
raw_address1++;
|
||||
raw_address2++;
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
||||
no_match: ;
|
||||
debug("no match");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Prepares a listening socket for the NBD server, binding etc. */
|
||||
void serve_open_server_socket(struct mode_serve_params* params)
|
||||
{
|
||||
int optval=1;
|
||||
|
||||
params->server = socket(params->bind_to.generic.sa_family == AF_INET ?
|
||||
PF_INET : PF_INET6, SOCK_STREAM, 0);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(params->server,
|
||||
"Couldn't create server socket");
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
setsockopt(params->server, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)),
|
||||
"Couldn't set SO_REUSEADDR"
|
||||
);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
bind(params->server, ¶ms->bind_to.generic,
|
||||
sizeof(params->bind_to)),
|
||||
"Couldn't bind server to IP address"
|
||||
);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
listen(params->server, params->tcp_backlog),
|
||||
"Couldn't listen on server socket"
|
||||
);
|
||||
}
|
||||
|
||||
/** We can only accommodate MAX_NBD_CLIENTS connections at once. This function
|
||||
* goes through the current list, waits for any threads that have finished
|
||||
* and returns the next slot free (or -1 if there are none).
|
||||
*/
|
||||
int cleanup_and_find_client_slot(struct mode_serve_params* params)
|
||||
{
|
||||
int slot=-1, i;
|
||||
|
||||
for (i=0; i < MAX_NBD_CLIENTS; i++) {
|
||||
void* status;
|
||||
|
||||
if (params->nbd_client[i].thread != 0) {
|
||||
char s_client_address[64];
|
||||
|
||||
memset(s_client_address, 0, 64);
|
||||
strcpy(s_client_address, "???");
|
||||
inet_ntop(
|
||||
params->nbd_client[i].address.sa_family,
|
||||
sockaddr_address_data(¶ms->nbd_client[i].address),
|
||||
s_client_address,
|
||||
64
|
||||
);
|
||||
|
||||
if (pthread_tryjoin_np(params->nbd_client[i].thread, &status) < 0) {
|
||||
if (errno != EBUSY)
|
||||
SERVER_ERROR_ON_FAILURE(-1, "Problem with joining thread");
|
||||
}
|
||||
else {
|
||||
uint64_t status1 = (uint64_t) status;
|
||||
params->nbd_client[i].thread = 0;
|
||||
debug("nbd thread %d exited (%s) with status %ld", (int) params->nbd_client[i].thread, s_client_address, status1);
|
||||
}
|
||||
}
|
||||
|
||||
if (params->nbd_client[i].thread == 0 && slot == -1)
|
||||
slot = i;
|
||||
}
|
||||
|
||||
return slot;
|
||||
}
|
||||
|
||||
/** Dispatch function for accepting an NBD connection and starting a thread
|
||||
* to handle it. Rejects the connection if there is an ACL, and the far end's
|
||||
* address doesn't match, or if there are too many clients already connected.
|
||||
*/
|
||||
void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address)
|
||||
{
|
||||
struct client_params* client_params;
|
||||
int slot = cleanup_and_find_client_slot(params);
|
||||
char s_client_address[64];
|
||||
|
||||
if (inet_ntop(client_address->sa_family, sockaddr_address_data(client_address), s_client_address, 64) == NULL) {
|
||||
write(client_fd, "Bad client_address", 18);
|
||||
close(client_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
if (params->acl &&
|
||||
!is_included_in_acl(params->acl_entries, params->acl, client_address)) {
|
||||
write(client_fd, "Access control error", 20);
|
||||
close(client_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
if (slot < 0) {
|
||||
write(client_fd, "Too many clients", 16);
|
||||
close(client_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
client_params = xmalloc(sizeof(struct client_params));
|
||||
client_params->socket = client_fd;
|
||||
client_params->serve = params;
|
||||
|
||||
if (pthread_create(¶ms->nbd_client[slot].thread, NULL, client_serve, client_params) < 0) {
|
||||
write(client_fd, "Thread creation problem", 23);
|
||||
free(client_params);
|
||||
close(client_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
memcpy(¶ms->nbd_client[slot].address, client_address,
|
||||
sizeof(struct sockaddr));
|
||||
|
||||
debug("nbd thread %d started (%s)", (int) params->nbd_client[slot].thread, s_client_address);
|
||||
}
|
||||
|
||||
/** Accept either an NBD or control socket connection, dispatch appropriately */
|
||||
void serve_accept_loop(struct mode_serve_params* params)
|
||||
{
|
||||
while (1) {
|
||||
int activity_fd, client_fd;
|
||||
struct sockaddr client_address;
|
||||
fd_set fds;
|
||||
socklen_t socklen=sizeof(client_address);
|
||||
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(params->server, &fds);
|
||||
FD_SET(params->close_signal[0], &fds);
|
||||
if (params->control_socket_name)
|
||||
FD_SET(params->control, &fds);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(select(FD_SETSIZE, &fds,
|
||||
NULL, NULL, NULL), "select() failed");
|
||||
|
||||
if (FD_ISSET(params->close_signal[0], &fds))
|
||||
return;
|
||||
|
||||
activity_fd = FD_ISSET(params->server, &fds) ? params->server :
|
||||
params->control;
|
||||
client_fd = accept(activity_fd, &client_address, &socklen);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_lock(¶ms->l_accept),
|
||||
"Problem with accept lock"
|
||||
);
|
||||
|
||||
if (activity_fd == params->server)
|
||||
accept_nbd_client(params, client_fd, &client_address);
|
||||
if (activity_fd == params->control)
|
||||
accept_control_connection(params, client_fd, &client_address);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
pthread_mutex_unlock(¶ms->l_accept),
|
||||
"Problem with accept unlock"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/** Initialisation function that sets up the initial allocation map, i.e. so
|
||||
* we know which blocks of the file are allocated.
|
||||
*/
|
||||
void serve_init_allocation_map(struct mode_serve_params* params)
|
||||
{
|
||||
int fd = open(params->filename, O_RDONLY);
|
||||
off64_t size;
|
||||
SERVER_ERROR_ON_FAILURE(fd, "Couldn't open %s", params->filename);
|
||||
size = lseek64(fd, 0, SEEK_END);
|
||||
params->size = size;
|
||||
SERVER_ERROR_ON_FAILURE(size, "Couldn't find size of %s",
|
||||
params->filename);
|
||||
params->block_allocation_map =
|
||||
build_allocation_map(fd, size, block_allocation_resolution);
|
||||
close(fd);
|
||||
}
|
||||
|
||||
/** Closes sockets, frees memory and waits for all client threads to finish */
|
||||
void serve_cleanup(struct mode_serve_params* params)
|
||||
{
|
||||
int i;
|
||||
|
||||
close(params->server);
|
||||
close(params->control);
|
||||
if (params->acl)
|
||||
free(params->acl);
|
||||
//free(params->filename);
|
||||
if (params->control_socket_name)
|
||||
//free(params->control_socket_name);
|
||||
pthread_mutex_destroy(¶ms->l_accept);
|
||||
pthread_mutex_destroy(¶ms->l_io);
|
||||
if (params->proxy_fd);
|
||||
close(params->proxy_fd);
|
||||
close(params->close_signal[0]);
|
||||
close(params->close_signal[1]);
|
||||
free(params->block_allocation_map);
|
||||
|
||||
if (params->mirror)
|
||||
debug("mirror thread running! this should not happen!");
|
||||
|
||||
for (i=0; i < MAX_NBD_CLIENTS; i++) {
|
||||
void* status;
|
||||
|
||||
if (params->nbd_client[i].thread != 0) {
|
||||
debug("joining thread %d", i);
|
||||
pthread_join(params->nbd_client[i].thread, &status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Full lifecycle of the server */
|
||||
void do_serve(struct mode_serve_params* params)
|
||||
{
|
||||
pthread_mutex_init(¶ms->l_accept, NULL);
|
||||
pthread_mutex_init(¶ms->l_io, NULL);
|
||||
SERVER_ERROR_ON_FAILURE(pipe(params->close_signal) , "pipe failed");
|
||||
|
||||
serve_open_server_socket(params);
|
||||
serve_open_control_socket(params);
|
||||
serve_init_allocation_map(params);
|
||||
serve_accept_loop(params);
|
||||
serve_cleanup(params);
|
||||
}
|
||||
|
61
src/util.c
Normal file
61
src/util.c
Normal file
@@ -0,0 +1,61 @@
|
||||
#include <stdarg.h>
|
||||
#include <stdio.h>
|
||||
#include <pthread.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <malloc.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "util.h"
|
||||
|
||||
static pthread_t main_thread;
|
||||
|
||||
void error_init()
|
||||
{
|
||||
main_thread = pthread_self();
|
||||
}
|
||||
|
||||
void error(int consult_errno, int close_socket, pthread_mutex_t* unlock, const char* format, ...)
|
||||
{
|
||||
va_list argptr;
|
||||
|
||||
fprintf(stderr, "*** ");
|
||||
|
||||
va_start(argptr, format);
|
||||
vfprintf(stderr, format, argptr);
|
||||
va_end(argptr);
|
||||
|
||||
if (consult_errno) {
|
||||
fprintf(stderr, " (errno=%d, %s)", errno, strerror(errno));
|
||||
}
|
||||
|
||||
if (close_socket)
|
||||
close(close_socket);
|
||||
|
||||
if (unlock)
|
||||
pthread_mutex_unlock(unlock);
|
||||
|
||||
fprintf(stderr, "\n");
|
||||
|
||||
if (pthread_equal(pthread_self(), main_thread))
|
||||
exit(1);
|
||||
else
|
||||
pthread_exit((void*) 1);
|
||||
}
|
||||
|
||||
void* xrealloc(void* ptr, size_t size)
|
||||
{
|
||||
void* p = realloc(ptr, size);
|
||||
if (p == NULL)
|
||||
SERVER_ERROR("couldn't xrealloc %d bytes", size);
|
||||
return p;
|
||||
}
|
||||
|
||||
void* xmalloc(size_t size)
|
||||
{
|
||||
void* p = xrealloc(NULL, size);
|
||||
memset(p, 0, size);
|
||||
return p;
|
||||
}
|
||||
|
34
src/util.h
Normal file
34
src/util.h
Normal file
@@ -0,0 +1,34 @@
|
||||
#ifndef __UTIL_H
|
||||
#define __UTIL_H
|
||||
|
||||
#include <stdio.h>
|
||||
#include <pthread.h>
|
||||
|
||||
void error_init();
|
||||
|
||||
void error(int consult_errno, int close_socket, pthread_mutex_t* unlock, const char* format, ...);
|
||||
|
||||
void* xrealloc(void* ptr, size_t size);
|
||||
|
||||
void* xmalloc(size_t size);
|
||||
|
||||
#ifndef DEBUG
|
||||
# define debug(msg, ...)
|
||||
#else
|
||||
# include <sys/times.h>
|
||||
# define debug(msg, ...) fprintf(stderr, "%08x %4d: " msg "\n" , \
|
||||
(int) pthread_self(), (int) clock(), ##__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
#define CLIENT_ERROR(msg, ...) \
|
||||
error(0, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__)
|
||||
#define CLIENT_ERROR_ON_FAILURE(test, msg, ...) \
|
||||
if (test < 0) { error(1, client->socket, &client->serve->l_io, msg, ##__VA_ARGS__); }
|
||||
|
||||
#define SERVER_ERROR(msg, ...) \
|
||||
error(0, 0, NULL, msg, ##__VA_ARGS__)
|
||||
#define SERVER_ERROR_ON_FAILURE(test, msg, ...) \
|
||||
if (test < 0) { error(1, 0, NULL, msg, ##__VA_ARGS__); }
|
||||
|
||||
#endif
|
||||
|
Reference in New Issue
Block a user