Split control-socket functions into separate file.

This commit is contained in:
Matthew Bloch
2012-05-23 00:42:14 +01:00
parent 811e4ab2cd
commit 9c26f7f36f
7 changed files with 257 additions and 238 deletions

View File

@@ -1,6 +1,6 @@
DEBUG = true DEBUG = true
SOURCES = %w( flexnbd ioutil readwrite serve util parse ) SOURCES = %w( flexnbd ioutil readwrite serve util parse control )
OBJECTS = SOURCES.map { |s| "#{s}.o" } OBJECTS = SOURCES.map { |s| "#{s}.o" }
LIBS = %w( pthread ) LIBS = %w( pthread )
CCFLAGS = %w( -Wall ) CCFLAGS = %w( -Wall )

241
control.c Normal file
View File

@@ -0,0 +1,241 @@
#include "params.h"
#include "util.h"
#include "ioutil.h"
#include "parse.h"
#include "readwrite.h"
#include "bitset.h"
#include <stdlib.h>
#include <string.h>
#include <sys/un.h>
static const int longest_run = 8<<20;
void* mirror_runner(void* serve_params_uncast)
{
struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
int pass;
struct bitset_mapping *map = serve->mirror->dirty_map;
for (pass=0; pass < 7 /* biblical */; pass++) {
uint64_t current = 0;
debug("mirror start pass=%d", pass);
while (current < serve->size) {
int run = bitset_run_count(map, current, longest_run);
debug("mirror current=%ld, run=%d", current, run);
if (bitset_is_set_at(map, current)) {
debug("^^^ writing");
/* dirty area */
socket_nbd_write(
serve->mirror->client,
current,
run,
0,
serve->mirror->mapped + current
);
bitset_clear_range(map, current, run);
}
current += run;
}
}
return NULL;
}
void control_mirror(struct control_params* client)
{
off64_t size, remote_size;
int fd, map_fd;
struct mirror_status *mirror;
union mysockaddr connect_to;
char s_ip_address[64], s_port[8];
CLIENT_ERROR_ON_FAILURE(
read_until_newline(client->socket, s_ip_address, 64),
"Failed to read destination IP"
);
CLIENT_ERROR_ON_FAILURE(
read_until_newline(client->socket, s_port, 8),
"Failed to read destination port"
);
if (parse_ip_to_sockaddr(&connect_to.generic, s_ip_address) == 0)
CLIENT_ERROR("Couldn't parse connection address '%s'",
s_ip_address);
connect_to.v4.sin_port = atoi(s_port);
if (connect_to.v4.sin_port < 0 || connect_to.v4.sin_port > 65535)
CLIENT_ERROR("Port number must be >= 0 and <= 65535");
connect_to.v4.sin_port = htobe16(connect_to.v4.sin_port);
fd = socket_connect(&connect_to.generic); /* FIXME uses wrong error handler */
remote_size = socket_nbd_read_hello(fd);
remote_size = remote_size; // shush compiler
mirror = xmalloc(sizeof(struct mirror_status));
mirror->client = fd;
mirror->max_bytes_per_second = 0;
CLIENT_ERROR_ON_FAILURE(
open_and_mmap(
client->serve->filename,
&map_fd,
&size,
(void**) &mirror->mapped
),
"Failed to open and mmap %s",
client->serve->filename
);
mirror->dirty_map = bitset_alloc(size, 4096);
bitset_set_range(mirror->dirty_map, 0, size);
client->serve->mirror = mirror;
CLIENT_ERROR_ON_FAILURE( /* FIXME should free mirror on error */
pthread_create(
&mirror->thread,
NULL,
mirror_runner,
client->serve
),
"Failed to create mirror thread"
);
}
void control_acl(struct control_params* client)
{
int acl_entries = 0, parsed;
char** s_acl_entry = NULL;
struct ip_and_mask (*acl)[], (*old_acl)[];
while (1) {
char entry[64];
int result = read_until_newline(client->socket, entry, 64);
if (result == -1)
goto done;
if (result == 1) /* blank line terminates */
break;
s_acl_entry = xrealloc(
s_acl_entry,
++acl_entries * sizeof(struct s_acl_entry*)
);
s_acl_entry[acl_entries-1] = strdup(entry);
debug("acl_entry = '%s'", s_acl_entry[acl_entries-1]);
}
parsed = parse_acl(&acl, acl_entries, s_acl_entry);
if (parsed != acl_entries) {
write(client->socket, "error: ", 7);
write(client->socket, s_acl_entry[parsed],
strlen(s_acl_entry[parsed]));
write(client->socket, "\n", 1);
free(acl);
}
else {
old_acl = client->serve->acl;
client->serve->acl = acl;
client->serve->acl_entries = acl_entries;
free(old_acl);
write(client->socket, "ok\n", 3);
}
done: if (acl_entries > 0) {
int i;
for (i=0; i<acl_entries; i++)
free(s_acl_entry[i]);
free(s_acl_entry);
}
return;
}
void control_status(struct control_params* client)
{
}
void* control_serve(void* client_uncast)
{
const int max = 256;
char command[max];
struct control_params* client = (struct control_params*) client_uncast;
while (1) {
CLIENT_ERROR_ON_FAILURE(
read_until_newline(client->socket, command, max),
"Error reading command"
);
if (strcmp(command, "acl") == 0)
control_acl(client);
else if (strcmp(command, "mirror") == 0)
control_mirror(client);
else if (strcmp(command, "status") == 0)
control_status(client);
else {
write(client->socket, "error: unknown command\n", 23);
break;
}
}
close(client->socket);
free(client);
return NULL;
}
void accept_control_connection(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address)
{
pthread_t control_thread;
struct control_params* control_params;
control_params = xmalloc(sizeof(struct control_params));
control_params->socket = client_fd;
control_params->serve = params;
SERVER_ERROR_ON_FAILURE(
pthread_create(
&control_thread,
NULL,
control_serve,
control_params
),
"Failed to create client thread"
);
}
void serve_open_control_socket(struct mode_serve_params* params)
{
struct sockaddr_un bind_address;
if (!params->control_socket_name)
return;
params->control = socket(AF_UNIX, SOCK_STREAM, 0);
SERVER_ERROR_ON_FAILURE(params->control,
"Couldn't create control socket");
memset(&bind_address, 0, sizeof(bind_address));
bind_address.sun_family = AF_UNIX;
strcpy(bind_address.sun_path, params->control_socket_name);
unlink(params->control_socket_name); /* ignore failure */
SERVER_ERROR_ON_FAILURE(
bind(params->control, &bind_address, sizeof(bind_address)),
"Couldn't bind control socket to %s",
params->control_socket_name
);
SERVER_ERROR_ON_FAILURE(
listen(params->control, 5),
"Couldn't listen on control socket"
);
}

