proxy: Switch to blocking I/O with signal handlers to exit.
It's safe to terminate the proxy at any point in its lifecycle, so there's no point using signalfd() (and the associated select() + non-blocking I/O gubbins) in it. We might want to use non-blocking I/O in the future for other reasons, of course, at which point it might become sensible to use signalfd() again. For now, this makes us reliably responsive to TERM,INT and QUIT in a way that we weren't previously.
This commit is contained in:
@@ -1,9 +1,7 @@
|
||||
#include <signal.h>
|
||||
#include <sys/signalfd.h>
|
||||
|
||||
#include "mode.h"
|
||||
#include "util.h"
|
||||
#include "sockutil.h"
|
||||
#include "proxy.h"
|
||||
|
||||
|
||||
@@ -71,62 +69,38 @@ void read_proxy_param(
|
||||
}
|
||||
}
|
||||
|
||||
/* Stolen from flexnbd.c, wil change in the near future so no point DRYing */
|
||||
int build_signal_fd(void)
|
||||
struct proxier * proxy = NULL;
|
||||
|
||||
void my_exit(int signum)
|
||||
{
|
||||
info( "Exit signalled (%i)", signum );
|
||||
if ( NULL != proxy ) {
|
||||
proxy_cleanup( proxy );
|
||||
};
|
||||
exit( 0 );
|
||||
}
|
||||
|
||||
int main( int argc, char *argv[] )
|
||||
{
|
||||
int c;
|
||||
char *downstream_addr = NULL;
|
||||
char *downstream_port = NULL;
|
||||
char *upstream_addr = NULL;
|
||||
char *upstream_port = NULL;
|
||||
char *bind_addr = NULL;
|
||||
int success;
|
||||
|
||||
sigset_t mask;
|
||||
int sfd;
|
||||
struct sigaction exit_action;
|
||||
|
||||
sigemptyset( &mask );
|
||||
sigaddset( &mask, SIGTERM );
|
||||
sigaddset( &mask, SIGQUIT );
|
||||
sigaddset( &mask, SIGINT );
|
||||
|
||||
FATAL_UNLESS( 0 == pthread_sigmask( SIG_BLOCK, &mask, NULL ),
|
||||
"Signal blocking failed" );
|
||||
|
||||
sfd = signalfd( -1, &mask, 0 );
|
||||
FATAL_IF( -1 == sfd, "Failed to get a signal fd" );
|
||||
|
||||
return sfd;
|
||||
}
|
||||
|
||||
struct proxier* flexnbd_create_proxying(
|
||||
int signal_fd,
|
||||
char* s_downstream_address,
|
||||
char* s_downstream_port,
|
||||
char* s_upstream_address,
|
||||
char* s_upstream_port,
|
||||
char* s_upstream_bind
|
||||
)
|
||||
{
|
||||
struct proxier* proxy = proxy_create(
|
||||
signal_fd,
|
||||
s_downstream_address,
|
||||
s_downstream_port,
|
||||
s_upstream_address,
|
||||
s_upstream_port,
|
||||
s_upstream_bind
|
||||
);
|
||||
|
||||
return proxy;
|
||||
}
|
||||
|
||||
|
||||
int main( int argc, char *argv[] )
|
||||
{
|
||||
int c;
|
||||
struct proxier * proxy;
|
||||
char *downstream_addr = NULL;
|
||||
char *downstream_port = NULL;
|
||||
char *upstream_addr = NULL;
|
||||
char *upstream_port = NULL;
|
||||
char *bind_addr = NULL;
|
||||
int signal_fd;
|
||||
int success;
|
||||
|
||||
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
|
||||
error_init();
|
||||
exit_action.sa_handler = my_exit;
|
||||
exit_action.sa_mask = mask;
|
||||
exit_action.sa_flags = 0;
|
||||
|
||||
while (1) {
|
||||
c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL );
|
||||
@@ -148,10 +122,7 @@ int main( int argc, char *argv[] )
|
||||
exit_err( proxy_help_text );
|
||||
}
|
||||
|
||||
signal_fd = build_signal_fd();
|
||||
|
||||
proxy = flexnbd_create_proxying(
|
||||
signal_fd,
|
||||
proxy = proxy_create(
|
||||
downstream_addr,
|
||||
downstream_port,
|
||||
upstream_addr,
|
||||
@@ -159,13 +130,19 @@ int main( int argc, char *argv[] )
|
||||
bind_addr
|
||||
);
|
||||
|
||||
/* Set these *after* proxy has been assigned to */
|
||||
sigaction(SIGTERM, &exit_action, NULL);
|
||||
sigaction(SIGQUIT, &exit_action, NULL);
|
||||
sigaction(SIGINT, &exit_action, NULL);
|
||||
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
|
||||
|
||||
info(
|
||||
"Proxying between %s %s (downstream) and %s %s (upstream)",
|
||||
downstream_addr, downstream_port, upstream_addr, upstream_port
|
||||
);
|
||||
|
||||
success = do_proxy( proxy );
|
||||
sock_try_close( signal_fd );
|
||||
success = do_proxy( proxy );
|
||||
proxy_destroy( proxy );
|
||||
|
||||
return success ? 0 : 1;
|
||||
}
|
||||
|
122
src/proxy.c
122
src/proxy.c
@@ -11,7 +11,6 @@
|
||||
#include <netinet/tcp.h>
|
||||
|
||||
struct proxier* proxy_create(
|
||||
int signal_fd,
|
||||
char* s_downstream_address,
|
||||
char* s_downstream_port,
|
||||
char* s_upstream_address,
|
||||
@@ -20,7 +19,6 @@ struct proxier* proxy_create(
|
||||
{
|
||||
struct proxier* out;
|
||||
out = xmalloc( sizeof( struct proxier ) );
|
||||
out->signal_fd = signal_fd;
|
||||
|
||||
FATAL_IF_NULL(s_downstream_address, "Listen address not specified");
|
||||
NULLCHECK( s_downstream_address );
|
||||
@@ -158,38 +156,6 @@ void proxy_open_listen_socket(struct proxier* params)
|
||||
info( "Now listening for incoming connections" );
|
||||
}
|
||||
|
||||
|
||||
/* Return 0 if we should keep running, 1 if an exit has been signaled. Pass it
|
||||
* an fd_set to check, or set check_fds to NULL to have it perform its own.
|
||||
* If we do the latter, then wait specifies how many seconds we'll wait for an
|
||||
* exit signal to show up.
|
||||
*/
|
||||
int proxy_should_exit( struct proxier* params, fd_set *check_fds, int wait )
|
||||
{
|
||||
struct timeval tv = { wait, 0 };
|
||||
fd_set internal_fds;
|
||||
fd_set* fds = check_fds;
|
||||
|
||||
if ( NULL == check_fds ) {
|
||||
fds = &internal_fds;
|
||||
|
||||
FD_ZERO( fds );
|
||||
FD_SET( params->signal_fd, fds );
|
||||
|
||||
FATAL_IF_NEGATIVE(
|
||||
sock_try_select(FD_SETSIZE, fds, NULL, NULL, &tv),
|
||||
SHOW_ERRNO( "select() failed." )
|
||||
);
|
||||
}
|
||||
|
||||
if ( FD_ISSET( params->signal_fd, fds ) ) {
|
||||
info( "Stop signal received" );
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Try to get a request from downstream. If reading from downstream fails, then
|
||||
* the session will be over. Returns 1 on success, 0 on failure.
|
||||
*/
|
||||
@@ -248,7 +214,7 @@ int proxy_get_request_from_downstream( struct proxier* proxy )
|
||||
|
||||
/* Tries to send the request upstream and receive a response. If upstream breaks
|
||||
* then we reconnect to it, and keep it up until we have a complete response
|
||||
* back. Returns 1 on success, 0 on failure, -1 if exit is signalled.
|
||||
* back. Returns 1 on success, 0 on failure
|
||||
*/
|
||||
int proxy_run_request_upstream( struct proxier* proxy )
|
||||
{
|
||||
@@ -262,20 +228,6 @@ int proxy_run_request_upstream( struct proxier* proxy )
|
||||
|
||||
size_t rsp_buf_size;
|
||||
|
||||
if ( proxy->upstream_fd == -1 ) {
|
||||
debug( "Connecting to upstream" );
|
||||
if ( !proxy_connect_to_upstream( proxy ) ) {
|
||||
debug( "Failed to connect to upstream" );
|
||||
|
||||
if ( proxy_should_exit( proxy, NULL, 5 ) ) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
debug( "Connected to upstream" );
|
||||
}
|
||||
|
||||
if ( writeloop( proxy->upstream_fd, proxy->req_buf, proxy->req_buf_size ) == -1 ) {
|
||||
warn( "Failed to send request to upstream" );
|
||||
proxy_disconnect_from_upstream( proxy );
|
||||
@@ -364,25 +316,22 @@ void proxy_session( struct proxier* proxy )
|
||||
}
|
||||
|
||||
while( proxy_get_request_from_downstream( proxy ) ) {
|
||||
|
||||
/* Don't start running the request if exit has been signalled */
|
||||
if ( proxy_should_exit( proxy, NULL, 0 ) ) {
|
||||
break;
|
||||
}
|
||||
|
||||
do {
|
||||
if ( proxy->upstream_fd == -1 ) {
|
||||
debug( "Connecting to upstream" );
|
||||
if ( !proxy_connect_to_upstream( proxy ) ) {
|
||||
debug( "Failed to connect to upstream" );
|
||||
result = 0;
|
||||
sleep( 5 );
|
||||
continue;
|
||||
}
|
||||
debug( "Connected to upstream" );
|
||||
}
|
||||
result = proxy_run_request_upstream( proxy );
|
||||
} while ( result == 0 );
|
||||
|
||||
/* We have to exit, but don't know if the request was successfully
|
||||
* proxied or not. We could add that knowledge, and attempt to send a
|
||||
* reply downstream if it was, but I don't think it's worth it.
|
||||
*/
|
||||
if ( result == -1 ) {
|
||||
break;
|
||||
}
|
||||
|
||||
if ( !proxy_send_reply_downstream( proxy ) ) {
|
||||
debug( "Replying on fd %i failed, ending session", downstream_fd );
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -407,7 +356,6 @@ int proxy_accept( struct proxier* params )
|
||||
|
||||
int client_fd;
|
||||
fd_set fds;
|
||||
int should_continue = 1;
|
||||
|
||||
union mysockaddr client_address;
|
||||
socklen_t socklen = sizeof( client_address );
|
||||
@@ -416,18 +364,13 @@ int proxy_accept( struct proxier* params )
|
||||
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(params->listen_fd, &fds);
|
||||
FD_SET(params->signal_fd, &fds);
|
||||
|
||||
FATAL_IF_NEGATIVE(
|
||||
sock_try_select(FD_SETSIZE, &fds, NULL, NULL, NULL),
|
||||
SHOW_ERRNO( "select() failed" )
|
||||
);
|
||||
|
||||
if ( proxy_should_exit( params, &fds, 0) ) {
|
||||
should_continue = 0;
|
||||
}
|
||||
|
||||
if ( should_continue && FD_ISSET( params->listen_fd, &fds ) ) {
|
||||
if ( FD_ISSET( params->listen_fd, &fds ) ) {
|
||||
client_fd = accept( params->listen_fd, &client_address.generic, &socklen );
|
||||
|
||||
if ( sock_set_tcp_nodelay(client_fd, 1) == -1 ) {
|
||||
@@ -446,7 +389,7 @@ int proxy_accept( struct proxier* params )
|
||||
params->downstream_fd = -1;
|
||||
}
|
||||
|
||||
return should_continue;
|
||||
return 1; /* We actually expect to be interrupted by signal handlers */
|
||||
}
|
||||
|
||||
|
||||
@@ -456,18 +399,39 @@ void proxy_accept_loop( struct proxier* params )
|
||||
while( proxy_accept( params ) );
|
||||
}
|
||||
|
||||
/** Closes sockets, frees memory and waits for all requests to clear */
|
||||
void proxy_cleanup( struct proxier* params )
|
||||
/** Closes sockets */
|
||||
void proxy_cleanup( struct proxier* proxy )
|
||||
{
|
||||
NULLCHECK( params );
|
||||
NULLCHECK( proxy );
|
||||
|
||||
info( "cleaning up" );
|
||||
info( "Cleaning up" );
|
||||
|
||||
if ( -1 != params->listen_fd ) {
|
||||
if ( -1 != proxy->listen_fd ) {
|
||||
WARN_IF_NEGATIVE(
|
||||
sock_try_close( params->listen_fd ),
|
||||
"Failed to close() listen_fd %i", params->listen_fd
|
||||
sock_try_close( proxy->listen_fd ),
|
||||
SHOW_ERRNO( "Failed to close() listen fd %i", proxy->listen_fd )
|
||||
);
|
||||
proxy->listen_fd = -1;
|
||||
}
|
||||
|
||||
if ( -1 != proxy->downstream_fd ) {
|
||||
WARN_IF_NEGATIVE(
|
||||
sock_try_close( proxy->downstream_fd ),
|
||||
SHOW_ERRNO(
|
||||
"Failed to close() downstream fd %i", proxy->downstream_fd
|
||||
)
|
||||
);
|
||||
proxy->downstream_fd = -1;
|
||||
}
|
||||
|
||||
if ( -1 != proxy->upstream_fd ) {
|
||||
WARN_IF_NEGATIVE(
|
||||
sock_try_close( proxy->upstream_fd ),
|
||||
SHOW_ERRNO(
|
||||
"Failed to close() upstream fd %i", proxy->upstream_fd
|
||||
)
|
||||
);
|
||||
proxy->upstream_fd = -1;
|
||||
}
|
||||
|
||||
debug( "Cleanup done" );
|
||||
@@ -478,8 +442,6 @@ int do_proxy( struct proxier* params )
|
||||
{
|
||||
NULLCHECK( params );
|
||||
|
||||
error_set_handler( (cleanup_handler*) proxy_cleanup, params );
|
||||
|
||||
debug( "Ensuring upstream server is open" );
|
||||
|
||||
if ( !proxy_connect_to_upstream( params ) ) {
|
||||
|
@@ -55,19 +55,16 @@ struct proxier {
|
||||
|
||||
/* We transform the raw reply header into here */
|
||||
struct nbd_reply rsp_hdr;
|
||||
|
||||
/* File descriptor that signal handlers write to */
|
||||
int signal_fd;
|
||||
};
|
||||
|
||||
struct proxier* proxy_create(
|
||||
int signal_fd,
|
||||
char* s_downstream_address,
|
||||
char* s_downstream_port,
|
||||
char* s_upstream_address,
|
||||
char* s_upstream_port,
|
||||
char* s_upstream_bind );
|
||||
int do_proxy( struct proxier* proxy );
|
||||
void proxy_cleanup( struct proxier* proxy );
|
||||
void proxy_destroy( struct proxier* proxy );
|
||||
|
||||
#endif
|
||||
|
Reference in New Issue
Block a user