proxy: DRY up some code

This commit is contained in:
nick
2013-06-18 16:58:39 +01:00
parent b14bba36ec
commit 2156d06368
4 changed files with 139 additions and 174 deletions

View File

@@ -10,6 +10,7 @@
#include "util.h"
#include "bitset.h"
#include "ioutil.h"
int build_allocation_map(struct bitset_mapping* allocation_map, int fd)
@@ -268,3 +269,66 @@ int fd_is_closed( int fd_in )
return result;
}
static inline int io_errno_permanent()
{
return ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR );
}
/* Returns -1 if the operation failed, or the number of bytes read if all is
* well. Note that 0 bytes may be returned. Unlike read(), this is not an EOF! */
ssize_t iobuf_read(int fd, struct iobuf *iobuf, size_t default_size )
{
size_t left;
ssize_t count;
if ( iobuf->needle == 0 ) {
iobuf->size = default_size;
}
left = iobuf->size - iobuf->needle;
debug( "Reading %"PRIu32" of %"PRIu32" bytes from fd %i", left, iobuf->size, fd );
count = read( fd, iobuf->buf + iobuf->needle, left );
if ( count > 0 ) {
iobuf->needle += count;
debug( "read() returned %"PRIu32" bytes", count );
} else if ( count == 0 ) {
warn( "read() returned EOF on fd %i", fd );
errno = 0;
return -1;
} else if ( count == -1 ) {
if ( io_errno_permanent() ) {
warn( SHOW_ERRNO( "read() failed on fd %i", fd ) );
} else {
count = 0;
}
}
return count;
}
ssize_t iobuf_write( int fd, struct iobuf *iobuf )
{
size_t left = iobuf->size - iobuf->needle;
ssize_t count;
debug( "Writing %"PRIu32" of %"PRIu32" bytes to fd %i", left, iobuf->size, fd );
count = write( fd, iobuf->buf + iobuf->needle, left );
if ( count >= 0 ) {
iobuf->needle += count;
debug( "write() returned %"PRIu32" bytes", count );
} else {
if ( io_errno_permanent() ) {
warn( SHOW_ERRNO( "write() failed on fd %i", fd ) );
} else {
count = 0;
}
}
return count;
}

View File

@@ -1,6 +1,16 @@
#ifndef __IOUTIL_H
#define __IOUTIL_H
#include <sys/types.h>
struct iobuf {
unsigned char *buf;
size_t size;
size_t needle;
};
ssize_t iobuf_read( int fd, struct iobuf* iobuf, size_t default_size );
ssize_t iobuf_write( int fd, struct iobuf* iobuf );
#include "serve.h"
struct bitset_mapping; /* don't need whole of bitset.h here */

View File