8
control.h Normal file
View File

@@ -0,0 +1,8 @@
#ifndef __CONTROL_H
#define __CONTROL_H
void accept_control_connection(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address);
void serve_open_control_socket(struct mode_serve_params* params);
#endif

View File

@@ -130,6 +130,11 @@ void do_serve(struct mode_serve_params* params);
void do_read(struct mode_readwrite_params* params); void do_read(struct mode_readwrite_params* params);
void do_write(struct mode_readwrite_params* params); void do_write(struct mode_readwrite_params* params);
union mode_params {
struct mode_serve_params serve;
struct mode_readwrite_params readwrite;
};
void mode(char* mode, int argc, char **argv) void mode(char* mode, int argc, char **argv)
{ {
union mode_params params; union mode_params params;

View File

@@ -154,7 +154,7 @@ int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
/*if (run > 65535) /*if (run > 65535)
run = 65535;*/ run = 65535;*/
r1 = splice(fd_in, NULL, pipefd[1], NULL, run, SPLICE_F_MORE|SPLICE_F_MOVE|SPLICE_F_NONBLOCK); r1 = splice(fd_in, NULL, pipefd[1], NULL, run, SPLICE_F_MORE|SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
debug("%d", r1); debug("%ld", r1);
if (r1 <= 0) if (r1 <= 0)
break; break;
r2 = splice(pipefd[0], NULL, fd_out, NULL, r1, SPLICE_F_MORE|SPLICE_F_MOVE); r2 = splice(pipefd[0], NULL, fd_out, NULL, r1, SPLICE_F_MORE|SPLICE_F_MOVE);

View File

@@ -59,10 +59,5 @@ struct client_params {
struct mode_serve_params* serve; /* FIXME: remove above duplication */ struct mode_serve_params* serve; /* FIXME: remove above duplication */
}; };
union mode_params {
struct mode_serve_params serve;
struct mode_readwrite_params readwrite;
};
#endif #endif

232
serve.c
View File

@@ -3,8 +3,7 @@
#include "ioutil.h" #include "ioutil.h"
#include "util.h" #include "util.h"
#include "bitset.h" #include "bitset.h"
#include "parse.h" #include "control.h"
#include "readwrite.h"
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
@@ -269,186 +268,6 @@ void* client_serve(void* client_uncast)
return NULL; return NULL;
} }
void control_acl(struct control_params* client)
{
int acl_entries = 0, parsed;
char** s_acl_entry = NULL;
struct ip_and_mask (*acl)[], (*old_acl)[];
while (1) {
char entry[64];
int result = read_until_newline(client->socket, entry, 64);
if (result == -1)
goto done;
if (result == 1) /* blank line terminates */
break;
s_acl_entry = xrealloc(
s_acl_entry,
++acl_entries * sizeof(struct s_acl_entry*)
);
s_acl_entry[acl_entries-1] = strdup(entry);
debug("acl_entry = '%s'", s_acl_entry[acl_entries-1]);
}
parsed = parse_acl(&acl, acl_entries, s_acl_entry);
if (parsed != acl_entries) {
write(client->socket, "error: ", 7);
write(client->socket, s_acl_entry[parsed],
strlen(s_acl_entry[parsed]));
write(client->socket, "\n", 1);
free(acl);
}
else {
old_acl = client->serve->acl;
client->serve->acl = acl;
client->serve->acl_entries = acl_entries;
free(old_acl);
write(client->socket, "ok\n", 3);
}
done: if (acl_entries > 0) {
int i;
for (i=0; i<acl_entries; i++)
free(s_acl_entry[i]);
free(s_acl_entry);
}
return;
}
static const int longest_run = 8<<20;
void* mirror_runner(void* serve_params_uncast)
{
struct mode_serve_params *serve = (struct mode_serve_params*) serve_params_uncast;
int pass;
struct bitset_mapping *map = serve->mirror->dirty_map;
for (pass=0; pass < 7 /* biblical */; pass++) {
uint64_t current = 0;
debug("mirror start pass=%d", pass);
while (current < serve->size) {
int run = bitset_run_count(map, current, longest_run);
debug("mirror current=%d, run=%d", current, run);
if (bitset_is_set_at(map, current)) {
debug("^^^ writing", current, run);
/* dirty area */
socket_nbd_write(
serve->mirror->client,
current,
run,
0,
serve->mirror->mapped + current
);
bitset_clear_range(map, current, run);
}
current += run;
}
}
return NULL;
}
void control_mirror(struct control_params* client)
{
off64_t size, remote_size;
int fd, map_fd;
struct mirror_status *mirror;
union mysockaddr connect_to;
char s_ip_address[64], s_port[8];
CLIENT_ERROR_ON_FAILURE(
read_until_newline(client->socket, s_ip_address, 64),
"Failed to read destination IP"
);
CLIENT_ERROR_ON_FAILURE(
read_until_newline(client->socket, s_port, 8),
"Failed to read destination port"
);
if (parse_ip_to_sockaddr(&connect_to.generic, s_ip_address) == 0)
CLIENT_ERROR("Couldn't parse connection address '%s'",
s_ip_address);
connect_to.v4.sin_port = atoi(s_port);
if (connect_to.v4.sin_port < 0 || connect_to.v4.sin_port > 65535)
CLIENT_ERROR("Port number must be >= 0 and <= 65535");
connect_to.v4.sin_port = htobe16(connect_to.v4.sin_port);
fd = socket_connect(&connect_to.generic); /* FIXME uses wrong error handler */
remote_size = socket_nbd_read_hello(fd);
mirror = xmalloc(sizeof(struct mirror_status));
mirror->client = fd;
mirror->max_bytes_per_second = 0;
CLIENT_ERROR_ON_FAILURE(
open_and_mmap(
client->serve->filename,
&map_fd,
&size,
(void**) &mirror->mapped
),
"Failed to open and mmap %s",
client->serve->filename
);
mirror->dirty_map = bitset_alloc(size, block_allocation_resolution);
bitset_set_range(mirror->dirty_map, 0, size);
client->serve->mirror = mirror;
CLIENT_ERROR_ON_FAILURE( /* FIXME should free mirror on error */
pthread_create(
&mirror->thread,
NULL,
mirror_runner,
client->serve
),
"Failed to create mirror thread"
);
}
void control_status(struct control_params* client)
{
}
void* control_serve(void* client_uncast)
{
const int max = 256;
char command[max];
struct control_params* client = (struct control_params*) client_uncast;
while (1) {
CLIENT_ERROR_ON_FAILURE(
read_until_newline(client->socket, command, max),
"Error reading command"
);
if (strcmp(command, "acl") == 0)
control_acl(client);
else if (strcmp(command, "mirror") == 0)
control_mirror(client);
else if (strcmp(command, "status") == 0)
control_status(client);
else {
write(client->socket, "error: unknown command\n", 23);
break;
}
}
close(client->socket);
free(client);
return NULL;
}
static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 }; static int testmasks[9] = { 0,128,192,224,240,248,252,254,255 };
int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct sockaddr* test) int is_included_in_acl(int list_length, struct ip_and_mask (*list)[], struct sockaddr* test)
@@ -524,35 +343,6 @@ void serve_open_server_socket(struct mode_serve_params* params)
); );
} }
void serve_open_control_socket(struct mode_serve_params* params)
{
struct sockaddr_un bind_address;
if (!params->control_socket_name)
return;
params->control = socket(AF_UNIX, SOCK_STREAM, 0);
SERVER_ERROR_ON_FAILURE(params->control,
"Couldn't create control socket");
memset(&bind_address, 0, sizeof(bind_address));
bind_address.sun_family = AF_UNIX;
strcpy(bind_address.sun_path, params->control_socket_name);
unlink(params->control_socket_name); /* ignore failure */
SERVER_ERROR_ON_FAILURE(
bind(params->control, &bind_address, sizeof(bind_address)),
"Couldn't bind control socket to %s",
params->control_socket_name
);
SERVER_ERROR_ON_FAILURE(
listen(params->control, 5),
"Couldn't listen on control socket"
);
}
void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address) void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address)
{ {
pthread_t client_thread; pthread_t client_thread;
@@ -585,26 +375,6 @@ void accept_nbd_client(struct mode_serve_params* params, int client_fd, struct s
/* FIXME: maybe shouldn't be fatal? */ /* FIXME: maybe shouldn't be fatal? */
} }
void accept_control_connection(struct mode_serve_params* params, int client_fd, struct sockaddr* client_address)
{
pthread_t control_thread;
struct control_params* control_params;
control_params = xmalloc(sizeof(struct control_params));
control_params->socket = client_fd;
control_params->serve = params;
SERVER_ERROR_ON_FAILURE(
pthread_create(
&control_thread,
NULL,
control_serve,
control_params
),
"Failed to create client thread"
);
}
void serve_accept_loop(struct mode_serve_params* params) void serve_accept_loop(struct mode_serve_params* params)
{ {
while (1) { while (1) {