Add a trivial read buffer to flexnbd-proxy.

Since the vast majority (something like 94% on boot) are sequential small
reads, and since network latency is a major factor in determining how fast the
exposed device appears to the client, it makes sense for us to try to minimise
the number of network requests where we safely can.

This patch implements the simplest possible read cache in flexnbd-proxy.  When
it receives a read request, if it's a small request then flexnbd-proxy will
double the length of data requested.  On receiving the data from the upstream
server, flexnbd-proxy will return the first half to the downstream as normal,
and stash the second half in a buffer.  If the very next request is a read, and
the offset and length match those of what we have stored, that second request
will be satisfied from the buffer without going out over the network.

The cache is invalidated by any non-read request, or by a disconnection.
This commit is contained in:
Alex Young
2013-04-29 14:50:42 +01:00
parent 33ee19dc5a
commit 574d44f17f
4 changed files with 141 additions and 16 deletions

14
src/prefetch.h Normal file
View File

@@ -0,0 +1,14 @@
#ifndef PREFETCH_H
#define PREFETCH_H
#define PREFETCH_BUFSIZE 4096
struct prefetch {
int is_full;
__be64 from;
__be32 len;
char buffer[PREFETCH_BUFSIZE];
};
#endif

View File

@@ -1,6 +1,11 @@
#include "proxy.h"
#include "readwrite.h"
#ifdef PREFETCH
#include "prefetch.h"
#endif
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
@@ -60,6 +65,11 @@ struct proxier* proxy_create(
out->downstream_fd = -1;
out->upstream_fd = -1;
#ifdef PREFETCH
out->prefetch = xmalloc( sizeof( struct prefetch ) );
#endif
out->req_buf = xmalloc( NBD_MAX_SIZE );
out->rsp_buf = xmalloc( NBD_MAX_SIZE );
@@ -70,6 +80,10 @@ void proxy_destroy( struct proxier* proxy )
{
free( proxy->req_buf );
free( proxy->rsp_buf );
#ifdef PREFETCH
free( proxy->prefetch );
#endif
free( proxy );
}
@@ -232,20 +246,85 @@ int proxy_run_request_upstream( struct proxier* proxy )
struct nbd_reply_raw* reply_raw = (struct nbd_reply_raw*) rsp_hdr_raw;
struct nbd_request* request = &(proxy->req_hdr);
#ifdef PREFETCH
/* We only want to consider prefetching if we know we're not getting too much
* data back, if it's a read request, and if the prefetch won't try to read
* past the end of the file.
*/
int prefetching =
request->len <= PREFETCH_BUFSIZE &&
request->type == REQUEST_READ &&
(request->from + request->len * 2) <= proxy->upstream_size;
/* We pull the request out of the proxy struct, rewrite the
* request size, and write it back.
*/
if ( prefetching ) {
/* We need a malloc here because nbd_h2r_request craps
* out if passed an address on the stack
*/
struct nbd_request* rewrite_request = xmalloc( sizeof( struct nbd_request ) );
NULLCHECK( rewrite_request );
memcpy( rewrite_request, request, sizeof( struct nbd_request ) );
rewrite_request->len *= 2;
debug( "Prefetching %d bytes", rewrite_request->len );
nbd_h2r_request( rewrite_request, (struct nbd_request_raw *)proxy->req_buf );
free( rewrite_request );
}
#endif
struct nbd_reply* reply = &(proxy->rsp_hdr);
size_t rsp_buf_size;
#ifdef PREFETCH
if ( request->type == REQUEST_READ ){
/* See if we can respond with what's in our prefetch
* cache
*/
if( proxy->prefetch->is_full &&
request->from == proxy->prefetch->from &&
request->len == proxy->prefetch->len ) {
/* HUZZAH! A match! */
debug( "Prefetch hit!" );
/* First build a reply header */
struct nbd_reply prefetch_reply = {REPLY_MAGIC, 0, "01234567"};
memcpy( &(prefetch_reply.handle), request->handle, 8 );
/* now copy it into the response */
nbd_h2r_reply( &prefetch_reply, reply_raw );
/* and the data */
memcpy( rsp_data, proxy->prefetch->buffer, proxy->prefetch->len );
proxy->rsp_buf_size = NBD_REPLY_SIZE + proxy->prefetch->len;
/* return early, our work here is done */
return 1;
}
}
else {
/* Safety catch. If we're sending a write request, we
* blow away the cache. This is very pessimistic, but
* it's simpler (and therefore safer) than working out
* whether we can keep it or not.
*/
debug( "Blowing away prefetch cache on type %d request.", request->type );
proxy->prefetch->is_full = 0;
}
#endif
if ( writeloop( proxy->upstream_fd, proxy->req_buf, proxy->req_buf_size ) == -1 ) {
warn( "Failed to send request to upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
goto disconnect;
}
if ( readloop( proxy->upstream_fd, rsp_hdr_raw, NBD_REPLY_SIZE ) == -1 ) {
debug( "Failed to get reply header from upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
goto disconnect;
}
nbd_r2h_reply( reply_raw, reply );
@@ -253,8 +332,7 @@ int proxy_run_request_upstream( struct proxier* proxy )
if ( reply->magic != REPLY_MAGIC ) {
debug( "Reply magic is incorrect" );
proxy_disconnect_from_upstream( proxy );
return 0;
goto disconnect;
}
debug( "NBD reply received from upstream. Response code: %"PRIu32, reply->error );
@@ -263,17 +341,42 @@ int proxy_run_request_upstream( struct proxier* proxy )
warn( "NBD error returned from upstream: %"PRIu32, reply->error );
}
if ( reply->error == 0 && request->type == REQUEST_READ ) {
if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) {
debug( "Failed to get reply data from upstream" );
proxy_disconnect_from_upstream( proxy );
return 0;
}
rsp_buf_size += request->len;
}
if ( reply->error == 0 && request->type == REQUEST_READ ) {
#ifdef PREFETCH
if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) {
debug( "Failed to get reply data from upstream" );
goto disconnect;
}
if ( prefetching ) {
if ( readloop( proxy->upstream_fd, &(proxy->prefetch->buffer), request->len ) == -1 ) {
debug( "Failed to get prefetch reply data from upstream" );
goto disconnect;
}
proxy->prefetch->from = request->from + request->len;
proxy->prefetch->len = request->len;
proxy->prefetch->is_full = 1;
}
#else
if (readloop( proxy->upstream_fd, rsp_data, request->len ) == -1 ) {
debug( "Failed to get reply data from upstream" );
goto disconnect;
}
#endif
rsp_buf_size += request->len;
}
proxy->rsp_buf_size = rsp_buf_size;
return 1;
disconnect:
proxy_disconnect_from_upstream( proxy );
#ifdef PREFETCH
proxy->prefetch->is_full = 0;
#endif
return 0;
}
/* Write an NBD reply back downstream. Return 0 on failure, 1 on success. */
@@ -333,7 +436,7 @@ void proxy_session( struct proxier* proxy )
sleep( 5 );
continue;
}
debug( "Connected to upstream" );
debug( "Connected to upstream");
}
result = proxy_run_request_upstream( proxy );
} while ( result == 0 );

View File

@@ -9,6 +9,10 @@
#include "nbdtypes.h"
#include "self_pipe.h"
#ifdef PREFETCH
#include "prefetch.h"
#endif
struct proxier {
/* The flexnbd wrapper this proxier is attached to */
struct flexnbd* flexnbd;
@@ -56,6 +60,10 @@ struct proxier {
/* We transform the raw reply header into here */
struct nbd_reply rsp_hdr;
#ifdef PREFETCH
struct prefetch *prefetch;
#endif
};
struct proxier* proxy_create(

View File

@@ -102,7 +102,7 @@ class TestProxyMode < Test::Unit::TestCase
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
assert_not_equal 0, req1[:len]
# Kill the server again, now we're sure the read request has been sent once
sc1.close