@@ -69,16 +69,18 @@ struct proxier* proxy_create(
out->prefetch = xmalloc( sizeof( struct prefetch ) );
#endif
out->req_buf = xmalloc( NBD_MAX_SIZE );
out->rsp_buf = xmalloc( NBD_MAX_SIZE );
out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) );
out->req.buf = xmalloc( NBD_MAX_SIZE );
out->rsp.buf = xmalloc( NBD_MAX_SIZE );
return out;
}
void proxy_destroy( struct proxier* proxy )
{
free( proxy->req_buf );
free( proxy->rsp_buf );
free( proxy->init.buf );
free( proxy->req.buf );
free( proxy->rsp.buf );
#ifdef PREFETCH
free( proxy->prefetch );
#endif
@@ -279,7 +281,7 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
debug( "Prefetching %d bytes", rewrite_request->len );
nbd_h2r_request( rewrite_request,
(struct nbd_request_raw *)proxy->req_buf );
(struct nbd_request_raw *)proxy->req.buf );
free( rewrite_request );
}
//####
@@ -306,7 +308,7 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
proxy->prefetch->buffer,
proxy->prefetch->len );
proxy->rsp_buf_size =
proxy->rsp.size =
NBD_REPLY_SIZE + proxy->prefetch->len;
/* return early, our work here is done */
@@ -372,52 +374,24 @@ static char* proxy_session_state_names[] = {
"READ_FROM_UPSTREAM"
};
static inline int io_errno_permanent()
{
return ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR );
}
int proxy_read_from_downstream( struct proxier *proxy, int state )
{
ssize_t count;
struct nbd_request_raw* request_raw = (struct nbd_request_raw*) proxy->req_buf;
struct nbd_request_raw* request_raw = (struct nbd_request_raw*) proxy->req.buf;
struct nbd_request* request = &(proxy->req_hdr);
// assert( state == READ_FROM_DOWNSTREAM );
/* We're beginning a new read from downstream */
if ( proxy->req_buf_size == 0 ) {
proxy->req_buf_size = NBD_REQUEST_SIZE;
}
read:
debug(
"Reading %"PRIu32" of %"PRIu32" bytes from downstream for NBD request",
proxy->req_buf_size - proxy->req_buf_offset,
proxy->req_buf_size
);
count = iobuf_read( proxy->downstream_fd, &proxy->req, NBD_REQUEST_SIZE );
count = read(
proxy->downstream_fd,
proxy->req_buf + proxy->req_buf_offset,
proxy->req_buf_size - proxy->req_buf_offset
);
if ( count == 0 ) {
info( "EOF on downstream fd %i received", proxy->downstream_fd );
if ( count == -1 ) {
warn( SHOW_ERRNO( "Couldn't read request from downstream" ) );
return EXIT;
}
if ( count != -1 ) {
proxy->req_buf_offset += count;
} else if ( io_errno_permanent() ) {
warn( SHOW_ERRNO( "Couldn't read request from downstream: " ) );
return EXIT;
}
if ( proxy->req_buf_offset == NBD_REQUEST_SIZE ) {
if ( proxy->req.needle == NBD_REQUEST_SIZE ) {
nbd_r2h_request( request_raw, request );
if ( request->type == REQUEST_DISCONNECT ) {
@@ -438,7 +412,7 @@ read:
return EXIT;
}
proxy->req_buf_size += request->len;
proxy->req.size += request->len;
/* Minor optimisation: Read again immediately if there's write
* request data to be had. No point select()ing again
@@ -447,7 +421,7 @@ read:
}
}
if ( proxy->req_buf_offset == proxy->req_buf_size ) {
if ( proxy->req.needle == proxy->req.size ) {
debug(
"Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32,
request->type, request->from, request->len
@@ -455,7 +429,7 @@ read:
/* Finished reading, so advance state. Leave size untouched so the next
* state knows how many bytes to write */
proxy->req_buf_offset = 0;
proxy->req.needle = 0;
return WRITE_TO_UPSTREAM;
}
@@ -496,37 +470,19 @@ int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state )
int proxy_read_init_from_upstream( struct proxier* proxy, int state )
{
ssize_t count;
unsigned char* buf = (unsigned char *) &(proxy->init_buf);
// assert( state == READ_INIT_FROM_UPSTREAM );
debug(
"Reading %"PRIu32" bytes of %"PRIu32" in init message",
sizeof( proxy->init_buf ) - proxy->init_buf_offset,
sizeof( proxy->init_buf )
);
count = iobuf_read( proxy->upstream_fd, &proxy->init, sizeof( struct nbd_init_raw ) );
count = read(
proxy->upstream_fd,
buf + proxy->init_buf_offset,
sizeof( proxy->init_buf ) - proxy->init_buf_offset
);
if ( count == 0 ) {
info( "EOF signalled on upstream fd %i", proxy->upstream_fd );
goto disconnect;
}
if ( count != -1 ) {
proxy->init_buf_offset += count;
} else if ( io_errno_permanent() ) {
if ( count == -1 ) {
warn( SHOW_ERRNO( "Failed to read init from upstream" ) );
goto disconnect;
}
if ( proxy->init_buf_offset == sizeof( proxy->init_buf ) ) {
if ( proxy->init.needle == proxy->init.size ) {
off64_t upstream_size;
if ( !nbd_check_hello( &proxy->init_buf, &upstream_size ) ) {
if ( !nbd_check_hello( (struct nbd_init_raw*) proxy->init.buf, &upstream_size ) ) {
warn( "Upstream sent invalid init" );
goto disconnect;
}
@@ -534,14 +490,15 @@ int proxy_read_init_from_upstream( struct proxier* proxy, int state )
/* Currently, we only get disconnected from upstream (so needing to come
* here) when we have an outstanding request. If that becomes false,
* we'll need to choose the right state to return to here */
proxy->init_buf_offset = 0;
proxy->init.needle = 0;
return WRITE_TO_UPSTREAM;
}
return state;
disconnect:
proxy->init_buf_offset = 0;
proxy->init.needle = 0;
proxy->init.size = 0;
return CONNECT_TO_UPSTREAM;
}
@@ -550,32 +507,19 @@ int proxy_write_to_upstream( struct proxier* proxy, int state )
ssize_t count;
// assert( state == WRITE_TO_UPSTREAM );
count = iobuf_write( proxy->upstream_fd, &proxy->req );
debug(
"Writing %"PRIu32" of %"PRIu32" bytes of request to upstream",
proxy->req_buf_size - proxy->req_buf_offset,
proxy->req_buf_size
);
count = write(
proxy->upstream_fd,
proxy->req_buf + proxy->req_buf_offset,
proxy->req_buf_size - proxy->req_buf_offset
);
if ( count != -1 ) {
proxy->req_buf_offset += count;
} else if ( io_errno_permanent() ) {
if ( count == -1 ) {
warn( SHOW_ERRNO( "Failed to send request to upstream" ) );
proxy->req_buf_offset = 0;
proxy->req.needle = 0;
return CONNECT_TO_UPSTREAM;
}
if ( proxy->req_buf_offset == proxy->req_buf_size ) {
if ( proxy->req.needle == proxy->req.size ) {
/* Request sent. Advance to reading the response from upstream. We might
* still need req_buf_size if reading the reply fails - we disconnect
* still need req.size if reading the reply fails - we disconnect
* and resend the reply in that case - so keep it around for now. */
proxy->req_buf_offset = 0;
proxy->req.needle = 0;
return READ_FROM_UPSTREAM;
}
@@ -587,45 +531,18 @@ int proxy_read_from_upstream( struct proxier* proxy, int state )
ssize_t count;
struct nbd_reply* reply = &(proxy->rsp_hdr);
struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) proxy->rsp_buf;
/* We can't assume the NBD_REPLY_SIZE + req->len is what we'll get back */
if ( proxy->rsp_buf_offset == 0 ) {
proxy->rsp_buf_size = NBD_REPLY_SIZE;
}
struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) proxy->rsp.buf;
read:
debug(
"Reading %"PRIu32" of %"PRIu32" bytes in NBD reply",
proxy->rsp_buf_size - proxy->rsp_buf_offset,
proxy->rsp_buf_size
);
/* We can't assume the NBD_REPLY_SIZE + req->len is what we'll get back */
count = iobuf_read( proxy->upstream_fd, &proxy->rsp, NBD_REPLY_SIZE );
count = read(
proxy->upstream_fd,
proxy->rsp_buf + proxy->rsp_buf_offset,
proxy->rsp_buf_size - proxy->rsp_buf_offset
);
if ( count == 0 ) {
info( "EOF signalled on upstream fd %i", proxy->upstream_fd );
goto disconnect;
}
if ( count != -1 ) {
proxy->rsp_buf_offset += count;
} else if ( io_errno_permanent() ) {
if ( count == -1 ) {
warn( SHOW_ERRNO( "Failed to get reply from upstream" ) );
goto disconnect;
}
debug(
"Read %"PRIu32" of %"PRIu32" bytes of reply from upstream",
proxy->rsp_buf_offset,
proxy->rsp_buf_size
);
if ( proxy->rsp_buf_offset == NBD_REPLY_SIZE ) {
if ( proxy->rsp.needle == NBD_REPLY_SIZE ) {
nbd_r2h_reply( reply_raw, reply );
if ( reply->magic != REPLY_MAGIC ) {
@@ -641,22 +558,22 @@ read:
if ( ( proxy->req_hdr.type || REQUEST_READ ) == REQUEST_READ ) {
/* Read the read reply data too. Same optimisation as
* read_from_downstream */
proxy->rsp_buf_size += proxy->req_hdr.len;
proxy->rsp.size += proxy->req_hdr.len;
goto read;
}
}
if ( proxy->rsp_buf_size == proxy->rsp_buf_offset ) {
if ( proxy->rsp.size == proxy->rsp.needle ) {
debug( "NBD reply received from upstream." );
proxy->rsp_buf_offset = 0;
proxy->rsp.needle = 0;
return WRITE_TO_DOWNSTREAM;
}
return state;
disconnect:
proxy->rsp_buf_offset = 0;
proxy->rsp_buf_size = 0;
proxy->rsp.needle = 0;
proxy->rsp.size = 0;
return CONNECT_TO_UPSTREAM;
}
@@ -667,33 +584,18 @@ int proxy_write_to_downstream( struct proxier* proxy, int state )
// assert( state == WRITE_TO_DOWNSTREAM );
if ( proxy->hello_sent ) {
debug(
"Writing request of %"PRIu32" bytes from offset %"PRIu32,
proxy->rsp_buf_size, proxy->rsp_buf_offset
);
} else {
debug( "Writing init to downstream" );
if ( !proxy->hello_sent ) {
info( "Writing init to downstream" );
}
count = write(
proxy->downstream_fd,
proxy->rsp_buf + proxy->rsp_buf_offset,
proxy->rsp_buf_size - proxy->rsp_buf_offset
);
count = iobuf_write( proxy->downstream_fd, &proxy->rsp );
if ( count != -1 ) {
proxy->rsp_buf_offset += count;
} else if ( io_errno_permanent() ) {
if ( proxy->hello_sent ) {
warn( SHOW_ERRNO( "Failed to send reply downstream" ) );
} else {
warn( SHOW_ERRNO( "Failed to send init downstream" ) );
}
if ( count == -1 ) {
warn( SHOW_ERRNO( "Failed to write to downstream" ) );
return EXIT;
}
if ( proxy->rsp_buf_offset == proxy->rsp_buf_size ) {
if ( proxy->rsp.needle == proxy->rsp.size ) {
if ( !proxy->hello_sent ) {
info( "Hello message sent to client" );
proxy->hello_sent = 1;
@@ -703,10 +605,10 @@ int proxy_write_to_downstream( struct proxier* proxy, int state )
}
/* We're done with the request & response buffers now */
proxy->req_buf_size = 0;
proxy->req_buf_offset = 0;
proxy->rsp_buf_size = 0;
proxy->rsp_buf_offset = 0;
proxy->req.size = 0;
proxy->req.needle = 0;
proxy->rsp.size = 0;
proxy->rsp.needle = 0;
return READ_FROM_DOWNSTREAM;
}
@@ -725,9 +627,9 @@ int proxy_write_to_downstream( struct proxier* proxy, int state )
void proxy_session( struct proxier* proxy )
{
/* First action: Write hello to downstream */
nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp_buf, proxy->upstream_size );
proxy->rsp_buf_size = sizeof( struct nbd_init_raw );
proxy->rsp_buf_offset = 0;
nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp.buf, proxy->upstream_size );
proxy->rsp.size = sizeof( struct nbd_init_raw );
proxy->rsp.needle = 0;
int state = WRITE_TO_DOWNSTREAM;
info( "Beginning proxy session on fd %i", proxy->downstream_fd );

View File

@@ -4,6 +4,7 @@
#include <sys/types.h>
#include <unistd.h>
#include "ioutil.h"
#include "flexnbd.h"
#include "parse.h"
#include "nbdtypes.h"
@@ -43,30 +44,22 @@ struct proxier {
/* This is the size we advertise to the downstream server */
off64_t upstream_size;
/* Scratch space for the current NBD request from downstream */
unsigned char* req_buf;
/* Number of bytes currently sat in req_buf */
ssize_t req_buf_size;
/* Number of bytes of request we've gotten through */
off_t req_buf_offset;
/* We transform the raw request header into here */
struct nbd_request req_hdr;
/* Scratch space for the current NBD reply from upstream */
unsigned char* rsp_buf;
/* Number of bytes currently sat in rsp_buf */
ssize_t rsp_buf_size;
/* Number of bytes of response we've gotten through */
off_t rsp_buf_offset;
/* We transform the raw reply header into here */
struct nbd_reply rsp_hdr;
/* Used for our non-blocking negotiation with upstream. TODO: maybe use
* for downstream as well ( we currently overload rsp ) */
struct iobuf init;
/* The current NBD request from downstream */
struct iobuf req;
/* The current NBD reply from upstream */
struct iobuf rsp;
/* It's starting to feel like we need an object for a single proxy session.
* These two track how many requests we've sent so far, and whether the
* NBD_INIT code has been sent to the client yet.
@@ -74,10 +67,6 @@ struct proxier {
uint64_t req_count;
int hello_sent;
/* And now we're doing non-blocking connect to upstream, we need this too */
struct nbd_init_raw init_buf;
off_t init_buf_offset;
#ifdef PREFETCH
struct prefetch *prefetch;
#endif