From fa8023cf69ba50f7033cded1dd0943876724781e Mon Sep 17 00:00:00 2001 From: Alex Young Date: Thu, 27 Feb 2014 14:21:36 +0000 Subject: [PATCH] Proxy prefetch cache becomes a command-line argument. --- README.proxy.txt | 30 ++- src/common/mode.h | 7 +- src/common/serve.h | 3 + src/proxy-main.c | 17 +- src/proxy/prefetch.c | 77 ++++++++ src/proxy/prefetch.h | 25 ++- src/proxy/proxy.c | 95 +++++----- src/proxy/proxy.h | 10 +- tests/acceptance/environment.rb | 5 + tests/acceptance/flexnbd.rb | 4 + tests/acceptance/proxy_tests.rb | 190 +++++++++++++++++++ tests/acceptance/test_prefetch_proxy_mode.rb | 22 +++ tests/acceptance/test_proxy_mode.rb | 190 +------------------ 13 files changed, 435 insertions(+), 240 deletions(-) create mode 100644 src/proxy/prefetch.c create mode 100644 tests/acceptance/proxy_tests.rb create mode 100644 tests/acceptance/test_prefetch_proxy_mode.rb diff --git a/README.proxy.txt b/README.proxy.txt index a7ac11a..cc20c9c 100644 --- a/README.proxy.txt +++ b/README.proxy.txt @@ -28,7 +28,8 @@ USAGE ----- $ flexnbd-proxy --addr [ --port ] - --conn-addr --conn-port [--bind ] [option]* + --conn-addr --conn-port + [--bind ] [--cache[=]] [option]* Proxy requests from an NBD client to an NBD server, resiliently. Only one client can be connected at a time, and ACLs cannot be applied to the client, as they @@ -73,6 +74,10 @@ Options *--conn-port, -P PORT*: The port of the NBD server to connect to. Required. +*--cache, -c=CACHE_BYTES*: + If given, the size in bytes of read cache to use. CACHE_BYTES + defaults to 4096. + *--help, -h* : Show command or global help. @@ -154,6 +159,29 @@ The proxy notices and reconnects, fulfiling any request it has in its buffer. The data in myfile has been moved between physical servers without the nbd client process having to be disturbed at all. +READ CACHE +---------- + +If the --cache option is given at the command line, either without an +argument or with an argument greater than 0, flexnbd-proxy will use a +read-ahead cache. The cache as currently implemented doubles each read +request size, up to a maximum of 2xCACHE_BYTES, and retains the latter +half in a buffer. If the next read request from the client exactly +matches the region held in the buffer, flexnbd-proxy responds from the +cache without making a request to the server. + +This pattern is designed to match sequential reads, such as those +performed by a booting virtual machine. + +Note: If specifying a cache size, you *must* use this form: + + nbd-client$ flexnbd-proxy --cache=XXXX + +That is, the '=' is required. This is a limitation of getopt-long. + +If no cache size is given, a size of 4096 bytes is assumed. Caching can +be explicitly disabled by setting a size of 0. + BUGS ---- diff --git a/src/common/mode.h b/src/common/mode.h index c895483..f117e95 100644 --- a/src/common/mode.h +++ b/src/common/mode.h @@ -7,8 +7,9 @@ void mode(char* mode, int argc, char **argv); #include -#define GETOPT_ARG(x,s) {(x), 1, 0, (s)} -#define GETOPT_FLAG(x,v) {(x), 0, 0, (v)} +#define GETOPT_ARG(x,s) {(x), required_argument, 0, (s)} +#define GETOPT_FLAG(x,v) {(x), no_argument, 0, (v)} +#define GETOPT_OPTARG(x,s) {(x), optional_argument, 0, (s)} #define OPT_HELP "help" #define OPT_ADDR "addr" @@ -19,6 +20,7 @@ void mode(char* mode, int argc, char **argv); #define OPT_FROM "from" #define OPT_SIZE "size" #define OPT_DENY "default-deny" +#define OPT_CACHE "cache" #define OPT_UNLINK "unlink" #define OPT_CONNECT_ADDR "conn-addr" #define OPT_CONNECT_PORT "conn-port" @@ -52,6 +54,7 @@ void mode(char* mode, int argc, char **argv); #define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' ) #define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' ) #define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' ) +#define GETOPT_CACHE GETOPT_OPTARG( OPT_CACHE, 'c' ) #define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' ) #define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' ) #define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' ) diff --git a/src/common/serve.h b/src/common/serve.h index 0102733..9b90cbc 100644 --- a/src/common/serve.h +++ b/src/common/serve.h @@ -154,8 +154,11 @@ int do_serve( struct server *, struct self_pipe * ); struct mode_readwrite_params { union mysockaddr connect_to; union mysockaddr connect_from; + + /* FIXME: these should be uint64_t and uint32_t respectively */ off64_t from; off64_t len; + int data_fd; int client; }; diff --git a/src/proxy-main.c b/src/proxy-main.c index fad873a..fae4d07 100644 --- a/src/proxy-main.c +++ b/src/proxy-main.c @@ -14,6 +14,7 @@ static struct option proxy_options[] = { GETOPT_CONNECT_ADDR, GETOPT_CONNECT_PORT, GETOPT_BIND, + GETOPT_CACHE, GETOPT_QUIET, GETOPT_VERBOSE, {0} @@ -29,16 +30,20 @@ static char proxy_help_text[] = "\t--" OPT_CONNECT_ADDR ",-C \tAddress of the proxied server.\n" "\t--" OPT_CONNECT_PORT ",-P \tPort of the proxied server.\n" "\t--" OPT_BIND ",-b \tThe address we connect from, as a proxy.\n" + "\t--" OPT_CACHE ",-c[=]\tUse a RAM read cache of the given size.\n" QUIET_LINE VERBOSE_LINE; +static char proxy_default_cache_size[] = "4096"; + void read_proxy_param( int c, char **downstream_addr, char **downstream_port, char **upstream_addr, char **upstream_port, - char **bind_addr ) + char **bind_addr, + char **cache_bytes) { switch( c ) { case 'h' : @@ -59,6 +64,9 @@ void read_proxy_param( case 'b': *bind_addr = optarg; break; + case 'c': + *cache_bytes = optarg ? optarg : proxy_default_cache_size; + break; case 'q': log_level = QUIET_LOG_LEVEL; break; @@ -90,6 +98,7 @@ int main( int argc, char *argv[] ) char *upstream_addr = NULL; char *upstream_port = NULL; char *bind_addr = NULL; + char *cache_bytes = NULL; int success; sigset_t mask; @@ -114,7 +123,8 @@ int main( int argc, char *argv[] ) &downstream_port, &upstream_addr, &upstream_port, - &bind_addr + &bind_addr, + &cache_bytes ); } @@ -131,7 +141,8 @@ int main( int argc, char *argv[] ) downstream_port, upstream_addr, upstream_port, - bind_addr + bind_addr, + cache_bytes ); /* Set these *after* proxy has been assigned to */ diff --git a/src/proxy/prefetch.c b/src/proxy/prefetch.c new file mode 100644 index 0000000..de8306d --- /dev/null +++ b/src/proxy/prefetch.c @@ -0,0 +1,77 @@ +#include "prefetch.h" +#include "util.h" + + +struct prefetch* prefetch_create( size_t size_bytes ){ + + struct prefetch* out = xmalloc( sizeof( struct prefetch ) ); + NULLCHECK( out ); + + out->buffer = xmalloc( size_bytes ); + NULLCHECK( out->buffer ); + + out->size = size_bytes; + out->is_full = 0; + out->from = 0; + out->len = 0; + + return out; + +} + +void prefetch_destroy( struct prefetch *prefetch ) { + if( prefetch ) { + free( prefetch->buffer ); + free( prefetch ); + } +} + +size_t prefetch_size( struct prefetch *prefetch){ + if ( prefetch ) { + return prefetch->size; + } else { + return 0; + } +} + +void prefetch_set_is_empty( struct prefetch *prefetch ){ + prefetch_set_full( prefetch, 0 ); +} + +void prefetch_set_is_full( struct prefetch *prefetch ){ + prefetch_set_full( prefetch, 1 ); +} + +void prefetch_set_full( struct prefetch *prefetch, int val ){ + if( prefetch ) { + prefetch->is_full = val; + } +} + +int prefetch_is_full( struct prefetch *prefetch ){ + if( prefetch ) { + return prefetch->is_full; + } else { + return 0; + } +} + +int prefetch_contains( struct prefetch* prefetch, uint64_t from, uint32_t len ){ + NULLCHECK( prefetch ); + return from == prefetch->from && len == prefetch->len; +} +char *prefetch_offset( struct prefetch* prefetch, uint64_t from ){ + return prefetch->buffer; +} +/* +int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len ){ + NULLCHECK( prefetch ); + return from >= prefetch->from && + from + len <= prefetch->from + prefetch->len; +} + +char *prefetch_offset( struct prefetch *prefetch, uint64_t from ){ + NULLCHECK( prefetch ); + return prefetch->buffer + (from - prefetch->from); +} +*/ diff --git a/src/proxy/prefetch.h b/src/proxy/prefetch.h index 0513f8f..7c71885 100644 --- a/src/proxy/prefetch.h +++ b/src/proxy/prefetch.h @@ -1,14 +1,33 @@ #ifndef PREFETCH_H #define PREFETCH_H +#include +#include + #define PREFETCH_BUFSIZE 4096 struct prefetch { + /* True if there is data in the buffer. */ int is_full; - __be64 from; - __be32 len; + /* The start point of the current content of buffer */ + uint64_t from; + /* The length of the current content of buffer */ + uint32_t len; - char buffer[PREFETCH_BUFSIZE]; + /* The total size of the buffer, in bytes. */ + size_t size; + + char *buffer; }; +struct prefetch* prefetch_create( size_t size_bytes ); +void prefetch_destroy( struct prefetch *prefetch ); +size_t prefetch_size( struct prefetch *); +void prefetch_set_is_empty( struct prefetch *prefetch ); +void prefetch_set_is_full( struct prefetch *prefetch ); +void prefetch_set_full( struct prefetch *prefetch, int val ); +int prefetch_is_full( struct prefetch *prefetch ); +int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len ); +char *prefetch_offset( struct prefetch *prefetch, uint64_t from ); + #endif diff --git a/src/proxy/proxy.c b/src/proxy/proxy.c index 830cb48..16d853a 100644 --- a/src/proxy/proxy.c +++ b/src/proxy/proxy.c @@ -1,9 +1,7 @@ #include "proxy.h" #include "readwrite.h" -#ifdef PREFETCH #include "prefetch.h" -#endif #include "ioutil.h" @@ -20,7 +18,8 @@ struct proxier* proxy_create( char* s_downstream_port, char* s_upstream_address, char* s_upstream_port, - char* s_upstream_bind ) + char* s_upstream_bind, + char* s_cache_bytes ) { struct proxier* out; out = xmalloc( sizeof( struct proxier ) ); @@ -65,9 +64,16 @@ struct proxier* proxy_create( out->downstream_fd = -1; out->upstream_fd = -1; -#ifdef PREFETCH - out->prefetch = xmalloc( sizeof( struct prefetch ) ); -#endif + out->prefetch = NULL; + if ( s_cache_bytes ){ + int cache_bytes = atoi( s_cache_bytes ); + /* leaving this off or setting a cache size of zero or + * less results in no cache. + */ + if ( cache_bytes >= 0 ) { + out->prefetch = prefetch_create( cache_bytes ); + } + } out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) ); out->req.buf = xmalloc( NBD_MAX_SIZE ); @@ -76,14 +82,22 @@ struct proxier* proxy_create( return out; } +int proxy_prefetches( struct proxier* proxy ) { + NULLCHECK( proxy ); + return proxy->prefetch != NULL; +} + +int proxy_prefetch_bufsize( struct proxier* proxy ){ + NULLCHECK( proxy ); + return prefetch_size( proxy->prefetch ); +} + void proxy_destroy( struct proxier* proxy ) { free( proxy->init.buf ); free( proxy->req.buf ); free( proxy->rsp.buf ); -#ifdef PREFETCH - free( proxy->prefetch ); -#endif + prefetch_destroy( proxy->prefetch ); free( proxy ); } @@ -279,10 +293,9 @@ static inline int proxy_state_upstream( int state ) state == WRITE_TO_UPSTREAM || state == READ_FROM_UPSTREAM; } -#ifdef PREFETCH - int proxy_prefetch_for_request( struct proxier* proxy, int state ) { + NULLCHECK( proxy ); struct nbd_request* req = &proxy->req_hdr; struct nbd_reply* rsp = &proxy->rsp_hdr; @@ -291,23 +304,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state ) int is_read = ( req->type & REQUEST_MASK ) == REQUEST_READ; - int prefetch_start = req->from; - int prefetch_end = req->from + ( req->len * 2 ); - - /* We only want to consider prefetching if we know we're not - * getting too much data back, if it's a read request, and if - * the prefetch won't try to read past the end of the file. - */ - - int prefetching = req->len <= PREFETCH_BUFSIZE && is_read && - prefetch_start < prefetch_end && prefetch_end <= proxy->upstream_size; - if ( is_read ) { /* See if we can respond with what's in our prefetch * cache */ - if ( proxy->prefetch->is_full && - req->from == proxy->prefetch->from && - req->len == proxy->prefetch->len ) { + if ( prefetch_is_full( proxy->prefetch ) && + prefetch_contains( proxy->prefetch, req->from, req->len ) ) { /* HUZZAH! A match! */ debug( "Prefetch hit!" ); @@ -322,10 +323,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state ) /* and the data */ memcpy( proxy->rsp.buf + NBD_REPLY_SIZE, - proxy->prefetch->buffer, proxy->prefetch->len + prefetch_offset( proxy->prefetch, req->from ), + req->len ); - proxy->rsp.size = NBD_REPLY_SIZE + proxy->prefetch->len; + proxy->rsp.size = NBD_REPLY_SIZE + req->len; proxy->rsp.needle = 0; /* return early, our work here is done */ @@ -339,11 +341,24 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state ) * whether we can keep it or not. */ debug( "Blowing away prefetch cache on type %d request.", req->type ); - proxy->prefetch->is_full = 0; + prefetch_set_is_empty( proxy->prefetch ); } debug( "Prefetch cache MISS!"); + uint64_t prefetch_start = req->from; + /* We prefetch what we expect to be the next request. */ + uint64_t prefetch_end = req->from + ( req->len * 2 ); + + /* We only want to consider prefetching if we know we're not + * getting too much data back, if it's a read request, and if + * the prefetch won't try to read past the end of the file. + */ + int prefetching = + req->len <= prefetch_size( proxy->prefetch ) && + is_read && + prefetch_start < prefetch_end && + prefetch_end <= (uint64_t)proxy->upstream_size; /* FIXME: we shouldn't need a cast - upstream_size should be uint64_t */ /* We pull the request out of the proxy struct, rewrite the * request size, and write it back. @@ -354,7 +369,8 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state ) req->len *= 2; - debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len ); + debug( "Prefetching additional %"PRIu32" bytes", + req->len - proxy->prefetch_req_orig_len ); nbd_h2r_request( req, req_raw ); } @@ -371,10 +387,10 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state ) prefetched_bytes = proxy->req_hdr.len - proxy->prefetch_req_orig_len; - debug( "Prefetched %d bytes", prefetched_bytes ); + debug( "Prefetched additional %d bytes", prefetched_bytes ); memcpy( - proxy->rsp.buf + proxy->prefetch_req_orig_len, - &(proxy->prefetch->buffer), + proxy->prefetch->buffer, + proxy->rsp.buf + proxy->prefetch_req_orig_len + NBD_REPLY_SIZE, prefetched_bytes ); @@ -389,13 +405,12 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state ) proxy->rsp.size -= prefetched_bytes; /* And we need to reset these */ - proxy->prefetch->is_full = 1; + prefetch_set_is_full( proxy->prefetch ); proxy->is_prefetch_req = 0; return state; } -#endif int proxy_read_from_downstream( struct proxier *proxy, int state ) @@ -476,10 +491,8 @@ int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state ) return state; } -#ifdef PREFETCH /* Data may have changed while we were disconnected */ - proxy->prefetch->is_full = 0; -#endif + prefetch_set_is_empty( proxy->prefetch ); info( "Connected to upstream on fd %i", proxy->upstream_fd ); return READ_INIT_FROM_UPSTREAM; @@ -762,14 +775,12 @@ void proxy_session( struct proxier* proxy ) case READ_FROM_DOWNSTREAM: if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) { state = proxy_read_from_downstream( proxy, state ); -#ifdef PREFETCH /* Check if we can fulfil the request from prefetch, or * rewrite the request to fill the prefetch buffer if needed */ - if ( state == WRITE_TO_UPSTREAM ) { + if ( proxy_prefetches( proxy ) && state == WRITE_TO_UPSTREAM ) { state = proxy_prefetch_for_request( proxy, state ); } -#endif } break; case CONNECT_TO_UPSTREAM: @@ -800,12 +811,10 @@ void proxy_session( struct proxier* proxy ) if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) { state = proxy_read_from_upstream( proxy, state ); } -# ifdef PREFETCH /* Fill the prefetch buffer and rewrite the reply, if needed */ - if ( state == WRITE_TO_DOWNSTREAM ) { + if ( proxy_prefetches( proxy ) && state == WRITE_TO_DOWNSTREAM ) { state = proxy_prefetch_for_reply( proxy, state ); } -#endif break; case WRITE_TO_DOWNSTREAM: if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) { diff --git a/src/proxy/proxy.h b/src/proxy/proxy.h index 38645d9..d99e0bd 100644 --- a/src/proxy/proxy.h +++ b/src/proxy/proxy.h @@ -48,6 +48,7 @@ struct proxier { int upstream_fd; /* This is the size we advertise to the downstream server */ + /* FIXME: should be uint64_t */ off64_t upstream_size; /* We transform the raw request header into here */ @@ -73,7 +74,8 @@ struct proxier { uint64_t req_count; int hello_sent; -#ifdef PREFETCH + /** These are only used if we pass --cache on the command line */ + /* While the in-flight request has been munged by prefetch, these two are * set to true, and the original length of the request, respectively */ int is_prefetch_req; @@ -81,7 +83,8 @@ struct proxier { /* And here, we actually store the prefetched data once it's returned */ struct prefetch *prefetch; -#endif + + /** */ }; struct proxier* proxy_create( @@ -89,7 +92,8 @@ struct proxier* proxy_create( char* s_downstream_port, char* s_upstream_address, char* s_upstream_port, - char* s_upstream_bind ); + char* s_upstream_bind, + char* s_cache_bytes); int do_proxy( struct proxier* proxy ); void proxy_cleanup( struct proxier* proxy ); void proxy_destroy( struct proxier* proxy ); diff --git a/tests/acceptance/environment.rb b/tests/acceptance/environment.rb index 0275705..514f814 100644 --- a/tests/acceptance/environment.rb +++ b/tests/acceptance/environment.rb @@ -21,6 +21,11 @@ class Environment @fake_pid = nil end + def prefetch_proxy! + @nbd1.prefetch_proxy = true + @nbd2.prefetch_proxy = true + end + def proxy1(port=@port2) @nbd1.proxy(@ip, port) end diff --git a/tests/acceptance/flexnbd.rb b/tests/acceptance/flexnbd.rb index e6b2bbd..417ed97 100644 --- a/tests/acceptance/flexnbd.rb +++ b/tests/acceptance/flexnbd.rb @@ -198,6 +198,8 @@ module FlexNBD end end + attr_accessor :prefetch_proxy + def initialize( bin, ip, port ) @bin = bin @do_debug = ENV['DEBUG'] @@ -208,6 +210,7 @@ module FlexNBD @ip = ip @port = port @kill = [] + @prefetch_proxy = false end @@ -247,6 +250,7 @@ module FlexNBD "--port #{port} "\ "--conn-addr #{connect_ip} "\ "--conn-port #{connect_port} "\ + "#{prefetch_proxy ? "--cache " : ""}"\ "#{@debug}" end diff --git a/tests/acceptance/proxy_tests.rb b/tests/acceptance/proxy_tests.rb new file mode 100644 index 0000000..5f3abff --- /dev/null +++ b/tests/acceptance/proxy_tests.rb @@ -0,0 +1,190 @@ +# encoding: utf-8 +require 'flexnbd/fake_source' +require 'flexnbd/fake_dest' + +module ProxyTests + def with_proxied_client( override_size = nil ) + @env.serve1 unless @server_up + @env.proxy2 unless @proxy_up + @env.nbd2.can_die(0) + client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy") + begin + + result = client.read_hello + assert_equal "NBDMAGIC", result[:magic] + assert_equal override_size || @env.file1.size, result[:size] + + yield client + ensure + client.close rescue nil + end + end + + def test_exits_with_error_when_cannot_connect_to_upstream_on_start + assert_raises(RuntimeError) { @env.proxy1 } + end + + def test_read_requests_successfully_proxied + with_proxied_client do |client| + (0..3).each do |n| + offset = n * 4096 + client.write_read_request(offset, 4096, "myhandle") + rsp = client.read_response + + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert_equal 0, rsp[:error] + + orig_data = @env.file1.read(offset, 4096) + data = client.read_raw(4096) + + assert_equal 4096, orig_data.size + assert_equal 4096, data.size + + assert_equal( orig_data, data, + "Returned data does not match on request #{n+1}" ) + end + end + end + + def test_write_requests_successfully_proxied + with_proxied_client do |client| + (0..3).each do |n| + offset = n * 4096 + client.write(offset, "\xFF" * 4096) + rsp = client.read_response + + assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal "myhandle", rsp[:handle] + assert_equal 0, rsp[:error] + + data = @env.file1.read(offset, 4096) + + assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" ) + end + end + end + + def make_fake_server + server = FlexNBD::FakeDest.new(@env.ip, @env.port1) + @server_up = true + + # We return a thread here because accept() and connect() both block for us + Thread.new do + sc = server.accept # just tell the supervisor we're up + sc.write_hello + + [ server, sc ] + end + end + + def test_read_request_retried_when_upstream_dies_partway + maker = make_fake_server + + with_proxied_client(4096) do |client| + server, sc1 = maker.value + + # Send the read request to the proxy + client.write_read_request( 0, 4096 ) + + # ensure we're given the read request + req1 = sc1.read_request + assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] + assert_equal ::FlexNBD::REQUEST_READ, req1[:type] + assert_equal 0, req1[:from] + assert_not_equal 0, req1[:len] + + # Kill the server again, now we're sure the read request has been sent once + sc1.close + + # We expect the proxy to reconnect without our client doing anything. + sc2 = server.accept + sc2.write_hello + + # And once reconnected, it should resend an identical request. + req2 = sc2.read_request + assert_equal req1, req2 + + # The reply should be proxied back to the client. + sc2.write_reply( req2[:handle] ) + sc2.write_data( "\xFF" * 4096 ) + + # Check it to make sure it's correct + rsp = timeout(15) { client.read_response } + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal 0, rsp[:error] + assert_equal req1[:handle], rsp[:handle] + + data = client.read_raw( 4096 ) + assert_equal( ("\xFF" * 4096), data, "Wrong data returned" ) + + sc2.close + server.close + end + + end + + def test_write_request_retried_when_upstream_dies_partway + maker = make_fake_server + + with_proxied_client(4096) do |client| + server, sc1 = maker.value + + # Send the read request to the proxy + client.write( 0, ( "\xFF" * 4096 ) ) + + # ensure we're given the read request + req1 = sc1.read_request + assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] + assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] + assert_equal 0, req1[:from] + assert_equal 4096, req1[:len] + data1 = sc1.read_data( 4096 ) + assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" ) + + # Kill the server again, now we're sure the read request has been sent once + sc1.close + + # We expect the proxy to reconnect without our client doing anything. + sc2 = server.accept + sc2.write_hello + + # And once reconnected, it should resend an identical request. + req2 = sc2.read_request + assert_equal req1, req2 + data2 = sc2.read_data( 4096 ) + assert_equal data1, data2 + + # The reply should be proxied back to the client. + sc2.write_reply( req2[:handle] ) + + # Check it to make sure it's correct + rsp = timeout(15) { client.read_response } + assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] + assert_equal 0, rsp[:error] + assert_equal req1[:handle], rsp[:handle] + + sc2.close + server.close + end + end + + def test_only_one_client_can_connect_to_proxy_at_a_time + with_proxied_client do |client| + + c2 = nil + assert_raises(Timeout::Error) do + timeout(1) do + c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)") + c2.read_hello + end + end + c2.close rescue nil if c2 + end + + + end + +end + + diff --git a/tests/acceptance/test_prefetch_proxy_mode.rb b/tests/acceptance/test_prefetch_proxy_mode.rb new file mode 100644 index 0000000..eb07454 --- /dev/null +++ b/tests/acceptance/test_prefetch_proxy_mode.rb @@ -0,0 +1,22 @@ +require 'test/unit' +require 'environment' +require 'proxy_tests' + + +class TestPrefetchProxyMode < Test::Unit::TestCase + include ProxyTests + + def setup + super + @env = Environment.new + @env.prefetch_proxy! + @env.writefile1( "f" * 16 ) + end + + def teardown + @env.cleanup + super + end +end + + diff --git a/tests/acceptance/test_proxy_mode.rb b/tests/acceptance/test_proxy_mode.rb index dd9f82c..c38116d 100644 --- a/tests/acceptance/test_proxy_mode.rb +++ b/tests/acceptance/test_proxy_mode.rb @@ -1,200 +1,20 @@ require 'test/unit' require 'environment' -require 'flexnbd/fake_source' -require 'flexnbd/fake_dest' +require 'proxy_tests' + class TestProxyMode < Test::Unit::TestCase + include ProxyTests + def setup super @env = Environment.new - @env.writefile1( "0" * 16 ) + @env.writefile1( "f" * 16 ) end def teardown @env.cleanup super end - - def with_proxied_client( override_size = nil ) - @env.serve1 unless @server_up - @env.proxy2 unless @proxy_up - @env.nbd2.can_die(0) - client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy") - begin - - result = client.read_hello - assert_equal "NBDMAGIC", result[:magic] - assert_equal override_size || @env.file1.size, result[:size] - - yield client - ensure - client.close rescue nil - end - end - - def test_exits_with_error_when_cannot_connect_to_upstream_on_start - assert_raises(RuntimeError) { @env.proxy1 } - end - - def test_read_requests_successfully_proxied - with_proxied_client do |client| - (0..3).each do |n| - offset = n * 4096 - client.write_read_request(offset, 4096, "myhandle") - rsp = client.read_response - - assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] - assert_equal "myhandle", rsp[:handle] - assert_equal 0, rsp[:error] - - orig_data = @env.file1.read(offset, 4096) - data = client.read_raw(4096) - - assert_equal 4096, orig_data.size - assert_equal 4096, data.size - - assert_equal( orig_data, data, "Returned data does not match" ) - end - end - end - - def test_write_requests_successfully_proxied - with_proxied_client do |client| - (0..3).each do |n| - offset = n * 4096 - client.write(offset, "\xFF" * 4096) - rsp = client.read_response - - assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic] - assert_equal "myhandle", rsp[:handle] - assert_equal 0, rsp[:error] - - data = @env.file1.read(offset, 4096) - - assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" ) - end - end - end - - def make_fake_server - server = FlexNBD::FakeDest.new(@env.ip, @env.port1) - @server_up = true - - # We return a thread here because accept() and connect() both block for us - Thread.new do - sc = server.accept # just tell the supervisor we're up - sc.write_hello - - [ server, sc ] - end - end - - def test_read_request_retried_when_upstream_dies_partway - maker = make_fake_server - - with_proxied_client(4096) do |client| - server, sc1 = maker.value - - # Send the read request to the proxy - client.write_read_request( 0, 4096 ) - - # ensure we're given the read request - req1 = sc1.read_request - assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] - assert_equal ::FlexNBD::REQUEST_READ, req1[:type] - assert_equal 0, req1[:from] - assert_not_equal 0, req1[:len] - - # Kill the server again, now we're sure the read request has been sent once - sc1.close - - # We expect the proxy to reconnect without our client doing anything. - sc2 = server.accept - sc2.write_hello - - # And once reconnected, it should resend an identical request. - req2 = sc2.read_request - assert_equal req1, req2 - - # The reply should be proxied back to the client. - sc2.write_reply( req2[:handle] ) - sc2.write_data( "\xFF" * 4096 ) - - # Check it to make sure it's correct - rsp = timeout(15) { client.read_response } - assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] - assert_equal 0, rsp[:error] - assert_equal req1[:handle], rsp[:handle] - - data = client.read_raw( 4096 ) - assert_equal( ("\xFF" * 4096), data, "Wrong data returned" ) - - sc2.close - server.close - end - - end - - def test_write_request_retried_when_upstream_dies_partway - maker = make_fake_server - - with_proxied_client(4096) do |client| - server, sc1 = maker.value - - # Send the read request to the proxy - client.write( 0, ( "\xFF" * 4096 ) ) - - # ensure we're given the read request - req1 = sc1.read_request - assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic] - assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type] - assert_equal 0, req1[:from] - assert_equal 4096, req1[:len] - data1 = sc1.read_data( 4096 ) - assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" ) - - # Kill the server again, now we're sure the read request has been sent once - sc1.close - - # We expect the proxy to reconnect without our client doing anything. - sc2 = server.accept - sc2.write_hello - - # And once reconnected, it should resend an identical request. - req2 = sc2.read_request - assert_equal req1, req2 - data2 = sc2.read_data( 4096 ) - assert_equal data1, data2 - - # The reply should be proxied back to the client. - sc2.write_reply( req2[:handle] ) - - # Check it to make sure it's correct - rsp = timeout(15) { client.read_response } - assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic] - assert_equal 0, rsp[:error] - assert_equal req1[:handle], rsp[:handle] - - sc2.close - server.close - end - end - - def test_only_one_client_can_connect_to_proxy_at_a_time - with_proxied_client do |client| - - c2 = nil - assert_raises(Timeout::Error) do - timeout(1) do - c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)") - c2.read_hello - end - end - c2.close rescue nil if c2 - end - - - end - end