/* FlexNBD server (C) Bytemark Hosting 2012 This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ /** The control server responds on a UNIX socket and services our "remote" * commands which are used for changing the access control list, initiating * a mirror process, or asking for status. The protocol is pretty simple - * after connecting the client sends a series of LF-terminated lines, followed * by a blank line (i.e. double LF). The first line is taken to be the command * name to invoke, and the lines before the double LF are its arguments. * * These commands can be invoked remotely from the command line, with the * client code to be found in remote.c */ #include "control.h" #include "mirror.h" #include "serve.h" #include "util.h" #include "ioutil.h" #include "parse.h" #include "readwrite.h" #include "bitset.h" #include "self_pipe.h" #include "acl.h" #include "status.h" #include "mbox.h" #include #include #include #include struct control * control_create(struct flexnbd * flexnbd, const char * csn) { struct control * control = xmalloc( sizeof( struct control ) ); NULLCHECK( csn ); control->flexnbd = flexnbd; control->socket_name = csn; control->close_signal = self_pipe_create(); return control; } void control_signal_close( struct control * control) { NULLCHECK( control ); self_pipe_signal( control->close_signal ); } void control_destroy( struct control * control ) { NULLCHECK( control ); self_pipe_destroy( control->close_signal ); free( control ); } struct control_client * control_client_create( struct flexnbd * flexnbd, int client_fd ) { NULLCHECK( flexnbd ); struct control_client * control_client = xmalloc( sizeof( struct control_client ) ); control_client->socket = client_fd; control_client->flexnbd = flexnbd; return control_client; } void control_client_destroy( struct control_client * client ) { NULLCHECK( client ); free( client ); } void control_respond(struct control_client * client); void control_handle_client( struct control * control, int client_fd ) { NULLCHECK( control ); NULLCHECK( control->flexnbd ); struct control_client * control_client = control_client_create( control->flexnbd, client_fd ); /* We intentionally don't spawn a thread for the client here. * This is to avoid having more than one thread potentially * waiting on the migration commit status. */ control_respond( control_client ); } void control_accept_client( struct control * control ) { int client_fd; union mysockaddr client_address; socklen_t addrlen = sizeof( union mysockaddr ); client_fd = accept( control->control_fd, &client_address.generic, &addrlen ); FATAL_IF( -1 == client_fd, "control accept failed" ); control_handle_client( control, client_fd ); } int control_accept( struct control * control ) { NULLCHECK( control ); fd_set fds; FD_ZERO( &fds ); FD_SET( control->control_fd, &fds ); self_pipe_fd_set( control->close_signal, &fds ); debug("Control thread selecting"); FATAL_UNLESS( 0 < select( FD_SETSIZE, &fds, NULL, NULL, NULL ), "Control select failed." ); if ( self_pipe_fd_isset( control->close_signal, &fds ) ){ return 0; } if ( FD_ISSET( control->control_fd, &fds ) ) { control_accept_client( control ); } return 1; } void control_accept_loop( struct control * control ) { while( control_accept( control ) ); } int open_control_socket( const char * socket_name ) { struct sockaddr_un bind_address; int control_fd; if (!socket_name) { fatal( "Tried to open a control socket without a socket name" ); } control_fd = socket(AF_UNIX, SOCK_STREAM, 0); FATAL_IF_NEGATIVE(control_fd , "Couldn't create control socket"); memset(&bind_address, 0, sizeof(struct sockaddr_un)); bind_address.sun_family = AF_UNIX; strncpy(bind_address.sun_path, socket_name, sizeof(bind_address.sun_path)-1); unlink(socket_name); /* ignore failure */ FATAL_IF_NEGATIVE( bind(control_fd , &bind_address, sizeof(bind_address)), "Couldn't bind control socket to %s", socket_name ); FATAL_IF_NEGATIVE( listen(control_fd , 5), "Couldn't listen on control socket" ); return control_fd; } void control_listen(struct control* control) { NULLCHECK( control ); control->control_fd = open_control_socket( control->socket_name ); } void control_serve( struct control * control ) { NULLCHECK( control ); control_listen( control ); while( control_accept( control ) ); } void * control_runner( void * control_uncast ) { debug("Control thread"); NULLCHECK( control_uncast ); struct control * control = (struct control *)control_uncast; control_serve( control ); return NULL; } #define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1) void control_write_mirror_response( enum mirror_state mirror_state, int client_fd ) { switch (mirror_state) { case MS_INIT: case MS_UNKNOWN: write_socket( "1: Mirror failed to initialise" ); fatal( "Impossible mirror state: %d", mirror_state ); case MS_FAIL_CONNECT: write_socket( "1: Mirror failed to connect"); break; case MS_FAIL_REJECTED: write_socket( "1: Mirror was rejected" ); break; case MS_FAIL_NO_HELLO: write_socket( "1: Remote server failed to respond"); break; case MS_FAIL_SIZE_MISMATCH: write_socket( "1: Remote size does not match local size" ); break; case MS_GO: case MS_FINALISE: case MS_DONE: /* Yes, I know we know better, but it's simpler this way */ write_socket( "0: Mirror started" ); break; default: fatal( "Unhandled mirror state: %d", mirror_state ); } } #undef write_socket #define write_socket(msg) write(client->socket, (msg "\n"), strlen((msg))+1) /** Command parser to start mirror process from socket input */ int control_mirror(struct control_client* client, int linesc, char** lines) { NULLCHECK( client ); struct flexnbd * flexnbd = client->flexnbd; union mysockaddr *connect_to = xmalloc( sizeof( union mysockaddr ) ); union mysockaddr *connect_from = NULL; uint64_t max_Bps = 0; int action_at_finish; int raw_port; if (linesc < 2) { write_socket("1: mirror takes at least two parameters"); return -1; } if (parse_ip_to_sockaddr(&connect_to->generic, lines[0]) == 0) { write_socket("1: bad IP address"); return -1; } raw_port = atoi(lines[1]); if (raw_port < 0 || raw_port > 65535) { write_socket("1: bad IP port number"); return -1; } connect_to->v4.sin_port = htobe16(raw_port); if (linesc > 2) { connect_from = xmalloc( sizeof( union mysockaddr ) ); if (parse_ip_to_sockaddr(&connect_from->generic, lines[2]) == 0) { write_socket("1: bad bind address"); return -1; } } if (linesc > 3) { max_Bps = atoi(lines[2]); } action_at_finish = ACTION_EXIT; if (linesc > 4) { if (strcmp("exit", lines[3]) == 0) { action_at_finish = ACTION_EXIT; } else if (strcmp("nothing", lines[3]) == 0) { action_at_finish = ACTION_NOTHING; } else { write_socket("1: action must be 'exit' or 'nothing'"); return -1; } } if (linesc > 5) { write_socket("1: unrecognised parameters to mirror"); return -1; } /* In theory, we should never have to worry about the switch * lock here, since we should never be able to start more than * one mirror at a time. This is enforced by only accepting a * single client at a time on the control socket. */ flexnbd_lock_switch( flexnbd ); { struct server * serve = flexnbd_server(flexnbd); serve->mirror_super = mirror_super_create( serve->filename, connect_to, connect_from, max_Bps , action_at_finish); serve->mirror = serve->mirror_super->mirror; FATAL_IF( 0 != pthread_create( &serve->mirror_super->thread, NULL, mirror_super_runner, serve ), "Failed to create mirror thread" ); debug("Control thread mirror super waiting"); enum mirror_state state = mirror_super_wait( serve->mirror_super ); debug("Control thread writing response"); control_write_mirror_response( state, client->socket ); } debug( "Control thread unlocking switch" ); flexnbd_unlock_switch( flexnbd ); debug( "Control thread going away." ); return 0; } #undef write_socket /** Command parser to alter access control list from socket input */ int control_acl(struct control_client* client, int linesc, char** lines) { NULLCHECK( client ); NULLCHECK( client->flexnbd ); struct flexnbd * flexnbd = client->flexnbd; int default_deny = flexnbd_default_deny( flexnbd ); struct acl * new_acl = acl_create( linesc, lines, default_deny ); if (new_acl->len != linesc) { write(client->socket, "1: bad spec: ", 13); write(client->socket, lines[new_acl->len], strlen(lines[new_acl->len])); write(client->socket, "\n", 1); acl_destroy( new_acl ); } else { flexnbd_replace_acl( flexnbd, new_acl ); write( client->socket, "0: updated", 10); } return 0; } /** FIXME: add some useful statistics */ int control_status( struct control_client* client, int linesc __attribute__ ((unused)), char** lines __attribute__((unused)) ) { NULLCHECK( client ); NULLCHECK( client->flexnbd ); struct status * status = flexnbd_status_create( client->flexnbd ); write( client->socket, "0: ", 3 ); status_write( status, client->socket ); status_destroy( status ); return 0; } void control_cleanup(struct control_client* client, int fatal __attribute__ ((unused)) ) { if (client->socket) { close(client->socket); } /* This is wrongness */ if ( server_io_locked( client->flexnbd->serve ) ) { server_unlock_io( client->flexnbd->serve ); } if ( server_acl_locked( client->flexnbd->serve ) ) { server_unlock_acl( client->flexnbd->serve ); } if ( flexnbd_switch_locked( client->flexnbd ) ) { flexnbd_unlock_switch( client->flexnbd ); } control_client_destroy( client ); } /** Master command parser for control socket connections, delegates quickly */ void control_respond(struct control_client * client) { char **lines = NULL; error_set_handler((cleanup_handler*) control_cleanup, client); int i, linesc; linesc = read_lines_until_blankline(client->socket, 256, &lines); if (linesc < 1) { write(client->socket, "9: missing command\n", 19); /* ignore failure */ } else if (strcmp(lines[0], "acl") == 0) { info("acl command received" ); if (control_acl(client, linesc-1, lines+1) < 0) { debug("acl command failed"); } } else if (strcmp(lines[0], "mirror") == 0) { info("mirror command received" ); if (control_mirror(client, linesc-1, lines+1) < 0) { debug("mirror command failed"); } } else if (strcmp(lines[0], "status") == 0) { info("status command received" ); if (control_status(client, linesc-1, lines+1) < 0) { debug("status command failed"); } } else { write(client->socket, "10: unknown command\n", 23); } for (i=0; i