Proxy prefetch cache becomes a command-line argument.

This commit is contained in:
Alex Young
2014-02-27 14:21:36 +00:00
parent aba802d415
commit fa8023cf69
13 changed files with 435 additions and 240 deletions

View File

@@ -28,7 +28,8 @@ USAGE
-----
$ flexnbd-proxy --addr <ADDR> [ --port <PORT> ]
--conn-addr <ADDR> --conn-port <PORT> [--bind <ADDR>] [option]*
--conn-addr <ADDR> --conn-port <PORT>
[--bind <ADDR>] [--cache[=<CACHE_BYTES>]] [option]*
Proxy requests from an NBD client to an NBD server, resiliently. Only one
client can be connected at a time, and ACLs cannot be applied to the client, as they
@@ -73,6 +74,10 @@ Options
*--conn-port, -P PORT*:
The port of the NBD server to connect to. Required.
*--cache, -c=CACHE_BYTES*:
If given, the size in bytes of read cache to use. CACHE_BYTES
defaults to 4096.
*--help, -h* :
Show command or global help.
@@ -154,6 +159,29 @@ The proxy notices and reconnects, fulfiling any request it has in its buffer.
The data in myfile has been moved between physical servers without the nbd
client process having to be disturbed at all.
READ CACHE
----------
If the --cache option is given at the command line, either without an
argument or with an argument greater than 0, flexnbd-proxy will use a
read-ahead cache. The cache as currently implemented doubles each read
request size, up to a maximum of 2xCACHE_BYTES, and retains the latter
half in a buffer. If the next read request from the client exactly
matches the region held in the buffer, flexnbd-proxy responds from the
cache without making a request to the server.
This pattern is designed to match sequential reads, such as those
performed by a booting virtual machine.
Note: If specifying a cache size, you *must* use this form:
nbd-client$ flexnbd-proxy --cache=XXXX
That is, the '=' is required. This is a limitation of getopt-long.
If no cache size is given, a size of 4096 bytes is assumed. Caching can
be explicitly disabled by setting a size of 0.
BUGS
----

View File

@@ -7,8 +7,9 @@ void mode(char* mode, int argc, char **argv);
#include <getopt.h>
#define GETOPT_ARG(x,s) {(x), 1, 0, (s)}
#define GETOPT_FLAG(x,v) {(x), 0, 0, (v)}
#define GETOPT_ARG(x,s) {(x), required_argument, 0, (s)}
#define GETOPT_FLAG(x,v) {(x), no_argument, 0, (v)}
#define GETOPT_OPTARG(x,s) {(x), optional_argument, 0, (s)}
#define OPT_HELP "help"
#define OPT_ADDR "addr"
@@ -19,6 +20,7 @@ void mode(char* mode, int argc, char **argv);
#define OPT_FROM "from"
#define OPT_SIZE "size"
#define OPT_DENY "default-deny"
#define OPT_CACHE "cache"
#define OPT_UNLINK "unlink"
#define OPT_CONNECT_ADDR "conn-addr"
#define OPT_CONNECT_PORT "conn-port"
@@ -52,6 +54,7 @@ void mode(char* mode, int argc, char **argv);
#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' )
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' )
#define GETOPT_CACHE GETOPT_OPTARG( OPT_CACHE, 'c' )
#define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' )
#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' )
#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' )

View File

@@ -154,8 +154,11 @@ int do_serve( struct server *, struct self_pipe * );
struct mode_readwrite_params {
union mysockaddr connect_to;
union mysockaddr connect_from;
/* FIXME: these should be uint64_t and uint32_t respectively */
off64_t from;
off64_t len;
int data_fd;
int client;
};

View File

