proxy: Initial move to event-loop proxy model.
Building with -DPREFETCH is currently broken, I'm sure, but otherwise this version seems to be feature-complete compared to the previous one, albeit wordier. Upcoming: cleanups
This commit is contained in:
743
src/proxy.c
743
src/proxy.c
@@ -69,7 +69,6 @@ struct proxier* proxy_create(
|
||||
out->prefetch = xmalloc( sizeof( struct prefetch ) );
|
||||
#endif
|
||||
|
||||
|
||||
out->req_buf = xmalloc( NBD_MAX_SIZE );
|
||||
out->rsp_buf = xmalloc( NBD_MAX_SIZE );
|
||||
|
||||
@@ -87,9 +86,11 @@ void proxy_destroy( struct proxier* proxy )
|
||||
free( proxy );
|
||||
}
|
||||
|
||||
/* Shared between our two different connect_to_upstream paths */
|
||||
void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size );
|
||||
|
||||
/* Try to establish a connection to our upstream server. Return 1 on success,
|
||||
* 0 on failure
|
||||
* 0 on failure. this is a blocking call that returns a non-blocking socket.
|
||||
*/
|
||||
int proxy_connect_to_upstream( struct proxier* proxy )
|
||||
{
|
||||
@@ -113,22 +114,85 @@ int proxy_connect_to_upstream( struct proxier* proxy )
|
||||
return 0;
|
||||
}
|
||||
|
||||
proxy_finish_connect_to_upstream( proxy, size );
|
||||
proxy->upstream_fd = fd;
|
||||
sock_set_nonblock( fd, 1 );
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* First half of non-blocking connection to upstream. Gets as far as calling
|
||||
* connect() on a non-blocking socket.
|
||||
*/
|
||||
void proxy_start_connect_to_upstream( struct proxier* proxy )
|
||||
{
|
||||
int fd, result;
|
||||
struct sockaddr* from = NULL;
|
||||
struct sockaddr* to = &proxy->connect_to.generic;
|
||||
|
||||
if ( proxy->bind ) {
|
||||
from = &proxy->connect_from.generic;
|
||||
}
|
||||
|
||||
fd = socket( to->sa_family , SOCK_STREAM, 0 );
|
||||
|
||||
if( fd < 0 ) {
|
||||
warn( SHOW_ERRNO( "Couldn't create socket to reconnect to upstream" ) );
|
||||
return;
|
||||
}
|
||||
|
||||
info( "Beginning non-blocking connection to upstream on fd %i", fd );
|
||||
|
||||
if ( NULL != from ) {
|
||||
if ( 0 > bind( fd, from, sockaddr_size( from ) ) ) {
|
||||
warn( SHOW_ERRNO( "bind() to source address failed" ) );
|
||||
}
|
||||
}
|
||||
|
||||
result = sock_set_nonblock( fd, 1 );
|
||||
if ( result == -1 ) {
|
||||
warn( SHOW_ERRNO( "Failed to set upstream fd %i non-blocking", fd ) );
|
||||
goto error;
|
||||
}
|
||||
|
||||
result = connect( fd, to, sockaddr_size( to ) );
|
||||
if ( result == -1 && errno != EINPROGRESS ) {
|
||||
warn( SHOW_ERRNO( "Failed to start connect()ing to upstream!" ) );
|
||||
goto error;
|
||||
}
|
||||
|
||||
proxy->upstream_fd = fd;
|
||||
return;
|
||||
|
||||
error:
|
||||
if ( sock_try_close( fd ) == -1 ) {
|
||||
/* Non-fatal leak, although still nasty */
|
||||
warn( SHOW_ERRNO( "Failed to close fd for upstream %i", fd ) );
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ) {
|
||||
|
||||
if ( proxy->upstream_size == 0 ) {
|
||||
info( "Size of upstream image is %"PRIu64" bytes", size );
|
||||
} else if ( proxy->upstream_size != size ) {
|
||||
warn( "Size changed from %"PRIu64" to %"PRIu64" bytes", proxy->upstream_size, size );
|
||||
warn(
|
||||
"Size changed from %"PRIu64" to %"PRIu64" bytes",
|
||||
proxy->upstream_size, size
|
||||
);
|
||||
}
|
||||
|
||||
proxy->upstream_size = size;
|
||||
proxy->upstream_fd = fd;
|
||||
info( "Connected to upstream on fd %i", proxy->upstream_fd );
|
||||
|
||||
return 1;
|
||||
return;
|
||||
}
|
||||
|
||||
void proxy_disconnect_from_upstream( struct proxier* proxy )
|
||||
{
|
||||
if ( -1 != proxy->upstream_fd ) {
|
||||
info(" Closing upstream connection" );
|
||||
info("Closing upstream connection on fd %i", proxy->upstream_fd );
|
||||
|
||||
/* TODO: An NBD disconnect would be pleasant here */
|
||||
WARN_IF_NEGATIVE(
|
||||
@@ -176,77 +240,18 @@ void proxy_open_listen_socket(struct proxier* params)
|
||||
);
|
||||
|
||||
info( "Now listening for incoming connections" );
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* Try to get a request from downstream. If reading from downstream fails, then
|
||||
* the session will be over. Returns 1 on success, 0 on failure.
|
||||
*/
|
||||
int proxy_get_request_from_downstream( struct proxier* proxy )
|
||||
{
|
||||
unsigned char* req_hdr_raw = proxy->req_buf;
|
||||
unsigned char* req_data = proxy->req_buf + NBD_REQUEST_SIZE;
|
||||
size_t req_buf_size;
|
||||
|
||||
struct nbd_request_raw* request_raw = (struct nbd_request_raw*) req_hdr_raw;
|
||||
struct nbd_request* request = &(proxy->req_hdr);
|
||||
|
||||
if ( readloop( proxy->downstream_fd, req_hdr_raw, NBD_REQUEST_SIZE ) == -1 ) {
|
||||
info( SHOW_ERRNO( "Failed to get request header from downstream" ) );
|
||||
return 0;
|
||||
}
|
||||
|
||||
nbd_r2h_request( request_raw, request );
|
||||
req_buf_size = NBD_REQUEST_SIZE;
|
||||
|
||||
if ( request->type == REQUEST_DISCONNECT ) {
|
||||
info( "Received disconnect request from client" );
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( request->type == REQUEST_READ ) {
|
||||
if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) {
|
||||
warn( "NBD read request size %"PRIu32" too large", request->len );
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if ( request->type == REQUEST_WRITE ) {
|
||||
if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) {
|
||||
warn( "NBD write request size %"PRIu32" too large", request->len );
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( readloop( proxy->downstream_fd, req_data, request->len ) == -1 ) {
|
||||
warn( "Failed to get NBD write request data: %"PRIu32"b", request->len );
|
||||
return 0;
|
||||
}
|
||||
|
||||
req_buf_size += request->len;
|
||||
}
|
||||
|
||||
debug(
|
||||
"Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32,
|
||||
request->type, request->from, request->len
|
||||
);
|
||||
|
||||
proxy->req_buf_size = req_buf_size;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Tries to send the request upstream and receive a response. If upstream breaks
|
||||
* then we reconnect to it, and keep it up until we have a complete response
|
||||
* back. Returns 1 on success, 0 on failure
|
||||
*/
|
||||
int proxy_run_request_upstream( struct proxier* proxy )
|
||||
{
|
||||
unsigned char* rsp_hdr_raw = proxy->rsp_buf;
|
||||
unsigned char* rsp_data = proxy->rsp_buf + NBD_REPLY_SIZE;
|
||||
|
||||
struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw;
|
||||
struct nbd_request* request = &(proxy->req_hdr);
|
||||
|
||||
#ifdef PREFETCH
|
||||
|
||||
int proxy_prefetch_for_request( struct proxier* proxy, int state )
|
||||
{
|
||||
/* TODO: restore prefetch capability */
|
||||
return state;
|
||||
|
||||
/* We only want to consider prefetching if we know we're not
|
||||
* getting too much data back, if it's a read request, and if
|
||||
* the prefetch won't try to read past the end of the file.
|
||||
@@ -277,13 +282,7 @@ int proxy_run_request_upstream( struct proxier* proxy )
|
||||
(struct nbd_request_raw *)proxy->req_buf );
|
||||
free( rewrite_request );
|
||||
}
|
||||
#endif
|
||||
|
||||
struct nbd_reply* reply = &(proxy->rsp_hdr);
|
||||
|
||||
size_t rsp_buf_size;
|
||||
|
||||
#ifdef PREFETCH
|
||||
//####
|
||||
if ( request->type == REQUEST_READ ){
|
||||
/* See if we can respond with what's in our prefetch
|
||||
* cache
|
||||
@@ -324,40 +323,12 @@ int proxy_run_request_upstream( struct proxier* proxy )
|
||||
request->type);
|
||||
proxy->prefetch->is_full = 0;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if ( -1 == writeloop( proxy->upstream_fd,
|
||||
proxy->req_buf,
|
||||
proxy->req_buf_size ) ) {
|
||||
warn( SHOW_ERRNO( "Failed to send request to upstream" ) );
|
||||
goto disconnect;
|
||||
}
|
||||
int proxy_prefetch_for_reply( struct proxier* proxy, int state )
|
||||
{
|
||||
return state;
|
||||
|
||||
if ( -1 == readloop( proxy->upstream_fd,
|
||||
rsp_hdr_raw,
|
||||
NBD_REPLY_SIZE ) ) {
|
||||
warn( SHOW_ERRNO( "Failed to get reply header from upstream" ) );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
nbd_r2h_reply( reply_raw, reply );
|
||||
rsp_buf_size = NBD_REPLY_SIZE;
|
||||
|
||||
if ( reply->magic != REPLY_MAGIC ) {
|
||||
debug( "Reply magic is incorrect" );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
debug( "NBD reply received from upstream. Response code: %"PRIu32,
|
||||
reply->error );
|
||||
|
||||
if ( reply->error != 0 ) {
|
||||
warn( "NBD error returned from upstream: %"PRIu32,
|
||||
reply->error );
|
||||
}
|
||||
|
||||
if ( reply->error == 0 && request->type == REQUEST_READ ) {
|
||||
#ifdef PREFETCH
|
||||
if ( -1 == readloop( proxy->upstream_fd,
|
||||
rsp_data,
|
||||
request->len ) ) {
|
||||
@@ -376,111 +347,509 @@ int proxy_run_request_upstream( struct proxier* proxy )
|
||||
proxy->prefetch->len = request->len;
|
||||
proxy->prefetch->is_full = 1;
|
||||
}
|
||||
#else
|
||||
|
||||
if ( -1 == readloop( proxy->upstream_fd,
|
||||
rsp_data,
|
||||
request->len ) ) {
|
||||
warn( SHOW_ERRNO( "Failed to get read reply data from upstream" ) );
|
||||
goto disconnect;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
rsp_buf_size += request->len;
|
||||
|
||||
typedef enum {
|
||||
EXIT,
|
||||
WRITE_TO_DOWNSTREAM,
|
||||
READ_FROM_DOWNSTREAM,
|
||||
CONNECT_TO_UPSTREAM,
|
||||
READ_INIT_FROM_UPSTREAM,
|
||||
WRITE_TO_UPSTREAM,
|
||||
READ_FROM_UPSTREAM
|
||||
} proxy_session_states;
|
||||
|
||||
static char* proxy_session_state_names[] = {
|
||||
"EXIT",
|
||||
"WRITE_TO_DOWNSTREAM",
|
||||
"READ_FROM_DOWNSTREAM",
|
||||
"CONNECT_TO_UPSTREAM",
|
||||
"READ_INIT_FROM_UPSTREAM",
|
||||
"WRITE_TO_UPSTREAM",
|
||||
"READ_FROM_UPSTREAM"
|
||||
};
|
||||
|
||||
static inline int io_errno_permanent()
|
||||
{
|
||||
return ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR );
|
||||
}
|
||||
|
||||
|
||||
int proxy_read_from_downstream( struct proxier *proxy, int state )
|
||||
{
|
||||
ssize_t count;
|
||||
|
||||
struct nbd_request_raw* request_raw = (struct nbd_request_raw*) proxy->req_buf;
|
||||
struct nbd_request* request = &(proxy->req_hdr);
|
||||
|
||||
// assert( state == READ_FROM_DOWNSTREAM );
|
||||
|
||||
/* We're beginning a new read from downstream */
|
||||
if ( proxy->req_buf_size == 0 ) {
|
||||
proxy->req_buf_size = NBD_REQUEST_SIZE;
|
||||
}
|
||||
|
||||
proxy->rsp_buf_size = rsp_buf_size;
|
||||
return 1;
|
||||
|
||||
disconnect:
|
||||
warn(
|
||||
"Request was: type=%"PRIu32" from=%"PRIu64" len=%"PRIu32,
|
||||
request->type, request->from, request->len
|
||||
read:
|
||||
debug(
|
||||
"Reading %"PRIu32" of %"PRIu32" bytes from downstream for NBD request",
|
||||
proxy->req_buf_size - proxy->req_buf_offset,
|
||||
proxy->req_buf_size
|
||||
);
|
||||
proxy_disconnect_from_upstream( proxy );
|
||||
|
||||
count = read(
|
||||
proxy->downstream_fd,
|
||||
proxy->req_buf + proxy->req_buf_offset,
|
||||
proxy->req_buf_size - proxy->req_buf_offset
|
||||
);
|
||||
|
||||
if ( count == 0 ) {
|
||||
info( "EOF on downstream fd %i received", proxy->downstream_fd );
|
||||
return EXIT;
|
||||
}
|
||||
|
||||
if ( count != -1 ) {
|
||||
proxy->req_buf_offset += count;
|
||||
} else if ( io_errno_permanent() ) {
|
||||
warn( SHOW_ERRNO( "Couldn't read request from downstream: " ) );
|
||||
return EXIT;
|
||||
}
|
||||
|
||||
if ( proxy->req_buf_offset == NBD_REQUEST_SIZE ) {
|
||||
nbd_r2h_request( request_raw, request );
|
||||
|
||||
if ( request->type == REQUEST_DISCONNECT ) {
|
||||
info( "Received disconnect request from client" );
|
||||
return EXIT;
|
||||
}
|
||||
|
||||
/* Simple validations */
|
||||
if ( request->type == REQUEST_READ ) {
|
||||
if (request->len > ( NBD_MAX_SIZE - NBD_REPLY_SIZE ) ) {
|
||||
warn( "NBD read request size %"PRIu32" too large", request->len );
|
||||
return EXIT;
|
||||
}
|
||||
}
|
||||
if ( request->type == REQUEST_WRITE ) {
|
||||
if (request->len > ( NBD_MAX_SIZE - NBD_REQUEST_SIZE ) ) {
|
||||
warn( "NBD write request size %"PRIu32" too large", request->len );
|
||||
return EXIT;
|
||||
}
|
||||
|
||||
proxy->req_buf_size += request->len;
|
||||
|
||||
/* Minor optimisation: Read again immediately if there's write
|
||||
* request data to be had. No point select()ing again
|
||||
*/
|
||||
goto read;
|
||||
}
|
||||
}
|
||||
|
||||
if ( proxy->req_buf_offset == proxy->req_buf_size ) {
|
||||
debug(
|
||||
"Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32,
|
||||
request->type, request->from, request->len
|
||||
);
|
||||
|
||||
/* Finished reading, so advance state. Leave size untouched so the next
|
||||
* state knows how many bytes to write */
|
||||
proxy->req_buf_offset = 0;
|
||||
return WRITE_TO_UPSTREAM;
|
||||
}
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state )
|
||||
{
|
||||
int error, result;
|
||||
socklen_t len = sizeof( error );
|
||||
|
||||
// assert( state == CONNECT_TO_UPSTREAM );
|
||||
|
||||
result = getsockopt(
|
||||
proxy->upstream_fd, SOL_SOCKET, SO_ERROR, &error, &len
|
||||
);
|
||||
|
||||
if ( result == -1 ) {
|
||||
warn( SHOW_ERRNO( "Failed to tell if connected to upstream" ) );
|
||||
return state;
|
||||
}
|
||||
|
||||
if ( error != 0 ) {
|
||||
errno = error;
|
||||
warn( SHOW_ERRNO( "Failed to connect to upstream" ) );
|
||||
return state;
|
||||
}
|
||||
|
||||
#ifdef PREFETCH
|
||||
/* Data may have changed while we were disconnected */
|
||||
proxy->prefetch->is_full = 0;
|
||||
#endif
|
||||
return 0;
|
||||
|
||||
info( "Connected to upstream on fd %i", proxy->upstream_fd );
|
||||
return READ_INIT_FROM_UPSTREAM;
|
||||
}
|
||||
|
||||
/* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */
|
||||
int proxy_send_reply_downstream( struct proxier* proxy )
|
||||
int proxy_read_init_from_upstream( struct proxier* proxy, int state )
|
||||
{
|
||||
int result;
|
||||
unsigned char* rsp_buf = proxy->rsp_buf;
|
||||
ssize_t count;
|
||||
unsigned char* buf = (unsigned char *) &(proxy->init_buf);
|
||||
|
||||
// assert( state == READ_INIT_FROM_UPSTREAM );
|
||||
|
||||
debug(
|
||||
"Writing header (%"PRIu32") + data (%"PRIu32") bytes downstream",
|
||||
NBD_REPLY_SIZE, proxy->rsp_buf_size - NBD_REPLY_SIZE
|
||||
"Reading %"PRIu32" bytes of %"PRIu32" in init message",
|
||||
sizeof( proxy->init_buf ) - proxy->init_buf_offset,
|
||||
sizeof( proxy->init_buf )
|
||||
);
|
||||
|
||||
result = writeloop( proxy->downstream_fd, rsp_buf, proxy->rsp_buf_size );
|
||||
if ( result == -1 ) {
|
||||
warn( SHOW_ERRNO( "Failed to send reply downstream" ) );
|
||||
return 0;
|
||||
count = read(
|
||||
proxy->upstream_fd,
|
||||
buf + proxy->init_buf_offset,
|
||||
sizeof( proxy->init_buf ) - proxy->init_buf_offset
|
||||
);
|
||||
|
||||
if ( count == 0 ) {
|
||||
info( "EOF signalled on upstream fd %i", proxy->upstream_fd );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
debug( "Reply sent" );
|
||||
return 1;
|
||||
if ( count != -1 ) {
|
||||
proxy->init_buf_offset += count;
|
||||
} else if ( io_errno_permanent() ) {
|
||||
warn( SHOW_ERRNO( "Failed to read init from upstream" ) );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
if ( proxy->init_buf_offset == sizeof( proxy->init_buf ) ) {
|
||||
off64_t upstream_size;
|
||||
if ( !nbd_check_hello( &proxy->init_buf, &upstream_size ) ) {
|
||||
warn( "Upstream sent invalid init" );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
/* Currently, we only get disconnected from upstream (so needing to come
|
||||
* here) when we have an outstanding request. If that becomes false,
|
||||
* we'll need to choose the right state to return to here */
|
||||
proxy->init_buf_offset = 0;
|
||||
return WRITE_TO_UPSTREAM;
|
||||
}
|
||||
|
||||
return state;
|
||||
|
||||
disconnect:
|
||||
proxy->init_buf_offset = 0;
|
||||
return CONNECT_TO_UPSTREAM;
|
||||
}
|
||||
|
||||
int proxy_write_to_upstream( struct proxier* proxy, int state )
|
||||
{
|
||||
ssize_t count;
|
||||
|
||||
// assert( state == WRITE_TO_UPSTREAM );
|
||||
|
||||
debug(
|
||||
"Writing %"PRIu32" of %"PRIu32" bytes of request to upstream",
|
||||
proxy->req_buf_size - proxy->req_buf_offset,
|
||||
proxy->req_buf_size
|
||||
);
|
||||
|
||||
count = write(
|
||||
proxy->upstream_fd,
|
||||
proxy->req_buf + proxy->req_buf_offset,
|
||||
proxy->req_buf_size - proxy->req_buf_offset
|
||||
);
|
||||
|
||||
if ( count != -1 ) {
|
||||
proxy->req_buf_offset += count;
|
||||
} else if ( io_errno_permanent() ) {
|
||||
warn( SHOW_ERRNO( "Failed to send request to upstream" ) );
|
||||
proxy->req_buf_offset = 0;
|
||||
return CONNECT_TO_UPSTREAM;
|
||||
}
|
||||
|
||||
if ( proxy->req_buf_offset == proxy->req_buf_size ) {
|
||||
/* Request sent. Advance to reading the response from upstream. We might
|
||||
* still need req_buf_size if reading the reply fails - we disconnect
|
||||
* and resend the reply in that case - so keep it around for now. */
|
||||
proxy->req_buf_offset = 0;
|
||||
return READ_FROM_UPSTREAM;
|
||||
}
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
int proxy_read_from_upstream( struct proxier* proxy, int state )
|
||||
{
|
||||
ssize_t count;
|
||||
|
||||
struct nbd_reply* reply = &(proxy->rsp_hdr);
|
||||
struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) proxy->rsp_buf;
|
||||
|
||||
/* We can't assume the NBD_REPLY_SIZE + req->len is what we'll get back */
|
||||
if ( proxy->rsp_buf_offset == 0 ) {
|
||||
proxy->rsp_buf_size = NBD_REPLY_SIZE;
|
||||
}
|
||||
|
||||
read:
|
||||
debug(
|
||||
"Reading %"PRIu32" of %"PRIu32" bytes in NBD reply",
|
||||
proxy->rsp_buf_size - proxy->rsp_buf_offset,
|
||||
proxy->rsp_buf_size
|
||||
);
|
||||
|
||||
count = read(
|
||||
proxy->upstream_fd,
|
||||
proxy->rsp_buf + proxy->rsp_buf_offset,
|
||||
proxy->rsp_buf_size - proxy->rsp_buf_offset
|
||||
);
|
||||
|
||||
if ( count == 0 ) {
|
||||
info( "EOF signalled on upstream fd %i", proxy->upstream_fd );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
if ( count != -1 ) {
|
||||
proxy->rsp_buf_offset += count;
|
||||
} else if ( io_errno_permanent() ) {
|
||||
warn( SHOW_ERRNO( "Failed to get reply from upstream" ) );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
debug(
|
||||
"Read %"PRIu32" of %"PRIu32" bytes of reply from upstream",
|
||||
proxy->rsp_buf_offset,
|
||||
proxy->rsp_buf_size
|
||||
);
|
||||
|
||||
if ( proxy->rsp_buf_offset == NBD_REPLY_SIZE ) {
|
||||
nbd_r2h_reply( reply_raw, reply );
|
||||
|
||||
if ( reply->magic != REPLY_MAGIC ) {
|
||||
warn( "Reply magic is incorrect" );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
if ( reply->error != 0 ) {
|
||||
warn( "NBD error returned from upstream: %"PRIu32, reply->error );
|
||||
goto disconnect;
|
||||
}
|
||||
|
||||
if ( ( proxy->req_hdr.type || REQUEST_READ ) == REQUEST_READ ) {
|
||||
/* Read the read reply data too. Same optimisation as
|
||||
* read_from_downstream */
|
||||
proxy->rsp_buf_size += proxy->req_hdr.len;
|
||||
goto read;
|
||||
}
|
||||
}
|
||||
|
||||
if ( proxy->rsp_buf_size == proxy->rsp_buf_offset ) {
|
||||
debug( "NBD reply received from upstream." );
|
||||
proxy->rsp_buf_offset = 0;
|
||||
return WRITE_TO_DOWNSTREAM;
|
||||
}
|
||||
|
||||
return state;
|
||||
|
||||
disconnect:
|
||||
proxy->rsp_buf_offset = 0;
|
||||
proxy->rsp_buf_size = 0;
|
||||
return CONNECT_TO_UPSTREAM;
|
||||
}
|
||||
|
||||
|
||||
/* Here, we negotiate an NBD session with downstream, based on the information
|
||||
* we got on first connection to upstream. Then we wait for a request to come
|
||||
* in from downstream, read it into memory, then send it to upstream. If
|
||||
* upstream dies before responding, we reconnect to upstream and resend it.
|
||||
* Once we've got a response, we write it directly to downstream, and wait for a
|
||||
* new request. When downstream disconnects, or we receive an exit signal (which
|
||||
* can be blocked, unfortunately), we are finished.
|
||||
*
|
||||
* This is the simplest possible nbd proxy I can think of. It may not be at all
|
||||
* performant - let's see.
|
||||
*/
|
||||
|
||||
void proxy_session( struct proxier* proxy )
|
||||
int proxy_write_to_downstream( struct proxier* proxy, int state )
|
||||
{
|
||||
int downstream_fd = proxy->downstream_fd;
|
||||
uint64_t req_count = 0;
|
||||
int result;
|
||||
ssize_t count;
|
||||
|
||||
info( "Beginning proxy session on fd %i", downstream_fd );
|
||||
// assert( state == WRITE_TO_DOWNSTREAM );
|
||||
|
||||
if ( !socket_nbd_write_hello( downstream_fd, proxy->upstream_size ) ) {
|
||||
warn( "Sending hello failed on fd %i, ending session", downstream_fd );
|
||||
return;
|
||||
if ( proxy->hello_sent ) {
|
||||
debug(
|
||||
"Writing request of %"PRIu32" bytes from offset %"PRIu32,
|
||||
proxy->rsp_buf_size, proxy->rsp_buf_offset
|
||||
);
|
||||
} else {
|
||||
debug( "Writing init to downstream" );
|
||||
}
|
||||
|
||||
while( proxy_get_request_from_downstream( proxy ) ) {
|
||||
do {
|
||||
if ( proxy->upstream_fd == -1 ) {
|
||||
info( "Connecting to upstream" );
|
||||
if ( !proxy_connect_to_upstream( proxy ) ) {
|
||||
warn( "Failed to connect to upstream" );
|
||||
result = 0;
|
||||
sleep( 5 );
|
||||
count = write(
|
||||
proxy->downstream_fd,
|
||||
proxy->rsp_buf + proxy->rsp_buf_offset,
|
||||
proxy->rsp_buf_size - proxy->rsp_buf_offset
|
||||
);
|
||||
|
||||
if ( count != -1 ) {
|
||||
proxy->rsp_buf_offset += count;
|
||||
} else if ( io_errno_permanent() ) {
|
||||
if ( proxy->hello_sent ) {
|
||||
warn( SHOW_ERRNO( "Failed to send reply downstream" ) );
|
||||
} else {
|
||||
warn( SHOW_ERRNO( "Failed to send init downstream" ) );
|
||||
}
|
||||
return EXIT;
|
||||
}
|
||||
|
||||
if ( proxy->rsp_buf_offset == proxy->rsp_buf_size ) {
|
||||
if ( !proxy->hello_sent ) {
|
||||
info( "Hello message sent to client" );
|
||||
proxy->hello_sent = 1;
|
||||
} else {
|
||||
debug( "Reply sent" );
|
||||
proxy->req_count++;
|
||||
}
|
||||
|
||||
/* We're done with the request & response buffers now */
|
||||
proxy->req_buf_size = 0;
|
||||
proxy->req_buf_offset = 0;
|
||||
proxy->rsp_buf_size = 0;
|
||||
proxy->rsp_buf_offset = 0;
|
||||
return READ_FROM_DOWNSTREAM;
|
||||
}
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
/* Non-blocking proxy session. Simple(ish) state machine. We read from d/s until
|
||||
* we have a full request, then try to write that request u/s. If writing fails,
|
||||
* we reconnect to upstream and retry. Once we've successfully written, we
|
||||
* attempt to read the reply. If that fails or times out (we give it 30 seconds)
|
||||
* then we disconnect from u/s and go back to trying to reconnect and resend.
|
||||
*
|
||||
* This is the second-simplest NBD proxy I can think of. The first version was
|
||||
* non-blocking I/O, but it was getting impossible to manage exceptional stuff
|
||||
*/
|
||||
void proxy_session( struct proxier* proxy )
|
||||
{
|
||||
/* First action: Write hello to downstream */
|
||||
nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp_buf, proxy->upstream_size );
|
||||
proxy->rsp_buf_size = sizeof( struct nbd_init_raw );
|
||||
proxy->rsp_buf_offset = 0;
|
||||
int state = WRITE_TO_DOWNSTREAM;
|
||||
|
||||
info( "Beginning proxy session on fd %i", proxy->downstream_fd );
|
||||
|
||||
while( state != EXIT ) {
|
||||
|
||||
struct timeval select_timeout = {
|
||||
.tv_sec = 0,
|
||||
.tv_usec = 0
|
||||
};
|
||||
|
||||
struct timeval *select_timeout_ptr = NULL;
|
||||
|
||||
int result; /* used by select() */
|
||||
|
||||
fd_set rfds;
|
||||
fd_set wfds;
|
||||
|
||||
FD_ZERO( &rfds );
|
||||
FD_ZERO( &wfds );
|
||||
|
||||
debug( "Proxy is in state %s ( %i )", proxy_session_state_names[state], state );
|
||||
|
||||
switch( state ) {
|
||||
case READ_FROM_DOWNSTREAM:
|
||||
FD_SET( proxy->downstream_fd, &rfds );
|
||||
break;
|
||||
case WRITE_TO_DOWNSTREAM:
|
||||
FD_SET( proxy->downstream_fd, &wfds );
|
||||
break;
|
||||
case WRITE_TO_UPSTREAM:
|
||||
select_timeout.tv_sec = 15;
|
||||
FD_SET(proxy->upstream_fd, &wfds );
|
||||
break;
|
||||
case CONNECT_TO_UPSTREAM:
|
||||
proxy_disconnect_from_upstream( proxy );
|
||||
/* Changes proxy->upstream_fd */
|
||||
proxy_start_connect_to_upstream( proxy );
|
||||
|
||||
if ( proxy->upstream_fd == -1 ) {
|
||||
warn( SHOW_ERRNO( "Error acquiring socket to upstream" ) );
|
||||
continue;
|
||||
}
|
||||
info( "Connected to upstream");
|
||||
}
|
||||
result = proxy_run_request_upstream( proxy );
|
||||
} while ( result == 0 );
|
||||
|
||||
if ( !proxy_send_reply_downstream( proxy ) ) {
|
||||
warn( "Replying on fd %i failed, ending session", downstream_fd );
|
||||
/* non-blocking connect() */
|
||||
select_timeout.tv_sec = 15;
|
||||
FD_SET( proxy->upstream_fd, &wfds );
|
||||
break;
|
||||
case READ_INIT_FROM_UPSTREAM:
|
||||
case READ_FROM_UPSTREAM:
|
||||
select_timeout.tv_sec = 15;
|
||||
FD_SET( proxy->upstream_fd, &rfds );
|
||||
break;
|
||||
};
|
||||
|
||||
if ( select_timeout.tv_sec > 0 ) {
|
||||
select_timeout_ptr = &select_timeout;
|
||||
}
|
||||
|
||||
result = sock_try_select( FD_SETSIZE, &rfds, &wfds, NULL, select_timeout_ptr );
|
||||
|
||||
if ( result == -1 ) {
|
||||
warn( SHOW_ERRNO( "select() failed: " ) );
|
||||
break;
|
||||
}
|
||||
|
||||
proxy->req_buf_size = 0;
|
||||
proxy->rsp_buf_size = 0;
|
||||
switch( state ) {
|
||||
case READ_FROM_DOWNSTREAM:
|
||||
if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) {
|
||||
state = proxy_read_from_downstream( proxy, state );
|
||||
#ifdef PREFETCH
|
||||
/* Check if we can fulfil the request from prefetch, or
|
||||
* rewrite the request to fill the prefetch buffer if needed
|
||||
*/
|
||||
if ( state == WRITE_TO_UPSTREAM ) {
|
||||
state = proxy_prefetch_for_request( proxy, state );
|
||||
}
|
||||
#endif
|
||||
}
|
||||
break;
|
||||
case CONNECT_TO_UPSTREAM:
|
||||
if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) {
|
||||
state = proxy_continue_connecting_to_upstream( proxy, state );
|
||||
}
|
||||
/* Leaving state untouched will retry connecting to upstream */
|
||||
break;
|
||||
case READ_INIT_FROM_UPSTREAM:
|
||||
state = proxy_read_init_from_upstream( proxy, state );
|
||||
case WRITE_TO_UPSTREAM:
|
||||
if ( FD_ISSET( proxy->upstream_fd, &wfds ) ) {
|
||||
state = proxy_write_to_upstream( proxy, state );
|
||||
}
|
||||
break;
|
||||
case READ_FROM_UPSTREAM:
|
||||
if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) {
|
||||
state = proxy_read_from_upstream( proxy, state );
|
||||
}
|
||||
# ifdef PREFETCH
|
||||
/* Fill the prefetch buffer and rewrite the reply, if needed */
|
||||
if ( state == WRITE_TO_DOWNSTREAM ) {
|
||||
state = proxy_prefetch_for_reply( proxy, state );
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
case WRITE_TO_DOWNSTREAM:
|
||||
if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) {
|
||||
state = proxy_write_to_downstream( proxy, state );
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
req_count++;
|
||||
};
|
||||
}
|
||||
|
||||
info(
|
||||
"Finished proxy session on fd %i after %"PRIu64" successful request(s)",
|
||||
downstream_fd, req_count
|
||||
proxy->downstream_fd, proxy->req_count
|
||||
);
|
||||
|
||||
/* Reset these two for the next session */
|
||||
proxy->req_count = 0;
|
||||
proxy->hello_sent = 0;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -515,6 +884,7 @@ int proxy_accept( struct proxier* params )
|
||||
}
|
||||
|
||||
info( "Accepted nbd client socket fd %d", client_fd );
|
||||
sock_set_nonblock( client_fd, 1 );
|
||||
params->downstream_fd = client_fd;
|
||||
proxy_session( params );
|
||||
|
||||
@@ -544,6 +914,13 @@ void proxy_cleanup( struct proxier* proxy )
|
||||
info( "Cleaning up" );
|
||||
|
||||
if ( -1 != proxy->listen_fd ) {
|
||||
|
||||
if ( AF_UNIX == proxy->listen_on.family ) {
|
||||
if ( -1 == unlink( proxy->listen_on.un.sun_path ) ) {
|
||||
warn( SHOW_ERRNO( "Failed to unlink %s", proxy->listen_on.un.sun_path ) );
|
||||
}
|
||||
}
|
||||
|
||||
WARN_IF_NEGATIVE(
|
||||
sock_try_close( proxy->listen_fd ),
|
||||
SHOW_ERRNO( "Failed to close() listen fd %i", proxy->listen_fd )
|
||||
@@ -571,12 +948,6 @@ void proxy_cleanup( struct proxier* proxy )
|
||||
proxy->upstream_fd = -1;
|
||||
}
|
||||
|
||||
if ( AF_UNIX == proxy->listen_on.family ) {
|
||||
if ( -1 == unlink( proxy->listen_on.un.sun_path ) ) {
|
||||
warn( SHOW_ERRNO( "Failed to unlink %s", proxy->listen_on.un.sun_path ) );
|
||||
}
|
||||
}
|
||||
|
||||
info( "Cleanup done" );
|
||||
}
|
||||
|
||||
|
23
src/proxy.h
23
src/proxy.h
@@ -47,7 +47,10 @@ struct proxier {
|
||||
unsigned char* req_buf;
|
||||
|
||||
/* Number of bytes currently sat in req_buf */
|
||||
size_t req_buf_size;
|
||||
ssize_t req_buf_size;
|
||||
|
||||
/* Number of bytes of request we've gotten through */
|
||||
off_t req_buf_offset;
|
||||
|
||||
/* We transform the raw request header into here */
|
||||
struct nbd_request req_hdr;
|
||||
@@ -56,13 +59,27 @@ struct proxier {
|
||||
unsigned char* rsp_buf;
|
||||
|
||||
/* Number of bytes currently sat in rsp_buf */
|
||||
size_t rsp_buf_size;
|
||||
ssize_t rsp_buf_size;
|
||||
|
||||
/* Number of bytes of response we've gotten through */
|
||||
off_t rsp_buf_offset;
|
||||
|
||||
/* We transform the raw reply header into here */
|
||||
struct nbd_reply rsp_hdr;
|
||||
|
||||
/* It's starting to feel like we need an object for a single proxy session.
|
||||
* These two track how many requests we've sent so far, and whether the
|
||||
* NBD_INIT code has been sent to the client yet.
|
||||
*/
|
||||
uint64_t req_count;
|
||||
int hello_sent;
|
||||
|
||||
/* And now we're doing non-blocking connect to upstream, we need this too */
|
||||
struct nbd_init_raw init_buf;
|
||||
off_t init_buf_offset;
|
||||
|
||||
#ifdef PREFETCH
|
||||
struct prefetch *prefetch;
|
||||
struct prefetch *prefetch;
|
||||
#endif
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user