42 Commits
0.1.1 ... 0.1.3

Author SHA1 Message Date
James Carter
218c55fb63 Merge branch 'simplify-nbd-handles-part-deux' into 'master'
Simplified NBD handle comparisons

8 bytes, therefore a uing64_t to compare to, no need for memcmp()

Signed-off-by: Michel Pollet <buserror@gmail.com>

See merge request !5
2016-10-04 15:49:07 +01:00
Michel Pollet
956a602475 Simplified NBD handle comparisons
8 bytes, therefore a uing64_t to compare to, no need for memcmp()

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-10-04 15:41:48 +01:00
James Carter
26a0a82f9d Merge branch '12-fix-bind' into 'master'
Attempt at fixing bind() bug

This will prevent the bind() wrapper to loop forever in some cases. I
could nor reproduce the issue, but this removes the only infinite loop I
could find.

Closes #12

See merge request !3
2016-10-04 15:41:37 +01:00
Michel Pollet
76e0476113 Attempt at fixing bind() bug
This will prevent the bind() wrapper to loop forever in some cases. I
could nor reproduc the issue, but this removes the only infinite loop I
could find.

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-10-04 15:36:46 +01:00
James Carter
e3360a3a1b Merge branch 'cherry-pick-41f25408' into 'master'
Close socket fix, might relate to migration crashing

This was listed as a bug, and was immediatelly picked the static
analyzer anyway, this is very likely the cause for the
migration-cancel-crash bug.

closes #10 and possibly closes #11

See merge request !1
2016-09-14 11:29:12 +01:00
Michel Pollet
1fefe1a669 Close socket fix, might relate to migration crashing
This was listed as a bug, and was immediatelly picked the static
analyzer anyway, this is very likely the cause for the
migration-cancel-crash bug.

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-09-14 10:45:49 +01:00
Patrick J Cherry
ba14943b60 Removed old changelog.template 2016-08-30 21:49:54 +01:00
Patrick J Cherry
4a709e73f8 Moved .hgignore to .gitignore 2016-08-30 21:47:25 +01:00
Patrick J Cherry
91a8946ddc Removed debian directory 2016-08-30 21:46:59 +01:00
nick
20f99b4554 flexnbd: We only require 1/8th of the memory we allocate for bitsets (bits vs. bytes confusion) 2015-05-13 09:25:09 +01:00
nick
c363991cfd Makefile: Add -lm to LLDFLAGS 2015-04-01 12:39:07 +01:00
Alex Young
c41eeff2fc Moved the server-specific files into src/server 2014-03-11 11:05:43 +00:00
Alex Young
5960e4d10b Remove the proxy's dependency on flexnbd.h 2014-03-11 10:37:00 +00:00
Alex Young
f0911b5c6c Tighten up some variable scopes. 2014-03-11 10:24:29 +00:00
Alex Young
b063f41ba8 Avoid a potential null pointer dereference 2014-03-11 09:57:19 +00:00
Alex Young
28c7e43e45 Fix a harmless buffer overflow 2014-03-11 09:49:25 +00:00
Alex Young
9326b6b882 Merge 2014-02-27 16:18:17 +00:00
Alex Young
f93476ebd3 Replace off64_t with uint64_t where it makes sense to do so.
It looks like off64_t was propagated through the code from the return
type of lseek64(), which isn't appropriate in many of the places we're
using it.
2014-02-27 16:04:25 +00:00
Alex Young
666b60ae1c Allow subset reads in prefetch_contains and prefetch_offset 2014-02-27 14:54:18 +00:00
nick
f48bf2b296 Automated merge with ssh://dev/flexnbd-c 2014-02-27 14:33:01 +00:00
nick
705164ae3b Cork/uncork in mirror - socket_connect already sets nodelay 2014-02-27 14:32:54 +00:00
nick
dbe7053bf3 Avoid some false positives 2014-02-27 14:32:26 +00:00
Alex Young
fa8023cf69 Proxy prefetch cache becomes a command-line argument. 2014-02-27 14:21:36 +00:00
nick
aba802d415 bitset: Allocate the right amount of memory
We were calculating the wrong number of words per byte in the first
place, and then passing the number of *words* to malloc, which expects
the number of *bytes*.

Fix both errors
2014-02-27 12:57:09 +00:00
Alex Young
d146102c2c Cherry-pick extra toolchain Makefile options 2014-02-26 15:56:41 +00:00
Alex Young
5551373073 Merge 2014-02-26 15:37:44 +00:00
Alex Young
77f333423b Apply Michel's tidy-ups 2014-02-26 15:19:03 +00:00
Alex Young
ffa45879d7 Pull back the changelog generation to the simplest thing that can possibly work 2014-02-25 17:24:25 +00:00
Alex Young
2fa1ce8e6b Tweak changelog generation not to skip commits since last tag 2014-02-25 16:35:51 +00:00
nick
6f540ce238 proxy: Turn on TCP_CORK
Now that we're using NODELAY, we should definitely use cork around
writes to the upstream server. This prevents each partial write()
from being its own packet, which would be terrible if it actually
happened with any regularity (we'd mostly see it when the kernel
is stressed, and write() is progressing a few bytes at a time as
a result)
2014-02-25 16:00:48 +00:00
nick
f9a3447bc9 proxy: Turn on TCP_NODELAY for the proxy->upstream leg
Nagle doesn't actually affect us too badly here, as we don't write
the header and then the data in two separate calls under normal
circumstances, which is the pathological case, but we should have
NODELAY on, regardless
2014-02-25 15:59:05 +00:00
nick
7806ec11ee client: cork/uncork around NBD_REQUEST_READ responses
We don't cork/uncork around NBD_REQUEST_WRITE responses because
they're only 16 bytes, and we're using blocking writes.
2014-02-25 15:45:41 +00:00
nick
1817c13acb sockutil: Add a tcp_cork helper 2014-02-25 15:44:46 +00:00
nick
97c8d7a358 Remove a compile-time optional selection of O_DIRECT (was never used)
The mmap() manpage tells us to avoid using O_DIRECT with mmap() - so
do so.
2014-02-24 13:47:29 +00:00
Alex Young
8cf92af900 Call srand() to make sure request handles are properly randomised 2014-02-24 12:20:50 +00:00
Alex Young
5185be39c9 Merge 2014-02-24 11:25:46 +00:00
Alex Young
374b4c616e Remove unreachable code to make -Wunreachable-code on clang useful. 2014-02-24 11:23:09 +00:00
Alex Young
50ec8fb7cc Depend on either libev4 or libev3, whichever is available 2014-02-24 11:22:26 +00:00
Alex Young
5fc9ad6fd8 Add some build-depends which make doc needs 2014-02-21 21:40:55 +00:00
Alex Young
85c463c4bd Add asciidoc as a Build-Depends 2014-02-21 20:46:44 +00:00
Alex Young
278a3151a8 Update Rakefile to generate debian/changelog.
`rake changelog` and a commit should be run after each `hg tag`.
2014-02-21 19:58:02 +00:00
Alex Young
0ea66b1e04 Added tag 0.1.1 for changeset 303f6859295d 2014-02-21 19:54:25 +00:00
59 changed files with 687 additions and 687 deletions

9
.gitignore vendored Normal file
View File

@@ -0,0 +1,9 @@
**/*.o
**/*~
flexnbd
build/
pkg/
**/*.orig
**/.*.swp
cscope.out
valgrind.out

View File

@@ -1,9 +0,0 @@
.o$
~$
^flexnbd$
^build/
^pkg/
\.orig$
.*\.swp$
cscope.out$
valgrind.out$

View File