@@ -14,6 +14,7 @@ static struct option proxy_options[] = {
GETOPT_CONNECT_ADDR,
GETOPT_CONNECT_PORT,
GETOPT_BIND,
GETOPT_CACHE,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
@@ -29,16 +30,20 @@ static char proxy_help_text[] =
"\t--" OPT_CONNECT_ADDR ",-C <ADDR>\tAddress of the proxied server.\n"
"\t--" OPT_CONNECT_PORT ",-P <PORT>\tPort of the proxied server.\n"
"\t--" OPT_BIND ",-b <ADDR>\tThe address we connect from, as a proxy.\n"
"\t--" OPT_CACHE ",-c[=<CACHE-BYTES>]\tUse a RAM read cache of the given size.\n"
QUIET_LINE
VERBOSE_LINE;
static char proxy_default_cache_size[] = "4096";
void read_proxy_param(
int c,
char **downstream_addr,
char **downstream_port,
char **upstream_addr,
char **upstream_port,
char **bind_addr )
char **bind_addr,
char **cache_bytes)
{
switch( c ) {
case 'h' :
@@ -59,6 +64,9 @@ void read_proxy_param(
case 'b':
*bind_addr = optarg;
break;
case 'c':
*cache_bytes = optarg ? optarg : proxy_default_cache_size;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
@@ -90,6 +98,7 @@ int main( int argc, char *argv[] )
char *upstream_addr = NULL;
char *upstream_port = NULL;
char *bind_addr = NULL;
char *cache_bytes = NULL;
int success;
sigset_t mask;
@@ -114,7 +123,8 @@ int main( int argc, char *argv[] )
&downstream_port,
&upstream_addr,
&upstream_port,
&bind_addr
&bind_addr,
&cache_bytes
);
}
@@ -131,7 +141,8 @@ int main( int argc, char *argv[] )
downstream_port,
upstream_addr,
upstream_port,
bind_addr
bind_addr,
cache_bytes
);
/* Set these *after* proxy has been assigned to */

77
src/proxy/prefetch.c Normal file
View File

@@ -0,0 +1,77 @@
#include "prefetch.h"
#include "util.h"
struct prefetch* prefetch_create( size_t size_bytes ){
struct prefetch* out = xmalloc( sizeof( struct prefetch ) );
NULLCHECK( out );
out->buffer = xmalloc( size_bytes );
NULLCHECK( out->buffer );
out->size = size_bytes;
out->is_full = 0;
out->from = 0;
out->len = 0;
return out;
}
void prefetch_destroy( struct prefetch *prefetch ) {
if( prefetch ) {
free( prefetch->buffer );
free( prefetch );
}
}
size_t prefetch_size( struct prefetch *prefetch){
if ( prefetch ) {
return prefetch->size;
} else {
return 0;
}
}
void prefetch_set_is_empty( struct prefetch *prefetch ){
prefetch_set_full( prefetch, 0 );
}
void prefetch_set_is_full( struct prefetch *prefetch ){
prefetch_set_full( prefetch, 1 );
}
void prefetch_set_full( struct prefetch *prefetch, int val ){
if( prefetch ) {
prefetch->is_full = val;
}
}
int prefetch_is_full( struct prefetch *prefetch ){
if( prefetch ) {
return prefetch->is_full;
} else {
return 0;
}
}
int prefetch_contains( struct prefetch* prefetch, uint64_t from, uint32_t len ){
NULLCHECK( prefetch );
return from == prefetch->from && len == prefetch->len;
}
char *prefetch_offset( struct prefetch* prefetch, uint64_t from ){
return prefetch->buffer;
}
/*
int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len ){
NULLCHECK( prefetch );
return from >= prefetch->from &&
from + len <= prefetch->from + prefetch->len;
}
char *prefetch_offset( struct prefetch *prefetch, uint64_t from ){
NULLCHECK( prefetch );
return prefetch->buffer + (from - prefetch->from);
}
*/

View File

@@ -1,14 +1,33 @@
#ifndef PREFETCH_H
#define PREFETCH_H
#include <stdint.h>
#include <stddef.h>
#define PREFETCH_BUFSIZE 4096
struct prefetch {
/* True if there is data in the buffer. */
int is_full;
__be64 from;
__be32 len;
/* The start point of the current content of buffer */
uint64_t from;
/* The length of the current content of buffer */
uint32_t len;
char buffer[PREFETCH_BUFSIZE];
/* The total size of the buffer, in bytes. */
size_t size;
char *buffer;
};
struct prefetch* prefetch_create( size_t size_bytes );
void prefetch_destroy( struct prefetch *prefetch );
size_t prefetch_size( struct prefetch *);
void prefetch_set_is_empty( struct prefetch *prefetch );
void prefetch_set_is_full( struct prefetch *prefetch );
void prefetch_set_full( struct prefetch *prefetch, int val );
int prefetch_is_full( struct prefetch *prefetch );
int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len );
char *prefetch_offset( struct prefetch *prefetch, uint64_t from );
#endif

View File

