Compare commits
36 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ba14943b60 | ||
![]() |
4a709e73f8 | ||
![]() |
91a8946ddc | ||
![]() |
20f99b4554 | ||
![]() |
c363991cfd | ||
![]() |
c41eeff2fc | ||
![]() |
5960e4d10b | ||
![]() |
f0911b5c6c | ||
![]() |
b063f41ba8 | ||
![]() |
28c7e43e45 | ||
![]() |
9326b6b882 | ||
![]() |
f93476ebd3 | ||
![]() |
666b60ae1c | ||
![]() |
f48bf2b296 | ||
![]() |
705164ae3b | ||
![]() |
dbe7053bf3 | ||
![]() |
fa8023cf69 | ||
![]() |
aba802d415 | ||
![]() |
d146102c2c | ||
![]() |
5551373073 | ||
![]() |
77f333423b | ||
![]() |
ffa45879d7 | ||
![]() |
2fa1ce8e6b | ||
![]() |
6f540ce238 | ||
![]() |
f9a3447bc9 | ||
![]() |
7806ec11ee | ||
![]() |
1817c13acb | ||
![]() |
97c8d7a358 | ||
![]() |
8cf92af900 | ||
![]() |
5185be39c9 | ||
![]() |
374b4c616e | ||
![]() |
50ec8fb7cc | ||
![]() |
5fc9ad6fd8 | ||
![]() |
85c463c4bd | ||
![]() |
278a3151a8 | ||
![]() |
0ea66b1e04 |
9
.gitignore
vendored
Normal file
9
.gitignore
vendored
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
**/*.o
|
||||||
|
**/*~
|
||||||
|
flexnbd
|
||||||
|
build/
|
||||||
|
pkg/
|
||||||
|
**/*.orig
|
||||||
|
**/.*.swp
|
||||||
|
cscope.out
|
||||||
|
valgrind.out
|
@@ -1,9 +0,0 @@
|
|||||||
.o$
|
|
||||||
~$
|
|
||||||
^flexnbd$
|
|
||||||
^build/
|
|
||||||
^pkg/
|
|
||||||
\.orig$
|
|
||||||
.*\.swp$
|
|
||||||
cscope.out$
|
|
||||||
valgrind.out$
|
|
36
Makefile
36
Makefile
@@ -11,9 +11,40 @@ ifdef DEBUG
|
|||||||
else
|
else
|
||||||
CFLAGS_EXTRA=-O2
|
CFLAGS_EXTRA=-O2
|
||||||
endif
|
endif
|
||||||
|
CFLAGS_EXTRA += -fPIC --std=gnu99
|
||||||
|
LDFLAGS_EXTRA += -Wl,--relax,--gc-sections
|
||||||
|
|
||||||
|
TOOLCHAIN := $(shell $(CC) --version|awk '/Debian/ {print "debian";exit;}')
|
||||||
|
#
|
||||||
|
# This bit adds extra flags depending of the distro, and the
|
||||||
|
# architecture. To make sure debian packages have the right
|
||||||
|
# set of 'native' flags on them
|
||||||
|
#
|
||||||
|
ifeq ($(TOOLCHAIN),debian)
|
||||||
|
DEBARCH := $(shell dpkg-architecture -qDEB_BUILD_ARCH)
|
||||||
|
ifeq ($(DEBARCH),$(filter $(DEBARCH),amd64 i386))
|
||||||
|
CFLAGS_EXTRA += -march=native
|
||||||
|
endif
|
||||||
|
ifeq ($(DEBARCH),armhf)
|
||||||
|
CFLAGS_EXTRA += -march=armv7-a -mtune=cortex-a8 -mfpu=neon
|
||||||
|
endif
|
||||||
|
LDFLAGS_EXTRA += -L$(LIB) -Wl,-rpath,${shell readlink -f ${LIB}}
|
||||||
|
else
|
||||||
|
LDFLAGS_EXTRA += -L$(LIB) -Wl,-rpath-link,$(LIB)
|
||||||
|
endif
|
||||||
|
|
||||||
|
|
||||||
|
# The -Wunreachable-code warning is only implemented in clang, but it
|
||||||
|
# doesn't break anything for gcc to see it.
|
||||||
|
WARNINGS=-Wall \
|
||||||
|
-Wextra \
|
||||||
|
-Werror-implicit-function-declaration \
|
||||||
|
-Wstrict-prototypes \
|
||||||
|
-Wno-missing-field-initializers \
|
||||||
|
-Wunreachable-code
|
||||||
|
CCFLAGS=-D_GNU_SOURCE=1 $(WARNINGS) $(CFLAGS_EXTRA) $(CFLAGS)
|
||||||
|
LLDFLAGS=-lm -lrt -lev $(LDFLAGS_EXTRA) $(LDFLAGS)
|
||||||
|
|
||||||
CCFLAGS=-D_GNU_SOURCE=1 -Wall -Wextra -Werror-implicit-function-declaration -Wstrict-prototypes -Wno-missing-field-initializers $(CFLAGS_EXTRA) $(CFLAGS)
|
|
||||||
LLDFLAGS=-lrt -lev $(LDFLAGS_EXTRA) $(LDFLAGS)
|
|
||||||
|
|
||||||
CC?=gcc
|
CC?=gcc
|
||||||
|
|
||||||
@@ -23,6 +54,7 @@ COMPILE=$(CC) $(INC) -c $(CCFLAGS)
|
|||||||
SAVEDEP=$(CC) $(INC) -MM $(CCFLAGS)
|
SAVEDEP=$(CC) $(INC) -MM $(CCFLAGS)
|
||||||
LINK=$(CC) $(LLDFLAGS) -Isrc $(LIBS)
|
LINK=$(CC) $(LLDFLAGS) -Isrc $(LIBS)
|
||||||
|
|
||||||
|
LIB=build/
|
||||||
|
|
||||||
EXISTING_OBJS := $(wildcard build/*.o)
|
EXISTING_OBJS := $(wildcard build/*.o)
|
||||||
-include $(EXISTING_OBJS:.o=.d)
|
-include $(EXISTING_OBJS:.o=.d)
|
||||||
|
@@ -28,7 +28,8 @@ USAGE
|
|||||||
-----
|
-----
|
||||||
|
|
||||||
$ flexnbd-proxy --addr <ADDR> [ --port <PORT> ]
|
$ flexnbd-proxy --addr <ADDR> [ --port <PORT> ]
|
||||||
--conn-addr <ADDR> --conn-port <PORT> [--bind <ADDR>] [option]*
|
--conn-addr <ADDR> --conn-port <PORT>
|
||||||
|
[--bind <ADDR>] [--cache[=<CACHE_BYTES>]] [option]*
|
||||||
|
|
||||||
Proxy requests from an NBD client to an NBD server, resiliently. Only one
|
Proxy requests from an NBD client to an NBD server, resiliently. Only one
|
||||||
client can be connected at a time, and ACLs cannot be applied to the client, as they
|
client can be connected at a time, and ACLs cannot be applied to the client, as they
|
||||||
@@ -73,6 +74,10 @@ Options
|
|||||||
*--conn-port, -P PORT*:
|
*--conn-port, -P PORT*:
|
||||||
The port of the NBD server to connect to. Required.
|
The port of the NBD server to connect to. Required.
|
||||||
|
|
||||||
|
*--cache, -c=CACHE_BYTES*:
|
||||||
|
If given, the size in bytes of read cache to use. CACHE_BYTES
|
||||||
|
defaults to 4096.
|
||||||
|
|
||||||
*--help, -h* :
|
*--help, -h* :
|
||||||
Show command or global help.
|
Show command or global help.
|
||||||
|
|
||||||
@@ -154,6 +159,29 @@ The proxy notices and reconnects, fulfiling any request it has in its buffer.
|
|||||||
The data in myfile has been moved between physical servers without the nbd
|
The data in myfile has been moved between physical servers without the nbd
|
||||||
client process having to be disturbed at all.
|
client process having to be disturbed at all.
|
||||||
|
|
||||||
|
READ CACHE
|
||||||
|
----------
|
||||||
|
|
||||||
|
If the --cache option is given at the command line, either without an
|
||||||
|
argument or with an argument greater than 0, flexnbd-proxy will use a
|
||||||
|
read-ahead cache. The cache as currently implemented doubles each read
|
||||||
|
request size, up to a maximum of 2xCACHE_BYTES, and retains the latter
|
||||||
|
half in a buffer. If the next read request from the client exactly
|
||||||
|
matches the region held in the buffer, flexnbd-proxy responds from the
|
||||||
|
cache without making a request to the server.
|
||||||
|
|
||||||
|
This pattern is designed to match sequential reads, such as those
|
||||||
|
performed by a booting virtual machine.
|
||||||
|
|
||||||
|
Note: If specifying a cache size, you *must* use this form:
|
||||||
|
|
||||||
|
nbd-client$ flexnbd-proxy --cache=XXXX
|
||||||
|
|
||||||
|
That is, the '=' is required. This is a limitation of getopt-long.
|
||||||
|
|
||||||
|
If no cache size is given, a size of 4096 bytes is assumed. Caching can
|
||||||
|
be explicitly disabled by setting a size of 0.
|
||||||
|
|
||||||
BUGS
|
BUGS
|
||||||
----
|
----
|
||||||
|
|
||||||
|
8
Rakefile
8
Rakefile
@@ -50,3 +50,11 @@ end
|
|||||||
|
|
||||||
desc "Remove all build targets, binaries and temporary files"
|
desc "Remove all build targets, binaries and temporary files"
|
||||||
maketask :clean
|
maketask :clean
|
||||||
|
|
||||||
|
file "debian/changelog" do
|
||||||
|
FileUtils.mkdir_p "debian"
|
||||||
|
sh "hg log --style=changelog.template > debian/changelog"
|
||||||
|
end
|
||||||
|
|
||||||
|
desc "Generate the changelog"
|
||||||
|
task :changelog => "debian/changelog"
|
||||||
|
198
debian/changelog
vendored
198
debian/changelog
vendored
@@ -1,198 +0,0 @@
|
|||||||
flexnbd (0.0.1-33) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added tag 0.0.1 for changeset 27409c2c1313 [r33]
|
|
||||||
|
|
||||||
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 17:11:10 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-31) unstable; urgency=low
|
|
||||||
|
|
||||||
* Fixed bug where ACL was accidentally deleted when being set from control [r31]
|
|
||||||
|
|
||||||
-- mbloch <mbloch> Wed, 30 May 2012 13:03:02 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-30) unstable; urgency=low
|
|
||||||
|
|
||||||
* Fix the usage message [r30]
|
|
||||||
|
|
||||||
-- nick <nick@bytemark.co.uk> Wed, 30 May 2012 11:28:32 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-29) unstable; urgency=low
|
|
||||||
|
|
||||||
* Fixed race in tests. [r29]
|
|
||||||
|
|
||||||
-- mbloch <mbloch> Tue, 29 May 2012 17:01:54 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-28) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added getopt_long command-line handling. [r28]
|
|
||||||
|
|
||||||
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 15:19:40 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-27) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added .h files to the Rakefile [r27]
|
|
||||||
|
|
||||||
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 15:06:06 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-26) unstable; urgency=low
|
|
||||||
|
|
||||||
* Rearranged the project to have src/ and build/ directories [r26]
|
|
||||||
|
|
||||||
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 09:51:20 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-25) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added .INCOMPLETE hack to aid with marking finished transfers. [r25]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 29 May 2012 11:24:24 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-24) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added mirror write barrier / final pass stuff & clean exit afterwards. [r24]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 29 May 2012 04:03:28 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-23) unstable; urgency=low
|
|
||||||
|
|
||||||
* Lots of errors spotted by Alex fixed, added mutexes to accept & I/O, added [r23]
|
|
||||||
|
|
||||||
-- mbloch <mbloch> Tue, 29 May 2012 00:59:12 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-22) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added another write/read test, fixed bugs in splice() usage and IPv6 [r22]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Sun, 27 May 2012 14:40:16 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-21) unstable; urgency=low
|
|
||||||
|
|
||||||
* First few external tests with test/unit, some minor tidying of internal data [r21]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Thu, 24 May 2012 01:39:35 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-20) unstable; urgency=low
|
|
||||||
|
|
||||||
* Pulled some duplicated code out of control.c into [r20]
|
|
||||||
|
|
||||||
-- mbloch <mbloch> Wed, 23 May 2012 14:03:30 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-19) unstable; urgency=low
|
|
||||||
|
|
||||||
* Split control-socket functions into separate file. [r19]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Wed, 23 May 2012 00:42:14 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-18) unstable; urgency=low
|
|
||||||
|
|
||||||
* Fixed mirroring to work (error reporting suspect though). [r18]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 22 May 2012 00:22:06 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-17) unstable; urgency=low
|
|
||||||
|
|
||||||
* Initial, untested mirror implementation and resolved some type confusion [r17]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 04:03:17 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-16) unstable; urgency=low
|
|
||||||
|
|
||||||
* More valgrind-found bugs, extracted open_and_mmap from main code. [r16]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 04:00:45 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-15) unstable; urgency=low
|
|
||||||
|
|
||||||
* Fixed some uninitialised variables courtesy of valgrind. [r15]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 03:59:43 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-14) unstable; urgency=low
|
|
||||||
|
|
||||||
* Mostly finished bitset tests, fixed test build to include utilities, remove [r14]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 03:17:32 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-13) unstable; urgency=low
|
|
||||||
|
|
||||||
* Tweaks to bitset.h, established a C test framework. [r13]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Sun, 20 May 2012 14:38:46 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-12) unstable; urgency=low
|
|
||||||
|
|
||||||
* Fixed segfaulting access control, allowed change to acl via control socket. [r12]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Sat, 19 May 2012 12:48:03 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-11) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added dummy control socket answering / changed serve_accept_loop to use [r11]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Fri, 18 May 2012 23:39:16 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-10) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added control socket, doesn't do anything yet. [r10]
|
|
||||||
|
|
||||||
-- mbloch <mbloch> Fri, 18 May 2012 18:44:34 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-9) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added .hgignore file [r9]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Fri, 18 May 2012 13:25:54 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-8) unstable; urgency=low
|
|
||||||
|
|
||||||
* Stopped NBD writes from committing all-zero blocks to disc (tentative, needs [r8]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Fri, 18 May 2012 13:24:35 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-7) unstable; urgency=low
|
|
||||||
|
|
||||||
* Split code out into separate compilation units (first pass, anyway). [r7]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Thu, 17 May 2012 20:14:22 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-6) unstable; urgency=low
|
|
||||||
|
|
||||||
* Non-functioning commit, half-way through adding sparse bitmap feature. [r6]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Thu, 17 May 2012 11:54:25 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-5) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added write mode. [r5]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Wed, 16 May 2012 11:58:41 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-4) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added working read via splice syscall. [r4]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Wed, 16 May 2012 03:20:09 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-3) unstable; urgency=low
|
|
||||||
|
|
||||||
* Added Rakefile [r3]
|
|
||||||
|
|
||||||
-- mbloch <mbloch> Wed, 16 May 2012 01:27:14 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-2) unstable; urgency=low
|
|
||||||
|
|
||||||
* Silly bug fixes, added ACL support, added parser for read/write requests. [r2]
|
|
||||||
|
|
||||||
-- mbloch <mbloch> Tue, 15 May 2012 18:40:58 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-1) unstable; urgency=low
|
|
||||||
|
|
||||||
* Some debugging, got it to serve. [r1]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 15 May 2012 03:16:19 +0100
|
|
||||||
|
|
||||||
flexnbd (0.0.1-0) unstable; urgency=low
|
|
||||||
|
|
||||||
* It compiles :) [r0]
|
|
||||||
|
|
||||||
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 15 May 2012 02:42:03 +0100
|
|
||||||
|
|
1
debian/compat
vendored
1
debian/compat
vendored
@@ -1 +0,0 @@
|
|||||||
7
|
|
25
debian/control
vendored
25
debian/control
vendored
@@ -1,25 +0,0 @@
|
|||||||
Source: flexnbd
|
|
||||||
Section: unknown
|
|
||||||
Priority: extra
|
|
||||||
Maintainer: Alex Young <alex@bytemark.co.uk>
|
|
||||||
Build-Depends: cdbs, debhelper (>= 7.0.50), ruby, rake, gcc, libev-dev
|
|
||||||
Standards-Version: 3.8.1
|
|
||||||
Homepage: http://bigv.io/
|
|
||||||
|
|
||||||
Package: flexnbd
|
|
||||||
Architecture: any
|
|
||||||
Depends: ${shlibs:Depends}, ${misc:Depends}, libev3
|
|
||||||
Description: FlexNBD server
|
|
||||||
An NBD server offering push-mirroring and intelligent sparse file handling
|
|
||||||
|
|
||||||
Package: flexnbd-dbg
|
|
||||||
Architecture: any
|
|
||||||
Section: debug
|
|
||||||
Priority: extra
|
|
||||||
Depends:
|
|
||||||
flexnbd (= ${binary:Version}),
|
|
||||||
${misc:Depends}
|
|
||||||
Description: debugging symbols for flexnbd
|
|
||||||
An NBD server offering push-mirroring and intelligent sparse file handling
|
|
||||||
.
|
|
||||||
This package contains the debugging symbols for flexnbd.
|
|
53
debian/copyright
vendored
53
debian/copyright
vendored
@@ -1,53 +0,0 @@
|
|||||||
This work was packaged for Debian by:
|
|
||||||
|
|
||||||
Alex Young <alex@bytemark.co.uk> on Wed, 30 May 2012 16:46:58 +0100
|
|
||||||
|
|
||||||
It was downloaded from:
|
|
||||||
|
|
||||||
<url://example.com>
|
|
||||||
|
|
||||||
Upstream Author(s):
|
|
||||||
|
|
||||||
<put author's name and email here>
|
|
||||||
<likewise for another author>
|
|
||||||
|
|
||||||
Copyright:
|
|
||||||
|
|
||||||
<Copyright (C) YYYY Firstname Lastname>
|
|
||||||
<likewise for another author>
|
|
||||||
|
|
||||||
License:
|
|
||||||
|
|
||||||
### SELECT: ###
|
|
||||||
This package is free software; you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU General Public License as published by
|
|
||||||
the Free Software Foundation; either version 2 of the License, or
|
|
||||||
(at your option) any later version.
|
|
||||||
### OR ###
|
|
||||||
This package is free software; you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU General Public License version 2 as
|
|
||||||
published by the Free Software Foundation.
|
|
||||||
##########
|
|
||||||
|
|
||||||
This package is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
|
||||||
|
|
||||||
On Debian systems, the complete text of the GNU General
|
|
||||||
Public License version 2 can be found in "/usr/share/common-licenses/GPL-2".
|
|
||||||
|
|
||||||
The Debian packaging is:
|
|
||||||
|
|
||||||
Copyright (C) 2012 Alex Young <alex@bytemark.co.uk>
|
|
||||||
|
|
||||||
you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU General Public License as published by
|
|
||||||
the Free Software Foundation; either version 2 of the License, or
|
|
||||||
(at your option) any later version.
|
|
||||||
|
|
||||||
# Please also look if there are files or directories which have a
|
|
||||||
# different copyright/license attached and list them here.
|
|
5
debian/flexnbd.install
vendored
5
debian/flexnbd.install
vendored
@@ -1,5 +0,0 @@
|
|||||||
build/flexnbd usr/bin
|
|
||||||
build/flexnbd-proxy usr/bin
|
|
||||||
build/flexnbd.1.gz usr/share/man/man1
|
|
||||||
build/flexnbd-proxy.1.gz usr/share/man/man1
|
|
||||||
|
|
12
debian/rules
vendored
12
debian/rules
vendored
@@ -1,12 +0,0 @@
|
|||||||
#!/usr/bin/make -f
|
|
||||||
# -*- makefile -*-
|
|
||||||
|
|
||||||
# Uncomment this to turn on verbose mode.
|
|
||||||
#export DH_VERBOSE=1
|
|
||||||
|
|
||||||
%:
|
|
||||||
dh $@
|
|
||||||
|
|
||||||
.PHONY: override_dh_strip
|
|
||||||
override_dh_strip:
|
|
||||||
dh_strip --dbg-package=flexnbd-dbg
|
|
1
debian/source/format
vendored
1
debian/source/format
vendored
@@ -1 +0,0 @@
|
|||||||
3.0 (native)
|
|
@@ -31,8 +31,6 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
|
|||||||
|
|
||||||
for (offset = 0; offset < allocation_map->size; ) {
|
for (offset = 0; offset < allocation_map->size; ) {
|
||||||
|
|
||||||
unsigned int i;
|
|
||||||
|
|
||||||
fiemap->fm_start = offset;
|
fiemap->fm_start = offset;
|
||||||
|
|
||||||
fiemap->fm_length = max_length;
|
fiemap->fm_length = max_length;
|
||||||
@@ -49,7 +47,7 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
|
|||||||
return 0; /* it's up to the caller to free the map */
|
return 0; /* it's up to the caller to free the map */
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
for ( i = 0; i < fiemap->fm_mapped_extents; i++ ) {
|
for ( unsigned int i = 0; i < fiemap->fm_mapped_extents; i++ ) {
|
||||||
bitset_set_range( allocation_map,
|
bitset_set_range( allocation_map,
|
||||||
fiemap->fm_extents[i].fe_logical,
|
fiemap->fm_extents[i].fe_logical,
|
||||||
fiemap->fm_extents[i].fe_length );
|
fiemap->fm_extents[i].fe_length );
|
||||||
@@ -76,17 +74,18 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **out_map)
|
int open_and_mmap(const char* filename, int* out_fd, uint64_t *out_size, void **out_map)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* size and out_size are intentionally of different types.
|
||||||
|
* lseek64() uses off64_t to signal errors in the sign bit.
|
||||||
|
* Since we check for these errors before trying to assign to
|
||||||
|
* *out_size, we know *out_size can never go negative.
|
||||||
|
*/
|
||||||
off64_t size;
|
off64_t size;
|
||||||
|
|
||||||
/* O_DIRECT seems to be intermittently supported. Leaving it as
|
/* O_DIRECT should not be used with mmap() */
|
||||||
* a compile-time option for now. */
|
|
||||||
#ifdef DIRECT_IO
|
|
||||||
*out_fd = open(filename, O_RDWR | O_DIRECT | O_SYNC );
|
|
||||||
#else
|
|
||||||
*out_fd = open(filename, O_RDWR | O_SYNC );
|
*out_fd = open(filename, O_RDWR | O_SYNC );
|
||||||
#endif
|
|
||||||
|
|
||||||
if (*out_fd < 1) {
|
if (*out_fd < 1) {
|
||||||
warn("open(%s) failed: does it exist?", filename);
|
warn("open(%s) failed: does it exist?", filename);
|
||||||
@@ -109,8 +108,11 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
|
|||||||
warn("mmap64() failed");
|
warn("mmap64() failed");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map);
|
debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
debug("opened %s size %ld on fd %d", filename, size, *out_fd);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@@ -65,7 +65,7 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
|
|||||||
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
|
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
|
||||||
* wrong, returns -1 setting errno, otherwise 0.
|
* wrong, returns -1 setting errno, otherwise 0.
|
||||||
*/
|
*/
|
||||||
int open_and_mmap( const char* filename, int* out_fd, off64_t *out_size, void **out_map);
|
int open_and_mmap( const char* filename, int* out_fd, uint64_t* out_size, void **out_map);
|
||||||
|
|
||||||
|
|
||||||
/** Check to see whether the given file descriptor is closed.
|
/** Check to see whether the given file descriptor is closed.
|
||||||
|
@@ -7,8 +7,9 @@ void mode(char* mode, int argc, char **argv);
|
|||||||
|
|
||||||
#include <getopt.h>
|
#include <getopt.h>
|
||||||
|
|
||||||
#define GETOPT_ARG(x,s) {(x), 1, 0, (s)}
|
#define GETOPT_ARG(x,s) {(x), required_argument, 0, (s)}
|
||||||
#define GETOPT_FLAG(x,v) {(x), 0, 0, (v)}
|
#define GETOPT_FLAG(x,v) {(x), no_argument, 0, (v)}
|
||||||
|
#define GETOPT_OPTARG(x,s) {(x), optional_argument, 0, (s)}
|
||||||
|
|
||||||
#define OPT_HELP "help"
|
#define OPT_HELP "help"
|
||||||
#define OPT_ADDR "addr"
|
#define OPT_ADDR "addr"
|
||||||
@@ -19,6 +20,7 @@ void mode(char* mode, int argc, char **argv);
|
|||||||
#define OPT_FROM "from"
|
#define OPT_FROM "from"
|
||||||
#define OPT_SIZE "size"
|
#define OPT_SIZE "size"
|
||||||
#define OPT_DENY "default-deny"
|
#define OPT_DENY "default-deny"
|
||||||
|
#define OPT_CACHE "cache"
|
||||||
#define OPT_UNLINK "unlink"
|
#define OPT_UNLINK "unlink"
|
||||||
#define OPT_CONNECT_ADDR "conn-addr"
|
#define OPT_CONNECT_ADDR "conn-addr"
|
||||||
#define OPT_CONNECT_PORT "conn-port"
|
#define OPT_CONNECT_PORT "conn-port"
|
||||||
@@ -52,6 +54,7 @@ void mode(char* mode, int argc, char **argv);
|
|||||||
#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' )
|
#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' )
|
||||||
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
|
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
|
||||||
#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' )
|
#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' )
|
||||||
|
#define GETOPT_CACHE GETOPT_OPTARG( OPT_CACHE, 'c' )
|
||||||
#define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' )
|
#define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' )
|
||||||
#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' )
|
#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' )
|
||||||
#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' )
|
#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' )
|
||||||
|
@@ -41,7 +41,7 @@ int socket_connect(struct sockaddr* to, struct sockaddr* from)
|
|||||||
return fd;
|
return fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
int nbd_check_hello( struct nbd_init_raw* init_raw, off64_t* out_size )
|
int nbd_check_hello( struct nbd_init_raw* init_raw, uint64_t* out_size )
|
||||||
{
|
{
|
||||||
if ( strncmp( init_raw->passwd, INIT_PASSWD, 8 ) != 0 ) {
|
if ( strncmp( init_raw->passwd, INIT_PASSWD, 8 ) != 0 ) {
|
||||||
warn( "wrong passwd" );
|
warn( "wrong passwd" );
|
||||||
@@ -62,7 +62,7 @@ fail:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int socket_nbd_read_hello( int fd, off64_t* out_size )
|
int socket_nbd_read_hello( int fd, uint64_t* out_size )
|
||||||
{
|
{
|
||||||
struct nbd_init_raw init_raw;
|
struct nbd_init_raw init_raw;
|
||||||
|
|
||||||
@@ -101,12 +101,13 @@ int socket_nbd_write_hello(int fd, off64_t out_size)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void fill_request(struct nbd_request *request, int type, off64_t from, int len)
|
void fill_request(struct nbd_request *request, int type, uint64_t from, uint32_t len)
|
||||||
{
|
{
|
||||||
request->magic = htobe32(REQUEST_MAGIC);
|
request->magic = htobe32(REQUEST_MAGIC);
|
||||||
request->type = htobe32(type);
|
request->type = htobe32(type);
|
||||||
((int*) request->handle)[0] = rand();
|
uint32_t * randa = (uint32_t*)request->handle;
|
||||||
((int*) request->handle)[1] = rand();
|
randa[0] = rand();
|
||||||
|
randa[1] = rand();
|
||||||
request->from = htobe64(from);
|
request->from = htobe64(from);
|
||||||
request->len = htobe32(len);
|
request->len = htobe32(len);
|
||||||
}
|
}
|
||||||
@@ -149,7 +150,7 @@ void wait_for_data( int fd, int timeout_secs )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs)
|
void socket_nbd_read(int fd, uint64_t from, uint32_t len, int out_fd, void* out_buf, int timeout_secs)
|
||||||
{
|
{
|
||||||
struct nbd_request request;
|
struct nbd_request request;
|
||||||
struct nbd_reply reply;
|
struct nbd_reply reply;
|
||||||
@@ -173,7 +174,7 @@ void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, i
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf, int timeout_secs)
|
void socket_nbd_write(int fd, uint64_t from, uint32_t len, int in_fd, void* in_buf, int timeout_secs)
|
||||||
{
|
{
|
||||||
struct nbd_request request;
|
struct nbd_request request;
|
||||||
struct nbd_reply reply;
|
struct nbd_reply reply;
|
||||||
@@ -213,10 +214,12 @@ int socket_nbd_disconnect( int fd )
|
|||||||
}
|
}
|
||||||
|
|
||||||
#define CHECK_RANGE(error_type) { \
|
#define CHECK_RANGE(error_type) { \
|
||||||
off64_t size;\
|
uint64_t size;\
|
||||||
int success = socket_nbd_read_hello(params->client, &size); \
|
int success = socket_nbd_read_hello(params->client, &size); \
|
||||||
if ( success ) {\
|
if ( success ) {\
|
||||||
if (params->from < 0 || (params->from + params->len) > size) {\
|
uint64_t endpoint = params->from + params->len; \
|
||||||
|
if (endpoint > size || \
|
||||||
|
endpoint < params->from ) { /* this happens on overflow */ \
|
||||||
fatal(error_type \
|
fatal(error_type \
|
||||||
" request %d+%d is out of range given size %d", \
|
" request %d+%d is out of range given size %d", \
|
||||||
params->from, params->len, size\
|
params->from, params->len, size\
|
||||||
|
@@ -7,17 +7,17 @@
|
|||||||
#include "nbdtypes.h"
|
#include "nbdtypes.h"
|
||||||
|
|
||||||
int socket_connect(struct sockaddr* to, struct sockaddr* from);
|
int socket_connect(struct sockaddr* to, struct sockaddr* from);
|
||||||
int socket_nbd_read_hello(int fd, off64_t * size);
|
int socket_nbd_read_hello(int fd, uint64_t* size);
|
||||||
int socket_nbd_write_hello(int fd, off64_t size);
|
int socket_nbd_write_hello(int fd, uint64_t size);
|
||||||
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs);
|
void socket_nbd_read(int fd, uint64_t from, uint32_t len, int out_fd, void* out_buf, int timeout_secs);
|
||||||
void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs);
|
void socket_nbd_write(int fd, uint64_t from, uint32_t len, int out_fd, void* out_buf, int timeout_secs);
|
||||||
int socket_nbd_disconnect( int fd );
|
int socket_nbd_disconnect( int fd );
|
||||||
|
|
||||||
/* as you can see, we're slowly accumulating code that should really be in an
|
/* as you can see, we're slowly accumulating code that should really be in an
|
||||||
* NBD library */
|
* NBD library */
|
||||||
|
|
||||||
void nbd_hello_to_buf( struct nbd_init_raw* buf, off64_t out_size );
|
void nbd_hello_to_buf( struct nbd_init_raw* buf, uint64_t out_size );
|
||||||
int nbd_check_hello( struct nbd_init_raw* init_raw, off64_t* out_size );
|
int nbd_check_hello( struct nbd_init_raw* init_raw, uint64_t* out_size );
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@@ -63,7 +63,5 @@ void do_remote_command(char* command, char* socket_name, int argc, char** argv)
|
|||||||
print_response( response );
|
print_response( response );
|
||||||
|
|
||||||
exit(atoi(response));
|
exit(atoi(response));
|
||||||
|
|
||||||
close(remote);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -51,7 +51,6 @@ struct self_pipe * self_pipe_create(void)
|
|||||||
{
|
{
|
||||||
struct self_pipe *sig = xmalloc( sizeof( struct self_pipe ) );
|
struct self_pipe *sig = xmalloc( sizeof( struct self_pipe ) );
|
||||||
int fds[2];
|
int fds[2];
|
||||||
int fcntl_err;
|
|
||||||
|
|
||||||
if ( NULL == sig ) { return NULL; }
|
if ( NULL == sig ) { return NULL; }
|
||||||
|
|
||||||
@@ -62,7 +61,7 @@ struct self_pipe * self_pipe_create(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ( fcntl( fds[0], F_SETFL, O_NONBLOCK ) || fcntl( fds[1], F_SETFL, O_NONBLOCK ) ) {
|
if ( fcntl( fds[0], F_SETFL, O_NONBLOCK ) || fcntl( fds[1], F_SETFL, O_NONBLOCK ) ) {
|
||||||
fcntl_err = errno;
|
int fcntl_err = errno;
|
||||||
while( close( fds[0] ) == -1 && errno == EINTR );
|
while( close( fds[0] ) == -1 && errno == EINTR );
|
||||||
while( close( fds[1] ) == -1 && errno == EINTR );
|
while( close( fds[1] ) == -1 && errno == EINTR );
|
||||||
free( sig );
|
free( sig );
|
||||||
|
@@ -39,7 +39,6 @@ const char* sockaddr_address_string( const struct sockaddr* sa, char* dest, size
|
|||||||
struct sockaddr_un* un = ( struct sockaddr_un* ) sa;
|
struct sockaddr_un* un = ( struct sockaddr_un* ) sa;
|
||||||
|
|
||||||
unsigned short real_port = ntohs( in->sin_port ); // common to in and in6
|
unsigned short real_port = ntohs( in->sin_port ); // common to in and in6
|
||||||
size_t size;
|
|
||||||
const char* ret = NULL;
|
const char* ret = NULL;
|
||||||
|
|
||||||
memset( dest, 0, len );
|
memset( dest, 0, len );
|
||||||
@@ -57,7 +56,7 @@ const char* sockaddr_address_string( const struct sockaddr* sa, char* dest, size
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ( NULL != ret && real_port > 0 && sa->sa_family != AF_UNIX ) {
|
if ( NULL != ret && real_port > 0 && sa->sa_family != AF_UNIX ) {
|
||||||
size = strlen( dest );
|
size_t size = strlen( dest );
|
||||||
snprintf( dest + size, len - size, " port %d", real_port );
|
snprintf( dest + size, len - size, " port %d", real_port );
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,6 +74,11 @@ int sock_set_tcp_nodelay( int fd, int optval )
|
|||||||
return setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval) );
|
return setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval) );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int sock_set_tcp_cork( int fd, int optval )
|
||||||
|
{
|
||||||
|
return setsockopt( fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval) );
|
||||||
|
}
|
||||||
|
|
||||||
int sock_set_nonblock( int fd, int optval )
|
int sock_set_nonblock( int fd, int optval )
|
||||||
{
|
{
|
||||||
int flags = fcntl( fd, F_GETFL );
|
int flags = fcntl( fd, F_GETFL );
|
||||||
|
@@ -20,8 +20,8 @@ int sock_set_reuseaddr(int fd, int optval);
|
|||||||
/* Set the tcp_nodelay option */
|
/* Set the tcp_nodelay option */
|
||||||
int sock_set_tcp_nodelay(int fd, int optval);
|
int sock_set_tcp_nodelay(int fd, int optval);
|
||||||
|
|
||||||
/* TODO: Set the tcp_cork option */
|
/* Set the tcp_cork option */
|
||||||
// int sock_set_cork(int fd, int optval);
|
int sock_set_tcp_cork(int fd, int optval);
|
||||||
|
|
||||||
int sock_set_nonblock(int fd, int optval);
|
int sock_set_nonblock(int fd, int optval);
|
||||||
|
|
||||||
|
@@ -116,6 +116,7 @@ uint64_t monotonic_time_ms(void);
|
|||||||
#define fatal(msg, ...) do { \
|
#define fatal(msg, ...) do { \
|
||||||
myloglev(4, msg, ##__VA_ARGS__); \
|
myloglev(4, msg, ##__VA_ARGS__); \
|
||||||
error_handler(1); \
|
error_handler(1); \
|
||||||
|
exit(1); /* never-reached, this is to make static code analizer happy */ \
|
||||||
} while(0)
|
} while(0)
|
||||||
|
|
||||||
|
|
||||||
|
@@ -2,12 +2,16 @@
|
|||||||
#include "mode.h"
|
#include "mode.h"
|
||||||
|
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
int main(int argc, char** argv)
|
int main(int argc, char** argv)
|
||||||
{
|
{
|
||||||
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
|
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
|
||||||
error_init();
|
error_init();
|
||||||
|
|
||||||
|
srand(time(NULL));
|
||||||
|
|
||||||
if (argc < 2) {
|
if (argc < 2) {
|
||||||
exit_err( help_help_text );
|
exit_err( help_help_text );
|
||||||
}
|
}
|
||||||
|
@@ -1,4 +1,6 @@
|
|||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
#include "mode.h"
|
#include "mode.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
@@ -12,6 +14,7 @@ static struct option proxy_options[] = {
|
|||||||
GETOPT_CONNECT_ADDR,
|
GETOPT_CONNECT_ADDR,
|
||||||
GETOPT_CONNECT_PORT,
|
GETOPT_CONNECT_PORT,
|
||||||
GETOPT_BIND,
|
GETOPT_BIND,
|
||||||
|
GETOPT_CACHE,
|
||||||
GETOPT_QUIET,
|
GETOPT_QUIET,
|
||||||
GETOPT_VERBOSE,
|
GETOPT_VERBOSE,
|
||||||
{0}
|
{0}
|
||||||
@@ -27,22 +30,25 @@ static char proxy_help_text[] =
|
|||||||
"\t--" OPT_CONNECT_ADDR ",-C <ADDR>\tAddress of the proxied server.\n"
|
"\t--" OPT_CONNECT_ADDR ",-C <ADDR>\tAddress of the proxied server.\n"
|
||||||
"\t--" OPT_CONNECT_PORT ",-P <PORT>\tPort of the proxied server.\n"
|
"\t--" OPT_CONNECT_PORT ",-P <PORT>\tPort of the proxied server.\n"
|
||||||
"\t--" OPT_BIND ",-b <ADDR>\tThe address we connect from, as a proxy.\n"
|
"\t--" OPT_BIND ",-b <ADDR>\tThe address we connect from, as a proxy.\n"
|
||||||
|
"\t--" OPT_CACHE ",-c[=<CACHE-BYTES>]\tUse a RAM read cache of the given size.\n"
|
||||||
QUIET_LINE
|
QUIET_LINE
|
||||||
VERBOSE_LINE;
|
VERBOSE_LINE;
|
||||||
|
|
||||||
|
static char proxy_default_cache_size[] = "4096";
|
||||||
|
|
||||||
void read_proxy_param(
|
void read_proxy_param(
|
||||||
int c,
|
int c,
|
||||||
char **downstream_addr,
|
char **downstream_addr,
|
||||||
char **downstream_port,
|
char **downstream_port,
|
||||||
char **upstream_addr,
|
char **upstream_addr,
|
||||||
char **upstream_port,
|
char **upstream_port,
|
||||||
char **bind_addr )
|
char **bind_addr,
|
||||||
|
char **cache_bytes)
|
||||||
{
|
{
|
||||||
switch( c ) {
|
switch( c ) {
|
||||||
case 'h' :
|
case 'h' :
|
||||||
fprintf( stdout, "%s\n", proxy_help_text );
|
fprintf( stdout, "%s\n", proxy_help_text );
|
||||||
exit( 0 );
|
exit( 0 );
|
||||||
break;
|
|
||||||
case 'l':
|
case 'l':
|
||||||
*downstream_addr = optarg;
|
*downstream_addr = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -58,6 +64,9 @@ void read_proxy_param(
|
|||||||
case 'b':
|
case 'b':
|
||||||
*bind_addr = optarg;
|
*bind_addr = optarg;
|
||||||
break;
|
break;
|
||||||
|
case 'c':
|
||||||
|
*cache_bytes = optarg ? optarg : proxy_default_cache_size;
|
||||||
|
break;
|
||||||
case 'q':
|
case 'q':
|
||||||
log_level = QUIET_LOG_LEVEL;
|
log_level = QUIET_LOG_LEVEL;
|
||||||
break;
|
break;
|
||||||
@@ -89,6 +98,7 @@ int main( int argc, char *argv[] )
|
|||||||
char *upstream_addr = NULL;
|
char *upstream_addr = NULL;
|
||||||
char *upstream_port = NULL;
|
char *upstream_port = NULL;
|
||||||
char *bind_addr = NULL;
|
char *bind_addr = NULL;
|
||||||
|
char *cache_bytes = NULL;
|
||||||
int success;
|
int success;
|
||||||
|
|
||||||
sigset_t mask;
|
sigset_t mask;
|
||||||
@@ -103,6 +113,8 @@ int main( int argc, char *argv[] )
|
|||||||
exit_action.sa_mask = mask;
|
exit_action.sa_mask = mask;
|
||||||
exit_action.sa_flags = 0;
|
exit_action.sa_flags = 0;
|
||||||
|
|
||||||
|
srand(time(NULL));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL );
|
c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL );
|
||||||
if ( -1 == c ) { break; }
|
if ( -1 == c ) { break; }
|
||||||
@@ -111,7 +123,8 @@ int main( int argc, char *argv[] )
|
|||||||
&downstream_port,
|
&downstream_port,
|
||||||
&upstream_addr,
|
&upstream_addr,
|
||||||
&upstream_port,
|
&upstream_port,
|
||||||
&bind_addr
|
&bind_addr,
|
||||||
|
&cache_bytes
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,7 +141,8 @@ int main( int argc, char *argv[] )
|
|||||||
downstream_port,
|
downstream_port,
|
||||||
upstream_addr,
|
upstream_addr,
|
||||||
upstream_port,
|
upstream_port,
|
||||||
bind_addr
|
bind_addr,
|
||||||
|
cache_bytes
|
||||||
);
|
);
|
||||||
|
|
||||||
/* Set these *after* proxy has been assigned to */
|
/* Set these *after* proxy has been assigned to */
|
||||||
|
68
src/proxy/prefetch.c
Normal file
68
src/proxy/prefetch.c
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
#include "prefetch.h"
|
||||||
|
#include "util.h"
|
||||||
|
|
||||||
|
|
||||||
|
struct prefetch* prefetch_create( size_t size_bytes ){
|
||||||
|
|
||||||
|
struct prefetch* out = xmalloc( sizeof( struct prefetch ) );
|
||||||
|
NULLCHECK( out );
|
||||||
|
|
||||||
|
out->buffer = xmalloc( size_bytes );
|
||||||
|
NULLCHECK( out->buffer );
|
||||||
|
|
||||||
|
out->size = size_bytes;
|
||||||
|
out->is_full = 0;
|
||||||
|
out->from = 0;
|
||||||
|
out->len = 0;
|
||||||
|
|
||||||
|
return out;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void prefetch_destroy( struct prefetch *prefetch ) {
|
||||||
|
if( prefetch ) {
|
||||||
|
free( prefetch->buffer );
|
||||||
|
free( prefetch );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t prefetch_size( struct prefetch *prefetch){
|
||||||
|
if ( prefetch ) {
|
||||||
|
return prefetch->size;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void prefetch_set_is_empty( struct prefetch *prefetch ){
|
||||||
|
prefetch_set_full( prefetch, 0 );
|
||||||
|
}
|
||||||
|
|
||||||
|
void prefetch_set_is_full( struct prefetch *prefetch ){
|
||||||
|
prefetch_set_full( prefetch, 1 );
|
||||||
|
}
|
||||||
|
|
||||||
|
void prefetch_set_full( struct prefetch *prefetch, int val ){
|
||||||
|
if( prefetch ) {
|
||||||
|
prefetch->is_full = val;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int prefetch_is_full( struct prefetch *prefetch ){
|
||||||
|
if( prefetch ) {
|
||||||
|
return prefetch->is_full;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len ){
|
||||||
|
NULLCHECK( prefetch );
|
||||||
|
return from >= prefetch->from &&
|
||||||
|
from + len <= prefetch->from + prefetch->len;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *prefetch_offset( struct prefetch *prefetch, uint64_t from ){
|
||||||
|
NULLCHECK( prefetch );
|
||||||
|
return prefetch->buffer + (from - prefetch->from);
|
||||||
|
}
|
@@ -1,14 +1,33 @@
|
|||||||
#ifndef PREFETCH_H
|
#ifndef PREFETCH_H
|
||||||
#define PREFETCH_H
|
#define PREFETCH_H
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
#define PREFETCH_BUFSIZE 4096
|
#define PREFETCH_BUFSIZE 4096
|
||||||
|
|
||||||
struct prefetch {
|
struct prefetch {
|
||||||
|
/* True if there is data in the buffer. */
|
||||||
int is_full;
|
int is_full;
|
||||||
__be64 from;
|
/* The start point of the current content of buffer */
|
||||||
__be32 len;
|
uint64_t from;
|
||||||
|
/* The length of the current content of buffer */
|
||||||
|
uint32_t len;
|
||||||
|
|
||||||
char buffer[PREFETCH_BUFSIZE];
|
/* The total size of the buffer, in bytes. */
|
||||||
|
size_t size;
|
||||||
|
|
||||||
|
char *buffer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct prefetch* prefetch_create( size_t size_bytes );
|
||||||
|
void prefetch_destroy( struct prefetch *prefetch );
|
||||||
|
size_t prefetch_size( struct prefetch *);
|
||||||
|
void prefetch_set_is_empty( struct prefetch *prefetch );
|
||||||
|
void prefetch_set_is_full( struct prefetch *prefetch );
|
||||||
|
void prefetch_set_full( struct prefetch *prefetch, int val );
|
||||||
|
int prefetch_is_full( struct prefetch *prefetch );
|
||||||
|
int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len );
|
||||||
|
char *prefetch_offset( struct prefetch *prefetch, uint64_t from );
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@@ -1,9 +1,7 @@
|
|||||||
#include "proxy.h"
|
#include "proxy.h"
|
||||||
#include "readwrite.h"
|
#include "readwrite.h"
|
||||||
|
|
||||||
#ifdef PREFETCH
|
|
||||||
#include "prefetch.h"
|
#include "prefetch.h"
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
#include "ioutil.h"
|
#include "ioutil.h"
|
||||||
@@ -20,7 +18,8 @@ struct proxier* proxy_create(
|
|||||||
char* s_downstream_port,
|
char* s_downstream_port,
|
||||||
char* s_upstream_address,
|
char* s_upstream_address,
|
||||||
char* s_upstream_port,
|
char* s_upstream_port,
|
||||||
char* s_upstream_bind )
|
char* s_upstream_bind,
|
||||||
|
char* s_cache_bytes )
|
||||||
{
|
{
|
||||||
struct proxier* out;
|
struct proxier* out;
|
||||||
out = xmalloc( sizeof( struct proxier ) );
|
out = xmalloc( sizeof( struct proxier ) );
|
||||||
@@ -65,9 +64,16 @@ struct proxier* proxy_create(
|
|||||||
out->downstream_fd = -1;
|
out->downstream_fd = -1;
|
||||||
out->upstream_fd = -1;
|
out->upstream_fd = -1;
|
||||||
|
|
||||||
#ifdef PREFETCH
|
out->prefetch = NULL;
|
||||||
out->prefetch = xmalloc( sizeof( struct prefetch ) );
|
if ( s_cache_bytes ){
|
||||||
#endif
|
int cache_bytes = atoi( s_cache_bytes );
|
||||||
|
/* leaving this off or setting a cache size of zero or
|
||||||
|
* less results in no cache.
|
||||||
|
*/
|
||||||
|
if ( cache_bytes >= 0 ) {
|
||||||
|
out->prefetch = prefetch_create( cache_bytes );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) );
|
out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) );
|
||||||
out->req.buf = xmalloc( NBD_MAX_SIZE );
|
out->req.buf = xmalloc( NBD_MAX_SIZE );
|
||||||
@@ -76,20 +82,28 @@ struct proxier* proxy_create(
|
|||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int proxy_prefetches( struct proxier* proxy ) {
|
||||||
|
NULLCHECK( proxy );
|
||||||
|
return proxy->prefetch != NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int proxy_prefetch_bufsize( struct proxier* proxy ){
|
||||||
|
NULLCHECK( proxy );
|
||||||
|
return prefetch_size( proxy->prefetch );
|
||||||
|
}
|
||||||
|
|
||||||
void proxy_destroy( struct proxier* proxy )
|
void proxy_destroy( struct proxier* proxy )
|
||||||
{
|
{
|
||||||
free( proxy->init.buf );
|
free( proxy->init.buf );
|
||||||
free( proxy->req.buf );
|
free( proxy->req.buf );
|
||||||
free( proxy->rsp.buf );
|
free( proxy->rsp.buf );
|
||||||
#ifdef PREFETCH
|
prefetch_destroy( proxy->prefetch );
|
||||||
free( proxy->prefetch );
|
|
||||||
#endif
|
|
||||||
|
|
||||||
free( proxy );
|
free( proxy );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Shared between our two different connect_to_upstream paths */
|
/* Shared between our two different connect_to_upstream paths */
|
||||||
void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size );
|
void proxy_finish_connect_to_upstream( struct proxier *proxy, uint64_t size );
|
||||||
|
|
||||||
/* Try to establish a connection to our upstream server. Return 1 on success,
|
/* Try to establish a connection to our upstream server. Return 1 on success,
|
||||||
* 0 on failure. this is a blocking call that returns a non-blocking socket.
|
* 0 on failure. this is a blocking call that returns a non-blocking socket.
|
||||||
@@ -102,7 +116,7 @@ int proxy_connect_to_upstream( struct proxier* proxy )
|
|||||||
}
|
}
|
||||||
|
|
||||||
int fd = socket_connect( &proxy->connect_to.generic, connect_from );
|
int fd = socket_connect( &proxy->connect_to.generic, connect_from );
|
||||||
off64_t size = 0;
|
uint64_t size = 0;
|
||||||
|
|
||||||
if ( -1 == fd ) {
|
if ( -1 == fd ) {
|
||||||
return 0;
|
return 0;
|
||||||
@@ -174,7 +188,7 @@ error:
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ) {
|
void proxy_finish_connect_to_upstream( struct proxier *proxy, uint64_t size ) {
|
||||||
|
|
||||||
if ( proxy->upstream_size == 0 ) {
|
if ( proxy->upstream_size == 0 ) {
|
||||||
info( "Size of upstream image is %"PRIu64" bytes", size );
|
info( "Size of upstream image is %"PRIu64" bytes", size );
|
||||||
@@ -186,6 +200,13 @@ void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
proxy->upstream_size = size;
|
proxy->upstream_size = size;
|
||||||
|
|
||||||
|
if ( AF_UNIX != proxy->connect_to.family ) {
|
||||||
|
if ( sock_set_tcp_nodelay( proxy->upstream_fd, 1 ) == -1 ) {
|
||||||
|
warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info( "Connected to upstream on fd %i", proxy->upstream_fd );
|
info( "Connected to upstream on fd %i", proxy->upstream_fd );
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@@ -272,10 +293,9 @@ static inline int proxy_state_upstream( int state )
|
|||||||
state == WRITE_TO_UPSTREAM || state == READ_FROM_UPSTREAM;
|
state == WRITE_TO_UPSTREAM || state == READ_FROM_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef PREFETCH
|
|
||||||
|
|
||||||
int proxy_prefetch_for_request( struct proxier* proxy, int state )
|
int proxy_prefetch_for_request( struct proxier* proxy, int state )
|
||||||
{
|
{
|
||||||
|
NULLCHECK( proxy );
|
||||||
struct nbd_request* req = &proxy->req_hdr;
|
struct nbd_request* req = &proxy->req_hdr;
|
||||||
struct nbd_reply* rsp = &proxy->rsp_hdr;
|
struct nbd_reply* rsp = &proxy->rsp_hdr;
|
||||||
|
|
||||||
@@ -284,23 +304,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
|
|||||||
|
|
||||||
int is_read = ( req->type & REQUEST_MASK ) == REQUEST_READ;
|
int is_read = ( req->type & REQUEST_MASK ) == REQUEST_READ;
|
||||||
|
|
||||||
int prefetch_start = req->from;
|
|
||||||
int prefetch_end = req->from + ( req->len * 2 );
|
|
||||||
|
|
||||||
/* We only want to consider prefetching if we know we're not
|
|
||||||
* getting too much data back, if it's a read request, and if
|
|
||||||
* the prefetch won't try to read past the end of the file.
|
|
||||||
*/
|
|
||||||
|
|
||||||
int prefetching = req->len <= PREFETCH_BUFSIZE && is_read &&
|
|
||||||
prefetch_start < prefetch_end && prefetch_end <= proxy->upstream_size;
|
|
||||||
|
|
||||||
if ( is_read ) {
|
if ( is_read ) {
|
||||||
/* See if we can respond with what's in our prefetch
|
/* See if we can respond with what's in our prefetch
|
||||||
* cache */
|
* cache */
|
||||||
if ( proxy->prefetch->is_full &&
|
if ( prefetch_is_full( proxy->prefetch ) &&
|
||||||
req->from == proxy->prefetch->from &&
|
prefetch_contains( proxy->prefetch, req->from, req->len ) ) {
|
||||||
req->len == proxy->prefetch->len ) {
|
|
||||||
/* HUZZAH! A match! */
|
/* HUZZAH! A match! */
|
||||||
debug( "Prefetch hit!" );
|
debug( "Prefetch hit!" );
|
||||||
|
|
||||||
@@ -315,10 +323,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
|
|||||||
/* and the data */
|
/* and the data */
|
||||||
memcpy(
|
memcpy(
|
||||||
proxy->rsp.buf + NBD_REPLY_SIZE,
|
proxy->rsp.buf + NBD_REPLY_SIZE,
|
||||||
proxy->prefetch->buffer, proxy->prefetch->len
|
prefetch_offset( proxy->prefetch, req->from ),
|
||||||
|
req->len
|
||||||
);
|
);
|
||||||
|
|
||||||
proxy->rsp.size = NBD_REPLY_SIZE + proxy->prefetch->len;
|
proxy->rsp.size = NBD_REPLY_SIZE + req->len;
|
||||||
proxy->rsp.needle = 0;
|
proxy->rsp.needle = 0;
|
||||||
|
|
||||||
/* return early, our work here is done */
|
/* return early, our work here is done */
|
||||||
@@ -332,11 +341,24 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
|
|||||||
* whether we can keep it or not.
|
* whether we can keep it or not.
|
||||||
*/
|
*/
|
||||||
debug( "Blowing away prefetch cache on type %d request.", req->type );
|
debug( "Blowing away prefetch cache on type %d request.", req->type );
|
||||||
proxy->prefetch->is_full = 0;
|
prefetch_set_is_empty( proxy->prefetch );
|
||||||
}
|
}
|
||||||
|
|
||||||
debug( "Prefetch cache MISS!");
|
debug( "Prefetch cache MISS!");
|
||||||
|
|
||||||
|
uint64_t prefetch_start = req->from;
|
||||||
|
/* We prefetch what we expect to be the next request. */
|
||||||
|
uint64_t prefetch_end = req->from + ( req->len * 2 );
|
||||||
|
|
||||||
|
/* We only want to consider prefetching if we know we're not
|
||||||
|
* getting too much data back, if it's a read request, and if
|
||||||
|
* the prefetch won't try to read past the end of the file.
|
||||||
|
*/
|
||||||
|
int prefetching =
|
||||||
|
req->len <= prefetch_size( proxy->prefetch ) &&
|
||||||
|
is_read &&
|
||||||
|
prefetch_start < prefetch_end &&
|
||||||
|
prefetch_end <= proxy->upstream_size;
|
||||||
|
|
||||||
/* We pull the request out of the proxy struct, rewrite the
|
/* We pull the request out of the proxy struct, rewrite the
|
||||||
* request size, and write it back.
|
* request size, and write it back.
|
||||||
@@ -347,7 +369,8 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
|
|||||||
|
|
||||||
req->len *= 2;
|
req->len *= 2;
|
||||||
|
|
||||||
debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len );
|
debug( "Prefetching additional %"PRIu32" bytes",
|
||||||
|
req->len - proxy->prefetch_req_orig_len );
|
||||||
nbd_h2r_request( req, req_raw );
|
nbd_h2r_request( req, req_raw );
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -364,10 +387,10 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state )
|
|||||||
|
|
||||||
prefetched_bytes = proxy->req_hdr.len - proxy->prefetch_req_orig_len;
|
prefetched_bytes = proxy->req_hdr.len - proxy->prefetch_req_orig_len;
|
||||||
|
|
||||||
debug( "Prefetched %d bytes", prefetched_bytes );
|
debug( "Prefetched additional %d bytes", prefetched_bytes );
|
||||||
memcpy(
|
memcpy(
|
||||||
proxy->rsp.buf + proxy->prefetch_req_orig_len,
|
proxy->prefetch->buffer,
|
||||||
&(proxy->prefetch->buffer),
|
proxy->rsp.buf + proxy->prefetch_req_orig_len + NBD_REPLY_SIZE,
|
||||||
prefetched_bytes
|
prefetched_bytes
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -382,13 +405,12 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state )
|
|||||||
proxy->rsp.size -= prefetched_bytes;
|
proxy->rsp.size -= prefetched_bytes;
|
||||||
|
|
||||||
/* And we need to reset these */
|
/* And we need to reset these */
|
||||||
proxy->prefetch->is_full = 1;
|
prefetch_set_is_full( proxy->prefetch );
|
||||||
proxy->is_prefetch_req = 0;
|
proxy->is_prefetch_req = 0;
|
||||||
|
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
int proxy_read_from_downstream( struct proxier *proxy, int state )
|
int proxy_read_from_downstream( struct proxier *proxy, int state )
|
||||||
@@ -469,10 +491,8 @@ int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state )
|
|||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef PREFETCH
|
|
||||||
/* Data may have changed while we were disconnected */
|
/* Data may have changed while we were disconnected */
|
||||||
proxy->prefetch->is_full = 0;
|
prefetch_set_is_empty( proxy->prefetch );
|
||||||
#endif
|
|
||||||
|
|
||||||
info( "Connected to upstream on fd %i", proxy->upstream_fd );
|
info( "Connected to upstream on fd %i", proxy->upstream_fd );
|
||||||
return READ_INIT_FROM_UPSTREAM;
|
return READ_INIT_FROM_UPSTREAM;
|
||||||
@@ -492,7 +512,7 @@ int proxy_read_init_from_upstream( struct proxier* proxy, int state )
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ( proxy->init.needle == proxy->init.size ) {
|
if ( proxy->init.needle == proxy->init.size ) {
|
||||||
off64_t upstream_size;
|
uint64_t upstream_size;
|
||||||
if ( !nbd_check_hello( (struct nbd_init_raw*) proxy->init.buf, &upstream_size ) ) {
|
if ( !nbd_check_hello( (struct nbd_init_raw*) proxy->init.buf, &upstream_size ) ) {
|
||||||
warn( "Upstream sent invalid init" );
|
warn( "Upstream sent invalid init" );
|
||||||
goto disconnect;
|
goto disconnect;
|
||||||
@@ -518,11 +538,22 @@ int proxy_write_to_upstream( struct proxier* proxy, int state )
|
|||||||
ssize_t count;
|
ssize_t count;
|
||||||
|
|
||||||
// assert( state == WRITE_TO_UPSTREAM );
|
// assert( state == WRITE_TO_UPSTREAM );
|
||||||
|
|
||||||
|
/* FIXME: We may set cork=1 multiple times as a result of this idiom.
|
||||||
|
* Not a serious problem, but we could do better
|
||||||
|
*/
|
||||||
|
if ( proxy->req.needle == 0 && AF_UNIX != proxy->connect_to.family ) {
|
||||||
|
if ( sock_set_tcp_cork( proxy->upstream_fd, 1 ) == -1 ) {
|
||||||
|
warn( SHOW_ERRNO( "Failed to set TCP_CORK" ) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
count = iobuf_write( proxy->upstream_fd, &proxy->req );
|
count = iobuf_write( proxy->upstream_fd, &proxy->req );
|
||||||
|
|
||||||
if ( count == -1 ) {
|
if ( count == -1 ) {
|
||||||
warn( SHOW_ERRNO( "Failed to send request to upstream" ) );
|
warn( SHOW_ERRNO( "Failed to send request to upstream" ) );
|
||||||
proxy->req.needle = 0;
|
proxy->req.needle = 0;
|
||||||
|
// We're throwing the socket away so no need to uncork
|
||||||
return CONNECT_TO_UPSTREAM;
|
return CONNECT_TO_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -531,6 +562,14 @@ int proxy_write_to_upstream( struct proxier* proxy, int state )
|
|||||||
* still need req.size if reading the reply fails - we disconnect
|
* still need req.size if reading the reply fails - we disconnect
|
||||||
* and resend the reply in that case - so keep it around for now. */
|
* and resend the reply in that case - so keep it around for now. */
|
||||||
proxy->req.needle = 0;
|
proxy->req.needle = 0;
|
||||||
|
|
||||||
|
if ( AF_UNIX != proxy->connect_to.family ) {
|
||||||
|
if ( sock_set_tcp_cork( proxy->upstream_fd, 0 ) == -1 ) {
|
||||||
|
warn( SHOW_ERRNO( "Failed to unset TCP_CORK" ) );
|
||||||
|
// TODO: should we return to CONNECT_TO_UPSTREAM in this instance?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return READ_FROM_UPSTREAM;
|
return READ_FROM_UPSTREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -736,14 +775,12 @@ void proxy_session( struct proxier* proxy )
|
|||||||
case READ_FROM_DOWNSTREAM:
|
case READ_FROM_DOWNSTREAM:
|
||||||
if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) {
|
if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) {
|
||||||
state = proxy_read_from_downstream( proxy, state );
|
state = proxy_read_from_downstream( proxy, state );
|
||||||
#ifdef PREFETCH
|
|
||||||
/* Check if we can fulfil the request from prefetch, or
|
/* Check if we can fulfil the request from prefetch, or
|
||||||
* rewrite the request to fill the prefetch buffer if needed
|
* rewrite the request to fill the prefetch buffer if needed
|
||||||
*/
|
*/
|
||||||
if ( state == WRITE_TO_UPSTREAM ) {
|
if ( proxy_prefetches( proxy ) && state == WRITE_TO_UPSTREAM ) {
|
||||||
state = proxy_prefetch_for_request( proxy, state );
|
state = proxy_prefetch_for_request( proxy, state );
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CONNECT_TO_UPSTREAM:
|
case CONNECT_TO_UPSTREAM:
|
||||||
@@ -774,12 +811,10 @@ void proxy_session( struct proxier* proxy )
|
|||||||
if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) {
|
if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) {
|
||||||
state = proxy_read_from_upstream( proxy, state );
|
state = proxy_read_from_upstream( proxy, state );
|
||||||
}
|
}
|
||||||
# ifdef PREFETCH
|
|
||||||
/* Fill the prefetch buffer and rewrite the reply, if needed */
|
/* Fill the prefetch buffer and rewrite the reply, if needed */
|
||||||
if ( state == WRITE_TO_DOWNSTREAM ) {
|
if ( proxy_prefetches( proxy ) && state == WRITE_TO_DOWNSTREAM ) {
|
||||||
state = proxy_prefetch_for_reply( proxy, state );
|
state = proxy_prefetch_for_reply( proxy, state );
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
break;
|
break;
|
||||||
case WRITE_TO_DOWNSTREAM:
|
case WRITE_TO_DOWNSTREAM:
|
||||||
if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) {
|
if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) {
|
||||||
|
@@ -5,7 +5,6 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "ioutil.h"
|
#include "ioutil.h"
|
||||||
#include "flexnbd.h"
|
|
||||||
#include "parse.h"
|
#include "parse.h"
|
||||||
#include "nbdtypes.h"
|
#include "nbdtypes.h"
|
||||||
#include "self_pipe.h"
|
#include "self_pipe.h"
|
||||||
@@ -21,9 +20,6 @@
|
|||||||
#define UPSTREAM_TIMEOUT 30 * 1000
|
#define UPSTREAM_TIMEOUT 30 * 1000
|
||||||
|
|
||||||
struct proxier {
|
struct proxier {
|
||||||
/* The flexnbd wrapper this proxier is attached to */
|
|
||||||
struct flexnbd* flexnbd;
|
|
||||||
|
|
||||||
/** address/port to bind to */
|
/** address/port to bind to */
|
||||||
union mysockaddr listen_on;
|
union mysockaddr listen_on;
|
||||||
|
|
||||||
@@ -48,7 +44,7 @@ struct proxier {
|
|||||||
int upstream_fd;
|
int upstream_fd;
|
||||||
|
|
||||||
/* This is the size we advertise to the downstream server */
|
/* This is the size we advertise to the downstream server */
|
||||||
off64_t upstream_size;
|
uint64_t upstream_size;
|
||||||
|
|
||||||
/* We transform the raw request header into here */
|
/* We transform the raw request header into here */
|
||||||
struct nbd_request req_hdr;
|
struct nbd_request req_hdr;
|
||||||
@@ -73,7 +69,8 @@ struct proxier {
|
|||||||
uint64_t req_count;
|
uint64_t req_count;
|
||||||
int hello_sent;
|
int hello_sent;
|
||||||
|
|
||||||
#ifdef PREFETCH
|
/** These are only used if we pass --cache on the command line */
|
||||||
|
|
||||||
/* While the in-flight request has been munged by prefetch, these two are
|
/* While the in-flight request has been munged by prefetch, these two are
|
||||||
* set to true, and the original length of the request, respectively */
|
* set to true, and the original length of the request, respectively */
|
||||||
int is_prefetch_req;
|
int is_prefetch_req;
|
||||||
@@ -81,7 +78,8 @@ struct proxier {
|
|||||||
|
|
||||||
/* And here, we actually store the prefetched data once it's returned */
|
/* And here, we actually store the prefetched data once it's returned */
|
||||||
struct prefetch *prefetch;
|
struct prefetch *prefetch;
|
||||||
#endif
|
|
||||||
|
/** */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct proxier* proxy_create(
|
struct proxier* proxy_create(
|
||||||
@@ -89,7 +87,8 @@ struct proxier* proxy_create(
|
|||||||
char* s_downstream_port,
|
char* s_downstream_port,
|
||||||
char* s_upstream_address,
|
char* s_upstream_address,
|
||||||
char* s_upstream_port,
|
char* s_upstream_port,
|
||||||
char* s_upstream_bind );
|
char* s_upstream_bind,
|
||||||
|
char* s_cache_bytes);
|
||||||
int do_proxy( struct proxier* proxy );
|
int do_proxy( struct proxier* proxy );
|
||||||
void proxy_cleanup( struct proxier* proxy );
|
void proxy_cleanup( struct proxier* proxy );
|
||||||
void proxy_destroy( struct proxier* proxy );
|
void proxy_destroy( struct proxier* proxy );
|
||||||
|
@@ -7,43 +7,64 @@
|
|||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make the bitfield words 'opaque' to prevent code
|
||||||
|
* poking at the bits directly without using these
|
||||||
|
* accessors/macros
|
||||||
|
*/
|
||||||
|
typedef uint64_t bitfield_word_t;
|
||||||
|
typedef bitfield_word_t * bitfield_p;
|
||||||
|
|
||||||
static inline char char_with_bit_set(uint64_t num) { return 1<<(num%8); }
|
#define BITFIELD_WORD_SIZE sizeof(bitfield_word_t)
|
||||||
|
#define BITS_PER_WORD (BITFIELD_WORD_SIZE * 8)
|
||||||
|
|
||||||
|
#define BIT_MASK(_idx) \
|
||||||
|
(1LL << ((_idx) & (BITS_PER_WORD - 1)))
|
||||||
|
#define BIT_WORD(_b, _idx) \
|
||||||
|
((bitfield_word_t*)(_b))[(_idx) / BITS_PER_WORD]
|
||||||
|
|
||||||
|
/* Calculates the number of words needed to store _bytes number of bytes
|
||||||
|
* this is added to accommodate code that wants to use bytes sizes
|
||||||
|
*/
|
||||||
|
#define BIT_WORDS_FOR_SIZE(_bytes) \
|
||||||
|
((_bytes + (BITFIELD_WORD_SIZE-1)) / BITFIELD_WORD_SIZE)
|
||||||
|
|
||||||
|
/** Return the bit value ''idx'' in array ''b'' */
|
||||||
|
static inline int bit_get(bitfield_p b, uint64_t idx) {
|
||||||
|
return (BIT_WORD(b, idx) >> (idx & (BITS_PER_WORD-1))) & 1;
|
||||||
|
}
|
||||||
|
|
||||||
/** Return 1 if the bit at ''idx'' in array ''b'' is set */
|
/** Return 1 if the bit at ''idx'' in array ''b'' is set */
|
||||||
static inline int bit_is_set(char* b, uint64_t idx) {
|
static inline int bit_is_set(bitfield_p b, uint64_t idx) {
|
||||||
return (b[idx/8] & char_with_bit_set(idx)) != 0;
|
return bit_get(b, idx);
|
||||||
}
|
}
|
||||||
/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
|
/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
|
||||||
static inline int bit_is_clear(char* b, uint64_t idx) {
|
static inline int bit_is_clear(bitfield_p b, uint64_t idx) {
|
||||||
return !bit_is_set(b, idx);
|
return !bit_get(b, idx);
|
||||||
}
|
}
|
||||||
/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
|
/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
|
||||||
static inline int bit_has_value(char* b, uint64_t idx, int value) {
|
static inline int bit_has_value(bitfield_p b, uint64_t idx, int value) {
|
||||||
if (value) { return bit_is_set(b, idx); }
|
return bit_get(b, idx) == !!value;
|
||||||
else { return bit_is_clear(b, idx); }
|
|
||||||
}
|
}
|
||||||
/** Sets the bit ''idx'' in array ''b'' */
|
/** Sets the bit ''idx'' in array ''b'' */
|
||||||
static inline void bit_set(char* b, uint64_t idx) {
|
static inline void bit_set(bitfield_p b, uint64_t idx) {
|
||||||
b[idx/8] |= char_with_bit_set(idx);
|
BIT_WORD(b, idx) |= BIT_MASK(idx);
|
||||||
//__sync_fetch_and_or(b+(idx/8), char_with_bit_set(idx));
|
|
||||||
}
|
}
|
||||||
/** Clears the bit ''idx'' in array ''b'' */
|
/** Clears the bit ''idx'' in array ''b'' */
|
||||||
static inline void bit_clear(char* b, uint64_t idx) {
|
static inline void bit_clear(bitfield_p b, uint64_t idx) {
|
||||||
b[idx/8] &= ~char_with_bit_set(idx);
|
BIT_WORD(b, idx) &= ~BIT_MASK(idx);
|
||||||
//__sync_fetch_and_nand(b+(idx/8), char_with_bit_set(idx));
|
|
||||||
}
|
}
|
||||||
/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
|
/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
|
||||||
static inline void bit_set_range(char* b, uint64_t from, uint64_t len)
|
static inline void bit_set_range(bitfield_p b, uint64_t from, uint64_t len)
|
||||||
{
|
{
|
||||||
for ( ; from%8 != 0 && len > 0 ; len-- ) {
|
for ( ; (from % BITS_PER_WORD) != 0 && len > 0 ; len-- ) {
|
||||||
bit_set( b, from++ );
|
bit_set( b, from++ );
|
||||||
}
|
}
|
||||||
|
|
||||||
if (len >= 8) {
|
if (len >= BITS_PER_WORD) {
|
||||||
memset(b+(from/8), 255, len/8 );
|
memset(&BIT_WORD(b, from), 0xff, len / 8 );
|
||||||
from += len;
|
from += len;
|
||||||
len = (len%8);
|
len = len % BITS_PER_WORD;
|
||||||
from -= len;
|
from -= len;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,16 +73,16 @@ static inline void bit_set_range(char* b, uint64_t from, uint64_t len)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
|
/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
|
||||||
static inline void bit_clear_range(char* b, uint64_t from, uint64_t len)
|
static inline void bit_clear_range(bitfield_p b, uint64_t from, uint64_t len)
|
||||||
{
|
{
|
||||||
for ( ; from%8 != 0 && len > 0 ; len-- ) {
|
for ( ; (from % BITS_PER_WORD) != 0 && len > 0 ; len-- ) {
|
||||||
bit_clear( b, from++ );
|
bit_clear( b, from++ );
|
||||||
}
|
}
|
||||||
|
|
||||||
if (len >= 8) {
|
if (len >= BITS_PER_WORD) {
|
||||||
memset(b+(from/8), 0, len/8 );
|
memset(&BIT_WORD(b, from), 0, len / 8 );
|
||||||
from += len;
|
from += len;
|
||||||
len = (len%8);
|
len = len % BITS_PER_WORD;
|
||||||
from -= len;
|
from -= len;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,16 +96,16 @@ static inline void bit_clear_range(char* b, uint64_t from, uint64_t len)
|
|||||||
* bits that are the same as the first one specified. If ''run_is_set'' is
|
* bits that are the same as the first one specified. If ''run_is_set'' is
|
||||||
* non-NULL, the value of that bit is placed into it.
|
* non-NULL, the value of that bit is placed into it.
|
||||||
*/
|
*/
|
||||||
static inline uint64_t bit_run_count(char* b, uint64_t from, uint64_t len, int *run_is_set) {
|
static inline uint64_t bit_run_count(bitfield_p b, uint64_t from, uint64_t len, int *run_is_set) {
|
||||||
uint64_t* current_block;
|
|
||||||
uint64_t count = 0;
|
uint64_t count = 0;
|
||||||
int first_value = bit_is_set(b, from);
|
int first_value = bit_get(b, from);
|
||||||
|
bitfield_word_t word_match = first_value ? -1 : 0;
|
||||||
|
|
||||||
if ( run_is_set != NULL ) {
|
if ( run_is_set != NULL ) {
|
||||||
*run_is_set = first_value;
|
*run_is_set = first_value;
|
||||||
}
|
}
|
||||||
|
|
||||||
for ( ; (from+count) % 64 != 0 && len > 0; len--) {
|
for ( ; ((from + count) % BITS_PER_WORD) != 0 && len > 0; len--) {
|
||||||
if (bit_has_value(b, from + count, first_value)) {
|
if (bit_has_value(b, from + count, first_value)) {
|
||||||
count++;
|
count++;
|
||||||
} else {
|
} else {
|
||||||
@@ -92,10 +113,9 @@ static inline uint64_t bit_run_count(char* b, uint64_t from, uint64_t len, int *
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for ( ; len >= 64 ; len -= 64 ) {
|
for ( ; len >= BITS_PER_WORD ; len -= BITS_PER_WORD ) {
|
||||||
current_block = (uint64_t*) (b + ((from+count)/8));
|
if (BIT_WORD(b, from + count) == word_match) {
|
||||||
if (*current_block == ( first_value ? UINT64_MAX : 0 ) ) {
|
count += BITS_PER_WORD;
|
||||||
count += 64;
|
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -154,7 +174,7 @@ struct bitset {
|
|||||||
int resolution;
|
int resolution;
|
||||||
struct bitset_stream *stream;
|
struct bitset_stream *stream;
|
||||||
int stream_enabled;
|
int stream_enabled;
|
||||||
char bits[];
|
bitfield_word_t bits[];
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Allocate a bitset for a file of the given size, and chunks of the
|
/** Allocate a bitset for a file of the given size, and chunks of the
|
||||||
@@ -162,9 +182,12 @@ struct bitset {
|
|||||||
*/
|
*/
|
||||||
static inline struct bitset *bitset_alloc( uint64_t size, int resolution )
|
static inline struct bitset *bitset_alloc( uint64_t size, int resolution )
|
||||||
{
|
{
|
||||||
struct bitset *bitset = xmalloc(
|
// calculate a size to allocate that is a multiple of the size of the
|
||||||
sizeof( struct bitset ) + ( size + resolution - 1 ) / resolution
|
// bitfield word
|
||||||
);
|
size_t bitfield_size =
|
||||||
|
BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t );
|
||||||
|
struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) );
|
||||||
|
|
||||||
bitset->size = size;
|
bitset->size = size;
|
||||||
bitset->resolution = resolution;
|
bitset->resolution = resolution;
|
||||||
/* don't actually need to call pthread_mutex_destroy '*/
|
/* don't actually need to call pthread_mutex_destroy '*/
|
||||||
|
@@ -126,7 +126,9 @@ void write_not_zeroes(struct client* client, uint64_t from, uint64_t len)
|
|||||||
debug("(run adjusted to %d)", run);
|
debug("(run adjusted to %d)", run);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0) /* useful but expensive */
|
/*
|
||||||
|
// Useful but expensive
|
||||||
|
if (0)
|
||||||
{
|
{
|
||||||
uint64_t i;
|
uint64_t i;
|
||||||
fprintf(stderr, "full map resolution=%d: ", map->resolution);
|
fprintf(stderr, "full map resolution=%d: ", map->resolution);
|
||||||
@@ -139,6 +141,7 @@ void write_not_zeroes(struct client* client, uint64_t from, uint64_t len)
|
|||||||
}
|
}
|
||||||
fprintf(stderr, "\n");
|
fprintf(stderr, "\n");
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
#define DO_READ(dst, len) ERROR_IF_NEGATIVE( \
|
#define DO_READ(dst, len) ERROR_IF_NEGATIVE( \
|
||||||
readloop( \
|
readloop( \
|
||||||
@@ -297,7 +300,7 @@ void client_write_init( struct client * client, uint64_t size )
|
|||||||
struct nbd_init init = {{0}};
|
struct nbd_init init = {{0}};
|
||||||
struct nbd_init_raw init_raw = {{0}};
|
struct nbd_init_raw init_raw = {{0}};
|
||||||
|
|
||||||
memcpy( init.passwd, INIT_PASSWD, sizeof( INIT_PASSWD ) );
|
memcpy( init.passwd, INIT_PASSWD, sizeof( init.passwd ) );
|
||||||
init.magic = INIT_MAGIC;
|
init.magic = INIT_MAGIC;
|
||||||
init.size = size;
|
init.size = size;
|
||||||
memset( init.reserved, 0, 128 );
|
memset( init.reserved, 0, 128 );
|
||||||
@@ -416,8 +419,8 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
|
|||||||
{
|
{
|
||||||
off64_t offset;
|
off64_t offset;
|
||||||
|
|
||||||
// TODO: cork
|
|
||||||
debug("request read %ld+%d", request.from, request.len);
|
debug("request read %ld+%d", request.from, request.len);
|
||||||
|
sock_set_tcp_cork( client->socket, 1 );
|
||||||
client_write_reply( client, &request, 0 );
|
client_write_reply( client, &request, 0 );
|
||||||
|
|
||||||
offset = request.from;
|
offset = request.from;
|
||||||
@@ -435,7 +438,7 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
|
|||||||
offset,
|
offset,
|
||||||
request.len);
|
request.len);
|
||||||
|
|
||||||
// TODO: uncork
|
sock_set_tcp_cork( client->socket, 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@@ -139,7 +139,7 @@ enum mirror_state mirror_get_state( struct mirror * mirror )
|
|||||||
void mirror_init( struct mirror * mirror, const char * filename )
|
void mirror_init( struct mirror * mirror, const char * filename )
|
||||||
{
|
{
|
||||||
int map_fd;
|
int map_fd;
|
||||||
off64_t size;
|
uint64_t size;
|
||||||
|
|
||||||
NULLCHECK( mirror );
|
NULLCHECK( mirror );
|
||||||
NULLCHECK( filename );
|
NULLCHECK( filename );
|
||||||
@@ -270,7 +270,7 @@ void mirror_cleanup( struct server * serve,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int mirror_connect( struct mirror * mirror, off64_t local_size )
|
int mirror_connect( struct mirror * mirror, uint64_t local_size )
|
||||||
{
|
{
|
||||||
struct sockaddr * connect_from = NULL;
|
struct sockaddr * connect_from = NULL;
|
||||||
int connected = 0;
|
int connected = 0;
|
||||||
@@ -292,7 +292,7 @@ int mirror_connect( struct mirror * mirror, off64_t local_size )
|
|||||||
"Select failed." );
|
"Select failed." );
|
||||||
|
|
||||||
if( FD_ISSET( mirror->client, &fds ) ){
|
if( FD_ISSET( mirror->client, &fds ) ){
|
||||||
off64_t remote_size;
|
uint64_t remote_size;
|
||||||
if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) {
|
if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) {
|
||||||
if( remote_size == local_size ){
|
if( remote_size == local_size ){
|
||||||
connected = 1;
|
connected = 1;
|
||||||
@@ -462,6 +462,12 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|||||||
|
|
||||||
debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client );
|
debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client );
|
||||||
|
|
||||||
|
/* FIXME: We can end up corking multiple times in unusual circumstances; this
|
||||||
|
* is annoying, but harmless */
|
||||||
|
if ( xfer->written == 0 ) {
|
||||||
|
sock_set_tcp_cork( ctrl->mirror->client, 1 );
|
||||||
|
}
|
||||||
|
|
||||||
if ( xfer->written < hdr_size ) {
|
if ( xfer->written < hdr_size ) {
|
||||||
data_loc = ( (char*) &xfer->hdr.req_raw ) + ctrl->xfer.written;
|
data_loc = ( (char*) &xfer->hdr.req_raw ) + ctrl->xfer.written;
|
||||||
to_write = hdr_size - xfer->written;
|
to_write = hdr_size - xfer->written;
|
||||||
@@ -489,6 +495,7 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|||||||
|
|
||||||
// All bytes written, so now we need to read the NBD reply back.
|
// All bytes written, so now we need to read the NBD reply back.
|
||||||
if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) {
|
if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) {
|
||||||
|
sock_set_tcp_cork( ctrl->mirror->client, 0 ) ;
|
||||||
ev_io_start( loop, &ctrl->read_watcher );
|
ev_io_start( loop, &ctrl->read_watcher );
|
||||||
ev_io_stop( loop, &ctrl->write_watcher );
|
ev_io_stop( loop, &ctrl->write_watcher );
|
||||||
}
|
}
|
||||||
@@ -636,7 +643,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)), int revents )
|
static void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)), int revents )
|
||||||
{
|
{
|
||||||
if ( !(revents & EV_TIMER ) ) {
|
if ( !(revents & EV_TIMER ) ) {
|
||||||
warn( "Mirror timeout called but no timer event signalled" );
|
warn( "Mirror timeout called but no timer event signalled" );
|
||||||
@@ -648,7 +655,7 @@ void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
|
static void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
|
||||||
{
|
{
|
||||||
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
||||||
NULLCHECK( ctrl );
|
NULLCHECK( ctrl );
|
||||||
@@ -666,7 +673,7 @@ void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
static void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
||||||
{
|
{
|
||||||
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
||||||
NULLCHECK( ctrl );
|
NULLCHECK( ctrl );
|
||||||
@@ -694,7 +701,7 @@ void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
|||||||
* if it has, start migrating. If it's not finished, then enabling the bitset
|
* if it has, start migrating. If it's not finished, then enabling the bitset
|
||||||
* stream does not go well for us.
|
* stream does not go well for us.
|
||||||
*/
|
*/
|
||||||
void mirror_begin_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
static void mirror_begin_cb( struct ev_loop *loop, ev_timer *w, int revents )
|
||||||
{
|
{
|
||||||
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
|
||||||
NULLCHECK( ctrl );
|
NULLCHECK( ctrl );
|
||||||
@@ -750,7 +757,8 @@ void mirror_run( struct server *serve )
|
|||||||
|
|
||||||
ctrl.ev_loop = EV_DEFAULT;
|
ctrl.ev_loop = EV_DEFAULT;
|
||||||
|
|
||||||
/* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */
|
/* gcc warns with -Wstrict-aliasing on -O2. clang doesn't
|
||||||
|
* implement this warning. Seems to be the fault of ev.h */
|
||||||
ev_init( &ctrl.begin_watcher, mirror_begin_cb );
|
ev_init( &ctrl.begin_watcher, mirror_begin_cb );
|
||||||
ctrl.begin_watcher.repeat = 1.0; // We check bps every second. seems sane.
|
ctrl.begin_watcher.repeat = 1.0; // We check bps every second. seems sane.
|
||||||
ctrl.begin_watcher.data = (void*) &ctrl;
|
ctrl.begin_watcher.data = (void*) &ctrl;
|
@@ -220,7 +220,6 @@ void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char
|
|||||||
case 'h':
|
case 'h':
|
||||||
fprintf(stdout, "%s\n", serve_help_text );
|
fprintf(stdout, "%s\n", serve_help_text );
|
||||||
exit( 0 );
|
exit( 0 );
|
||||||
break;
|
|
||||||
case 'l':
|
case 'l':
|
||||||
*ip_addr = optarg;
|
*ip_addr = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -263,7 +262,6 @@ void read_listen_param( int c,
|
|||||||
case 'h':
|
case 'h':
|
||||||
fprintf(stdout, "%s\n", listen_help_text );
|
fprintf(stdout, "%s\n", listen_help_text );
|
||||||
exit(0);
|
exit(0);
|
||||||
break;
|
|
||||||
case 'l':
|
case 'l':
|
||||||
*ip_addr = optarg;
|
*ip_addr = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -297,7 +295,6 @@ void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **bind_ad
|
|||||||
case 'h':
|
case 'h':
|
||||||
fprintf(stdout, "%s\n", err_text );
|
fprintf(stdout, "%s\n", err_text );
|
||||||
exit( 0 );
|
exit( 0 );
|
||||||
break;
|
|
||||||
case 'l':
|
case 'l':
|
||||||
*ip_addr = optarg;
|
*ip_addr = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -331,7 +328,6 @@ void read_sock_param( int c, char **sock, char *help_text )
|
|||||||
case 'h':
|
case 'h':
|
||||||
fprintf( stdout, "%s\n", help_text );
|
fprintf( stdout, "%s\n", help_text );
|
||||||
exit( 0 );
|
exit( 0 );
|
||||||
break;
|
|
||||||
case 's':
|
case 's':
|
||||||
*sock = optarg;
|
*sock = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -362,7 +358,6 @@ void read_mirror_speed_param(
|
|||||||
case 'h':
|
case 'h':
|
||||||
fprintf( stdout, "%s\n", mirror_speed_help_text );
|
fprintf( stdout, "%s\n", mirror_speed_help_text );
|
||||||
exit( 0 );
|
exit( 0 );
|
||||||
break;
|
|
||||||
case 's':
|
case 's':
|
||||||
*sock = optarg;
|
*sock = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -394,7 +389,6 @@ void read_mirror_param(
|
|||||||
case 'h':
|
case 'h':
|
||||||
fprintf( stdout, "%s\n", mirror_help_text );
|
fprintf( stdout, "%s\n", mirror_help_text );
|
||||||
exit( 0 );
|
exit( 0 );
|
||||||
break;
|
|
||||||
case 's':
|
case 's':
|
||||||
*sock = optarg;
|
*sock = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -428,7 +422,6 @@ void read_break_param( int c, char **sock )
|
|||||||
case 'h':
|
case 'h':
|
||||||
fprintf( stdout, "%s\n", break_help_text );
|
fprintf( stdout, "%s\n", break_help_text );
|
||||||
exit( 0 );
|
exit( 0 );
|
||||||
break;
|
|
||||||
case 's':
|
case 's':
|
||||||
*sock = optarg;
|
*sock = optarg;
|
||||||
break;
|
break;
|
||||||
@@ -580,7 +573,10 @@ void params_readwrite(
|
|||||||
|
|
||||||
parse_port( s_port, &out->connect_to.v4 );
|
parse_port( s_port, &out->connect_to.v4 );
|
||||||
|
|
||||||
out->from = atol(s_from);
|
long signed_from = atol(s_from);
|
||||||
|
FATAL_IF_NEGATIVE( signed_from,
|
||||||
|
"Can't read from a negative offset %d.", signed_from);
|
||||||
|
out->from = signed_from;
|
||||||
|
|
||||||
if (write_not_read) {
|
if (write_not_read) {
|
||||||
if (s_length_or_filename[0]-48 < 10) {
|
if (s_length_or_filename[0]-48 < 10) {
|
||||||
@@ -592,9 +588,10 @@ void params_readwrite(
|
|||||||
s_length_or_filename, O_RDONLY);
|
s_length_or_filename, O_RDONLY);
|
||||||
FATAL_IF_NEGATIVE(out->data_fd,
|
FATAL_IF_NEGATIVE(out->data_fd,
|
||||||
"Couldn't open %s", s_length_or_filename);
|
"Couldn't open %s", s_length_or_filename);
|
||||||
out->len = lseek64(out->data_fd, 0, SEEK_END);
|
off64_t signed_len = lseek64(out->data_fd, 0, SEEK_END);
|
||||||
FATAL_IF_NEGATIVE(out->len,
|
FATAL_IF_NEGATIVE(signed_len,
|
||||||
"Couldn't find length of %s", s_length_or_filename);
|
"Couldn't find length of %s", s_length_or_filename);
|
||||||
|
out->len = signed_len;
|
||||||
FATAL_IF_NEGATIVE(
|
FATAL_IF_NEGATIVE(
|
||||||
lseek64(out->data_fd, 0, SEEK_SET),
|
lseek64(out->data_fd, 0, SEEK_SET),
|
||||||
"Couldn't rewind %s", s_length_or_filename
|
"Couldn't rewind %s", s_length_or_filename
|
@@ -233,7 +233,6 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
|
|||||||
|
|
||||||
int was_closed = 0;
|
int was_closed = 0;
|
||||||
void * status=NULL;
|
void * status=NULL;
|
||||||
int join_errno;
|
|
||||||
|
|
||||||
if (entry->thread != 0) {
|
if (entry->thread != 0) {
|
||||||
char s_client_address[128];
|
char s_client_address[128];
|
||||||
@@ -241,7 +240,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
|
|||||||
sockaddr_address_string( &entry->address.generic, &s_client_address[0], 128 );
|
sockaddr_address_string( &entry->address.generic, &s_client_address[0], 128 );
|
||||||
|
|
||||||
debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread );
|
debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread );
|
||||||
join_errno = joinfunc(entry->thread, &status);
|
int join_errno = joinfunc(entry->thread, &status);
|
||||||
|
|
||||||
/* join_errno can legitimately be ESRCH if the thread is
|
/* join_errno can legitimately be ESRCH if the thread is
|
||||||
* already dead, but the client still needs tidying up. */
|
* already dead, but the client still needs tidying up. */
|
||||||
@@ -598,7 +597,6 @@ int server_accept( struct server * params )
|
|||||||
{
|
{
|
||||||
NULLCHECK( params );
|
NULLCHECK( params );
|
||||||
debug("accept loop starting");
|
debug("accept loop starting");
|
||||||
int client_fd;
|
|
||||||
union mysockaddr client_address;
|
union mysockaddr client_address;
|
||||||
fd_set fds;
|
fd_set fds;
|
||||||
socklen_t socklen=sizeof(client_address);
|
socklen_t socklen=sizeof(client_address);
|
||||||
@@ -638,7 +636,7 @@ int server_accept( struct server * params )
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ( FD_ISSET( params->server_fd, &fds ) ){
|
if ( FD_ISSET( params->server_fd, &fds ) ){
|
||||||
client_fd = accept( params->server_fd, &client_address.generic, &socklen );
|
int client_fd = accept( params->server_fd, &client_address.generic, &socklen );
|
||||||
|
|
||||||
if ( params->allow_new_clients ) {
|
if ( params->allow_new_clients ) {
|
||||||
debug("Accepted nbd client socket fd %d", client_fd);
|
debug("Accepted nbd client socket fd %d", client_fd);
|
||||||
@@ -741,11 +739,11 @@ void server_join_clients( struct server * serve ) {
|
|||||||
|
|
||||||
for (i=0; i < serve->max_nbd_clients; i++) {
|
for (i=0; i < serve->max_nbd_clients; i++) {
|
||||||
pthread_t thread_id = serve->nbd_client[i].thread;
|
pthread_t thread_id = serve->nbd_client[i].thread;
|
||||||
int err = 0;
|
|
||||||
|
|
||||||
if (thread_id != 0) {
|
if (thread_id != 0) {
|
||||||
debug( "joining thread %p", thread_id );
|
debug( "joining thread %p", thread_id );
|
||||||
if ( 0 == (err = pthread_join( thread_id, &status ) ) ) {
|
int err = pthread_join( thread_id, &status );
|
||||||
|
if ( 0 == err ) {
|
||||||
serve->nbd_client[i].thread = 0;
|
serve->nbd_client[i].thread = 0;
|
||||||
} else {
|
} else {
|
||||||
warn( "Error %s (%i) joining thread %p", strerror( err ), err, thread_id );
|
warn( "Error %s (%i) joining thread %p", strerror( err ), err, thread_id );
|
@@ -154,8 +154,10 @@ int do_serve( struct server *, struct self_pipe * );
|
|||||||
struct mode_readwrite_params {
|
struct mode_readwrite_params {
|
||||||
union mysockaddr connect_to;
|
union mysockaddr connect_to;
|
||||||
union mysockaddr connect_from;
|
union mysockaddr connect_from;
|
||||||
off64_t from;
|
|
||||||
off64_t len;
|
uint64_t from;
|
||||||
|
uint32_t len;
|
||||||
|
|
||||||
int data_fd;
|
int data_fd;
|
||||||
int client;
|
int client;
|
||||||
};
|
};
|
@@ -21,6 +21,11 @@ class Environment
|
|||||||
@fake_pid = nil
|
@fake_pid = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def prefetch_proxy!
|
||||||
|
@nbd1.prefetch_proxy = true
|
||||||
|
@nbd2.prefetch_proxy = true
|
||||||
|
end
|
||||||
|
|
||||||
def proxy1(port=@port2)
|
def proxy1(port=@port2)
|
||||||
@nbd1.proxy(@ip, port)
|
@nbd1.proxy(@ip, port)
|
||||||
end
|
end
|
||||||
|
@@ -198,6 +198,8 @@ module FlexNBD
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
attr_accessor :prefetch_proxy
|
||||||
|
|
||||||
def initialize( bin, ip, port )
|
def initialize( bin, ip, port )
|
||||||
@bin = bin
|
@bin = bin
|
||||||
@do_debug = ENV['DEBUG']
|
@do_debug = ENV['DEBUG']
|
||||||
@@ -208,6 +210,7 @@ module FlexNBD
|
|||||||
@ip = ip
|
@ip = ip
|
||||||
@port = port
|
@port = port
|
||||||
@kill = []
|
@kill = []
|
||||||
|
@prefetch_proxy = false
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
@@ -247,6 +250,7 @@ module FlexNBD
|
|||||||
"--port #{port} "\
|
"--port #{port} "\
|
||||||
"--conn-addr #{connect_ip} "\
|
"--conn-addr #{connect_ip} "\
|
||||||
"--conn-port #{connect_port} "\
|
"--conn-port #{connect_port} "\
|
||||||
|
"#{prefetch_proxy ? "--cache " : ""}"\
|
||||||
"#{@debug}"
|
"#{@debug}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@@ -138,7 +138,7 @@ module FlexNBD
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
def accept( err_msg = "Timed out waiting for a connection", timeout = 2)
|
def accept( err_msg = "Timed out waiting for a connection", timeout = 5)
|
||||||
client_sock = nil
|
client_sock = nil
|
||||||
|
|
||||||
begin
|
begin
|
||||||
|
190
tests/acceptance/proxy_tests.rb
Normal file
190
tests/acceptance/proxy_tests.rb
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
# encoding: utf-8
|
||||||
|
require 'flexnbd/fake_source'
|
||||||
|
require 'flexnbd/fake_dest'
|
||||||
|
|
||||||
|
module ProxyTests
|
||||||
|
def with_proxied_client( override_size = nil )
|
||||||
|
@env.serve1 unless @server_up
|
||||||
|
@env.proxy2 unless @proxy_up
|
||||||
|
@env.nbd2.can_die(0)
|
||||||
|
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
|
||||||
|
begin
|
||||||
|
|
||||||
|
result = client.read_hello
|
||||||
|
assert_equal "NBDMAGIC", result[:magic]
|
||||||
|
assert_equal override_size || @env.file1.size, result[:size]
|
||||||
|
|
||||||
|
yield client
|
||||||
|
ensure
|
||||||
|
client.close rescue nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
|
||||||
|
assert_raises(RuntimeError) { @env.proxy1 }
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_read_requests_successfully_proxied
|
||||||
|
with_proxied_client do |client|
|
||||||
|
(0..3).each do |n|
|
||||||
|
offset = n * 4096
|
||||||
|
client.write_read_request(offset, 4096, "myhandle")
|
||||||
|
rsp = client.read_response
|
||||||
|
|
||||||
|
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
|
||||||
|
assert_equal "myhandle", rsp[:handle]
|
||||||
|
assert_equal 0, rsp[:error]
|
||||||
|
|
||||||
|
orig_data = @env.file1.read(offset, 4096)
|
||||||
|
data = client.read_raw(4096)
|
||||||
|
|
||||||
|
assert_equal 4096, orig_data.size
|
||||||
|
assert_equal 4096, data.size
|
||||||
|
|
||||||
|
assert_equal( orig_data, data,
|
||||||
|
"Returned data does not match on request #{n+1}" )
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_write_requests_successfully_proxied
|
||||||
|
with_proxied_client do |client|
|
||||||
|
(0..3).each do |n|
|
||||||
|
offset = n * 4096
|
||||||
|
client.write(offset, "\xFF" * 4096)
|
||||||
|
rsp = client.read_response
|
||||||
|
|
||||||
|
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
|
||||||
|
assert_equal "myhandle", rsp[:handle]
|
||||||
|
assert_equal 0, rsp[:error]
|
||||||
|
|
||||||
|
data = @env.file1.read(offset, 4096)
|
||||||
|
|
||||||
|
assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" )
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def make_fake_server
|
||||||
|
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
|
||||||
|
@server_up = true
|
||||||
|
|
||||||
|
# We return a thread here because accept() and connect() both block for us
|
||||||
|
Thread.new do
|
||||||
|
sc = server.accept # just tell the supervisor we're up
|
||||||
|
sc.write_hello
|
||||||
|
|
||||||
|
[ server, sc ]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_read_request_retried_when_upstream_dies_partway
|
||||||
|
maker = make_fake_server
|
||||||
|
|
||||||
|
with_proxied_client(4096) do |client|
|
||||||
|
server, sc1 = maker.value
|
||||||
|
|
||||||
|
# Send the read request to the proxy
|
||||||
|
client.write_read_request( 0, 4096 )
|
||||||
|
|
||||||
|
# ensure we're given the read request
|
||||||
|
req1 = sc1.read_request
|
||||||
|
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
||||||
|
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
|
||||||
|
assert_equal 0, req1[:from]
|
||||||
|
assert_not_equal 0, req1[:len]
|
||||||
|
|
||||||
|
# Kill the server again, now we're sure the read request has been sent once
|
||||||
|
sc1.close
|
||||||
|
|
||||||
|
# We expect the proxy to reconnect without our client doing anything.
|
||||||
|
sc2 = server.accept
|
||||||
|
sc2.write_hello
|
||||||
|
|
||||||
|
# And once reconnected, it should resend an identical request.
|
||||||
|
req2 = sc2.read_request
|
||||||
|
assert_equal req1, req2
|
||||||
|
|
||||||
|
# The reply should be proxied back to the client.
|
||||||
|
sc2.write_reply( req2[:handle] )
|
||||||
|
sc2.write_data( "\xFF" * 4096 )
|
||||||
|
|
||||||
|
# Check it to make sure it's correct
|
||||||
|
rsp = timeout(15) { client.read_response }
|
||||||
|
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
|
||||||
|
assert_equal 0, rsp[:error]
|
||||||
|
assert_equal req1[:handle], rsp[:handle]
|
||||||
|
|
||||||
|
data = client.read_raw( 4096 )
|
||||||
|
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
|
||||||
|
|
||||||
|
sc2.close
|
||||||
|
server.close
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_write_request_retried_when_upstream_dies_partway
|
||||||
|
maker = make_fake_server
|
||||||
|
|
||||||
|
with_proxied_client(4096) do |client|
|
||||||
|
server, sc1 = maker.value
|
||||||
|
|
||||||
|
# Send the read request to the proxy
|
||||||
|
client.write( 0, ( "\xFF" * 4096 ) )
|
||||||
|
|
||||||
|
# ensure we're given the read request
|
||||||
|
req1 = sc1.read_request
|
||||||
|
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
||||||
|
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
||||||
|
assert_equal 0, req1[:from]
|
||||||
|
assert_equal 4096, req1[:len]
|
||||||
|
data1 = sc1.read_data( 4096 )
|
||||||
|
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
|
||||||
|
|
||||||
|
# Kill the server again, now we're sure the read request has been sent once
|
||||||
|
sc1.close
|
||||||
|
|
||||||
|
# We expect the proxy to reconnect without our client doing anything.
|
||||||
|
sc2 = server.accept
|
||||||
|
sc2.write_hello
|
||||||
|
|
||||||
|
# And once reconnected, it should resend an identical request.
|
||||||
|
req2 = sc2.read_request
|
||||||
|
assert_equal req1, req2
|
||||||
|
data2 = sc2.read_data( 4096 )
|
||||||
|
assert_equal data1, data2
|
||||||
|
|
||||||
|
# The reply should be proxied back to the client.
|
||||||
|
sc2.write_reply( req2[:handle] )
|
||||||
|
|
||||||
|
# Check it to make sure it's correct
|
||||||
|
rsp = timeout(15) { client.read_response }
|
||||||
|
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
|
||||||
|
assert_equal 0, rsp[:error]
|
||||||
|
assert_equal req1[:handle], rsp[:handle]
|
||||||
|
|
||||||
|
sc2.close
|
||||||
|
server.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_only_one_client_can_connect_to_proxy_at_a_time
|
||||||
|
with_proxied_client do |client|
|
||||||
|
|
||||||
|
c2 = nil
|
||||||
|
assert_raises(Timeout::Error) do
|
||||||
|
timeout(1) do
|
||||||
|
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
|
||||||
|
c2.read_hello
|
||||||
|
end
|
||||||
|
end
|
||||||
|
c2.close rescue nil if c2
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
|
22
tests/acceptance/test_prefetch_proxy_mode.rb
Normal file
22
tests/acceptance/test_prefetch_proxy_mode.rb
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
require 'test/unit'
|
||||||
|
require 'environment'
|
||||||
|
require 'proxy_tests'
|
||||||
|
|
||||||
|
|
||||||
|
class TestPrefetchProxyMode < Test::Unit::TestCase
|
||||||
|
include ProxyTests
|
||||||
|
|
||||||
|
def setup
|
||||||
|
super
|
||||||
|
@env = Environment.new
|
||||||
|
@env.prefetch_proxy!
|
||||||
|
@env.writefile1( "f" * 16 )
|
||||||
|
end
|
||||||
|
|
||||||
|
def teardown
|
||||||
|
@env.cleanup
|
||||||
|
super
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
|
@@ -1,200 +1,20 @@
|
|||||||
require 'test/unit'
|
require 'test/unit'
|
||||||
require 'environment'
|
require 'environment'
|
||||||
require 'flexnbd/fake_source'
|
require 'proxy_tests'
|
||||||
require 'flexnbd/fake_dest'
|
|
||||||
|
|
||||||
class TestProxyMode < Test::Unit::TestCase
|
class TestProxyMode < Test::Unit::TestCase
|
||||||
|
include ProxyTests
|
||||||
|
|
||||||
def setup
|
def setup
|
||||||
super
|
super
|
||||||
@env = Environment.new
|
@env = Environment.new
|
||||||
@env.writefile1( "0" * 16 )
|
@env.writefile1( "f" * 16 )
|
||||||
end
|
end
|
||||||
|
|
||||||
def teardown
|
def teardown
|
||||||
@env.cleanup
|
@env.cleanup
|
||||||
super
|
super
|
||||||
end
|
end
|
||||||
|
|
||||||
def with_proxied_client( override_size = nil )
|
|
||||||
@env.serve1 unless @server_up
|
|
||||||
@env.proxy2 unless @proxy_up
|
|
||||||
@env.nbd2.can_die(0)
|
|
||||||
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
|
|
||||||
begin
|
|
||||||
|
|
||||||
result = client.read_hello
|
|
||||||
assert_equal "NBDMAGIC", result[:magic]
|
|
||||||
assert_equal override_size || @env.file1.size, result[:size]
|
|
||||||
|
|
||||||
yield client
|
|
||||||
ensure
|
|
||||||
client.close rescue nil
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
|
|
||||||
assert_raises(RuntimeError) { @env.proxy1 }
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_read_requests_successfully_proxied
|
|
||||||
with_proxied_client do |client|
|
|
||||||
(0..3).each do |n|
|
|
||||||
offset = n * 4096
|
|
||||||
client.write_read_request(offset, 4096, "myhandle")
|
|
||||||
rsp = client.read_response
|
|
||||||
|
|
||||||
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
|
|
||||||
assert_equal "myhandle", rsp[:handle]
|
|
||||||
assert_equal 0, rsp[:error]
|
|
||||||
|
|
||||||
orig_data = @env.file1.read(offset, 4096)
|
|
||||||
data = client.read_raw(4096)
|
|
||||||
|
|
||||||
assert_equal 4096, orig_data.size
|
|
||||||
assert_equal 4096, data.size
|
|
||||||
|
|
||||||
assert_equal( orig_data, data, "Returned data does not match" )
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_write_requests_successfully_proxied
|
|
||||||
with_proxied_client do |client|
|
|
||||||
(0..3).each do |n|
|
|
||||||
offset = n * 4096
|
|
||||||
client.write(offset, "\xFF" * 4096)
|
|
||||||
rsp = client.read_response
|
|
||||||
|
|
||||||
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
|
|
||||||
assert_equal "myhandle", rsp[:handle]
|
|
||||||
assert_equal 0, rsp[:error]
|
|
||||||
|
|
||||||
data = @env.file1.read(offset, 4096)
|
|
||||||
|
|
||||||
assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" )
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def make_fake_server
|
|
||||||
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
|
|
||||||
@server_up = true
|
|
||||||
|
|
||||||
# We return a thread here because accept() and connect() both block for us
|
|
||||||
Thread.new do
|
|
||||||
sc = server.accept # just tell the supervisor we're up
|
|
||||||
sc.write_hello
|
|
||||||
|
|
||||||
[ server, sc ]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_read_request_retried_when_upstream_dies_partway
|
|
||||||
maker = make_fake_server
|
|
||||||
|
|
||||||
with_proxied_client(4096) do |client|
|
|
||||||
server, sc1 = maker.value
|
|
||||||
|
|
||||||
# Send the read request to the proxy
|
|
||||||
client.write_read_request( 0, 4096 )
|
|
||||||
|
|
||||||
# ensure we're given the read request
|
|
||||||
req1 = sc1.read_request
|
|
||||||
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
|
||||||
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
|
|
||||||
assert_equal 0, req1[:from]
|
|
||||||
assert_not_equal 0, req1[:len]
|
|
||||||
|
|
||||||
# Kill the server again, now we're sure the read request has been sent once
|
|
||||||
sc1.close
|
|
||||||
|
|
||||||
# We expect the proxy to reconnect without our client doing anything.
|
|
||||||
sc2 = server.accept
|
|
||||||
sc2.write_hello
|
|
||||||
|
|
||||||
# And once reconnected, it should resend an identical request.
|
|
||||||
req2 = sc2.read_request
|
|
||||||
assert_equal req1, req2
|
|
||||||
|
|
||||||
# The reply should be proxied back to the client.
|
|
||||||
sc2.write_reply( req2[:handle] )
|
|
||||||
sc2.write_data( "\xFF" * 4096 )
|
|
||||||
|
|
||||||
# Check it to make sure it's correct
|
|
||||||
rsp = timeout(15) { client.read_response }
|
|
||||||
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
|
|
||||||
assert_equal 0, rsp[:error]
|
|
||||||
assert_equal req1[:handle], rsp[:handle]
|
|
||||||
|
|
||||||
data = client.read_raw( 4096 )
|
|
||||||
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
|
|
||||||
|
|
||||||
sc2.close
|
|
||||||
server.close
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_write_request_retried_when_upstream_dies_partway
|
|
||||||
maker = make_fake_server
|
|
||||||
|
|
||||||
with_proxied_client(4096) do |client|
|
|
||||||
server, sc1 = maker.value
|
|
||||||
|
|
||||||
# Send the read request to the proxy
|
|
||||||
client.write( 0, ( "\xFF" * 4096 ) )
|
|
||||||
|
|
||||||
# ensure we're given the read request
|
|
||||||
req1 = sc1.read_request
|
|
||||||
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
|
|
||||||
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
|
|
||||||
assert_equal 0, req1[:from]
|
|
||||||
assert_equal 4096, req1[:len]
|
|
||||||
data1 = sc1.read_data( 4096 )
|
|
||||||
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
|
|
||||||
|
|
||||||
# Kill the server again, now we're sure the read request has been sent once
|
|
||||||
sc1.close
|
|
||||||
|
|
||||||
# We expect the proxy to reconnect without our client doing anything.
|
|
||||||
sc2 = server.accept
|
|
||||||
sc2.write_hello
|
|
||||||
|
|
||||||
# And once reconnected, it should resend an identical request.
|
|
||||||
req2 = sc2.read_request
|
|
||||||
assert_equal req1, req2
|
|
||||||
data2 = sc2.read_data( 4096 )
|
|
||||||
assert_equal data1, data2
|
|
||||||
|
|
||||||
# The reply should be proxied back to the client.
|
|
||||||
sc2.write_reply( req2[:handle] )
|
|
||||||
|
|
||||||
# Check it to make sure it's correct
|
|
||||||
rsp = timeout(15) { client.read_response }
|
|
||||||
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
|
|
||||||
assert_equal 0, rsp[:error]
|
|
||||||
assert_equal req1[:handle], rsp[:handle]
|
|
||||||
|
|
||||||
sc2.close
|
|
||||||
server.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_only_one_client_can_connect_to_proxy_at_a_time
|
|
||||||
with_proxied_client do |client|
|
|
||||||
|
|
||||||
c2 = nil
|
|
||||||
assert_raises(Timeout::Error) do
|
|
||||||
timeout(1) do
|
|
||||||
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
|
|
||||||
c2.read_hello
|
|
||||||
end
|
|
||||||
end
|
|
||||||
c2.close rescue nil if c2
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@@ -10,7 +10,7 @@
|
|||||||
START_TEST(test_bit_set)
|
START_TEST(test_bit_set)
|
||||||
{
|
{
|
||||||
uint64_t num = 0;
|
uint64_t num = 0;
|
||||||
char *bits = (char*) #
|
bitfield_p bits = (bitfield_p) #
|
||||||
|
|
||||||
#define TEST_BIT_SET(bit, newvalue) \
|
#define TEST_BIT_SET(bit, newvalue) \
|
||||||
bit_set(bits, (bit)); \
|
bit_set(bits, (bit)); \
|
||||||
@@ -27,7 +27,7 @@ END_TEST
|
|||||||
START_TEST(test_bit_clear)
|
START_TEST(test_bit_clear)
|
||||||
{
|
{
|
||||||
uint64_t num = 0xffffffffffffffff;
|
uint64_t num = 0xffffffffffffffff;
|
||||||
char *bits = (char*) #
|
bitfield_p bits = (bitfield_p) #
|
||||||
|
|
||||||
#define TEST_BIT_CLEAR(bit, newvalue) \
|
#define TEST_BIT_CLEAR(bit, newvalue) \
|
||||||
bit_clear(bits, (bit)); \
|
bit_clear(bits, (bit)); \
|
||||||
@@ -44,7 +44,7 @@ END_TEST
|
|||||||
START_TEST(test_bit_tests)
|
START_TEST(test_bit_tests)
|
||||||
{
|
{
|
||||||
uint64_t num = 0x5555555555555555;
|
uint64_t num = 0x5555555555555555;
|
||||||
char *bits = (char*) #
|
bitfield_p bits = (bitfield_p) #
|
||||||
|
|
||||||
fail_unless(bit_has_value(bits, 0, 1), "bit_has_value malfunction");
|
fail_unless(bit_has_value(bits, 0, 1), "bit_has_value malfunction");
|
||||||
fail_unless(bit_has_value(bits, 1, 0), "bit_has_value malfunction");
|
fail_unless(bit_has_value(bits, 1, 0), "bit_has_value malfunction");
|
||||||
@@ -58,7 +58,7 @@ END_TEST
|
|||||||
|
|
||||||
START_TEST(test_bit_ranges)
|
START_TEST(test_bit_ranges)
|
||||||
{
|
{
|
||||||
char buffer[4160];
|
bitfield_word_t buffer[BIT_WORDS_FOR_SIZE(4160)];
|
||||||
uint64_t *longs = (unsigned long*) buffer;
|
uint64_t *longs = (unsigned long*) buffer;
|
||||||
uint64_t i;
|
uint64_t i;
|
||||||
|
|
||||||
@@ -84,7 +84,7 @@ END_TEST
|
|||||||
|
|
||||||
START_TEST(test_bit_runs)
|
START_TEST(test_bit_runs)
|
||||||
{
|
{
|
||||||
char buffer[256];
|
bitfield_word_t buffer[BIT_WORDS_FOR_SIZE(256)];
|
||||||
int i, ptr=0, runs[] = {
|
int i, ptr=0, runs[] = {
|
||||||
56,97,22,12,83,1,45,80,85,51,64,40,63,67,75,64,94,81,79,62
|
56,97,22,12,83,1,45,80,85,51,64,40,63,67,75,64,94,81,79,62
|
||||||
};
|
};
|
||||||
|
@@ -93,7 +93,7 @@ END_TEST
|
|||||||
|
|
||||||
int connect_client( char *addr, int actual_port, char *source_addr )
|
int connect_client( char *addr, int actual_port, char *source_addr )
|
||||||
{
|
{
|
||||||
int client_fd;
|
int client_fd = -1;
|
||||||
|
|
||||||
struct addrinfo hint;
|
struct addrinfo hint;
|
||||||
struct addrinfo *ailist, *aip;
|
struct addrinfo *ailist, *aip;
|
||||||
|
Reference in New Issue
Block a user