From 2156d06368bb42556bc7bc2bca86a5393ab2ddc4 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 18 Jun 2013 16:58:39 +0100 Subject: [PATCH] proxy: DRY up some code --- src/ioutil.c | 64 ++++++++++++++++ src/ioutil.h | 14 +++- src/proxy.c | 202 +++++++++++++-------------------------------------- src/proxy.h | 33 +++------ 4 files changed, 139 insertions(+), 174 deletions(-) diff --git a/src/ioutil.c b/src/ioutil.c index 51cd7a1..440a5d6 100644 --- a/src/ioutil.c +++ b/src/ioutil.c @@ -10,6 +10,7 @@ #include "util.h" #include "bitset.h" +#include "ioutil.h" int build_allocation_map(struct bitset_mapping* allocation_map, int fd) @@ -268,3 +269,66 @@ int fd_is_closed( int fd_in ) return result; } + +static inline int io_errno_permanent() +{ + return ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ); +} + + +/* Returns -1 if the operation failed, or the number of bytes read if all is + * well. Note that 0 bytes may be returned. Unlike read(), this is not an EOF! */ +ssize_t iobuf_read(int fd, struct iobuf *iobuf, size_t default_size ) +{ + size_t left; + ssize_t count; + + if ( iobuf->needle == 0 ) { + iobuf->size = default_size; + } + + left = iobuf->size - iobuf->needle; + debug( "Reading %"PRIu32" of %"PRIu32" bytes from fd %i", left, iobuf->size, fd ); + + count = read( fd, iobuf->buf + iobuf->needle, left ); + + if ( count > 0 ) { + iobuf->needle += count; + debug( "read() returned %"PRIu32" bytes", count ); + } else if ( count == 0 ) { + warn( "read() returned EOF on fd %i", fd ); + errno = 0; + return -1; + } else if ( count == -1 ) { + if ( io_errno_permanent() ) { + warn( SHOW_ERRNO( "read() failed on fd %i", fd ) ); + } else { + count = 0; + } + } + + return count; +} + +ssize_t iobuf_write( int fd, struct iobuf *iobuf ) +{ + size_t left = iobuf->size - iobuf->needle; + ssize_t count; + + debug( "Writing %"PRIu32" of %"PRIu32" bytes to fd %i", left, iobuf->size, fd ); + count = write( fd, iobuf->buf + iobuf->needle, left ); + + if ( count >= 0 ) { + iobuf->needle += count; + debug( "write() returned %"PRIu32" bytes", count ); + } else { + if ( io_errno_permanent() ) { + warn( SHOW_ERRNO( "write() failed on fd %i", fd ) ); + } else { + count = 0; + } + } + + return count; +} + diff --git a/src/ioutil.h b/src/ioutil.h index 861ee84..6a7ed97 100644 --- a/src/ioutil.h +++ b/src/ioutil.h @@ -1,13 +1,23 @@ #ifndef __IOUTIL_H #define __IOUTIL_H +#include +struct iobuf { + unsigned char *buf; + size_t size; + size_t needle; +}; + +ssize_t iobuf_read( int fd, struct iobuf* iobuf, size_t default_size ); +ssize_t iobuf_write( int fd, struct iobuf* iobuf ); + #include "serve.h" struct bitset_mapping; /* don't need whole of bitset.h here */ /** Scan the file opened in ''fd'', set bits in ''allocation_map'' that * correspond to which blocks are physically allocated on disc (or part- - * allocated). If the OS represents allocated blocks at a finer resolution - * than you've asked for, any block or part block will count as "allocated" + * allocated). If the OS represents allocated blocks at a finer resolution + * than you've asked for, any block or part block will count as "allocated" * with the corresponding bit set. Returns 1 if successful, 0 otherwise. */ int build_allocation_map(struct bitset_mapping* allocation_map, int fd); diff --git a/src/proxy.c b/src/proxy.c index 8a3451b..2040342 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -69,16 +69,18 @@ struct proxier* proxy_create( out->prefetch = xmalloc( sizeof( struct prefetch ) ); #endif - out->req_buf = xmalloc( NBD_MAX_SIZE ); - out->rsp_buf = xmalloc( NBD_MAX_SIZE ); + out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) ); + out->req.buf = xmalloc( NBD_MAX_SIZE ); + out->rsp.buf = xmalloc( NBD_MAX_SIZE ); return out; } void proxy_destroy( struct proxier* proxy ) { - free( proxy->req_buf ); - free( proxy->rsp_buf ); + free( proxy->init.buf ); + free( proxy->req.buf ); + free( proxy->rsp.buf ); #ifdef PREFETCH free( proxy->prefetch ); #endif @@ -279,7 +281,7 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state ) debug( "Prefetching %d bytes", rewrite_request->len ); nbd_h2r_request( rewrite_request, - (struct nbd_request_raw *)proxy->req_buf ); + (struct nbd_request_raw *)proxy->req.buf ); free( rewrite_request ); } //#### @@ -306,7 +308,7 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state ) proxy->prefetch->buffer, proxy->prefetch->len ); - proxy->rsp_buf_size = + proxy->rsp.size = NBD_REPLY_SIZE + proxy->prefetch->len; /* return early, our work here is done */ @@ -372,52 +374,24 @@ static char* proxy_session_state_names[] = { "READ_FROM_UPSTREAM" }; -static inline int io_errno_permanent() -{ - return ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ); -} - - int proxy_read_from_downstream( struct proxier *proxy, int state ) { ssize_t count; - struct nbd_request_raw* request_raw = (struct nbd_request_raw*) proxy->req_buf; + struct nbd_request_raw* request_raw = (struct nbd_request_raw*) proxy->req.buf; struct nbd_request* request = &(proxy->req_hdr); // assert( state == READ_FROM_DOWNSTREAM ); - /* We're beginning a new read from downstream */ - if ( proxy->req_buf_size == 0 ) { - proxy->req_buf_size = NBD_REQUEST_SIZE; - } - read: - debug( - "Reading %"PRIu32" of %"PRIu32" bytes from downstream for NBD request", - proxy->req_buf_size - proxy->req_buf_offset, - proxy->req_buf_size - ); + count = iobuf_read( proxy->downstream_fd, &proxy->req, NBD_REQUEST_SIZE ); - count = read( - proxy->downstream_fd, - proxy->req_buf + proxy->req_buf_offset, - proxy->req_buf_size - proxy->req_buf_offset - ); - - if ( count == 0 ) { - info( "EOF on downstream fd %i received", proxy->downstream_fd ); + if ( count == -1 ) { + warn( SHOW_ERRNO( "Couldn't read request from downstream" ) ); return EXIT; } - if ( count != -1 ) { - proxy->req_buf_offset += count; - } else if ( io_errno_permanent() ) { - warn( SHOW_ERRNO( "Couldn't read request from downstream: " ) ); - return EXIT; - } - - if ( proxy->req_buf_offset == NBD_REQUEST_SIZE ) { + if ( proxy->req.needle == NBD_REQUEST_SIZE ) { nbd_r2h_request( request_raw, request ); if ( request->type == REQUEST_DISCONNECT ) { @@ -438,7 +412,7 @@ read: return EXIT; } - proxy->req_buf_size += request->len; + proxy->req.size += request->len; /* Minor optimisation: Read again immediately if there's write * request data to be had. No point select()ing again @@ -447,7 +421,7 @@ read: } } - if ( proxy->req_buf_offset == proxy->req_buf_size ) { + if ( proxy->req.needle == proxy->req.size ) { debug( "Received NBD request from downstream. type=%"PRIu32" from=%"PRIu64" len=%"PRIu32, request->type, request->from, request->len @@ -455,7 +429,7 @@ read: /* Finished reading, so advance state. Leave size untouched so the next * state knows how many bytes to write */ - proxy->req_buf_offset = 0; + proxy->req.needle = 0; return WRITE_TO_UPSTREAM; } @@ -496,37 +470,19 @@ int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state ) int proxy_read_init_from_upstream( struct proxier* proxy, int state ) { ssize_t count; - unsigned char* buf = (unsigned char *) &(proxy->init_buf); // assert( state == READ_INIT_FROM_UPSTREAM ); - debug( - "Reading %"PRIu32" bytes of %"PRIu32" in init message", - sizeof( proxy->init_buf ) - proxy->init_buf_offset, - sizeof( proxy->init_buf ) - ); + count = iobuf_read( proxy->upstream_fd, &proxy->init, sizeof( struct nbd_init_raw ) ); - count = read( - proxy->upstream_fd, - buf + proxy->init_buf_offset, - sizeof( proxy->init_buf ) - proxy->init_buf_offset - ); - - if ( count == 0 ) { - info( "EOF signalled on upstream fd %i", proxy->upstream_fd ); - goto disconnect; - } - - if ( count != -1 ) { - proxy->init_buf_offset += count; - } else if ( io_errno_permanent() ) { + if ( count == -1 ) { warn( SHOW_ERRNO( "Failed to read init from upstream" ) ); goto disconnect; } - if ( proxy->init_buf_offset == sizeof( proxy->init_buf ) ) { + if ( proxy->init.needle == proxy->init.size ) { off64_t upstream_size; - if ( !nbd_check_hello( &proxy->init_buf, &upstream_size ) ) { + if ( !nbd_check_hello( (struct nbd_init_raw*) proxy->init.buf, &upstream_size ) ) { warn( "Upstream sent invalid init" ); goto disconnect; } @@ -534,14 +490,15 @@ int proxy_read_init_from_upstream( struct proxier* proxy, int state ) /* Currently, we only get disconnected from upstream (so needing to come * here) when we have an outstanding request. If that becomes false, * we'll need to choose the right state to return to here */ - proxy->init_buf_offset = 0; + proxy->init.needle = 0; return WRITE_TO_UPSTREAM; } return state; disconnect: - proxy->init_buf_offset = 0; + proxy->init.needle = 0; + proxy->init.size = 0; return CONNECT_TO_UPSTREAM; } @@ -550,32 +507,19 @@ int proxy_write_to_upstream( struct proxier* proxy, int state ) ssize_t count; // assert( state == WRITE_TO_UPSTREAM ); + count = iobuf_write( proxy->upstream_fd, &proxy->req ); - debug( - "Writing %"PRIu32" of %"PRIu32" bytes of request to upstream", - proxy->req_buf_size - proxy->req_buf_offset, - proxy->req_buf_size - ); - - count = write( - proxy->upstream_fd, - proxy->req_buf + proxy->req_buf_offset, - proxy->req_buf_size - proxy->req_buf_offset - ); - - if ( count != -1 ) { - proxy->req_buf_offset += count; - } else if ( io_errno_permanent() ) { + if ( count == -1 ) { warn( SHOW_ERRNO( "Failed to send request to upstream" ) ); - proxy->req_buf_offset = 0; + proxy->req.needle = 0; return CONNECT_TO_UPSTREAM; } - if ( proxy->req_buf_offset == proxy->req_buf_size ) { + if ( proxy->req.needle == proxy->req.size ) { /* Request sent. Advance to reading the response from upstream. We might - * still need req_buf_size if reading the reply fails - we disconnect + * still need req.size if reading the reply fails - we disconnect * and resend the reply in that case - so keep it around for now. */ - proxy->req_buf_offset = 0; + proxy->req.needle = 0; return READ_FROM_UPSTREAM; } @@ -587,45 +531,18 @@ int proxy_read_from_upstream( struct proxier* proxy, int state ) ssize_t count; struct nbd_reply* reply = &(proxy->rsp_hdr); - struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) proxy->rsp_buf; - - /* We can't assume the NBD_REPLY_SIZE + req->len is what we'll get back */ - if ( proxy->rsp_buf_offset == 0 ) { - proxy->rsp_buf_size = NBD_REPLY_SIZE; - } + struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) proxy->rsp.buf; read: - debug( - "Reading %"PRIu32" of %"PRIu32" bytes in NBD reply", - proxy->rsp_buf_size - proxy->rsp_buf_offset, - proxy->rsp_buf_size - ); + /* We can't assume the NBD_REPLY_SIZE + req->len is what we'll get back */ + count = iobuf_read( proxy->upstream_fd, &proxy->rsp, NBD_REPLY_SIZE ); - count = read( - proxy->upstream_fd, - proxy->rsp_buf + proxy->rsp_buf_offset, - proxy->rsp_buf_size - proxy->rsp_buf_offset - ); - - if ( count == 0 ) { - info( "EOF signalled on upstream fd %i", proxy->upstream_fd ); - goto disconnect; - } - - if ( count != -1 ) { - proxy->rsp_buf_offset += count; - } else if ( io_errno_permanent() ) { + if ( count == -1 ) { warn( SHOW_ERRNO( "Failed to get reply from upstream" ) ); goto disconnect; } - debug( - "Read %"PRIu32" of %"PRIu32" bytes of reply from upstream", - proxy->rsp_buf_offset, - proxy->rsp_buf_size - ); - - if ( proxy->rsp_buf_offset == NBD_REPLY_SIZE ) { + if ( proxy->rsp.needle == NBD_REPLY_SIZE ) { nbd_r2h_reply( reply_raw, reply ); if ( reply->magic != REPLY_MAGIC ) { @@ -641,22 +558,22 @@ read: if ( ( proxy->req_hdr.type || REQUEST_READ ) == REQUEST_READ ) { /* Read the read reply data too. Same optimisation as * read_from_downstream */ - proxy->rsp_buf_size += proxy->req_hdr.len; + proxy->rsp.size += proxy->req_hdr.len; goto read; } } - if ( proxy->rsp_buf_size == proxy->rsp_buf_offset ) { + if ( proxy->rsp.size == proxy->rsp.needle ) { debug( "NBD reply received from upstream." ); - proxy->rsp_buf_offset = 0; + proxy->rsp.needle = 0; return WRITE_TO_DOWNSTREAM; } return state; disconnect: - proxy->rsp_buf_offset = 0; - proxy->rsp_buf_size = 0; + proxy->rsp.needle = 0; + proxy->rsp.size = 0; return CONNECT_TO_UPSTREAM; } @@ -667,33 +584,18 @@ int proxy_write_to_downstream( struct proxier* proxy, int state ) // assert( state == WRITE_TO_DOWNSTREAM ); - if ( proxy->hello_sent ) { - debug( - "Writing request of %"PRIu32" bytes from offset %"PRIu32, - proxy->rsp_buf_size, proxy->rsp_buf_offset - ); - } else { - debug( "Writing init to downstream" ); + if ( !proxy->hello_sent ) { + info( "Writing init to downstream" ); } - count = write( - proxy->downstream_fd, - proxy->rsp_buf + proxy->rsp_buf_offset, - proxy->rsp_buf_size - proxy->rsp_buf_offset - ); + count = iobuf_write( proxy->downstream_fd, &proxy->rsp ); - if ( count != -1 ) { - proxy->rsp_buf_offset += count; - } else if ( io_errno_permanent() ) { - if ( proxy->hello_sent ) { - warn( SHOW_ERRNO( "Failed to send reply downstream" ) ); - } else { - warn( SHOW_ERRNO( "Failed to send init downstream" ) ); - } + if ( count == -1 ) { + warn( SHOW_ERRNO( "Failed to write to downstream" ) ); return EXIT; } - if ( proxy->rsp_buf_offset == proxy->rsp_buf_size ) { + if ( proxy->rsp.needle == proxy->rsp.size ) { if ( !proxy->hello_sent ) { info( "Hello message sent to client" ); proxy->hello_sent = 1; @@ -703,10 +605,10 @@ int proxy_write_to_downstream( struct proxier* proxy, int state ) } /* We're done with the request & response buffers now */ - proxy->req_buf_size = 0; - proxy->req_buf_offset = 0; - proxy->rsp_buf_size = 0; - proxy->rsp_buf_offset = 0; + proxy->req.size = 0; + proxy->req.needle = 0; + proxy->rsp.size = 0; + proxy->rsp.needle = 0; return READ_FROM_DOWNSTREAM; } @@ -725,9 +627,9 @@ int proxy_write_to_downstream( struct proxier* proxy, int state ) void proxy_session( struct proxier* proxy ) { /* First action: Write hello to downstream */ - nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp_buf, proxy->upstream_size ); - proxy->rsp_buf_size = sizeof( struct nbd_init_raw ); - proxy->rsp_buf_offset = 0; + nbd_hello_to_buf( (struct nbd_init_raw *) proxy->rsp.buf, proxy->upstream_size ); + proxy->rsp.size = sizeof( struct nbd_init_raw ); + proxy->rsp.needle = 0; int state = WRITE_TO_DOWNSTREAM; info( "Beginning proxy session on fd %i", proxy->downstream_fd ); diff --git a/src/proxy.h b/src/proxy.h index 3ac957f..1991e10 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -4,6 +4,7 @@ #include #include +#include "ioutil.h" #include "flexnbd.h" #include "parse.h" #include "nbdtypes.h" @@ -43,30 +44,22 @@ struct proxier { /* This is the size we advertise to the downstream server */ off64_t upstream_size; - /* Scratch space for the current NBD request from downstream */ - unsigned char* req_buf; - - /* Number of bytes currently sat in req_buf */ - ssize_t req_buf_size; - - /* Number of bytes of request we've gotten through */ - off_t req_buf_offset; - /* We transform the raw request header into here */ struct nbd_request req_hdr; - /* Scratch space for the current NBD reply from upstream */ - unsigned char* rsp_buf; - - /* Number of bytes currently sat in rsp_buf */ - ssize_t rsp_buf_size; - - /* Number of bytes of response we've gotten through */ - off_t rsp_buf_offset; - /* We transform the raw reply header into here */ struct nbd_reply rsp_hdr; + /* Used for our non-blocking negotiation with upstream. TODO: maybe use + * for downstream as well ( we currently overload rsp ) */ + struct iobuf init; + + /* The current NBD request from downstream */ + struct iobuf req; + + /* The current NBD reply from upstream */ + struct iobuf rsp; + /* It's starting to feel like we need an object for a single proxy session. * These two track how many requests we've sent so far, and whether the * NBD_INIT code has been sent to the client yet. @@ -74,10 +67,6 @@ struct proxier { uint64_t req_count; int hello_sent; - /* And now we're doing non-blocking connect to upstream, we need this too */ - struct nbd_init_raw init_buf; - off_t init_buf_offset; - #ifdef PREFETCH struct prefetch *prefetch; #endif