@@ -1,9 +1,7 @@
#include "proxy.h"
#include "readwrite.h"
#ifdef PREFETCH
#include "prefetch.h"
#endif
#include "ioutil.h"
@@ -20,7 +18,8 @@ struct proxier* proxy_create(
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind )
char* s_upstream_bind,
char* s_cache_bytes )
{
struct proxier* out;
out = xmalloc( sizeof( struct proxier ) );
@@ -65,9 +64,16 @@ struct proxier* proxy_create(
out->downstream_fd = -1;
out->upstream_fd = -1;
#ifdef PREFETCH
out->prefetch = xmalloc( sizeof( struct prefetch ) );
#endif
out->prefetch = NULL;
if ( s_cache_bytes ){
int cache_bytes = atoi( s_cache_bytes );
/* leaving this off or setting a cache size of zero or
* less results in no cache.
*/
if ( cache_bytes >= 0 ) {
out->prefetch = prefetch_create( cache_bytes );
}
}
out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) );
out->req.buf = xmalloc( NBD_MAX_SIZE );
@@ -76,14 +82,22 @@ struct proxier* proxy_create(
return out;
}
int proxy_prefetches( struct proxier* proxy ) {
NULLCHECK( proxy );
return proxy->prefetch != NULL;
}
int proxy_prefetch_bufsize( struct proxier* proxy ){
NULLCHECK( proxy );
return prefetch_size( proxy->prefetch );
}
void proxy_destroy( struct proxier* proxy )
{
free( proxy->init.buf );
free( proxy->req.buf );
free( proxy->rsp.buf );
#ifdef PREFETCH
free( proxy->prefetch );
#endif
prefetch_destroy( proxy->prefetch );
free( proxy );
}
@@ -279,10 +293,9 @@ static inline int proxy_state_upstream( int state )
state == WRITE_TO_UPSTREAM || state == READ_FROM_UPSTREAM;
}
#ifdef PREFETCH
int proxy_prefetch_for_request( struct proxier* proxy, int state )
{
NULLCHECK( proxy );
struct nbd_request* req = &proxy->req_hdr;
struct nbd_reply* rsp = &proxy->rsp_hdr;
@@ -291,23 +304,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
int is_read = ( req->type & REQUEST_MASK ) == REQUEST_READ;
int prefetch_start = req->from;
int prefetch_end = req->from + ( req->len * 2 );
/* We only want to consider prefetching if we know we're not
* getting too much data back, if it's a read request, and if
* the prefetch won't try to read past the end of the file.
*/
int prefetching = req->len <= PREFETCH_BUFSIZE && is_read &&
prefetch_start < prefetch_end && prefetch_end <= proxy->upstream_size;
if ( is_read ) {
/* See if we can respond with what's in our prefetch
* cache */
if ( proxy->prefetch->is_full &&
req->from == proxy->prefetch->from &&
req->len == proxy->prefetch->len ) {
if ( prefetch_is_full( proxy->prefetch ) &&
prefetch_contains( proxy->prefetch, req->from, req->len ) ) {
/* HUZZAH! A match! */
debug( "Prefetch hit!" );
@@ -322,10 +323,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
/* and the data */
memcpy(
proxy->rsp.buf + NBD_REPLY_SIZE,
proxy->prefetch->buffer, proxy->prefetch->len
prefetch_offset( proxy->prefetch, req->from ),
req->len
);
proxy->rsp.size = NBD_REPLY_SIZE + proxy->prefetch->len;
proxy->rsp.size = NBD_REPLY_SIZE + req->len;
proxy->rsp.needle = 0;
/* return early, our work here is done */
@@ -339,11 +341,24 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
* whether we can keep it or not.
*/
debug( "Blowing away prefetch cache on type %d request.", req->type );
proxy->prefetch->is_full = 0;
prefetch_set_is_empty( proxy->prefetch );
}
debug( "Prefetch cache MISS!");
uint64_t prefetch_start = req->from;
/* We prefetch what we expect to be the next request. */
uint64_t prefetch_end = req->from + ( req->len * 2 );
/* We only want to consider prefetching if we know we're not
* getting too much data back, if it's a read request, and if
* the prefetch won't try to read past the end of the file.
*/
int prefetching =
req->len <= prefetch_size( proxy->prefetch ) &&
is_read &&
prefetch_start < prefetch_end &&
prefetch_end <= (uint64_t)proxy->upstream_size; /* FIXME: we shouldn't need a cast - upstream_size should be uint64_t */
/* We pull the request out of the proxy struct, rewrite the
* request size, and write it back.
@@ -354,7 +369,8 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
req->len *= 2;
debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len );
debug( "Prefetching additional %"PRIu32" bytes",
req->len - proxy->prefetch_req_orig_len );
nbd_h2r_request( req, req_raw );
}
@@ -371,10 +387,10 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state )
prefetched_bytes = proxy->req_hdr.len - proxy->prefetch_req_orig_len;
debug( "Prefetched %d bytes", prefetched_bytes );
debug( "Prefetched additional %d bytes", prefetched_bytes );
memcpy(
proxy->rsp.buf + proxy->prefetch_req_orig_len,
&(proxy->prefetch->buffer),
proxy->prefetch->buffer,
proxy->rsp.buf + proxy->prefetch_req_orig_len + NBD_REPLY_SIZE,
prefetched_bytes
);
@@ -389,13 +405,12 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state )
proxy->rsp.size -= prefetched_bytes;
/* And we need to reset these */
proxy->prefetch->is_full = 1;
prefetch_set_is_full( proxy->prefetch );
proxy->is_prefetch_req = 0;
return state;
}
#endif
int proxy_read_from_downstream( struct proxier *proxy, int state )
@@ -476,10 +491,8 @@ int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state )
return state;
}
#ifdef PREFETCH
/* Data may have changed while we were disconnected */
proxy->prefetch->is_full = 0;
#endif
prefetch_set_is_empty( proxy->prefetch );
info( "Connected to upstream on fd %i", proxy->upstream_fd );
return READ_INIT_FROM_UPSTREAM;
@@ -762,14 +775,12 @@ void proxy_session( struct proxier* proxy )
case READ_FROM_DOWNSTREAM:
if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) {
state = proxy_read_from_downstream( proxy, state );
#ifdef PREFETCH
/* Check if we can fulfil the request from prefetch, or
* rewrite the request to fill the prefetch buffer if needed
*/
if ( state == WRITE_TO_UPSTREAM ) {
if ( proxy_prefetches( proxy ) && state == WRITE_TO_UPSTREAM ) {
state = proxy_prefetch_for_request( proxy, state );
}
#endif
}
break;
case CONNECT_TO_UPSTREAM:
@@ -800,12 +811,10 @@ void proxy_session( struct proxier* proxy )
if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) {
state = proxy_read_from_upstream( proxy, state );
}
# ifdef PREFETCH
/* Fill the prefetch buffer and rewrite the reply, if needed */
if ( state == WRITE_TO_DOWNSTREAM ) {
if ( proxy_prefetches( proxy ) && state == WRITE_TO_DOWNSTREAM ) {
state = proxy_prefetch_for_reply( proxy, state );
}
#endif
break;
case WRITE_TO_DOWNSTREAM:
if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) {

View File

@@ -48,6 +48,7 @@ struct proxier {
int upstream_fd;
/* This is the size we advertise to the downstream server */
/* FIXME: should be uint64_t */
off64_t upstream_size;
/* We transform the raw request header into here */
@@ -73,7 +74,8 @@ struct proxier {
uint64_t req_count;
int hello_sent;
#ifdef PREFETCH
/** These are only used if we pass --cache on the command line */
/* While the in-flight request has been munged by prefetch, these two are
* set to true, and the original length of the request, respectively */
int is_prefetch_req;
@@ -81,7 +83,8 @@ struct proxier {
/* And here, we actually store the prefetched data once it's returned */
struct prefetch *prefetch;
#endif
/** */
};
struct proxier* proxy_create(
@@ -89,7 +92,8 @@ struct proxier* proxy_create(
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind );
char* s_upstream_bind,
char* s_cache_bytes);
int do_proxy( struct proxier* proxy );
void proxy_cleanup( struct proxier* proxy );
void proxy_destroy( struct proxier* proxy );

View File

@@ -21,6 +21,11 @@ class Environment
@fake_pid = nil
end
def prefetch_proxy!
@nbd1.prefetch_proxy = true
@nbd2.prefetch_proxy = true
end
def proxy1(port=@port2)
@nbd1.proxy(@ip, port)
end

View File

@@ -198,6 +198,8 @@ module FlexNBD
end
end
attr_accessor :prefetch_proxy
def initialize( bin, ip, port )
@bin = bin
@do_debug = ENV['DEBUG']
@@ -208,6 +210,7 @@ module FlexNBD
@ip = ip
@port = port
@kill = []
@prefetch_proxy = false
end
@@ -247,6 +250,7 @@ module FlexNBD
"--port #{port} "\
"--conn-addr #{connect_ip} "\
"--conn-port #{connect_port} "\
"#{prefetch_proxy ? "--cache " : ""}"\
"#{@debug}"
end

View File

@@ -0,0 +1,190 @@
# encoding: utf-8
require 'flexnbd/fake_source'
require 'flexnbd/fake_dest'
module ProxyTests
def with_proxied_client( override_size = nil )
@env.serve1 unless @server_up
@env.proxy2 unless @proxy_up
@env.nbd2.can_die(0)
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
begin
result = client.read_hello
assert_equal "NBDMAGIC", result[:magic]
assert_equal override_size || @env.file1.size, result[:size]
yield client
ensure
client.close rescue nil
end
end
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
assert_raises(RuntimeError) { @env.proxy1 }
end
def test_read_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write_read_request(offset, 4096, "myhandle")
rsp = client.read_response
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
orig_data = @env.file1.read(offset, 4096)
data = client.read_raw(4096)
assert_equal 4096, orig_data.size
assert_equal 4096, data.size
assert_equal( orig_data, data,
"Returned data does not match on request #{n+1}" )
end
end
end
def test_write_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write(offset, "\xFF" * 4096)
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
data = @env.file1.read(offset, 4096)
assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" )
end
end
end
def make_fake_server
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
@server_up = true
# We return a thread here because accept() and connect() both block for us
Thread.new do
sc = server.accept # just tell the supervisor we're up
sc.write_hello
[ server, sc ]
end
end
def test_read_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write_read_request( 0, 4096 )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
assert_equal 0, req1[:from]
assert_not_equal 0, req1[:len]
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
sc2.write_data( "\xFF" * 4096 )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
data = client.read_raw( 4096 )
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
sc2.close
server.close
end
end
def test_write_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write( 0, ( "\xFF" * 4096 ) )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
data1 = sc1.read_data( 4096 )
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
data2 = sc2.read_data( 4096 )
assert_equal data1, data2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
sc2.close
server.close
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |client|
c2 = nil
assert_raises(Timeout::Error) do
timeout(1) do
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
c2.read_hello
end
end
c2.close rescue nil if c2
end
end
end

View File

@@ -0,0 +1,22 @@
require 'test/unit'
require 'environment'
require 'proxy_tests'
class TestPrefetchProxyMode < Test::Unit::TestCase
include ProxyTests
def setup
super
@env = Environment.new
@env.prefetch_proxy!
@env.writefile1( "f" * 16 )
end
def teardown
@env.cleanup
super
end
end

View File

@@ -1,200 +1,20 @@
require 'test/unit'
require 'environment'
require 'flexnbd/fake_source'
require 'flexnbd/fake_dest'
require 'proxy_tests'
class TestProxyMode < Test::Unit::TestCase
include ProxyTests
def setup
super
@env = Environment.new
@env.writefile1( "0" * 16 )
@env.writefile1( "f" * 16 )
end
def teardown
@env.cleanup
super
end
def with_proxied_client( override_size = nil )
@env.serve1 unless @server_up
@env.proxy2 unless @proxy_up
@env.nbd2.can_die(0)
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
begin
result = client.read_hello
assert_equal "NBDMAGIC", result[:magic]
assert_equal override_size || @env.file1.size, result[:size]
yield client
ensure
client.close rescue nil
end
end
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
assert_raises(RuntimeError) { @env.proxy1 }
end
def test_read_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write_read_request(offset, 4096, "myhandle")
rsp = client.read_response
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
orig_data = @env.file1.read(offset, 4096)
data = client.read_raw(4096)
assert_equal 4096, orig_data.size
assert_equal 4096, data.size
assert_equal( orig_data, data, "Returned data does not match" )
end
end
end
def test_write_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write(offset, "\xFF" * 4096)
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
data = @env.file1.read(offset, 4096)
assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" )
end
end
end
def make_fake_server
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
@server_up = true
# We return a thread here because accept() and connect() both block for us
Thread.new do
sc = server.accept # just tell the supervisor we're up
sc.write_hello
[ server, sc ]
end
end
def test_read_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write_read_request( 0, 4096 )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
assert_equal 0, req1[:from]
assert_not_equal 0, req1[:len]
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
sc2.write_data( "\xFF" * 4096 )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
data = client.read_raw( 4096 )
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
sc2.close
server.close
end
end
def test_write_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write( 0, ( "\xFF" * 4096 ) )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
data1 = sc1.read_data( 4096 )
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
data2 = sc2.read_data( 4096 )
assert_equal data1, data2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
sc2.close
server.close
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |client|
c2 = nil
assert_raises(Timeout::Error) do
timeout(1) do
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
c2.read_hello
end
end
c2.close rescue nil if c2
end
end
end