Add listen mode

This commit is contained in:
Alex Young
2012-06-21 18:01:50 +01:00
parent 79ba1cf728
commit f37a217cb9
12 changed files with 450 additions and 22 deletions

View File

@@ -25,6 +25,8 @@ struct client *client_create( struct server *serve, int socket )
c->stop_signal = self_pipe_create();
c->entrusted = 0;
return c;
}
@@ -241,12 +243,18 @@ void client_write_init( struct client * client, uint64_t size )
);
}
/* Check to see if the client's request needs a reply constructing.
* Returns 1 if we do, 0 otherwise.
* request_err is set to 0 if the client sent a bad request, in which
* case we send an error reply.
* case we drop the connection.
* FIXME: after an ENTRUST, there's no way to distinguish between a
* DISCONNECT and any bad request.
*/
int client_request_needs_reply( struct client * client, struct nbd_request request, int *request_err )
int client_request_needs_reply( struct client * client,
struct nbd_request request,
int *should_disconnect )
{
debug("request type %d", request.type);
@@ -257,8 +265,12 @@ int client_request_needs_reply( struct client * client, struct nbd_request reque
switch (request.type)
{
case REQUEST_READ:
FATAL_IF( client->entrusted,
"Received a read request after an entrust message.");
break;
case REQUEST_WRITE:
FATAL_IF( client->entrusted,
"Received a write request after an entrust message.");
/* check it's not out of range */
if ( request.from+request.len > client->serve->size) {
debug("request read %ld+%d out of range",
@@ -266,14 +278,19 @@ int client_request_needs_reply( struct client * client, struct nbd_request reque
request.len
);
client_write_reply( client, &request, 1 );
*request_err = 0;
*should_disconnect = 0;
return 0;
}
break;
case REQUEST_ENTRUST:
/* Yes, we need to reply to an entrust, but we take no
* further action */
debug("request entrust");
break;
case REQUEST_DISCONNECT:
debug("request disconnect");
*request_err = 1;
*should_disconnect = 1;
return 0;
default:
@@ -283,6 +300,19 @@ int client_request_needs_reply( struct client * client, struct nbd_request reque
}
void client_reply_to_entrust( struct client * client, struct nbd_request request )
{
/* An entrust needs a response, but has no data. */
debug( "request entrust" );
client_write_reply( client, &request, 0 );
/* We set this after trying to send the reply, so we know the
* reply got away safely.
*/
client->entrusted = 1;
}
void client_reply_to_read( struct client* client, struct nbd_request request )
{
off64_t offset;
@@ -345,15 +375,17 @@ void client_reply( struct client* client, struct nbd_request request )
case REQUEST_READ:
client_reply_to_read( client, request );
break;
case REQUEST_WRITE:
client_reply_to_write( client, request );
break;
case REQUEST_ENTRUST:
client_reply_to_entrust( client, request );
break;
}
}
/* Returns 0 if a request was successfully served. */
/* Returns 0 if we should continue trying to serve requests */
int client_serve_request(struct client* client)
{
struct nbd_request request;
@@ -416,6 +448,10 @@ void* client_serve(void* client_uncast)
;
client->stopped = 1;
if ( client->entrusted ){
server_control_arrived( client->serve );
}
FATAL_IF_NEGATIVE(
close(client->socket),
"Couldn't close socket %d",

View File

@@ -19,6 +19,9 @@ struct client {
struct self_pipe * stop_signal;
struct server* serve; /* FIXME: remove above duplication */
/* Have we seen a REQUEST_ENTRUST message? */
int entrusted;
};

View File

@@ -21,6 +21,7 @@
#include "serve.h"
#include "listen.h"
#include "util.h"
#include <stdio.h>
@@ -115,7 +116,7 @@ void params_readwrite(
}
}
void do_serve(struct server* params);
int do_serve(struct server* params);
void do_read(struct mode_readwrite_params* params);
void do_write(struct mode_readwrite_params* params);
void do_remote_command(char* command, char* mode, int argc, char** argv);
@@ -153,6 +154,50 @@ void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char
}
void read_listen_param( int c,
char **ip_addr,
char **rebind_ip_addr,
char **ip_port,
char **rebind_ip_port,
char **file,
char **sock,
int *default_deny )
{
switch(c){
case 'h':
fprintf(stdout, listen_help_text );
exit(0);
break;
case 'l':
*ip_addr = optarg;
break;
case 'L':
*rebind_ip_addr = optarg;
break;
case 'p':
*ip_port = optarg;
break;
case 'P':
*rebind_ip_port = optarg;
break;
case 'f':
*file = optarg;
break;
case 's':
*sock = optarg;
break;
case 'd':
*default_deny = 1;
break;
case 'v':
log_level = 0;
break;
default:
exit_err( listen_help_text );
break;
}
}
void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **bind_addr, char **from, char **size)
{
switch(c){
@@ -269,13 +314,60 @@ int mode_serve( int argc, char *argv[] )
}
if ( err ) { exit_err( serve_help_text ); }
serve = server_create( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS );
do_serve( serve );
serve = server_create( ip_addr, ip_port, file, sock, default_deny, argc - optind, argv + optind, MAX_NBD_CLIENTS, 1 );
if ( 0 == do_serve( serve ) ) {
info( "Control transfered.");
}
server_destroy( serve );
return 0;
}
int mode_listen( int argc, char *argv[] )
{
int c;
char *ip_addr = NULL;
char *rebind_ip_addr = NULL;
char *ip_port = NULL;
char *rebind_ip_port = NULL;
char *file = NULL;
char *sock = NULL;
int default_deny = 0; // not on by default
int err = 0;
struct listen * listen;
while (1) {
c = getopt_long(argc, argv, listen_short_options, listen_options, NULL);
if ( c == -1 ) { break; }
read_listen_param( c, &ip_addr, &rebind_ip_addr, &ip_port, &rebind_ip_port,
&file, &sock, &default_deny );
}
if ( NULL == ip_addr || NULL == ip_port ) {
err = 1;
fprintf( stderr, "both --addr and --port are required.\n" );
}
if ( NULL == file ) {
err = 1;
fprintf( stderr, "--file is required\n" );
}
if ( err ) { exit_err( listen_help_text ); }
listen = listen_create( ip_addr, rebind_ip_addr,
ip_port, rebind_ip_port,
file, sock, default_deny,
argc - optind, argv + optind, MAX_NBD_CLIENTS );
do_listen( listen );
listen_destroy( listen );
return 0;
}
int mode_read( int argc, char *argv[] )
{
int c;
@@ -439,6 +531,8 @@ int mode_help( int argc, char *argv[] )
cmd = argv[0];
if (IS_CMD( CMD_SERVE, cmd ) ) {
help_text = serve_help_text;
} else if ( IS_CMD( CMD_LISTEN, cmd ) ) {
help_text = listen_help_text;
} else if ( IS_CMD( CMD_READ, cmd ) ) {
help_text = read_help_text;
} else if ( IS_CMD( CMD_WRITE, cmd ) ) {
@@ -462,6 +556,9 @@ void mode(char* mode, int argc, char **argv)
if ( IS_CMD( CMD_SERVE, mode ) ) {
mode_serve( argc, argv );
}
else if ( IS_CMD( CMD_LISTEN, mode ) ) {
mode_listen( argc, argv );
}
else if ( IS_CMD( CMD_READ, mode ) ) {
mode_read( argc, argv );
}

87
src/listen.c Normal file
View File

@@ -0,0 +1,87 @@
#include "listen.h"
#include "serve.h"
#include "util.h"
#include <stdlib.h>
struct listen * listen_create(
char* s_ip_address,
char* s_rebind_ip_address,
char* s_port,
char* s_rebind_port,
char* s_file,
char *s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients )
{
struct listen * listen;
listen = (struct listen *)xmalloc( sizeof( listen ) );
listen->init_serve = server_create( s_ip_address,
s_port,
s_file,
s_ctrl_sock,
default_deny,
acl_entries,
s_acl_entries,
1, 0);
listen->main_serve = server_create(
s_rebind_ip_address ? s_rebind_ip_address : s_ip_address,
s_rebind_port ? s_rebind_port : s_port,
s_file,
s_ctrl_sock,
default_deny,
acl_entries,
s_acl_entries,
max_nbd_clients, 1);
return listen;
}
void listen_destroy( struct listen * listen )
{
NULLCHECK( listen );
free( listen );
}
void listen_switch( struct listen * listen )
{
NULLCHECK( listen );
/* TODO: Copy acl from init_serve to main_serve */
/* TODO: rename underlying file from foo.INCOMPLETE to foo */
}
void listen_cleanup( void * unused __attribute__((unused)) )
{
}
void do_listen( struct listen * listen )
{
NULLCHECK( listen );
int have_control = 0;
have_control = do_serve( listen->init_serve );
if( have_control ) {
info( "Taking control.");
listen_switch( listen );
server_destroy( listen->init_serve );
info( "Switched to the main server, serving." );
do_serve( listen->main_serve );
}
else {
warn("Failed to take control, giving up.");
server_destroy( listen->init_serve );
}
server_destroy( listen->main_serve );
info("Listen done.");
listen_cleanup( listen );
}

26
src/listen.h Normal file
View File

@@ -0,0 +1,26 @@
#ifndef LISTEN_H
#define LISTEN_H
#include "serve.h"
struct listen {
struct server * init_serve;
struct server * main_serve;
};
struct listen * listen_create(
char* s_ip_address,
char* s_rebind_ip_address,
char* s_port,
char* s_rebind_port,
char* s_file,
char *s_ctrl_sock,
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients );
void listen_destroy( struct listen* );
void do_listen( struct listen * );
#endif

View File

@@ -5,8 +5,10 @@
#define OPT_HELP "help"
#define OPT_ADDR "addr"
#define OPT_REBIND_ADDR "rebind-addr"
#define OPT_BIND "bind"
#define OPT_PORT "port"
#define OPT_REBIND_PORT "rebind-port"
#define OPT_FILE "file"
#define OPT_SOCK "sock"
#define OPT_FROM "from"
@@ -14,6 +16,7 @@
#define OPT_DENY "default-deny"
#define CMD_SERVE "serve"
#define CMD_LISTEN "listen"
#define CMD_READ "read"
#define CMD_WRITE "write"
#define CMD_ACL "acl"
@@ -32,7 +35,9 @@
#define GETOPT_DENY GETOPT_FLAG( OPT_DENY, 'd' )
#define GETOPT_ADDR GETOPT_ARG( OPT_ADDR, 'l' )
#define GETOPT_REBIND_ADDR GETOPT_ARG( OPT_REBIND_ADDR, 'L')
#define GETOPT_PORT GETOPT_ARG( OPT_PORT, 'p' )
#define GETOPT_REBIND_PORT GETOPT_ARG( OPT_REBIND_PORT, 'P')
#define GETOPT_FILE GETOPT_ARG( OPT_FILE, 'f' )
#define GETOPT_SOCK GETOPT_ARG( OPT_SOCK, 's' )
#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' )
@@ -81,6 +86,36 @@ static char serve_help_text[] =
SOCK_LINE
VERBOSE_LINE;
static struct option listen_options[] = {
GETOPT_HELP,
GETOPT_ADDR,
GETOPT_REBIND_ADDR,
GETOPT_PORT,
GETOPT_REBIND_ADDR,
GETOPT_FILE,
GETOPT_SOCK,
GETOPT_DENY,
GETOPT_VERBOSE,
{0}
};
static char listen_short_options[] = "hl:L:p:P:f:s:d" SOPT_VERBOSE;
static char listen_help_text[] =
"Usage: flexnbd " CMD_LISTEN " <options> [<acl_address>*]\n\n"
"Listen for an incoming migration on ADDR:PORT, "
"then switch to REBIND_ADDR:REBIND_PORT on completion "
"to serve FILE.\n\n"
HELP_LINE
"\t--" OPT_ADDR ",-l <ADDR>\tThe address to listen on.\n"
"\t--" OPT_REBIND_ADDR ",-L <REBIND_ADDR>\tThe address to switch to, if given.\n"
"\t--" OPT_PORT ",-p <PORT>\tThe port to listen on.\n"
"\t--" OPT_REBIND_PORT ",-P <REBIND_PORT>\tThe port to switch to, if given..\n"
"\t--" OPT_FILE ",-f <FILE>\tThe file to serve.\n"
"\t--" OPT_DENY ",-d\tDeny connections by default unless in ACL.\n"
SOCK_LINE
VERBOSE_LINE;
static struct option read_options[] = {
GETOPT_HELP,
GETOPT_ADDR,

View File

@@ -11,6 +11,9 @@ int atoi(const char *nptr);
/* FIXME: should change this to return negative on error like everything else */
int parse_ip_to_sockaddr(struct sockaddr* out, char* src)
{
NULLCHECK( out );
NULLCHECK( src );
char temp[64];
struct sockaddr_in *v4 = (struct sockaddr_in *) out;
struct sockaddr_in6 *v6 = (struct sockaddr_in6 *) out;

View File

@@ -44,10 +44,12 @@ struct server * server_create (
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients)
int max_nbd_clients,
int has_control)
{
struct server * out;
out = xmalloc( sizeof( struct server ) );
out->has_control = has_control;
out->max_nbd_clients = max_nbd_clients;
out->nbd_client = xmalloc( max_nbd_clients * sizeof( struct client_tbl_entry ) );
@@ -56,6 +58,7 @@ struct server * server_create (
FATAL_IF_NULL(s_ip_address, "No IP address supplied");
FATAL_IF_NULL(s_port, "No port number supplied");
FATAL_IF_NULL(s_file, "No filename supplied");
NULLCHECK( s_ip_address );
FATAL_IF_ZERO(
parse_ip_to_sockaddr(&out->bind_to.generic, s_ip_address),
"Couldn't parse server address '%s' (use 0 if "
@@ -271,7 +274,7 @@ int cleanup_client_thread( struct client_tbl_entry * entry )
void cleanup_client_threads( struct client_tbl_entry * entries, size_t entries_len )
{
int i;
size_t i;
for( i = 0; i < entries_len; i++ ) {
cleanup_client_thread( &entries[i] );
}
@@ -575,19 +578,13 @@ int server_accept( struct server * params )
}
if ( FD_ISSET( params->server_fd, &fds ) ){
client_fd = accept( params->server_fd, &client_address.generic, &socklen );
debug("Accepted nbd client socket");
accept_nbd_client(params, client_fd, &client_address);
} else if( FD_ISSET( params->control_fd, &fds ) ) {
}
else if( FD_ISSET( params->control_fd, &fds ) ) {
client_fd = accept( params->control_fd, &client_address.generic, &socklen );
debug("Accepted control client socket");
accept_control_connection(params, client_fd, &client_address);
}
@@ -639,6 +636,17 @@ void serve_wait_for_close( struct server * serve )
}
}
/* We've just had an ENTRUST/DISCONNECT pair, so we need to shut down
* and signal our listener that we can safely take over.
*/
void server_control_arrived( struct server *serve )
{
NULLCHECK( serve );
serve->has_control = 1;
serve_signal_close( serve );
}
/** Closes sockets, frees memory and waits for all client threads to finish */
void serve_cleanup(struct server* params,
@@ -676,15 +684,20 @@ void serve_cleanup(struct server* params,
}
/** Full lifecycle of the server */
void do_serve(struct server* params)
int do_serve(struct server* params)
{
NULLCHECK( params );
int has_control;
error_set_handler((cleanup_handler*) serve_cleanup, params);
serve_open_server_socket(params);
serve_open_control_socket(params);
serve_init_allocation_map(params);
serve_accept_loop(params);
has_control = params->has_control;
serve_cleanup(params, 0);
return has_control;
}

View File

@@ -82,6 +82,13 @@ struct server {
int max_nbd_clients;
struct client_tbl_entry *nbd_client;
/* Marker for whether or not this server has control over the
* data in the file, or if we're waiting to receive it from an
* inbound migration which hasn't yet finished.
*/
int has_control;
};
struct server * server_create(
@@ -92,7 +99,8 @@ struct server * server_create(
int default_deny,
int acl_entries,
char** s_acl_entries,
int max_nbd_clients );
int max_nbd_clients,
int has_control );
void server_destroy( struct server * );
int server_is_closed(struct server* serve);
void server_dirty(struct server *serve, off64_t from, int len);
@@ -101,7 +109,9 @@ void server_unlock_io( struct server* serve );
void serve_signal_close( struct server *serve );
void serve_wait_for_close( struct server * serve );
void server_replace_acl( struct server *serve, struct acl * acl);
void server_control_arrived( struct server *serve );
int do_serve( struct server * );
struct mode_readwrite_params {
union mysockaddr connect_to;