@@ -11,9 +11,40 @@ ifdef DEBUG
else
CFLAGS_EXTRA=-O2
endif
CFLAGS_EXTRA += -fPIC --std=gnu99
LDFLAGS_EXTRA += -Wl,--relax,--gc-sections
TOOLCHAIN := $(shell $(CC) --version|awk '/Debian/ {print "debian";exit;}')
#
# This bit adds extra flags depending of the distro, and the
# architecture. To make sure debian packages have the right
# set of 'native' flags on them
#
ifeq ($(TOOLCHAIN),debian)
DEBARCH := $(shell dpkg-architecture -qDEB_BUILD_ARCH)
ifeq ($(DEBARCH),$(filter $(DEBARCH),amd64 i386))
CFLAGS_EXTRA += -march=native
endif
ifeq ($(DEBARCH),armhf)
CFLAGS_EXTRA += -march=armv7-a -mtune=cortex-a8 -mfpu=neon
endif
LDFLAGS_EXTRA += -L$(LIB) -Wl,-rpath,${shell readlink -f ${LIB}}
else
LDFLAGS_EXTRA += -L$(LIB) -Wl,-rpath-link,$(LIB)
endif
# The -Wunreachable-code warning is only implemented in clang, but it
# doesn't break anything for gcc to see it.
WARNINGS=-Wall \
-Wextra \
-Werror-implicit-function-declaration \
-Wstrict-prototypes \
-Wno-missing-field-initializers \
-Wunreachable-code
CCFLAGS=-D_GNU_SOURCE=1 $(WARNINGS) $(CFLAGS_EXTRA) $(CFLAGS)
LLDFLAGS=-lm -lrt -lev $(LDFLAGS_EXTRA) $(LDFLAGS)
CCFLAGS=-D_GNU_SOURCE=1 -Wall -Wextra -Werror-implicit-function-declaration -Wstrict-prototypes -Wno-missing-field-initializers $(CFLAGS_EXTRA) $(CFLAGS)
LLDFLAGS=-lrt -lev $(LDFLAGS_EXTRA) $(LDFLAGS)
CC?=gcc
@@ -23,6 +54,7 @@ COMPILE=$(CC) $(INC) -c $(CCFLAGS)
SAVEDEP=$(CC) $(INC) -MM $(CCFLAGS)
LINK=$(CC) $(LLDFLAGS) -Isrc $(LIBS)
LIB=build/
EXISTING_OBJS := $(wildcard build/*.o)
-include $(EXISTING_OBJS:.o=.d)

View File

@@ -28,7 +28,8 @@ USAGE
-----
$ flexnbd-proxy --addr <ADDR> [ --port <PORT> ]
--conn-addr <ADDR> --conn-port <PORT> [--bind <ADDR>] [option]*
--conn-addr <ADDR> --conn-port <PORT>
[--bind <ADDR>] [--cache[=<CACHE_BYTES>]] [option]*
Proxy requests from an NBD client to an NBD server, resiliently. Only one
client can be connected at a time, and ACLs cannot be applied to the client, as they
@@ -73,6 +74,10 @@ Options
*--conn-port, -P PORT*:
The port of the NBD server to connect to. Required.
*--cache, -c=CACHE_BYTES*:
If given, the size in bytes of read cache to use. CACHE_BYTES
defaults to 4096.
*--help, -h* :
Show command or global help.
@@ -154,6 +159,29 @@ The proxy notices and reconnects, fulfiling any request it has in its buffer.
The data in myfile has been moved between physical servers without the nbd
client process having to be disturbed at all.
READ CACHE
----------
If the --cache option is given at the command line, either without an
argument or with an argument greater than 0, flexnbd-proxy will use a
read-ahead cache. The cache as currently implemented doubles each read
request size, up to a maximum of 2xCACHE_BYTES, and retains the latter
half in a buffer. If the next read request from the client exactly
matches the region held in the buffer, flexnbd-proxy responds from the
cache without making a request to the server.
This pattern is designed to match sequential reads, such as those
performed by a booting virtual machine.
Note: If specifying a cache size, you *must* use this form:
nbd-client$ flexnbd-proxy --cache=XXXX
That is, the '=' is required. This is a limitation of getopt-long.
If no cache size is given, a size of 4096 bytes is assumed. Caching can
be explicitly disabled by setting a size of 0.
BUGS
----

View File

@@ -50,3 +50,11 @@ end
desc "Remove all build targets, binaries and temporary files"
maketask :clean
file "debian/changelog" do
FileUtils.mkdir_p "debian"
sh "hg log --style=changelog.template > debian/changelog"
end
desc "Generate the changelog"
task :changelog => "debian/changelog"

198
debian/changelog vendored
View File

@@ -1,198 +0,0 @@
flexnbd (0.0.1-33) unstable; urgency=low
* Added tag 0.0.1 for changeset 27409c2c1313 [r33]
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 17:11:10 +0100
flexnbd (0.0.1-31) unstable; urgency=low
* Fixed bug where ACL was accidentally deleted when being set from control [r31]
-- mbloch <mbloch> Wed, 30 May 2012 13:03:02 +0100
flexnbd (0.0.1-30) unstable; urgency=low
* Fix the usage message [r30]
-- nick <nick@bytemark.co.uk> Wed, 30 May 2012 11:28:32 +0100
flexnbd (0.0.1-29) unstable; urgency=low
* Fixed race in tests. [r29]
-- mbloch <mbloch> Tue, 29 May 2012 17:01:54 +0100
flexnbd (0.0.1-28) unstable; urgency=low
* Added getopt_long command-line handling. [r28]
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 15:19:40 +0100
flexnbd (0.0.1-27) unstable; urgency=low
* Added .h files to the Rakefile [r27]
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 15:06:06 +0100
flexnbd (0.0.1-26) unstable; urgency=low
* Rearranged the project to have src/ and build/ directories [r26]
-- Alex Young <alex@bytemark.co.uk> Wed, 30 May 2012 09:51:20 +0100
flexnbd (0.0.1-25) unstable; urgency=low
* Added .INCOMPLETE hack to aid with marking finished transfers. [r25]
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 29 May 2012 11:24:24 +0100
flexnbd (0.0.1-24) unstable; urgency=low
* Added mirror write barrier / final pass stuff & clean exit afterwards. [r24]
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 29 May 2012 04:03:28 +0100
flexnbd (0.0.1-23) unstable; urgency=low
* Lots of errors spotted by Alex fixed, added mutexes to accept & I/O, added [r23]
-- mbloch <mbloch> Tue, 29 May 2012 00:59:12 +0100
flexnbd (0.0.1-22) unstable; urgency=low
* Added another write/read test, fixed bugs in splice() usage and IPv6 [r22]
-- Matthew Bloch <matthew@bytemark.co.uk> Sun, 27 May 2012 14:40:16 +0100
flexnbd (0.0.1-21) unstable; urgency=low
* First few external tests with test/unit, some minor tidying of internal data [r21]
-- Matthew Bloch <matthew@bytemark.co.uk> Thu, 24 May 2012 01:39:35 +0100
flexnbd (0.0.1-20) unstable; urgency=low
* Pulled some duplicated code out of control.c into [r20]
-- mbloch <mbloch> Wed, 23 May 2012 14:03:30 +0100
flexnbd (0.0.1-19) unstable; urgency=low
* Split control-socket functions into separate file. [r19]
-- Matthew Bloch <matthew@bytemark.co.uk> Wed, 23 May 2012 00:42:14 +0100
flexnbd (0.0.1-18) unstable; urgency=low
* Fixed mirroring to work (error reporting suspect though). [r18]
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 22 May 2012 00:22:06 +0100
flexnbd (0.0.1-17) unstable; urgency=low
* Initial, untested mirror implementation and resolved some type confusion [r17]
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 04:03:17 +0100
flexnbd (0.0.1-16) unstable; urgency=low
* More valgrind-found bugs, extracted open_and_mmap from main code. [r16]
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 04:00:45 +0100
flexnbd (0.0.1-15) unstable; urgency=low
* Fixed some uninitialised variables courtesy of valgrind. [r15]
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 03:59:43 +0100
flexnbd (0.0.1-14) unstable; urgency=low
* Mostly finished bitset tests, fixed test build to include utilities, remove [r14]
-- Matthew Bloch <matthew@bytemark.co.uk> Mon, 21 May 2012 03:17:32 +0100
flexnbd (0.0.1-13) unstable; urgency=low
* Tweaks to bitset.h, established a C test framework. [r13]
-- Matthew Bloch <matthew@bytemark.co.uk> Sun, 20 May 2012 14:38:46 +0100
flexnbd (0.0.1-12) unstable; urgency=low
* Fixed segfaulting access control, allowed change to acl via control socket. [r12]
-- Matthew Bloch <matthew@bytemark.co.uk> Sat, 19 May 2012 12:48:03 +0100
flexnbd (0.0.1-11) unstable; urgency=low
* Added dummy control socket answering / changed serve_accept_loop to use [r11]
-- Matthew Bloch <matthew@bytemark.co.uk> Fri, 18 May 2012 23:39:16 +0100
flexnbd (0.0.1-10) unstable; urgency=low
* Added control socket, doesn't do anything yet. [r10]
-- mbloch <mbloch> Fri, 18 May 2012 18:44:34 +0100
flexnbd (0.0.1-9) unstable; urgency=low
* Added .hgignore file [r9]
-- Matthew Bloch <matthew@bytemark.co.uk> Fri, 18 May 2012 13:25:54 +0100
flexnbd (0.0.1-8) unstable; urgency=low
* Stopped NBD writes from committing all-zero blocks to disc (tentative, needs [r8]
-- Matthew Bloch <matthew@bytemark.co.uk> Fri, 18 May 2012 13:24:35 +0100
flexnbd (0.0.1-7) unstable; urgency=low
* Split code out into separate compilation units (first pass, anyway). [r7]
-- Matthew Bloch <matthew@bytemark.co.uk> Thu, 17 May 2012 20:14:22 +0100
flexnbd (0.0.1-6) unstable; urgency=low
* Non-functioning commit, half-way through adding sparse bitmap feature. [r6]
-- Matthew Bloch <matthew@bytemark.co.uk> Thu, 17 May 2012 11:54:25 +0100
flexnbd (0.0.1-5) unstable; urgency=low
* Added write mode. [r5]
-- Matthew Bloch <matthew@bytemark.co.uk> Wed, 16 May 2012 11:58:41 +0100
flexnbd (0.0.1-4) unstable; urgency=low
* Added working read via splice syscall. [r4]
-- Matthew Bloch <matthew@bytemark.co.uk> Wed, 16 May 2012 03:20:09 +0100
flexnbd (0.0.1-3) unstable; urgency=low
* Added Rakefile [r3]
-- mbloch <mbloch> Wed, 16 May 2012 01:27:14 +0100
flexnbd (0.0.1-2) unstable; urgency=low
* Silly bug fixes, added ACL support, added parser for read/write requests. [r2]
-- mbloch <mbloch> Tue, 15 May 2012 18:40:58 +0100
flexnbd (0.0.1-1) unstable; urgency=low
* Some debugging, got it to serve. [r1]
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 15 May 2012 03:16:19 +0100
flexnbd (0.0.1-0) unstable; urgency=low
* It compiles :) [r0]
-- Matthew Bloch <matthew@bytemark.co.uk> Tue, 15 May 2012 02:42:03 +0100

1
debian/compat vendored
View File

@@ -1 +0,0 @@
7

25
debian/control vendored
View File

@@ -1,25 +0,0 @@
Source: flexnbd
Section: unknown
Priority: extra
Maintainer: Alex Young <alex@bytemark.co.uk>
Build-Depends: cdbs, debhelper (>= 7.0.50), ruby, rake, gcc, libev-dev
Standards-Version: 3.8.1
Homepage: http://bigv.io/
Package: flexnbd
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}, libev3
Description: FlexNBD server
An NBD server offering push-mirroring and intelligent sparse file handling
Package: flexnbd-dbg
Architecture: any
Section: debug
Priority: extra
Depends:
flexnbd (= ${binary:Version}),
${misc:Depends}
Description: debugging symbols for flexnbd
An NBD server offering push-mirroring and intelligent sparse file handling
.
This package contains the debugging symbols for flexnbd.

53
debian/copyright vendored
View File

@@ -1,53 +0,0 @@
This work was packaged for Debian by:
Alex Young <alex@bytemark.co.uk> on Wed, 30 May 2012 16:46:58 +0100
It was downloaded from:
<url://example.com>
Upstream Author(s):
<put author's name and email here>
<likewise for another author>
Copyright:
<Copyright (C) YYYY Firstname Lastname>
<likewise for another author>
License:
### SELECT: ###
This package is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
### OR ###
This package is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2 as
published by the Free Software Foundation.
##########
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
On Debian systems, the complete text of the GNU General
Public License version 2 can be found in "/usr/share/common-licenses/GPL-2".
The Debian packaging is:
Copyright (C) 2012 Alex Young <alex@bytemark.co.uk>
you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
# Please also look if there are files or directories which have a
# different copyright/license attached and list them here.

View File

@@ -1,5 +0,0 @@
build/flexnbd usr/bin
build/flexnbd-proxy usr/bin
build/flexnbd.1.gz usr/share/man/man1
build/flexnbd-proxy.1.gz usr/share/man/man1

12
debian/rules vendored
View File

@@ -1,12 +0,0 @@
#!/usr/bin/make -f
# -*- makefile -*-
# Uncomment this to turn on verbose mode.
#export DH_VERBOSE=1
%:
dh $@
.PHONY: override_dh_strip
override_dh_strip:
dh_strip --dbg-package=flexnbd-dbg

View File

@@ -1 +0,0 @@
3.0 (native)

View File

@@ -31,8 +31,6 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
for (offset = 0; offset < allocation_map->size; ) {
unsigned int i;
fiemap->fm_start = offset;
fiemap->fm_length = max_length;
@@ -49,7 +47,7 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
return 0; /* it's up to the caller to free the map */
}
else {
for ( i = 0; i < fiemap->fm_mapped_extents; i++ ) {
for ( unsigned int i = 0; i < fiemap->fm_mapped_extents; i++ ) {
bitset_set_range( allocation_map,
fiemap->fm_extents[i].fe_logical,
fiemap->fm_extents[i].fe_length );
@@ -76,17 +74,18 @@ int build_allocation_map(struct bitset * allocation_map, int fd)
}
int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **out_map)
int open_and_mmap(const char* filename, int* out_fd, uint64_t *out_size, void **out_map)
{
/*
* size and out_size are intentionally of different types.
* lseek64() uses off64_t to signal errors in the sign bit.
* Since we check for these errors before trying to assign to
* *out_size, we know *out_size can never go negative.
*/
off64_t size;
/* O_DIRECT seems to be intermittently supported. Leaving it as
* a compile-time option for now. */
#ifdef DIRECT_IO
*out_fd = open(filename, O_RDWR | O_DIRECT | O_SYNC );
#else
/* O_DIRECT should not be used with mmap() */
*out_fd = open(filename, O_RDWR | O_SYNC );
#endif
if (*out_fd < 1) {
warn("open(%s) failed: does it exist?", filename);
@@ -109,8 +108,11 @@ int open_and_mmap(const char* filename, int* out_fd, off64_t *out_size, void **o
warn("mmap64() failed");
return -1;
}
}
debug("opened %s size %ld on fd %d @ %p", filename, size, *out_fd, *out_map);
}
else {
debug("opened %s size %ld on fd %d", filename, size, *out_fd);
}
return 0;
}

View File

@@ -65,7 +65,7 @@ int read_lines_until_blankline(int fd, int max_line_length, char ***lines);
* ''out_size'' and the address of the mmap in ''out_map''. If anything goes
* wrong, returns -1 setting errno, otherwise 0.
*/
int open_and_mmap( const char* filename, int* out_fd, off64_t *out_size, void **out_map);
int open_and_mmap( const char* filename, int* out_fd, uint64_t* out_size, void **out_map);
/** Check to see whether the given file descriptor is closed.

View File

@@ -7,8 +7,9 @@ void mode(char* mode, int argc, char **argv);
#include <getopt.h>
#define GETOPT_ARG(x,s) {(x), 1, 0, (s)}
#define GETOPT_FLAG(x,v) {(x), 0, 0, (v)}
#define GETOPT_ARG(x,s) {(x), required_argument, 0, (s)}
#define GETOPT_FLAG(x,v) {(x), no_argument, 0, (v)}
#define GETOPT_OPTARG(x,s) {(x), optional_argument, 0, (s)}
#define OPT_HELP "help"
#define OPT_ADDR "addr"
@@ -19,6 +20,7 @@ void mode(char* mode, int argc, char **argv);
#define OPT_FROM "from"
#define OPT_SIZE "size"
#define OPT_DENY "default-deny"
#define OPT_CACHE "cache"
#define OPT_UNLINK "unlink"
#define OPT_CONNECT_ADDR "conn-addr"
#define OPT_CONNECT_PORT "conn-port"
@@ -52,6 +54,7 @@ void mode(char* mode, int argc, char **argv);
#define GETOPT_FROM GETOPT_ARG( OPT_FROM, 'F' )
#define GETOPT_SIZE GETOPT_ARG( OPT_SIZE, 'S' )
#define GETOPT_BIND GETOPT_ARG( OPT_BIND, 'b' )
#define GETOPT_CACHE GETOPT_OPTARG( OPT_CACHE, 'c' )
#define GETOPT_UNLINK GETOPT_ARG( OPT_UNLINK, 'u' )
#define GETOPT_CONNECT_ADDR GETOPT_ARG( OPT_CONNECT_ADDR, 'C' )
#define GETOPT_CONNECT_PORT GETOPT_ARG( OPT_CONNECT_PORT, 'P' )

View File

@@ -27,7 +27,7 @@ void nbd_r2h_request( struct nbd_request_raw *from, struct nbd_request * to )
{
to->magic = htobe32( from->magic );
to->type = htobe32( from->type );
memcpy( to->handle, from->handle, 8 );
to->handle.w = from->handle.w;
to->from = htobe64( from->from );
to->len = htobe32( from->len );
}
@@ -36,7 +36,7 @@ void nbd_h2r_request( struct nbd_request * from, struct nbd_request_raw * to )
{
to->magic = be32toh( from->magic );
to->type = be32toh( from->type );
memcpy( to->handle, from->handle, 8 );
to->handle.w = from->handle.w;
to->from = be64toh( from->from );
to->len = be32toh( from->len );
}
@@ -46,13 +46,13 @@ void nbd_r2h_reply( struct nbd_reply_raw * from, struct nbd_reply * to )
{
to->magic = htobe32( from->magic );
to->error = htobe32( from->error );
memcpy( to->handle, from->handle, 8 );
to->handle.w = from->handle.w;
}
void nbd_h2r_reply( struct nbd_reply * from, struct nbd_reply_raw * to )
{
to->magic = be32toh( from->magic );
to->error = be32toh( from->error );
memcpy( to->handle, from->handle, 8 );
to->handle.w = from->handle.w;
}

View File

@@ -24,6 +24,11 @@
#include <linux/types.h>
#include <inttypes.h>
typedef union nbd_handle_t {
uint8_t b[8];
uint64_t w;
} nbd_handle_t;
/* The _raw types are the types as they appear on the wire. Non-_raw
* types are in host-format.
* Conversion functions are _r2h_ for converting raw to host, and _h2r_
@@ -39,7 +44,7 @@ struct nbd_init_raw {
struct nbd_request_raw {
__be32 magic;
__be32 type; /* == READ || == WRITE */
char handle[8];
nbd_handle_t handle;
__be64 from;
__be32 len;
} __attribute__((packed));
@@ -47,7 +52,7 @@ struct nbd_request_raw {
struct nbd_reply_raw {
__be32 magic;
__be32 error; /* 0 = ok, else error */
char handle[8]; /* handle you got from request */
nbd_handle_t handle; /* handle you got from request */
};
@@ -62,7 +67,7 @@ struct nbd_init {
struct nbd_request {
uint32_t magic;
uint32_t type; /* == READ || == WRITE || == DISCONNECT */
char handle[8];
nbd_handle_t handle;
uint64_t from;
uint32_t len;
} __attribute__((packed));
@@ -70,7 +75,7 @@ struct nbd_request {
struct nbd_reply {
uint32_t magic;
uint32_t error; /* 0 = ok, else error */
char handle[8]; /* handle you got from request */
nbd_handle_t handle; /* handle you got from request */
};
void nbd_r2h_init( struct nbd_init_raw * from, struct nbd_init * to );

View File

@@ -41,7 +41,7 @@ int socket_connect(struct sockaddr* to, struct sockaddr* from)
return fd;
}
int nbd_check_hello( struct nbd_init_raw* init_raw, off64_t* out_size )
int nbd_check_hello( struct nbd_init_raw* init_raw, uint64_t* out_size )
{
if ( strncmp( init_raw->passwd, INIT_PASSWD, 8 ) != 0 ) {
warn( "wrong passwd" );
@@ -62,7 +62,7 @@ fail:
}
int socket_nbd_read_hello( int fd, off64_t* out_size )
int socket_nbd_read_hello( int fd, uint64_t* out_size )
{
struct nbd_init_raw init_raw;
@@ -101,12 +101,11 @@ int socket_nbd_write_hello(int fd, off64_t out_size)
return 1;
}
void fill_request(struct nbd_request *request, int type, off64_t from, int len)
void fill_request(struct nbd_request *request, int type, uint64_t from, uint32_t len)
{
request->magic = htobe32(REQUEST_MAGIC);
request->type = htobe32(type);
((int*) request->handle)[0] = rand();
((int*) request->handle)[1] = rand();
request->handle.w = (((uint64_t)rand()) << 32) | ((uint64_t)rand());
request->from = htobe64(from);
request->len = htobe32(len);
}
@@ -126,7 +125,7 @@ void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply)
if (reply->error != 0) {
error("Server replied with error %d", reply->error);
}
if (strncmp(request->handle, reply->handle, 8) != 0) {
if (request->handle.w != reply->handle.w) {
error("Did not reply with correct handle");
}
}
@@ -149,7 +148,7 @@ void wait_for_data( int fd, int timeout_secs )
}
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs)
void socket_nbd_read(int fd, uint64_t from, uint32_t len, int out_fd, void* out_buf, int timeout_secs)
{
struct nbd_request request;
struct nbd_reply reply;
@@ -173,7 +172,7 @@ void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, i
}
}
void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf, int timeout_secs)
void socket_nbd_write(int fd, uint64_t from, uint32_t len, int in_fd, void* in_buf, int timeout_secs)
{
struct nbd_request request;
struct nbd_reply reply;
@@ -213,10 +212,12 @@ int socket_nbd_disconnect( int fd )
}
#define CHECK_RANGE(error_type) { \
off64_t size;\
uint64_t size;\
int success = socket_nbd_read_hello(params->client, &size); \
if ( success ) {\
if (params->from < 0 || (params->from + params->len) > size) {\
uint64_t endpoint = params->from + params->len; \
if (endpoint > size || \
endpoint < params->from ) { /* this happens on overflow */ \
fatal(error_type \
" request %d+%d is out of range given size %d", \
params->from, params->len, size\

View File

@@ -7,17 +7,17 @@
#include "nbdtypes.h"
int socket_connect(struct sockaddr* to, struct sockaddr* from);
int socket_nbd_read_hello(int fd, off64_t * size);
int socket_nbd_write_hello(int fd, off64_t size);
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs);
void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs);
int socket_nbd_read_hello(int fd, uint64_t* size);
int socket_nbd_write_hello(int fd, uint64_t size);
void socket_nbd_read(int fd, uint64_t from, uint32_t len, int out_fd, void* out_buf, int timeout_secs);
void socket_nbd_write(int fd, uint64_t from, uint32_t len, int out_fd, void* out_buf, int timeout_secs);
int socket_nbd_disconnect( int fd );
/* as you can see, we're slowly accumulating code that should really be in an
* NBD library */
void nbd_hello_to_buf( struct nbd_init_raw* buf, off64_t out_size );
int nbd_check_hello( struct nbd_init_raw* init_raw, off64_t* out_size );
void nbd_hello_to_buf( struct nbd_init_raw* buf, uint64_t out_size );
int nbd_check_hello( struct nbd_init_raw* init_raw, uint64_t* out_size );
#endif

View File

@@ -63,7 +63,5 @@ void do_remote_command(char* command, char* socket_name, int argc, char** argv)
print_response( response );
exit(atoi(response));
close(remote);
}

View File

@@ -51,7 +51,6 @@ struct self_pipe * self_pipe_create(void)
{
struct self_pipe *sig = xmalloc( sizeof( struct self_pipe ) );
int fds[2];
int fcntl_err;
if ( NULL == sig ) { return NULL; }
@@ -62,7 +61,7 @@ struct self_pipe * self_pipe_create(void)
}
if ( fcntl( fds[0], F_SETFL, O_NONBLOCK ) || fcntl( fds[1], F_SETFL, O_NONBLOCK ) ) {
fcntl_err = errno;
int fcntl_err = errno;
while( close( fds[0] ) == -1 && errno == EINTR );
while( close( fds[1] ) == -1 && errno == EINTR );
free( sig );

View File

@@ -39,7 +39,6 @@ const char* sockaddr_address_string( const struct sockaddr* sa, char* dest, size
struct sockaddr_un* un = ( struct sockaddr_un* ) sa;
unsigned short real_port = ntohs( in->sin_port ); // common to in and in6
size_t size;
const char* ret = NULL;
memset( dest, 0, len );
@@ -57,7 +56,7 @@ const char* sockaddr_address_string( const struct sockaddr* sa, char* dest, size
}
if ( NULL != ret && real_port > 0 && sa->sa_family != AF_UNIX ) {
size = strlen( dest );
size_t size = strlen( dest );
snprintf( dest + size, len - size, " port %d", real_port );
}
@@ -75,6 +74,11 @@ int sock_set_tcp_nodelay( int fd, int optval )
return setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval) );
}
int sock_set_tcp_cork( int fd, int optval )
{
return setsockopt( fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval) );
}
int sock_set_nonblock( int fd, int optval )
{
int flags = fcntl( fd, F_GETFL );
@@ -96,7 +100,7 @@ int sock_try_bind( int fd, const struct sockaddr* sa )
{
int bind_result;
char s_address[256];
int retry = 1;
int retry = 10;
sockaddr_address_string( sa, &s_address[0], 256 );
@@ -122,8 +126,11 @@ int sock_try_bind( int fd, const struct sockaddr* sa )
* will cope with it.
*/
case EADDRNOTAVAIL:
retry--;
if (retry) {
debug( "retrying" );
sleep( 1 );
}
continue;
case EADDRINUSE:
warn( "%s in use, giving up.", s_address );

View File

@@ -20,8 +20,8 @@ int sock_set_reuseaddr(int fd, int optval);
/* Set the tcp_nodelay option */
int sock_set_tcp_nodelay(int fd, int optval);
/* TODO: Set the tcp_cork option */
// int sock_set_cork(int fd, int optval);
/* Set the tcp_cork option */
int sock_set_tcp_cork(int fd, int optval);
int sock_set_nonblock(int fd, int optval);

View File

@@ -116,6 +116,7 @@ uint64_t monotonic_time_ms(void);
#define fatal(msg, ...) do { \
myloglev(4, msg, ##__VA_ARGS__); \
error_handler(1); \
exit(1); /* never-reached, this is to make static code analizer happy */ \
} while(0)

View File

@@ -2,12 +2,16 @@
#include "mode.h"
#include <signal.h>
#include <stdlib.h>
#include <time.h>
int main(int argc, char** argv)
{
signal(SIGPIPE, SIG_IGN); /* calls to splice() unhelpfully throw this */
error_init();
srand(time(NULL));
if (argc < 2) {
exit_err( help_help_text );
}

View File

@@ -1,4 +1,6 @@
#include <signal.h>
#include <stdlib.h>
#include <time.h>
#include "mode.h"
#include "util.h"
@@ -12,6 +14,7 @@ static struct option proxy_options[] = {
GETOPT_CONNECT_ADDR,
GETOPT_CONNECT_PORT,
GETOPT_BIND,
GETOPT_CACHE,
GETOPT_QUIET,
GETOPT_VERBOSE,
{0}
@@ -27,22 +30,25 @@ static char proxy_help_text[] =
"\t--" OPT_CONNECT_ADDR ",-C <ADDR>\tAddress of the proxied server.\n"
"\t--" OPT_CONNECT_PORT ",-P <PORT>\tPort of the proxied server.\n"
"\t--" OPT_BIND ",-b <ADDR>\tThe address we connect from, as a proxy.\n"
"\t--" OPT_CACHE ",-c[=<CACHE-BYTES>]\tUse a RAM read cache of the given size.\n"
QUIET_LINE
VERBOSE_LINE;
static char proxy_default_cache_size[] = "4096";
void read_proxy_param(
int c,
char **downstream_addr,
char **downstream_port,
char **upstream_addr,
char **upstream_port,
char **bind_addr )
char **bind_addr,
char **cache_bytes)
{
switch( c ) {
case 'h' :
fprintf( stdout, "%s\n", proxy_help_text );
exit( 0 );
break;
case 'l':
*downstream_addr = optarg;
break;
@@ -58,6 +64,9 @@ void read_proxy_param(
case 'b':
*bind_addr = optarg;
break;
case 'c':
*cache_bytes = optarg ? optarg : proxy_default_cache_size;
break;
case 'q':
log_level = QUIET_LOG_LEVEL;
break;
@@ -89,6 +98,7 @@ int main( int argc, char *argv[] )
char *upstream_addr = NULL;
char *upstream_port = NULL;
char *bind_addr = NULL;
char *cache_bytes = NULL;
int success;
sigset_t mask;
@@ -103,6 +113,8 @@ int main( int argc, char *argv[] )
exit_action.sa_mask = mask;
exit_action.sa_flags = 0;
srand(time(NULL));
while (1) {
c = getopt_long( argc, argv, proxy_short_options, proxy_options, NULL );
if ( -1 == c ) { break; }
@@ -111,7 +123,8 @@ int main( int argc, char *argv[] )
&downstream_port,
&upstream_addr,
&upstream_port,
&bind_addr
&bind_addr,
&cache_bytes
);
}
@@ -128,7 +141,8 @@ int main( int argc, char *argv[] )
downstream_port,
upstream_addr,
upstream_port,
bind_addr
bind_addr,
cache_bytes
);
/* Set these *after* proxy has been assigned to */

68
src/proxy/prefetch.c Normal file
View File

@@ -0,0 +1,68 @@
#include "prefetch.h"
#include "util.h"
struct prefetch* prefetch_create( size_t size_bytes ){
struct prefetch* out = xmalloc( sizeof( struct prefetch ) );
NULLCHECK( out );
out->buffer = xmalloc( size_bytes );
NULLCHECK( out->buffer );
out->size = size_bytes;
out->is_full = 0;
out->from = 0;
out->len = 0;
return out;
}
void prefetch_destroy( struct prefetch *prefetch ) {
if( prefetch ) {
free( prefetch->buffer );
free( prefetch );
}
}
size_t prefetch_size( struct prefetch *prefetch){
if ( prefetch ) {
return prefetch->size;
} else {
return 0;
}
}
void prefetch_set_is_empty( struct prefetch *prefetch ){
prefetch_set_full( prefetch, 0 );
}
void prefetch_set_is_full( struct prefetch *prefetch ){
prefetch_set_full( prefetch, 1 );
}
void prefetch_set_full( struct prefetch *prefetch, int val ){
if( prefetch ) {
prefetch->is_full = val;
}
}
int prefetch_is_full( struct prefetch *prefetch ){
if( prefetch ) {
return prefetch->is_full;
} else {
return 0;
}
}
int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len ){
NULLCHECK( prefetch );
return from >= prefetch->from &&
from + len <= prefetch->from + prefetch->len;
}
char *prefetch_offset( struct prefetch *prefetch, uint64_t from ){
NULLCHECK( prefetch );
return prefetch->buffer + (from - prefetch->from);
}

View File

@@ -1,14 +1,33 @@
#ifndef PREFETCH_H
#define PREFETCH_H
#include <stdint.h>
#include <stddef.h>
#define PREFETCH_BUFSIZE 4096
struct prefetch {
/* True if there is data in the buffer. */
int is_full;
__be64 from;
__be32 len;
/* The start point of the current content of buffer */
uint64_t from;
/* The length of the current content of buffer */
uint32_t len;
char buffer[PREFETCH_BUFSIZE];
/* The total size of the buffer, in bytes. */
size_t size;
char *buffer;
};
struct prefetch* prefetch_create( size_t size_bytes );
void prefetch_destroy( struct prefetch *prefetch );
size_t prefetch_size( struct prefetch *);
void prefetch_set_is_empty( struct prefetch *prefetch );
void prefetch_set_is_full( struct prefetch *prefetch );
void prefetch_set_full( struct prefetch *prefetch, int val );
int prefetch_is_full( struct prefetch *prefetch );
int prefetch_contains( struct prefetch *prefetch, uint64_t from, uint32_t len );
char *prefetch_offset( struct prefetch *prefetch, uint64_t from );
#endif

View File

@@ -1,9 +1,7 @@
#include "proxy.h"
#include "readwrite.h"
#ifdef PREFETCH
#include "prefetch.h"
#endif
#include "ioutil.h"
@@ -20,7 +18,8 @@ struct proxier* proxy_create(
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind )
char* s_upstream_bind,
char* s_cache_bytes )
{
struct proxier* out;
out = xmalloc( sizeof( struct proxier ) );
@@ -65,9 +64,16 @@ struct proxier* proxy_create(
out->downstream_fd = -1;
out->upstream_fd = -1;
#ifdef PREFETCH
out->prefetch = xmalloc( sizeof( struct prefetch ) );
#endif
out->prefetch = NULL;
if ( s_cache_bytes ){
int cache_bytes = atoi( s_cache_bytes );
/* leaving this off or setting a cache size of zero or
* less results in no cache.
*/
if ( cache_bytes >= 0 ) {
out->prefetch = prefetch_create( cache_bytes );
}
}
out->init.buf = xmalloc( sizeof( struct nbd_init_raw ) );
out->req.buf = xmalloc( NBD_MAX_SIZE );
@@ -76,20 +82,28 @@ struct proxier* proxy_create(
return out;
}
int proxy_prefetches( struct proxier* proxy ) {
NULLCHECK( proxy );
return proxy->prefetch != NULL;
}
int proxy_prefetch_bufsize( struct proxier* proxy ){
NULLCHECK( proxy );
return prefetch_size( proxy->prefetch );
}
void proxy_destroy( struct proxier* proxy )
{
free( proxy->init.buf );
free( proxy->req.buf );
free( proxy->rsp.buf );
#ifdef PREFETCH
free( proxy->prefetch );
#endif
prefetch_destroy( proxy->prefetch );
free( proxy );
}
/* Shared between our two different connect_to_upstream paths */
void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size );
void proxy_finish_connect_to_upstream( struct proxier *proxy, uint64_t size );
/* Try to establish a connection to our upstream server. Return 1 on success,
* 0 on failure. this is a blocking call that returns a non-blocking socket.
@@ -102,7 +116,7 @@ int proxy_connect_to_upstream( struct proxier* proxy )
}
int fd = socket_connect( &proxy->connect_to.generic, connect_from );
off64_t size = 0;
uint64_t size = 0;
if ( -1 == fd ) {
return 0;
@@ -174,7 +188,7 @@ error:
return;
}
void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ) {
void proxy_finish_connect_to_upstream( struct proxier *proxy, uint64_t size ) {
if ( proxy->upstream_size == 0 ) {
info( "Size of upstream image is %"PRIu64" bytes", size );
@@ -186,6 +200,13 @@ void proxy_finish_connect_to_upstream( struct proxier *proxy, off64_t size ) {
}
proxy->upstream_size = size;
if ( AF_UNIX != proxy->connect_to.family ) {
if ( sock_set_tcp_nodelay( proxy->upstream_fd, 1 ) == -1 ) {
warn( SHOW_ERRNO( "Failed to set TCP_NODELAY" ) );
}
}
info( "Connected to upstream on fd %i", proxy->upstream_fd );
return;
@@ -272,10 +293,9 @@ static inline int proxy_state_upstream( int state )
state == WRITE_TO_UPSTREAM || state == READ_FROM_UPSTREAM;
}
#ifdef PREFETCH
int proxy_prefetch_for_request( struct proxier* proxy, int state )
{
NULLCHECK( proxy );
struct nbd_request* req = &proxy->req_hdr;
struct nbd_reply* rsp = &proxy->rsp_hdr;
@@ -284,23 +304,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
int is_read = ( req->type & REQUEST_MASK ) == REQUEST_READ;
int prefetch_start = req->from;
int prefetch_end = req->from + ( req->len * 2 );
/* We only want to consider prefetching if we know we're not
* getting too much data back, if it's a read request, and if
* the prefetch won't try to read past the end of the file.
*/
int prefetching = req->len <= PREFETCH_BUFSIZE && is_read &&
prefetch_start < prefetch_end && prefetch_end <= proxy->upstream_size;
if ( is_read ) {
/* See if we can respond with what's in our prefetch
* cache */
if ( proxy->prefetch->is_full &&
req->from == proxy->prefetch->from &&
req->len == proxy->prefetch->len ) {
if ( prefetch_is_full( proxy->prefetch ) &&
prefetch_contains( proxy->prefetch, req->from, req->len ) ) {
/* HUZZAH! A match! */
debug( "Prefetch hit!" );
@@ -315,10 +323,11 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
/* and the data */
memcpy(
proxy->rsp.buf + NBD_REPLY_SIZE,
proxy->prefetch->buffer, proxy->prefetch->len
prefetch_offset( proxy->prefetch, req->from ),
req->len
);
proxy->rsp.size = NBD_REPLY_SIZE + proxy->prefetch->len;
proxy->rsp.size = NBD_REPLY_SIZE + req->len;
proxy->rsp.needle = 0;
/* return early, our work here is done */
@@ -332,11 +341,24 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
* whether we can keep it or not.
*/
debug( "Blowing away prefetch cache on type %d request.", req->type );
proxy->prefetch->is_full = 0;
prefetch_set_is_empty( proxy->prefetch );
}
debug( "Prefetch cache MISS!");
uint64_t prefetch_start = req->from;
/* We prefetch what we expect to be the next request. */
uint64_t prefetch_end = req->from + ( req->len * 2 );
/* We only want to consider prefetching if we know we're not
* getting too much data back, if it's a read request, and if
* the prefetch won't try to read past the end of the file.
*/
int prefetching =
req->len <= prefetch_size( proxy->prefetch ) &&
is_read &&
prefetch_start < prefetch_end &&
prefetch_end <= proxy->upstream_size;
/* We pull the request out of the proxy struct, rewrite the
* request size, and write it back.
@@ -347,7 +369,8 @@ int proxy_prefetch_for_request( struct proxier* proxy, int state )
req->len *= 2;
debug( "Prefetching %"PRIu32" bytes", req->len - proxy->prefetch_req_orig_len );
debug( "Prefetching additional %"PRIu32" bytes",
req->len - proxy->prefetch_req_orig_len );
nbd_h2r_request( req, req_raw );
}
@@ -364,10 +387,10 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state )
prefetched_bytes = proxy->req_hdr.len - proxy->prefetch_req_orig_len;
debug( "Prefetched %d bytes", prefetched_bytes );
debug( "Prefetched additional %d bytes", prefetched_bytes );
memcpy(
proxy->rsp.buf + proxy->prefetch_req_orig_len,
&(proxy->prefetch->buffer),
proxy->prefetch->buffer,
proxy->rsp.buf + proxy->prefetch_req_orig_len + NBD_REPLY_SIZE,
prefetched_bytes
);
@@ -382,13 +405,12 @@ int proxy_prefetch_for_reply( struct proxier* proxy, int state )
proxy->rsp.size -= prefetched_bytes;
/* And we need to reset these */
proxy->prefetch->is_full = 1;
prefetch_set_is_full( proxy->prefetch );
proxy->is_prefetch_req = 0;
return state;
}
#endif
int proxy_read_from_downstream( struct proxier *proxy, int state )
@@ -469,10 +491,8 @@ int proxy_continue_connecting_to_upstream( struct proxier* proxy, int state )
return state;
}
#ifdef PREFETCH
/* Data may have changed while we were disconnected */
proxy->prefetch->is_full = 0;
#endif
prefetch_set_is_empty( proxy->prefetch );
info( "Connected to upstream on fd %i", proxy->upstream_fd );
return READ_INIT_FROM_UPSTREAM;
@@ -492,7 +512,7 @@ int proxy_read_init_from_upstream( struct proxier* proxy, int state )
}
if ( proxy->init.needle == proxy->init.size ) {
off64_t upstream_size;
uint64_t upstream_size;
if ( !nbd_check_hello( (struct nbd_init_raw*) proxy->init.buf, &upstream_size ) ) {
warn( "Upstream sent invalid init" );
goto disconnect;
@@ -518,11 +538,22 @@ int proxy_write_to_upstream( struct proxier* proxy, int state )
ssize_t count;
// assert( state == WRITE_TO_UPSTREAM );
/* FIXME: We may set cork=1 multiple times as a result of this idiom.
* Not a serious problem, but we could do better
*/
if ( proxy->req.needle == 0 && AF_UNIX != proxy->connect_to.family ) {
if ( sock_set_tcp_cork( proxy->upstream_fd, 1 ) == -1 ) {
warn( SHOW_ERRNO( "Failed to set TCP_CORK" ) );
}
}
count = iobuf_write( proxy->upstream_fd, &proxy->req );
if ( count == -1 ) {
warn( SHOW_ERRNO( "Failed to send request to upstream" ) );
proxy->req.needle = 0;
// We're throwing the socket away so no need to uncork
return CONNECT_TO_UPSTREAM;
}
@@ -531,6 +562,14 @@ int proxy_write_to_upstream( struct proxier* proxy, int state )
* still need req.size if reading the reply fails - we disconnect
* and resend the reply in that case - so keep it around for now. */
proxy->req.needle = 0;
if ( AF_UNIX != proxy->connect_to.family ) {
if ( sock_set_tcp_cork( proxy->upstream_fd, 0 ) == -1 ) {
warn( SHOW_ERRNO( "Failed to unset TCP_CORK" ) );
// TODO: should we return to CONNECT_TO_UPSTREAM in this instance?
}
}
return READ_FROM_UPSTREAM;
}
@@ -670,7 +709,7 @@ void proxy_session( struct proxier* proxy )
state_started = monotonic_time_ms();
debug(
"State transitition from %s to %s",
"State transition from %s to %s",
proxy_session_state_names[old_state],
proxy_session_state_names[state]
);
@@ -736,14 +775,12 @@ void proxy_session( struct proxier* proxy )
case READ_FROM_DOWNSTREAM:
if ( FD_ISSET( proxy->downstream_fd, &rfds ) ) {
state = proxy_read_from_downstream( proxy, state );
#ifdef PREFETCH
/* Check if we can fulfil the request from prefetch, or
* rewrite the request to fill the prefetch buffer if needed
*/
if ( state == WRITE_TO_UPSTREAM ) {
if ( proxy_prefetches( proxy ) && state == WRITE_TO_UPSTREAM ) {
state = proxy_prefetch_for_request( proxy, state );
}
#endif
}
break;
case CONNECT_TO_UPSTREAM:
@@ -774,12 +811,10 @@ void proxy_session( struct proxier* proxy )
if ( FD_ISSET( proxy->upstream_fd, &rfds ) ) {
state = proxy_read_from_upstream( proxy, state );
}
# ifdef PREFETCH
/* Fill the prefetch buffer and rewrite the reply, if needed */
if ( state == WRITE_TO_DOWNSTREAM ) {
if ( proxy_prefetches( proxy ) && state == WRITE_TO_DOWNSTREAM ) {
state = proxy_prefetch_for_reply( proxy, state );
}
#endif
break;
case WRITE_TO_DOWNSTREAM:
if ( FD_ISSET( proxy->downstream_fd, &wfds ) ) {

View File

@@ -5,7 +5,6 @@
#include <unistd.h>
#include "ioutil.h"
#include "flexnbd.h"
#include "parse.h"
#include "nbdtypes.h"
#include "self_pipe.h"
@@ -21,9 +20,6 @@
#define UPSTREAM_TIMEOUT 30 * 1000
struct proxier {
/* The flexnbd wrapper this proxier is attached to */
struct flexnbd* flexnbd;
/** address/port to bind to */
union mysockaddr listen_on;
@@ -48,7 +44,7 @@ struct proxier {
int upstream_fd;
/* This is the size we advertise to the downstream server */
off64_t upstream_size;
uint64_t upstream_size;
/* We transform the raw request header into here */
struct nbd_request req_hdr;
@@ -73,7 +69,8 @@ struct proxier {
uint64_t req_count;
int hello_sent;
#ifdef PREFETCH
/** These are only used if we pass --cache on the command line */
/* While the in-flight request has been munged by prefetch, these two are
* set to true, and the original length of the request, respectively */
int is_prefetch_req;
@@ -81,7 +78,8 @@ struct proxier {
/* And here, we actually store the prefetched data once it's returned */
struct prefetch *prefetch;
#endif
/** */
};
struct proxier* proxy_create(
@@ -89,7 +87,8 @@ struct proxier* proxy_create(
char* s_downstream_port,
char* s_upstream_address,
char* s_upstream_port,
char* s_upstream_bind );
char* s_upstream_bind,
char* s_cache_bytes);
int do_proxy( struct proxier* proxy );
void proxy_cleanup( struct proxier* proxy );
void proxy_destroy( struct proxier* proxy );

View File

@@ -7,43 +7,64 @@
#include <string.h>
#include <pthread.h>
/*
* Make the bitfield words 'opaque' to prevent code
* poking at the bits directly without using these
* accessors/macros
*/
typedef uint64_t bitfield_word_t;
typedef bitfield_word_t * bitfield_p;
static inline char char_with_bit_set(uint64_t num) { return 1<<(num%8); }
#define BITFIELD_WORD_SIZE sizeof(bitfield_word_t)
#define BITS_PER_WORD (BITFIELD_WORD_SIZE * 8)
#define BIT_MASK(_idx) \
(1LL << ((_idx) & (BITS_PER_WORD - 1)))
#define BIT_WORD(_b, _idx) \
((bitfield_word_t*)(_b))[(_idx) / BITS_PER_WORD]
/* Calculates the number of words needed to store _bytes number of bytes
* this is added to accommodate code that wants to use bytes sizes
*/
#define BIT_WORDS_FOR_SIZE(_bytes) \
((_bytes + (BITFIELD_WORD_SIZE-1)) / BITFIELD_WORD_SIZE)
/** Return the bit value ''idx'' in array ''b'' */
static inline int bit_get(bitfield_p b, uint64_t idx) {
return (BIT_WORD(b, idx) >> (idx & (BITS_PER_WORD-1))) & 1;
}
/** Return 1 if the bit at ''idx'' in array ''b'' is set */
static inline int bit_is_set(char* b, uint64_t idx) {
return (b[idx/8] & char_with_bit_set(idx)) != 0;
static inline int bit_is_set(bitfield_p b, uint64_t idx) {
return bit_get(b, idx);
}
/** Return 1 if the bit at ''idx'' in array ''b'' is clear */
static inline int bit_is_clear(char* b, uint64_t idx) {
return !bit_is_set(b, idx);
static inline int bit_is_clear(bitfield_p b, uint64_t idx) {
return !bit_get(b, idx);
}
/** Tests whether the bit at ''idx'' in array ''b'' has value ''value'' */
static inline int bit_has_value(char* b, uint64_t idx, int value) {
if (value) { return bit_is_set(b, idx); }
else { return bit_is_clear(b, idx); }
static inline int bit_has_value(bitfield_p b, uint64_t idx, int value) {
return bit_get(b, idx) == !!value;
}
/** Sets the bit ''idx'' in array ''b'' */
static inline void bit_set(char* b, uint64_t idx) {
b[idx/8] |= char_with_bit_set(idx);
//__sync_fetch_and_or(b+(idx/8), char_with_bit_set(idx));
static inline void bit_set(bitfield_p b, uint64_t idx) {
BIT_WORD(b, idx) |= BIT_MASK(idx);
}
/** Clears the bit ''idx'' in array ''b'' */
static inline void bit_clear(char* b, uint64_t idx) {
b[idx/8] &= ~char_with_bit_set(idx);
//__sync_fetch_and_nand(b+(idx/8), char_with_bit_set(idx));
static inline void bit_clear(bitfield_p b, uint64_t idx) {
BIT_WORD(b, idx) &= ~BIT_MASK(idx);
}
/** Sets ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_set_range(char* b, uint64_t from, uint64_t len)
static inline void bit_set_range(bitfield_p b, uint64_t from, uint64_t len)
{
for ( ; from%8 != 0 && len > 0 ; len-- ) {
for ( ; (from % BITS_PER_WORD) != 0 && len > 0 ; len-- ) {
bit_set( b, from++ );
}
if (len >= 8) {
memset(b+(from/8), 255, len/8 );
if (len >= BITS_PER_WORD) {
memset(&BIT_WORD(b, from), 0xff, len / 8 );
from += len;
len = (len%8);
len = len % BITS_PER_WORD;
from -= len;
}
@@ -52,16 +73,16 @@ static inline void bit_set_range(char* b, uint64_t from, uint64_t len)
}
}
/** Clears ''len'' bits in array ''b'' starting at offset ''from'' */
static inline void bit_clear_range(char* b, uint64_t from, uint64_t len)
static inline void bit_clear_range(bitfield_p b, uint64_t from, uint64_t len)
{
for ( ; from%8 != 0 && len > 0 ; len-- ) {
for ( ; (from % BITS_PER_WORD) != 0 && len > 0 ; len-- ) {
bit_clear( b, from++ );
}
if (len >= 8) {
memset(b+(from/8), 0, len/8 );
if (len >= BITS_PER_WORD) {
memset(&BIT_WORD(b, from), 0, len / 8 );
from += len;
len = (len%8);
len = len % BITS_PER_WORD;
from -= len;
}
@@ -75,34 +96,33 @@ static inline void bit_clear_range(char* b, uint64_t from, uint64_t len)
* bits that are the same as the first one specified. If ''run_is_set'' is
* non-NULL, the value of that bit is placed into it.
*/
static inline uint64_t bit_run_count(char* b, uint64_t from, uint64_t len, int *run_is_set) {
uint64_t* current_block;
static inline uint64_t bit_run_count(bitfield_p b, uint64_t from, uint64_t len, int *run_is_set) {
uint64_t count = 0;
int first_value = bit_is_set(b, from);
int first_value = bit_get(b, from);
bitfield_word_t word_match = first_value ? -1 : 0;
if ( run_is_set != NULL ) {
*run_is_set = first_value;
}
for ( ; (from+count) % 64 != 0 && len > 0; len--) {
if (bit_has_value(b, from+count, first_value)) {
for ( ; ((from + count) % BITS_PER_WORD) != 0 && len > 0; len--) {
if (bit_has_value(b, from + count, first_value)) {
count++;
} else {
return count;
}
}
for ( ; len >= 64 ; len -= 64 ) {
current_block = (uint64_t*) (b + ((from+count)/8));
if (*current_block == ( first_value ? UINT64_MAX : 0 ) ) {
count += 64;
for ( ; len >= BITS_PER_WORD ; len -= BITS_PER_WORD ) {
if (BIT_WORD(b, from + count) == word_match) {
count += BITS_PER_WORD;
} else {
break;
}
}
for ( ; len > 0; len-- ) {
if ( bit_has_value(b, from+count, first_value) ) {
if ( bit_has_value(b, from + count, first_value) ) {
count++;
}
}
@@ -154,7 +174,7 @@ struct bitset {
int resolution;
struct bitset_stream *stream;
int stream_enabled;
char bits[];
bitfield_word_t bits[];
};
/** Allocate a bitset for a file of the given size, and chunks of the
@@ -162,9 +182,12 @@ struct bitset {
*/
static inline struct bitset *bitset_alloc( uint64_t size, int resolution )
{
struct bitset *bitset = xmalloc(
sizeof( struct bitset ) + ( size + resolution - 1 ) / resolution
);
// calculate a size to allocate that is a multiple of the size of the
// bitfield word
size_t bitfield_size =
BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t );
struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) );
bitset->size = size;
bitset->resolution = resolution;
/* don't actually need to call pthread_mutex_destroy '*/

View File

@@ -126,7 +126,9 @@ void write_not_zeroes(struct client* client, uint64_t from, uint64_t len)
debug("(run adjusted to %d)", run);
}
if (0) /* useful but expensive */
/*
// Useful but expensive
if (0)
{
uint64_t i;
fprintf(stderr, "full map resolution=%d: ", map->resolution);
@@ -139,6 +141,7 @@ void write_not_zeroes(struct client* client, uint64_t from, uint64_t len)
}
fprintf(stderr, "\n");
}
*/
#define DO_READ(dst, len) ERROR_IF_NEGATIVE( \
readloop( \
@@ -249,14 +252,14 @@ int client_read_request( struct client * client , struct nbd_request *out_reques
return 1;
}
int fd_write_reply( int fd, char *handle, int error )
int fd_write_reply( int fd, uint64_t handle, int error )
{
struct nbd_reply reply;
struct nbd_reply_raw reply_raw;
reply.magic = REPLY_MAGIC;
reply.error = error;
memcpy( reply.handle, handle, 8 );
reply.handle.w = handle;
nbd_h2r_reply( &reply, &reply_raw );
debug( "Replying with handle=0x%08X, error=%"PRIu32, handle, error );
@@ -288,7 +291,7 @@ int fd_write_reply( int fd, char *handle, int error )
*/
int client_write_reply( struct client * client, struct nbd_request *request, int error )
{
return fd_write_reply( client->socket, request->handle, error);
return fd_write_reply( client->socket, request->handle.w, error);
}
@@ -297,7 +300,7 @@ void client_write_init( struct client * client, uint64_t size )
struct nbd_init init = {{0}};
struct nbd_init_raw init_raw = {{0}};
memcpy( init.passwd, INIT_PASSWD, sizeof( INIT_PASSWD ) );
memcpy( init.passwd, INIT_PASSWD, sizeof( init.passwd ) );
init.magic = INIT_MAGIC;
init.size = size;
memset( init.reserved, 0, 128 );
@@ -416,9 +419,9 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
{
off64_t offset;
// TODO: cork
debug("request read %ld+%d", request.from, request.len);
client_write_reply( client, &request, 0);
sock_set_tcp_cork( client->socket, 1 );
client_write_reply( client, &request, 0 );
offset = request.from;
@@ -435,7 +438,7 @@ void client_reply_to_read( struct client* client, struct nbd_request request )
offset,
request.len);
// TODO: uncork
sock_set_tcp_cork( client->socket, 0 );
}

View File

@@ -139,7 +139,7 @@ enum mirror_state mirror_get_state( struct mirror * mirror )
void mirror_init( struct mirror * mirror, const char * filename )
{
int map_fd;
off64_t size;
uint64_t size;
NULLCHECK( mirror );
NULLCHECK( filename );
@@ -270,7 +270,7 @@ void mirror_cleanup( struct server * serve,
}
int mirror_connect( struct mirror * mirror, off64_t local_size )
int mirror_connect( struct mirror * mirror, uint64_t local_size )
{
struct sockaddr * connect_from = NULL;
int connected = 0;
@@ -292,7 +292,7 @@ int mirror_connect( struct mirror * mirror, off64_t local_size )
"Select failed." );
if( FD_ISSET( mirror->client, &fds ) ){
off64_t remote_size;
uint64_t remote_size;
if ( socket_nbd_read_hello( mirror->client, &remote_size ) ) {
if( remote_size == local_size ){
connected = 1;
@@ -412,7 +412,7 @@ int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
struct nbd_request req = {
.magic = REQUEST_MAGIC,
.type = REQUEST_WRITE,
.handle = ".MIRROR.",
.handle.b = ".MIRROR.",
.from = current,
.len = run
};
@@ -462,6 +462,12 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
debug( "Mirror write callback invoked with events %d. fd: %i", revents, ctrl->mirror->client );
/* FIXME: We can end up corking multiple times in unusual circumstances; this
* is annoying, but harmless */
if ( xfer->written == 0 ) {
sock_set_tcp_cork( ctrl->mirror->client, 1 );
}
if ( xfer->written < hdr_size ) {
data_loc = ( (char*) &xfer->hdr.req_raw ) + ctrl->xfer.written;
to_write = hdr_size - xfer->written;
@@ -489,6 +495,7 @@ static void mirror_write_cb( struct ev_loop *loop, ev_io *w, int revents )
// All bytes written, so now we need to read the NBD reply back.
if ( ctrl->xfer.written == ctrl->xfer.len + hdr_size ) {
sock_set_tcp_cork( ctrl->mirror->client, 0 ) ;
ev_io_start( loop, &ctrl->read_watcher );
ev_io_stop( loop, &ctrl->write_watcher );
}
@@ -561,7 +568,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
return;
}
if ( memcmp( ".MIRROR.", &rsp.handle[0], 8 ) != 0 ) {
if ( memcmp( ".MIRROR.", rsp.handle.b, 8 ) != 0 ) {
warn( "Bad handle returned from listener" );
ev_break( loop, EVBREAK_ONE );
return;
@@ -636,7 +643,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
return;
}
void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)), int revents )
static void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)), int revents )
{
if ( !(revents & EV_TIMER ) ) {
warn( "Mirror timeout called but no timer event signalled" );
@@ -648,7 +655,7 @@ void mirror_timeout_cb( struct ev_loop *loop, ev_timer *w __attribute__((unused)
return;
}
void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
static void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
@@ -666,7 +673,7 @@ void mirror_abandon_cb( struct ev_loop *loop, ev_io *w, int revents )
}
void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
static void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
@@ -694,7 +701,7 @@ void mirror_limit_cb( struct ev_loop *loop, ev_timer *w, int revents )
* if it has, start migrating. If it's not finished, then enabling the bitset
* stream does not go well for us.
*/
void mirror_begin_cb( struct ev_loop *loop, ev_timer *w, int revents )
static void mirror_begin_cb( struct ev_loop *loop, ev_timer *w, int revents )
{
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
@@ -750,7 +757,8 @@ void mirror_run( struct server *serve )
ctrl.ev_loop = EV_DEFAULT;
/* gcc warns on -O2. clang is fine. Seems to be the fault of ev.h */
/* gcc warns with -Wstrict-aliasing on -O2. clang doesn't
* implement this warning. Seems to be the fault of ev.h */
ev_init( &ctrl.begin_watcher, mirror_begin_cb );
ctrl.begin_watcher.repeat = 1.0; // We check bps every second. seems sane.
ctrl.begin_watcher.data = (void*) &ctrl;
@@ -914,7 +922,7 @@ void* mirror_runner(void* serve_params_uncast)
* for us ). But if we've failed and are going to retry on the next run, we
* must close this socket here to have any chance of it succeeding.
*/
if ( !mirror->client < 0 ) {
if ( !(mirror->client < 0) ) {
sock_try_close( mirror->client );
mirror->client = -1;
}

View File

@@ -220,7 +220,6 @@ void read_serve_param( int c, char **ip_addr, char **ip_port, char **file, char
case 'h':
fprintf(stdout, "%s\n", serve_help_text );
exit( 0 );
break;
case 'l':
*ip_addr = optarg;
break;
@@ -263,7 +262,6 @@ void read_listen_param( int c,
case 'h':
fprintf(stdout, "%s\n", listen_help_text );
exit(0);
break;
case 'l':
*ip_addr = optarg;
break;
@@ -297,7 +295,6 @@ void read_readwrite_param( int c, char **ip_addr, char **ip_port, char **bind_ad
case 'h':
fprintf(stdout, "%s\n", err_text );
exit( 0 );
break;
case 'l':
*ip_addr = optarg;
break;
@@ -331,7 +328,6 @@ void read_sock_param( int c, char **sock, char *help_text )
case 'h':
fprintf( stdout, "%s\n", help_text );
exit( 0 );
break;
case 's':
*sock = optarg;
break;
@@ -362,7 +358,6 @@ void read_mirror_speed_param(
case 'h':
fprintf( stdout, "%s\n", mirror_speed_help_text );
exit( 0 );
break;
case 's':
*sock = optarg;
break;
@@ -394,7 +389,6 @@ void read_mirror_param(
case 'h':
fprintf( stdout, "%s\n", mirror_help_text );
exit( 0 );
break;
case 's':
*sock = optarg;
break;
@@ -428,7 +422,6 @@ void read_break_param( int c, char **sock )
case 'h':
fprintf( stdout, "%s\n", break_help_text );
exit( 0 );
break;
case 's':
*sock = optarg;
break;
@@ -580,7 +573,10 @@ void params_readwrite(
parse_port( s_port, &out->connect_to.v4 );
out->from = atol(s_from);
long signed_from = atol(s_from);
FATAL_IF_NEGATIVE( signed_from,
"Can't read from a negative offset %d.", signed_from);
out->from = signed_from;
if (write_not_read) {
if (s_length_or_filename[0]-48 < 10) {
@@ -592,9 +588,10 @@ void params_readwrite(
s_length_or_filename, O_RDONLY);
FATAL_IF_NEGATIVE(out->data_fd,
"Couldn't open %s", s_length_or_filename);
out->len = lseek64(out->data_fd, 0, SEEK_END);
FATAL_IF_NEGATIVE(out->len,
off64_t signed_len = lseek64(out->data_fd, 0, SEEK_END);
FATAL_IF_NEGATIVE(signed_len,
"Couldn't find length of %s", s_length_or_filename);
out->len = signed_len;
FATAL_IF_NEGATIVE(
lseek64(out->data_fd, 0, SEEK_SET),
"Couldn't rewind %s", s_length_or_filename

View File

@@ -233,7 +233,6 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
int was_closed = 0;
void * status=NULL;
int join_errno;
if (entry->thread != 0) {
char s_client_address[128];
@@ -241,7 +240,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
sockaddr_address_string( &entry->address.generic, &s_client_address[0], 128 );
debug( "%s(%p,...)", joinfunc == pthread_join ? "joining" : "tryjoining", entry->thread );
join_errno = joinfunc(entry->thread, &status);
int join_errno = joinfunc(entry->thread, &status);
/* join_errno can legitimately be ESRCH if the thread is
* already dead, but the client still needs tidying up. */
@@ -598,7 +597,6 @@ int server_accept( struct server * params )
{
NULLCHECK( params );
debug("accept loop starting");
int client_fd;
union mysockaddr client_address;
fd_set fds;
socklen_t socklen=sizeof(client_address);
@@ -638,7 +636,7 @@ int server_accept( struct server * params )
}
if ( FD_ISSET( params->server_fd, &fds ) ){
client_fd = accept( params->server_fd, &client_address.generic, &socklen );
int client_fd = accept( params->server_fd, &client_address.generic, &socklen );
if ( params->allow_new_clients ) {
debug("Accepted nbd client socket fd %d", client_fd);
@@ -741,11 +739,11 @@ void server_join_clients( struct server * serve ) {
for (i=0; i < serve->max_nbd_clients; i++) {
pthread_t thread_id = serve->nbd_client[i].thread;
int err = 0;
if (thread_id != 0) {
debug( "joining thread %p", thread_id );
if ( 0 == (err = pthread_join( thread_id, &status ) ) ) {
int err = pthread_join( thread_id, &status );
if ( 0 == err ) {
serve->nbd_client[i].thread = 0;
} else {
warn( "Error %s (%i) joining thread %p", strerror( err ), err, thread_id );

View File

@@ -154,8 +154,10 @@ int do_serve( struct server *, struct self_pipe * );
struct mode_readwrite_params {
union mysockaddr connect_to;
union mysockaddr connect_from;
off64_t from;
off64_t len;
uint64_t from;
uint32_t len;
int data_fd;
int client;
};

View File

@@ -21,6 +21,11 @@ class Environment
@fake_pid = nil
end
def prefetch_proxy!
@nbd1.prefetch_proxy = true
@nbd2.prefetch_proxy = true
end
def proxy1(port=@port2)
@nbd1.proxy(@ip, port)
end

View File

@@ -198,6 +198,8 @@ module FlexNBD
end
end
attr_accessor :prefetch_proxy
def initialize( bin, ip, port )
@bin = bin
@do_debug = ENV['DEBUG']
@@ -208,6 +210,7 @@ module FlexNBD
@ip = ip
@port = port
@kill = []
@prefetch_proxy = false
end
@@ -247,6 +250,7 @@ module FlexNBD
"--port #{port} "\
"--conn-addr #{connect_ip} "\
"--conn-port #{connect_port} "\
"#{prefetch_proxy ? "--cache " : ""}"\
"#{@debug}"
end

View File

@@ -138,7 +138,7 @@ module FlexNBD
end
def accept( err_msg = "Timed out waiting for a connection", timeout = 2)
def accept( err_msg = "Timed out waiting for a connection", timeout = 5)
client_sock = nil
begin

View File

@@ -0,0 +1,190 @@
# encoding: utf-8
require 'flexnbd/fake_source'
require 'flexnbd/fake_dest'
module ProxyTests
def with_proxied_client( override_size = nil )
@env.serve1 unless @server_up
@env.proxy2 unless @proxy_up
@env.nbd2.can_die(0)
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
begin
result = client.read_hello
assert_equal "NBDMAGIC", result[:magic]
assert_equal override_size || @env.file1.size, result[:size]
yield client
ensure
client.close rescue nil
end
end
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
assert_raises(RuntimeError) { @env.proxy1 }
end
def test_read_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write_read_request(offset, 4096, "myhandle")
rsp = client.read_response
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
orig_data = @env.file1.read(offset, 4096)
data = client.read_raw(4096)
assert_equal 4096, orig_data.size
assert_equal 4096, data.size
assert_equal( orig_data, data,
"Returned data does not match on request #{n+1}" )
end
end
end
def test_write_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write(offset, "\xFF" * 4096)
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
data = @env.file1.read(offset, 4096)
assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" )
end
end
end
def make_fake_server
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
@server_up = true
# We return a thread here because accept() and connect() both block for us
Thread.new do
sc = server.accept # just tell the supervisor we're up
sc.write_hello
[ server, sc ]
end
end
def test_read_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write_read_request( 0, 4096 )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
assert_equal 0, req1[:from]
assert_not_equal 0, req1[:len]
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
sc2.write_data( "\xFF" * 4096 )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
data = client.read_raw( 4096 )
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
sc2.close
server.close
end
end
def test_write_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write( 0, ( "\xFF" * 4096 ) )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
data1 = sc1.read_data( 4096 )
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
data2 = sc2.read_data( 4096 )
assert_equal data1, data2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
sc2.close
server.close
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |client|
c2 = nil
assert_raises(Timeout::Error) do
timeout(1) do
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
c2.read_hello
end
end
c2.close rescue nil if c2
end
end
end

View File

@@ -0,0 +1,22 @@
require 'test/unit'
require 'environment'
require 'proxy_tests'
class TestPrefetchProxyMode < Test::Unit::TestCase
include ProxyTests
def setup
super
@env = Environment.new
@env.prefetch_proxy!
@env.writefile1( "f" * 16 )
end
def teardown
@env.cleanup
super
end
end

View File

@@ -1,200 +1,20 @@
require 'test/unit'
require 'environment'
require 'flexnbd/fake_source'
require 'flexnbd/fake_dest'
require 'proxy_tests'
class TestProxyMode < Test::Unit::TestCase
include ProxyTests
def setup
super
@env = Environment.new
@env.writefile1( "0" * 16 )
@env.writefile1( "f" * 16 )
end
def teardown
@env.cleanup
super
end
def with_proxied_client( override_size = nil )
@env.serve1 unless @server_up
@env.proxy2 unless @proxy_up
@env.nbd2.can_die(0)
client = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy")
begin
result = client.read_hello
assert_equal "NBDMAGIC", result[:magic]
assert_equal override_size || @env.file1.size, result[:size]
yield client
ensure
client.close rescue nil
end
end
def test_exits_with_error_when_cannot_connect_to_upstream_on_start
assert_raises(RuntimeError) { @env.proxy1 }
end
def test_read_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write_read_request(offset, 4096, "myhandle")
rsp = client.read_response
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
orig_data = @env.file1.read(offset, 4096)
data = client.read_raw(4096)
assert_equal 4096, orig_data.size
assert_equal 4096, data.size
assert_equal( orig_data, data, "Returned data does not match" )
end
end
end
def test_write_requests_successfully_proxied
with_proxied_client do |client|
(0..3).each do |n|
offset = n * 4096
client.write(offset, "\xFF" * 4096)
rsp = client.read_response
assert_equal FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal "myhandle", rsp[:handle]
assert_equal 0, rsp[:error]
data = @env.file1.read(offset, 4096)
assert_equal( ( "\xFF" * 4096 ), data, "Data not written correctly (offset is #{n})" )
end
end
end
def make_fake_server
server = FlexNBD::FakeDest.new(@env.ip, @env.port1)
@server_up = true
# We return a thread here because accept() and connect() both block for us
Thread.new do
sc = server.accept # just tell the supervisor we're up
sc.write_hello
[ server, sc ]
end
end
def test_read_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write_read_request( 0, 4096 )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_READ, req1[:type]
assert_equal 0, req1[:from]
assert_not_equal 0, req1[:len]
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
sc2.write_data( "\xFF" * 4096 )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
data = client.read_raw( 4096 )
assert_equal( ("\xFF" * 4096), data, "Wrong data returned" )
sc2.close
server.close
end
end
def test_write_request_retried_when_upstream_dies_partway
maker = make_fake_server
with_proxied_client(4096) do |client|
server, sc1 = maker.value
# Send the read request to the proxy
client.write( 0, ( "\xFF" * 4096 ) )
# ensure we're given the read request
req1 = sc1.read_request
assert_equal ::FlexNBD::REQUEST_MAGIC, req1[:magic]
assert_equal ::FlexNBD::REQUEST_WRITE, req1[:type]
assert_equal 0, req1[:from]
assert_equal 4096, req1[:len]
data1 = sc1.read_data( 4096 )
assert_equal( ( "\xFF" * 4096 ), data1, "Data not proxied successfully" )
# Kill the server again, now we're sure the read request has been sent once
sc1.close
# We expect the proxy to reconnect without our client doing anything.
sc2 = server.accept
sc2.write_hello
# And once reconnected, it should resend an identical request.
req2 = sc2.read_request
assert_equal req1, req2
data2 = sc2.read_data( 4096 )
assert_equal data1, data2
# The reply should be proxied back to the client.
sc2.write_reply( req2[:handle] )
# Check it to make sure it's correct
rsp = timeout(15) { client.read_response }
assert_equal ::FlexNBD::REPLY_MAGIC, rsp[:magic]
assert_equal 0, rsp[:error]
assert_equal req1[:handle], rsp[:handle]
sc2.close
server.close
end
end
def test_only_one_client_can_connect_to_proxy_at_a_time
with_proxied_client do |client|
c2 = nil
assert_raises(Timeout::Error) do
timeout(1) do
c2 = FlexNBD::FakeSource.new(@env.ip, @env.port2, "Couldn't connect to proxy (2)")
c2.read_hello
end
end
c2.close rescue nil if c2
end
end
end

View File

@@ -10,7 +10,7 @@
START_TEST(test_bit_set)
{
uint64_t num = 0;
char *bits = (char*) &num;
bitfield_p bits = (bitfield_p) &num;
#define TEST_BIT_SET(bit, newvalue) \
bit_set(bits, (bit)); \
@@ -27,7 +27,7 @@ END_TEST
START_TEST(test_bit_clear)
{
uint64_t num = 0xffffffffffffffff;
char *bits = (char*) &num;
bitfield_p bits = (bitfield_p) &num;
#define TEST_BIT_CLEAR(bit, newvalue) \
bit_clear(bits, (bit)); \
@@ -44,7 +44,7 @@ END_TEST
START_TEST(test_bit_tests)
{
uint64_t num = 0x5555555555555555;
char *bits = (char*) &num;
bitfield_p bits = (bitfield_p) &num;
fail_unless(bit_has_value(bits, 0, 1), "bit_has_value malfunction");
fail_unless(bit_has_value(bits, 1, 0), "bit_has_value malfunction");
@@ -58,7 +58,7 @@ END_TEST
START_TEST(test_bit_ranges)
{
char buffer[4160];
bitfield_word_t buffer[BIT_WORDS_FOR_SIZE(4160)];
uint64_t *longs = (unsigned long*) buffer;
uint64_t i;
@@ -84,7 +84,7 @@ END_TEST
START_TEST(test_bit_runs)
{
char buffer[256];
bitfield_word_t buffer[BIT_WORDS_FOR_SIZE(256)];
int i, ptr=0, runs[] = {
56,97,22,12,83,1,45,80,85,51,64,40,63,67,75,64,94,81,79,62
};

View File

@@ -57,7 +57,7 @@ void * responder( void *respond_uncast )
fd_write_reply( sock_fd, wrong_handle, 0 );
}
else {
fd_write_reply( sock_fd, resp->received.handle, 0 );
fd_write_reply( sock_fd, (char*)resp->received.handle.b, 0 );
}
write( sock_fd, "12345678", 8 );
}

View File

@@ -93,7 +93,7 @@ END_TEST
int connect_client( char *addr, int actual_port, char *source_addr )
{
int client_fd;
int client_fd = -1;
struct addrinfo hint;
struct addrinfo *ailist, *aip;