42 Commits

Author SHA1 Message Date
Patrick J Cherry
e547696d0d Merge branch 'dev-michel' of gitlab.bytemark.co.uk:open-source/flexnbd-c into dev-michel 2017-07-14 17:05:17 +01:00
James F. Carter
59f264184b Merge pull request #1 from BytemarkHosting/better-stats
Calculate and return bytes_left in migration statistics
2017-07-14 16:36:50 +01:00
Chris Elsworth
42d206cfb7 Update test 2017-07-14 16:26:25 +01:00
Chris Elsworth
ab3106202a Also return migration_bytes_left 2017-07-14 16:18:34 +01:00
James Carter
e04dead5ce Merge branch 'update-changelog' into 'develop'
Updated changelog.

See merge request !30
2017-04-13 12:52:00 +01:00
Patrick J Cherry
88bc5f0643 Updated changelog. 2017-04-13 12:49:55 +01:00
James Carter
e89c87e2b9 Merge branch 'fix-compiler-flags' into 'develop'
Remove lots of per-cpu compiler flags.

See merge request !28
2017-02-23 12:11:25 +00:00
Patrick J Cherry
9d2ac3f403 Remove lots of per-cpu compiler flags.
These flags appear to cause SIGILL when flexnbd starts on some CPUs.
2017-02-22 17:52:52 +00:00
James Carter
67823bf85b Merge branch '32-package-and-publish-in-gitlab-ci-retire-maker2-job' into 'master'
Resolve "package and publish in gitlab-ci - retire maker2 job"

Closes #32 and #21

See merge request !27
2017-01-23 14:04:43 +00:00
Patrick J Cherry
17d30b86ad Updated build-deps to have libsubunit and ruby-test-unit 2017-01-23 14:00:09 +00:00
Patrick J Cherry
b97bcd6f51 Don't test separately from packaging. Also use correct source "format" 2017-01-23 13:58:04 +00:00
Patrick J Cherry
4d3c15a4d0 Switch to native from quilted packaging 2017-01-23 13:52:22 +00:00
Patrick J Cherry
83d6872a8d Add ruby test dependency 2017-01-23 13:48:19 +00:00
Patrick J Cherry
ab8470aef3 Modernise gitlab-ci 2017-01-23 13:46:42 +00:00
Patrick J Cherry
716df32fd6 Merge remote-tracking branch 'origin/debian' into 32-package-and-publish-in-gitlab-ci-retire-maker2-job 2017-01-23 13:44:44 +00:00
Michel Pollet
c19901cf10 mbox: Simplified
Removed the existing mbox that is used to pass transactions from one
thread to the other, use a non-locking FIFO and a simple semaphore
instead.

Removed all notion of void* from the FIFO system, use the now generic
FIFO. Also started to rework the mirror.c code to use typedefs for
structs enums etc.

DO NOT USE. check_mbox is borken and needs changing. Currently
'functional' otherwise like this, but requires more testing.

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-11-02 12:22:19 +00:00
Michel Pollet
781a91fe3d fifo: Add fifo_declare.h
My own implementation, used in countless projects

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-11-02 12:21:55 +00:00
Michel Pollet
90e8b13df5 fifo: Split the bitset.h
And fix all dependencies

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-11-02 12:21:55 +00:00
Michel Pollet
9ab1af8dff tools: semtest: new tool
Piece of code I used to validate that socketpair is a lot quicker than
pthread_semaphores.

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-11-02 12:21:55 +00:00
Michel Pollet
c265d7fe3f tools: holemap: new tool
Tool to list how a file is 'sparse' and calculate how 'sparse' it
/could/ be.

Signed-off-by: Michel Pollet <buserror@gmail.com>
2016-11-02 12:21:55 +00:00
Michel Pollet
1a768d5e9c Merge branch '29-fix-linker-issue' into 'master'
Link against subunit for testing.

This fixes the problems in Debian stretch+.

Closes #29

See merge request !26
2016-10-13 16:47:37 +01:00
Patrick J Cherry
72992c76ac Added libsubunit to the gitlab-ci 2016-10-13 16:42:21 +01:00
Patrick J Cherry
cace8123f4 Link against subunit for testing.
This fixes the problems in Debian stretch+.
2016-10-13 16:39:20 +01:00
Patrick J Cherry
c3b241464a Updated changelog 2016-10-07 12:26:52 +01:00
Patrick J Cherry
4f956e4b9d Merge branch 'master' of gitlab.bytemark.co.uk:open-source/flexnbd-c into debian 2016-10-07 12:24:51 +01:00
James Carter
b4cb2d9240 Merge branch 'fix-wrong-handle-type' into 'master'
Fix up "wrong" handle type from char* to uint64_t

Following from the NBD handle comparison simplifications.

See merge request !25
2016-10-07 10:20:35 +01:00
James Carter
1efb7bada6 Merge branch 'fix-unsigned-longs-in-bitset-test' into 'master'
fix check_bitset test on 32-bit platforms

The use of `unsigned long` and `UL` suffices caused this test to fail
on 32 bit platforms, where these are just 4, not 8 bits long.

```
tests/unit/check_bitset.c:73:F:bit:test_bit_ranges:0: longs[32] = 0 SHOULD BE ffffffff
```

See merge request !24
2016-10-07 10:20:08 +01:00
James Carter
6bc2a4c0b9 Merge branch 'fix-cast-from-pointer-to-wrong-size-integer-in-serve' into 'master'
This fixes the compiler warning pointer-to-int-cast in serve.c

```
In file included from src/server/bitset.h:4:0,
                 from src/server/mirror.h:8,
                 from src/server/flexnbd.h:5,
                 from src/server/serve.h:8,
                 from src/server/serve.c:1:
src/server/serve.c: In function 'tryjoin_client_thread':
src/server/serve.c:258:6: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
      (uint64_t)status);
      ^
```

See merge request !23
2016-10-07 09:59:24 +01:00
James Carter
59de76c50c Merge branch 'skip-large-file-test-on-i386' into 'master'
Skip large file test on 32-bit platforms

This test cannot run on 32-bit machines as they cannot access files
large than 2G.  Makes flexnbd on 32-bit a bit useless really..

See merge request !22
2016-10-07 09:57:09 +01:00
Patrick J Cherry
209da655b3 Skip large file test on 32-bit platforms
This test cannot run on 32-bit machines as they cannot access files
large than 2G.  Makes flexnbd on 32-bit a bit useless really..
2016-10-06 21:42:52 +01:00
Patrick J Cherry
52b45e6b40 fix check_bitset test on 32-bit platforms
The use of `unsigned long` and `UL` suffices caused this test to fail
on 32 bit platforms, where these are just 4, not 8 bits long.

```
tests/unit/check_bitset.c:73:F:bit:test_bit_ranges:0: longs[32] = 0 SHOULD BE ffffffff
```
2016-10-06 21:22:53 +01:00
Patrick J Cherry
d279eb7570 Fix up "wrong" handle type from char* to uint64_t
Following from the NBD handle comparison simplifications.
2016-10-06 21:19:15 +01:00
Patrick J Cherry
c07df76ede This fixes the compiler warning pointer-to-int-cast in serve.c
```
In file included from src/server/bitset.h:4:0,
                 from src/server/mirror.h:8,
                 from src/server/flexnbd.h:5,
                 from src/server/serve.h:8,
                 from src/server/serve.c:1:
src/server/serve.c: In function 'tryjoin_client_thread':
src/server/serve.c:258:6: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
      (uint64_t)status);
      ^
```
2016-10-06 21:16:07 +01:00
Patrick J Cherry
e7e99b099c Updated debian packaging, adding in new build-deps. 2016-10-06 16:02:15 +01:00
Patrick J Cherry
b2edd0734a Merge branch 'master' of gitlab.bytemark.co.uk:open-source/flexnbd-c into debian 2016-10-06 16:00:14 +01:00
James Carter
e19d005636 Merge branch '26-fix-function-definition' into 'master'
OK removed the cast and fixed the function def in the test

This should definitely clear the warning.

Closes #26

See merge request !21
2016-10-06 15:59:57 +01:00
Patrick J Cherry
8fed794fe7 Merge branch 'master' of gitlab.bytemark.co.uk:open-source/flexnbd-c into debian 2016-10-06 15:47:25 +01:00
Patrick J Cherry
3571d3f82e Added net-tools to the build-deps for testing
Fixes #21
2016-10-05 09:27:10 +01:00
Patrick J Cherry
4cd7e764bb Updated changelog 2016-10-04 21:22:07 +01:00
Patrick J Cherry
4f535fbb02 Merge branch 'master' of gitlab.bytemark.co.uk:open-source/flexnbd-c into debian 2016-10-04 21:14:26 +01:00
Patrick J Cherry
4ed8d49b2c Updated rules to skip ruby tests, and just use the normal make check 2016-08-31 10:06:07 +01:00
Patrick J Cherry
3af0e84f5f Updated Debian packaging to be in a separate branch.
This should allow us to use git-buildpackage to build our packages.
2016-08-30 21:57:00 +01:00
33 changed files with 4017 additions and 502 deletions

View File

@@ -1,10 +1,27 @@
image: "ruby:2.1"
before_script:
- apt-get update; apt-get install -y check libev-dev net-tools dpkg-dev
unit_test:
stages:
- package
- publish
package:jessie: &package
stage: package
image: $CI_REGISTRY/docker-images/layers:$DISTRO-deb
variables:
DISTRO: jessie
script:
- make clean
- make build
- make test
- package
artifacts:
paths:
- pkg/
package:stretch:
<<: *package
variables:
DISTRO: stretch
publish:
stage: publish
tags:
- shell
script:
- publish

View File

@@ -11,28 +11,9 @@ ifdef DEBUG
else
CFLAGS_EXTRA=-O2
endif
CFLAGS_EXTRA += -fPIC --std=gnu99
LDFLAGS_EXTRA += -Wl,--relax,--gc-sections
TOOLCHAIN := $(shell $(CC) --version|awk '/Debian/ {print "debian";exit;}')
#
# This bit adds extra flags depending of the distro, and the
# architecture. To make sure debian packages have the right
# set of 'native' flags on them
#
ifeq ($(TOOLCHAIN),debian)
DEBARCH := $(shell dpkg-architecture -qDEB_BUILD_ARCH)
ifeq ($(DEBARCH),$(filter $(DEBARCH),amd64 i386))
CFLAGS_EXTRA += -march=native
endif
ifeq ($(DEBARCH),armhf)
CFLAGS_EXTRA += -march=armv7-a -mtune=cortex-a8 -mfpu=neon
endif
LDFLAGS_EXTRA += -L$(LIB) -Wl,-rpath,${shell readlink -f ${LIB}}
else
LDFLAGS_EXTRA += -L$(LIB) -Wl,-rpath-link,$(LIB)
endif
LDFLAGS_EXTRA += -Wl,--relax,--gc-sections -L$(LIB) -Wl,-rpath-link,$(LIB)
# The -Wunreachable-code warning is only implemented in clang, but it
# doesn't break anything for gcc to see it.
@@ -42,10 +23,10 @@ WARNINGS=-Wall \
-Wstrict-prototypes \
-Wno-missing-field-initializers \
-Wunreachable-code
CCFLAGS=-D_GNU_SOURCE=1 $(WARNINGS) $(CFLAGS_EXTRA) $(CFLAGS)
LLDFLAGS=-lm -lrt -lev $(LDFLAGS_EXTRA) $(LDFLAGS)
CC?=gcc
LIBS=-lpthread
@@ -94,7 +75,7 @@ CHECK_OBJ := $(CHECK_SRC:tests/unit/%.c=build/%.o)
CHECK_BINS := $(CHECK_SRC:tests/unit/%.c=build/%)
build/check_%: build/check_%.o
$(LINK) $^ -o $@ $(COMMON_OBJ) $(SERVER_OBJ) -lcheck
$(LINK) $^ -o $@ $(COMMON_OBJ) $(SERVER_OBJ) -lcheck -lsubunit
check_objs: $(CHECK_OBJ)

2804
debian/changelog vendored Normal file

File diff suppressed because it is too large Load Diff

1
debian/compat vendored Normal file
View File

@@ -0,0 +1 @@
7

25
debian/control vendored Normal file
View File

@@ -0,0 +1,25 @@
Source: flexnbd
Section: web
Priority: extra
Maintainer: Patrick J Cherry <patrick@bytemark.co.uk>
Build-Depends: debhelper (>= 7.0.50), ruby, gcc, libev-dev, txt2man, check, net-tools, libsubunit-dev, ruby-test-unit
Standards-Version: 3.8.1
Homepage: https://github.com/BytemarkHosting/flexnbd-c
Package: flexnbd
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}, libev4 | libev3
Description: FlexNBD server
An NBD server offering push-mirroring and intelligent sparse file handling
Package: flexnbd-dbg
Architecture: any
Section: debug
Priority: extra
Depends:
flexnbd (= ${binary:Version}),
${misc:Depends}
Description: debugging symbols for flexnbd
An NBD server offering push-mirroring and intelligent sparse file handling
.
This package contains the debugging symbols for flexnbd.

53
debian/copyright vendored Normal file
View File

@@ -0,0 +1,53 @@
This work was packaged for Debian by:
Alex Young <alex@bytemark.co.uk> on Wed, 30 May 2012 16:46:58 +0100
It was downloaded from:
<url://example.com>
Upstream Author(s):
<put author's name and email here>
<likewise for another author>
Copyright:
<Copyright (C) YYYY Firstname Lastname>
<likewise for another author>
License:
### SELECT: ###
This package is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
### OR ###
This package is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2 as
published by the Free Software Foundation.
##########
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
On Debian systems, the complete text of the GNU General
Public License version 2 can be found in "/usr/share/common-licenses/GPL-2".
The Debian packaging is:
Copyright (C) 2012 Alex Young <alex@bytemark.co.uk>
you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
# Please also look if there are files or directories which have a
# different copyright/license attached and list them here.

3
debian/flexnbd.install vendored Normal file
View File

@@ -0,0 +1,3 @@
build/flexnbd usr/bin
build/flexnbd-proxy usr/bin

2
debian/flexnbd.manpages vendored Normal file
View File

@@ -0,0 +1,2 @@
build/flexnbd.1.gz
build/flexnbd-proxy.1.gz

19
debian/rules vendored Executable file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/make -f
# -*- makefile -*-
# Uncomment this to turn on verbose mode.
#export DH_VERBOSE=1
%:
dh $@
override_dh_strip:
dh_strip --dbg-package=flexnbd-dbg
#
# TODO: The ruby test suites don't work during buiding in a chroot, so leave
# them out for now.
#
#override_dh_auto_test:
# rake test:run

1
debian/source/format vendored Normal file
View File

@@ -0,0 +1 @@
3.0 (native)

189
src/common/fifo_declare.h Normal file
View File

@@ -0,0 +1,189 @@
/*
fido_declare.h
Copyright (C) 2003-2012 Michel Pollet <buserror@gmail.com>
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
* FIFO helpers, aka circular buffers
*
* these macros define accessories for FIFOs of any name, type and
* any (power of two) size
*/
#ifndef __FIFO_DECLARE__
#define __FIFO_DECLARE__
#ifdef __cplusplus
extern "C" {
#endif
/*
doing a :
DECLARE_FIFO(uint8_t, myfifo, 128);
will declare :
enum : myfifo_overflow_f
type : myfifo_t
functions:
// write a byte into the fifo, return 1 if there was room, 0 if there wasn't
int myfifo_write(myfifo_t *c, uint8_t b);
// reads a byte from the fifo, return 0 if empty. Use myfifo_isempty() to check beforehand
uint8_t myfifo_read(myfifo_t *c);
int myfifo_isfull(myfifo_t *c);
int myfifo_isempty(myfifo_t *c);
// returns number of items to read now
uint16_t myfifo_get_read_size(myfifo_t *c);
// read item at offset o from read cursor, no cursor advance
uint8_t myfifo_read_at(myfifo_t *c, uint16_t o);
// write b at offset o compared to current write cursor, no cursor advance
void myfifo_write_at(myfifo_t *c, uint16_t o, uint8_t b);
In your .c you need to 'implement' the fifo:
DEFINE_FIFO(uint8_t, myfifo)
To use the fifo, you must declare at least one :
myfifo_t fifo = FIFO_NULL;
while (!myfifo_isfull(&fifo))
myfifo_write(&fifo, 0xaa);
....
while (!myfifo_isempty(&fifo))
b = myfifo_read(&fifo);
*/
#include <stdint.h>
#if __AVR__
#define FIFO_CURSOR_TYPE uint8_t
#define FIFO_BOOL_TYPE char
#define FIFO_INLINE
#define FIFO_SYNC
#endif
#ifndef FIFO_CURSOR_TYPE
#define FIFO_CURSOR_TYPE uint16_t
#endif
#ifndef FIFO_BOOL_TYPE
#define FIFO_BOOL_TYPE int
#endif
#ifndef FIFO_INLINE
#define FIFO_INLINE inline
#endif
/* We should not need volatile */
#ifndef FIFO_VOLATILE
#define FIFO_VOLATILE
#endif
#ifndef FIFO_SYNC
#define FIFO_SYNC __sync_synchronize()
#endif
#ifndef FIFO_ZERO_INIT
#define FIFO_ZERO_INIT {0}
#endif
#define FIFO_NULL { FIFO_ZERO_INIT, 0, 0, 0 }
/* New compilers don't like unused static functions. However,
* we do like 'static inlines' for these small accessors,
* so we mark them as 'unused'. It stops it complaining */
#ifdef __GNUC__
#define FIFO_DECL static __attribute__ ((unused))
#else
#define FIFO_DECL static
#endif
#define DECLARE_FIFO(__type, __name, __size) \
enum { __name##_overflow_f = (1 << 0) }; \
enum { __name##_fifo_size = (__size) }; \
typedef struct __name##_t { \
__type buffer[__name##_fifo_size]; \
FIFO_VOLATILE FIFO_CURSOR_TYPE read; \
FIFO_VOLATILE FIFO_CURSOR_TYPE write; \
FIFO_VOLATILE uint8_t flags; \
} __name##_t
#define DEFINE_FIFO(__type, __name) \
FIFO_DECL FIFO_INLINE FIFO_BOOL_TYPE __name##_write(__name##_t * c, __type b)\
{\
FIFO_CURSOR_TYPE now = c->write;\
FIFO_CURSOR_TYPE next = (now + 1) & (__name##_fifo_size-1);\
if (c->read != next) { \
c->buffer[now] = b;\
FIFO_SYNC; \
c->write = next;\
return 1;\
}\
return 0;\
}\
FIFO_DECL FIFO_INLINE FIFO_BOOL_TYPE __name##_isfull(__name##_t *c)\
{\
FIFO_CURSOR_TYPE next = (c->write + 1) & (__name##_fifo_size-1);\
return c->read == next;\
}\
FIFO_DECL FIFO_INLINE FIFO_BOOL_TYPE __name##_isempty(__name##_t * c)\
{\
return c->read == c->write;\
}\
FIFO_DECL FIFO_INLINE __type __name##_read(__name##_t * c)\
{\
__type res = FIFO_ZERO_INIT; \
FIFO_CURSOR_TYPE read = c->read;\
if (read == c->write)\
return res;\
res = c->buffer[read];\
FIFO_SYNC; \
c->read = (read + 1) & (__name##_fifo_size-1);\
return res;\
}\
FIFO_DECL FIFO_INLINE FIFO_CURSOR_TYPE __name##_get_read_size(__name##_t *c)\
{\
return ((c->write + __name##_fifo_size) - c->read) & (__name##_fifo_size-1);\
}\
FIFO_DECL FIFO_INLINE FIFO_CURSOR_TYPE __name##_get_write_size(__name##_t *c)\
{\
return (__name##_fifo_size-1) - __name##_get_read_size(c);\
}\
FIFO_DECL FIFO_INLINE void __name##_read_offset(__name##_t *c, FIFO_CURSOR_TYPE o)\
{\
FIFO_SYNC; \
c->read = (c->read + o) & (__name##_fifo_size-1);\
}\
FIFO_DECL FIFO_INLINE __type __name##_read_at(__name##_t *c, FIFO_CURSOR_TYPE o)\
{\
return c->buffer[(c->read + o) & (__name##_fifo_size-1)];\
}\
FIFO_DECL FIFO_INLINE void __name##_write_at(__name##_t *c, FIFO_CURSOR_TYPE o, __type b)\
{\
c->buffer[(c->write + o) & (__name##_fifo_size-1)] = b;\
}\
FIFO_DECL FIFO_INLINE void __name##_write_offset(__name##_t *c, FIFO_CURSOR_TYPE o)\
{\
FIFO_SYNC; \
c->write = (c->write + o) & (__name##_fifo_size-1);\
}\
FIFO_DECL FIFO_INLINE void __name##_reset(__name##_t *c)\
{\
FIFO_SYNC; \
c->read = c->write = c->flags = 0;\
}\
struct __name##_t
#ifdef __cplusplus
};
#endif
#endif

View File

@@ -9,7 +9,7 @@
#include <fcntl.h>
#include "util.h"
#include "bitset.h"
#include "bitstream.h"
#include "ioutil.h"

View File

@@ -130,309 +130,6 @@ static inline uint64_t bit_run_count(bitfield_p b, uint64_t from, uint64_t len,
return count;
}
enum bitset_stream_events {
BITSET_STREAM_UNSET = 0,
BITSET_STREAM_SET = 1,
BITSET_STREAM_ON = 2,
BITSET_STREAM_OFF = 3
};
#define BITSET_STREAM_EVENTS_ENUM_SIZE 4
struct bitset_stream_entry {
enum bitset_stream_events event;
uint64_t from;
uint64_t len;
};
/** Limit the stream size to 1MB for now.
*
* If this is too small, it'll cause requests to stall as the migration lags
* behind the changes made by those requests.
*/
#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) )
struct bitset_stream {
struct bitset_stream_entry entries[BITSET_STREAM_SIZE];
int in;
int out;
int size;
pthread_mutex_t mutex;
pthread_cond_t cond_not_full;
pthread_cond_t cond_not_empty;
uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE];
};
/** An application of a bitset - a bitset mapping represents a file of ''size''
* broken down into ''resolution''-sized chunks. The bit set is assumed to
* represent one bit per chunk. We also bundle a lock so that the set can be
* written reliably by multiple threads.
*/
struct bitset {
pthread_mutex_t lock;
uint64_t size;
int resolution;
struct bitset_stream *stream;
int stream_enabled;
bitfield_word_t bits[];
};
/** Allocate a bitset for a file of the given size, and chunks of the
* given resolution.
*/
static inline struct bitset *bitset_alloc( uint64_t size, int resolution )
{
// calculate a size to allocate that is a multiple of the size of the
// bitfield word
size_t bitfield_size =
BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t );
struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) );
bitset->size = size;
bitset->resolution = resolution;
/* don't actually need to call pthread_mutex_destroy '*/
pthread_mutex_init(&bitset->lock, NULL);
bitset->stream = xmalloc( sizeof( struct bitset_stream ) );
pthread_mutex_init( &bitset->stream->mutex, NULL );
/* Technically don't need to call pthread_cond_destroy either */
pthread_cond_init( &bitset->stream->cond_not_full, NULL );
pthread_cond_init( &bitset->stream->cond_not_empty, NULL );
return bitset;
}
static inline void bitset_free( struct bitset * set )
{
/* TODO: free our mutex... */
free( set->stream );
set->stream = NULL;
free( set );
}
#define INT_FIRST_AND_LAST \
uint64_t first = from/set->resolution, \
last = ((from+len)-1)/set->resolution, \
bitlen = (last-first)+1
#define BITSET_LOCK \
FATAL_IF_NEGATIVE(pthread_mutex_lock(&set->lock), "Error locking bitset")
#define BITSET_UNLOCK \
FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset")
static inline void bitset_stream_enqueue(
struct bitset * set,
enum bitset_stream_events event,
uint64_t from,
uint64_t len
)
{
struct bitset_stream * stream = set->stream;
pthread_mutex_lock( &stream->mutex );
while ( stream->size == BITSET_STREAM_SIZE ) {
pthread_cond_wait( &stream->cond_not_full, &stream->mutex );
}
stream->entries[stream->in].event = event;
stream->entries[stream->in].from = from;
stream->entries[stream->in].len = len;
stream->queued_bytes[event] += len;
stream->size++;
stream->in++;
stream->in %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( & stream->mutex );
pthread_cond_signal( &stream->cond_not_empty );
return;
}
static inline void bitset_stream_dequeue(
struct bitset * set,
struct bitset_stream_entry * out
)
{
struct bitset_stream * stream = set->stream;
struct bitset_stream_entry * dequeued;
pthread_mutex_lock( &stream->mutex );
while ( stream->size == 0 ) {
pthread_cond_wait( &stream->cond_not_empty, &stream->mutex );
}
dequeued = &stream->entries[stream->out];
if ( out != NULL ) {
out->event = dequeued->event;
out->from = dequeued->from;
out->len = dequeued->len;
}
stream->queued_bytes[dequeued->event] -= dequeued->len;
stream->size--;
stream->out++;
stream->out %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( &stream->mutex );
pthread_cond_signal( &stream->cond_not_full );
return;
}
static inline size_t bitset_stream_size( struct bitset * set )
{
size_t size;
pthread_mutex_lock( &set->stream->mutex );
size = set->stream->size;
pthread_mutex_unlock( &set->stream->mutex );
return size;
}
static inline uint64_t bitset_stream_queued_bytes(
struct bitset * set,
enum bitset_stream_events event
)
{
uint64_t total;
pthread_mutex_lock( &set->stream->mutex );
total = set->stream->queued_bytes[event];
pthread_mutex_unlock( &set->stream->mutex );
return total;
}
static inline void bitset_enable_stream( struct bitset * set )
{
BITSET_LOCK;
set->stream_enabled = 1;
bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size );
BITSET_UNLOCK;
}
static inline void bitset_disable_stream( struct bitset * set )
{
BITSET_LOCK;
bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size );
set->stream_enabled = 0;
BITSET_UNLOCK;
}
/** Set the bits in a bitset which correspond to the given bytes in the larger
* file.
*/
static inline void bitset_set_range(
struct bitset * set,
uint64_t from,
uint64_t len)
{
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_set_range(set->bits, first, bitlen);
if ( set->stream_enabled ) {
bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len );
}
BITSET_UNLOCK;
}
/** Set every bit in the bitset. */
static inline void bitset_set( struct bitset * set )
{
bitset_set_range(set, 0, set->size);
}
/** Clear the bits in a bitset which correspond to the given bytes in the
* larger file.
*/
static inline void bitset_clear_range(
struct bitset * set,
uint64_t from,
uint64_t len)
{
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_clear_range(set->bits, first, bitlen);
if ( set->stream_enabled ) {
bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len );
}
BITSET_UNLOCK;
}
/** Clear every bit in the bitset. */
static inline void bitset_clear( struct bitset * set )
{
bitset_clear_range(set, 0, set->size);
}
/** As per bitset_run_count but also tells you whether the run it found was set
* or unset, atomically.
*/
static inline uint64_t bitset_run_count_ex(
struct bitset * set,
uint64_t from,
uint64_t len,
int* run_is_set
)
{
uint64_t run;
/* Clip our requests to the end of the bitset, avoiding uint underflow. */
if ( from > set->size ) {
return 0;
}
len = ( len + from ) > set->size ? ( set->size - from ) : len;
INT_FIRST_AND_LAST;
BITSET_LOCK;
run = bit_run_count(set->bits, first, bitlen, run_is_set) * set->resolution;
run -= (from % set->resolution);
BITSET_UNLOCK;
return run;
}
/** Counts the number of contiguous bytes that are represented as a run in
* the bit field.
*/
static inline uint64_t bitset_run_count(
struct bitset * set,
uint64_t from,
uint64_t len)
{
return bitset_run_count_ex( set, from, len, NULL );
}
/** Tests whether the bit field is clear for the given file offset.
*/
static inline int bitset_is_clear_at( struct bitset * set, uint64_t at )
{
return bit_is_clear(set->bits, at/set->resolution);
}
/** Tests whether the bit field is set for the given file offset.
*/
static inline int bitset_is_set_at( struct bitset * set, uint64_t at )
{
return bit_is_set(set->bits, at/set->resolution);
}
#endif

317
src/server/bitstream.h Normal file
View File

@@ -0,0 +1,317 @@
/*
* bitstream.h
*
* Created on: 13 Oct 2016
* Author: michel
*/
#ifndef SRC_SERVER_BITSTREAM_H_
#define SRC_SERVER_BITSTREAM_H_
#include "bitset.h"
enum bitset_stream_events {
BITSET_STREAM_UNSET = 0,
BITSET_STREAM_SET = 1,
BITSET_STREAM_ON = 2,
BITSET_STREAM_OFF = 3
};
#define BITSET_STREAM_EVENTS_ENUM_SIZE 4
struct bitset_stream_entry {
enum bitset_stream_events event;
uint64_t from;
uint64_t len;
};
/** Limit the stream size to 1MB for now.
*
* If this is too small, it'll cause requests to stall as the migration lags
* behind the changes made by those requests.
*/
#define BITSET_STREAM_SIZE ( ( 1024 * 1024 ) / sizeof( struct bitset_stream_entry ) )
struct bitset_stream {
struct bitset_stream_entry entries[BITSET_STREAM_SIZE];
int in;
int out;
int size;
pthread_mutex_t mutex;
pthread_cond_t cond_not_full;
pthread_cond_t cond_not_empty;
uint64_t queued_bytes[BITSET_STREAM_EVENTS_ENUM_SIZE];
};
/** An application of a bitset - a bitset mapping represents a file of ''size''
* broken down into ''resolution''-sized chunks. The bit set is assumed to
* represent one bit per chunk. We also bundle a lock so that the set can be
* written reliably by multiple threads.
*/
struct bitset {
pthread_mutex_t lock;
uint64_t size;
int resolution;
struct bitset_stream *stream;
int stream_enabled;
bitfield_word_t bits[];
};
/** Allocate a bitset for a file of the given size, and chunks of the
* given resolution.
*/
static inline struct bitset *bitset_alloc( uint64_t size, int resolution )
{
// calculate a size to allocate that is a multiple of the size of the
// bitfield word
size_t bitfield_size =
BIT_WORDS_FOR_SIZE((( size + resolution - 1 ) / resolution)) * sizeof( bitfield_word_t );
struct bitset *bitset = xmalloc(sizeof( struct bitset ) + ( bitfield_size / 8 ) );
bitset->size = size;
bitset->resolution = resolution;
/* don't actually need to call pthread_mutex_destroy '*/
pthread_mutex_init(&bitset->lock, NULL);
bitset->stream = xmalloc( sizeof( struct bitset_stream ) );
pthread_mutex_init( &bitset->stream->mutex, NULL );
/* Technically don't need to call pthread_cond_destroy either */
pthread_cond_init( &bitset->stream->cond_not_full, NULL );
pthread_cond_init( &bitset->stream->cond_not_empty, NULL );
return bitset;
}
static inline void bitset_free( struct bitset * set )
{
/* TODO: free our mutex... */
free( set->stream );
set->stream = NULL;
free( set );
}
#define INT_FIRST_AND_LAST \
uint64_t first = from/set->resolution, \
last = ((from+len)-1)/set->resolution, \
bitlen = (last-first)+1
#define BITSET_LOCK \
FATAL_IF_NEGATIVE(pthread_mutex_lock(&set->lock), "Error locking bitset")
#define BITSET_UNLOCK \
FATAL_IF_NEGATIVE(pthread_mutex_unlock(&set->lock), "Error unlocking bitset")
static inline void bitset_stream_enqueue(
struct bitset * set,
enum bitset_stream_events event,
uint64_t from,
uint64_t len
)
{
struct bitset_stream * stream = set->stream;
pthread_mutex_lock( &stream->mutex );
while ( stream->size == BITSET_STREAM_SIZE ) {
pthread_cond_wait( &stream->cond_not_full, &stream->mutex );
}
stream->entries[stream->in].event = event;
stream->entries[stream->in].from = from;
stream->entries[stream->in].len = len;
stream->queued_bytes[event] += len;
stream->size++;
stream->in++;
stream->in %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( & stream->mutex );
pthread_cond_signal( &stream->cond_not_empty );
return;
}
static inline void bitset_stream_dequeue(
struct bitset * set,
struct bitset_stream_entry * out
)
{
struct bitset_stream * stream = set->stream;
struct bitset_stream_entry * dequeued;
pthread_mutex_lock( &stream->mutex );
while ( stream->size == 0 ) {
pthread_cond_wait( &stream->cond_not_empty, &stream->mutex );
}
dequeued = &stream->entries[stream->out];
if ( out != NULL ) {
out->event = dequeued->event;
out->from = dequeued->from;
out->len = dequeued->len;
}
stream->queued_bytes[dequeued->event] -= dequeued->len;
stream->size--;
stream->out++;
stream->out %= BITSET_STREAM_SIZE;
pthread_mutex_unlock( &stream->mutex );
pthread_cond_signal( &stream->cond_not_full );
return;
}
static inline size_t bitset_stream_size( struct bitset * set )
{
size_t size;
pthread_mutex_lock( &set->stream->mutex );
size = set->stream->size;
pthread_mutex_unlock( &set->stream->mutex );
return size;
}
static inline uint64_t bitset_stream_queued_bytes(
struct bitset * set,
enum bitset_stream_events event
)
{
uint64_t total;
pthread_mutex_lock( &set->stream->mutex );
total = set->stream->queued_bytes[event];
pthread_mutex_unlock( &set->stream->mutex );
return total;
}
static inline void bitset_enable_stream( struct bitset * set )
{
BITSET_LOCK;
set->stream_enabled = 1;
bitset_stream_enqueue( set, BITSET_STREAM_ON, 0, set->size );
BITSET_UNLOCK;
}
static inline void bitset_disable_stream( struct bitset * set )
{
BITSET_LOCK;
bitset_stream_enqueue( set, BITSET_STREAM_OFF, 0, set->size );
set->stream_enabled = 0;
BITSET_UNLOCK;
}
/** Set the bits in a bitset which correspond to the given bytes in the larger
* file.
*/
static inline void bitset_set_range(
struct bitset * set,
uint64_t from,
uint64_t len)
{
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_set_range(set->bits, first, bitlen);
if ( set->stream_enabled ) {
bitset_stream_enqueue( set, BITSET_STREAM_SET, from, len );
}
BITSET_UNLOCK;
}
/** Set every bit in the bitset. */
static inline void bitset_set( struct bitset * set )
{
bitset_set_range(set, 0, set->size);
}
/** Clear the bits in a bitset which correspond to the given bytes in the
* larger file.
*/
static inline void bitset_clear_range(
struct bitset * set,
uint64_t from,
uint64_t len)
{
INT_FIRST_AND_LAST;
BITSET_LOCK;
bit_clear_range(set->bits, first, bitlen);
if ( set->stream_enabled ) {
bitset_stream_enqueue( set, BITSET_STREAM_UNSET, from, len );
}
BITSET_UNLOCK;
}
/** Clear every bit in the bitset. */
static inline void bitset_clear( struct bitset * set )
{
bitset_clear_range(set, 0, set->size);
}
/** As per bitset_run_count but also tells you whether the run it found was set
* or unset, atomically.
*/
static inline uint64_t bitset_run_count_ex(
struct bitset * set,
uint64_t from,
uint64_t len,
int* run_is_set
)
{
uint64_t run;
/* Clip our requests to the end of the bitset, avoiding uint underflow. */
if ( from > set->size ) {
return 0;
}
len = ( len + from ) > set->size ? ( set->size - from ) : len;
INT_FIRST_AND_LAST;
BITSET_LOCK;
run = bit_run_count(set->bits, first, bitlen, run_is_set) * set->resolution;
run -= (from % set->resolution);
BITSET_UNLOCK;
return run;
}
/** Counts the number of contiguous bytes that are represented as a run in
* the bit field.
*/
static inline uint64_t bitset_run_count(
struct bitset * set,
uint64_t from,
uint64_t len)
{
return bitset_run_count_ex( set, from, len, NULL );
}
/** Tests whether the bit field is clear for the given file offset.
*/
static inline int bitset_is_clear_at( struct bitset * set, uint64_t at )
{
return bit_is_clear(set->bits, at/set->resolution);
}
/** Tests whether the bit field is set for the given file offset.
*/
static inline int bitset_is_set_at( struct bitset * set, uint64_t at )
{
return bit_is_set(set->bits, at/set->resolution);
}
#endif /* SRC_SERVER_BITSTREAM_H_ */

View File

@@ -3,7 +3,7 @@
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include "bitset.h"
#include "bitstream.h"
#include "nbdtypes.h"
#include "self_pipe.h"

View File

@@ -83,7 +83,7 @@ void control_destroy( struct control * control )
struct control_client * control_client_create(
struct flexnbd * flexnbd,
int client_fd ,
struct mbox * state_mbox )
struct mbox_t * state_mbox )
{
NULLCHECK( flexnbd );
@@ -256,7 +256,7 @@ void * control_runner( void * control_uncast )
#define write_socket(msg) write(client_fd, (msg "\n"), strlen((msg))+1)
void control_write_mirror_response( enum mirror_state mirror_state, int client_fd )
void control_write_mirror_response( mirror_state_t mirror_state, int client_fd )
{
switch (mirror_state) {
case MS_INIT:
@@ -290,22 +290,16 @@ void control_write_mirror_response( enum mirror_state mirror_state, int client_f
/* Call this in the thread where you want to receive the mirror state */
enum mirror_state control_client_mirror_wait(
mirror_state_t control_client_mirror_wait(
struct control_client* client)
{
NULLCHECK( client );
NULLCHECK( client->mirror_state_mbox );
struct mbox * mbox = client->mirror_state_mbox;
enum mirror_state mirror_state;
enum mirror_state * contents;
struct mbox_t * mbox = client->mirror_state_mbox;
mirror_state_t mirror_state;
contents = (enum mirror_state*)mbox_receive( mbox );
NULLCHECK( contents );
mirror_state = *contents;
free( contents );
mirror_state = mbox_receive( mbox ).i;
return mirror_state;
}
@@ -425,7 +419,7 @@ int control_mirror(struct control_client* client, int linesc, char** lines)
);
debug("Control thread mirror super waiting");
enum mirror_state state =
mirror_state_t state =
control_client_mirror_wait( client );
debug("Control thread writing response");
control_write_mirror_response( state, client->socket );

View File

@@ -31,7 +31,7 @@ struct control {
* process (and we can only have a mirror thread if the control
* thread has started it).
*/
struct mbox * mirror_state_mbox;
struct mbox_t * mirror_state_mbox;
};
struct control_client{
@@ -41,7 +41,7 @@ struct control_client{
/* Passed in on creation. We know it's all right to do this
* because we know there's only ever one control_client.
*/
struct mbox * mirror_state_mbox;
struct mbox_t * mirror_state_mbox;
};
struct control * control_create(

View File

@@ -1,77 +1,73 @@
#include "mbox.h"
#include "util.h"
#include <sys/socket.h>
#include <pthread.h>
struct mbox * mbox_create( void )
DEFINE_FIFO(mbox_item_t, mbox_fifo);
#define ARRAY_SIZE(w) (sizeof(w) / sizeof((w)[0]))
mbox_p mbox_create( void )
{
struct mbox * mbox = xmalloc( sizeof( struct mbox ) );
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->filled_cond, NULL ),
"Failed to initialise a condition variable" );
FATAL_UNLESS( 0 == pthread_cond_init( &mbox->emptied_cond, NULL ),
"Failed to initialise a condition variable" );
FATAL_UNLESS( 0 == pthread_mutex_init( &mbox->mutex, NULL ),
"Failed to initialise a mutex" );
mbox_p mbox = xmalloc( sizeof( struct mbox_t ) );
int sv[2];
FATAL_UNLESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0,
"Failed to socketpair");
mbox->signalw = sv[0];
mbox->signalr = sv[1];
return mbox;
}
void mbox_post( struct mbox * mbox, void * contents )
void mbox_post( mbox_p mbox, mbox_item_t item )
{
pthread_mutex_lock( &mbox->mutex );
mbox_fifo_write(&mbox->fifo, item);
{
if (mbox->full){
pthread_cond_wait( &mbox->emptied_cond, &mbox->mutex );
}
mbox->contents = contents;
mbox->full = 1;
while( 0 != pthread_cond_signal( &mbox->filled_cond ) );
uint8_t w;
FATAL_UNLESS((write(mbox->signalw, &w, 1)) == 1,
"Write to socketpair");
}
pthread_mutex_unlock( &mbox->mutex );
}
void * mbox_contents( struct mbox * mbox )
mbox_item_t mbox_contents( mbox_p mbox )
{
return mbox->contents;
const mbox_item_t zero = {0};
return mbox_fifo_isempty(&mbox->fifo) ?
zero :
mbox_fifo_read_at(&mbox->fifo, 0);
}
int mbox_is_full( struct mbox * mbox )
int mbox_is_full( mbox_p mbox )
{
return mbox->full;
return mbox_fifo_isfull(&mbox->fifo);
}
void * mbox_receive( struct mbox * mbox )
{
NULLCHECK( mbox );
void * result;
pthread_mutex_lock( &mbox->mutex );
{
if ( !mbox->full ) {
pthread_cond_wait( &mbox->filled_cond, &mbox->mutex );
}
mbox->full = 0;
result = mbox->contents;
mbox->contents = NULL;
while( 0 != pthread_cond_signal( &mbox->emptied_cond));
}
pthread_mutex_unlock( &mbox->mutex );
return result;
}
void mbox_destroy( struct mbox * mbox )
mbox_item_t mbox_receive( mbox_p mbox )
{
NULLCHECK( mbox );
while( 0 != pthread_cond_destroy( &mbox->emptied_cond ) );
while( 0 != pthread_cond_destroy( &mbox->filled_cond ) );
while (mbox_fifo_isempty(&mbox->fifo)) {
uint8_t w;
FATAL_UNLESS((read(mbox->signalr, &w, 1)) == 1,
"Read from socketpair");
}
while( 0 != pthread_mutex_destroy( &mbox->mutex ) );
return mbox_fifo_read(&mbox->fifo);
}
void mbox_destroy( mbox_p mbox )
{
NULLCHECK( mbox );
close(mbox->signalw);
close(mbox->signalr);
free( mbox );
}

View File

@@ -11,45 +11,43 @@
#include <pthread.h>
#include <stdint.h>
#include "fifo_declare.h"
typedef union {
uint64_t i;
void * p;
} mbox_item_t;
DECLARE_FIFO(mbox_item_t, mbox_fifo, 8);
typedef struct mbox_t {
mbox_fifo_t fifo;
// socketpair() ends
int signalw, signalr;
} mbox_t, *mbox_p;
struct mbox {
void * contents;
/** Marker to tell us if there's content in the box.
* Keeping this separate allows us to use NULL for the contents.
*/
int full;
/** This gets signaled by mbox_post, and waited on by
* mbox_receive */
pthread_cond_t filled_cond;
/** This is signaled by mbox_receive, and waited on by mbox_post */
pthread_cond_t emptied_cond;
pthread_mutex_t mutex;
};
/* Create an mbox. */
struct mbox * mbox_create(void);
/* Create an mbox_t. */
mbox_p mbox_create(void);
/* Put something in the mbox, blocking if it's already full.
* That something can be NULL if you want.
*/
void mbox_post( struct mbox *, void *);
void mbox_post( mbox_p , mbox_item_t item);
/* See what's in the mbox. This isn't thread-safe. */
void * mbox_contents( struct mbox *);
mbox_item_t mbox_contents( mbox_p );
/* See if anything has been put into the mbox. This isn't thread-safe.
* */
int mbox_is_full( struct mbox *);
int mbox_is_full( mbox_p );
/* Get the contents from the mbox, blocking if there's nothing there. */
void * mbox_receive( struct mbox *);
mbox_item_t mbox_receive( mbox_p );
/* Free the mbox and destroy the associated pthread bits. */
void mbox_destroy( struct mbox *);
void mbox_destroy( mbox_p );
#endif

View File

@@ -22,7 +22,7 @@
#include "sockutil.h"
#include "parse.h"
#include "readwrite.h"
#include "bitset.h"
#include "bitstream.h"
#include "self_pipe.h"
#include "status.h"
@@ -66,7 +66,7 @@ struct xfer {
struct mirror_ctrl {
struct server *serve;
struct mirror *mirror;
mirror_p mirror;
/* libev stuff */
struct ev_loop *ev_loop;
@@ -90,16 +90,16 @@ struct mirror_ctrl {
};
struct mirror * mirror_alloc(
mirror_p mirror_alloc(
union mysockaddr * connect_to,
union mysockaddr * connect_from,
uint64_t max_Bps,
enum mirror_finish_action action_at_finish,
struct mbox * commit_signal)
mirror_finish_action_t action_at_finish,
mbox_p commit_signal)
{
struct mirror * mirror;
mirror_p mirror;
mirror = xmalloc(sizeof(struct mirror));
mirror = xmalloc(sizeof(mirror_t));
mirror->connect_to = connect_to;
mirror->connect_from = connect_from;
mirror->max_bytes_per_second = max_Bps;
@@ -116,7 +116,7 @@ struct mirror * mirror_alloc(
return mirror;
}
void mirror_set_state_f( struct mirror * mirror, enum mirror_state state )
void mirror_set_state_f( mirror_p mirror, mirror_state_t state )
{
NULLCHECK( mirror );
mirror->commit_state = state;
@@ -127,7 +127,7 @@ void mirror_set_state_f( struct mirror * mirror, enum mirror_state state )
mirror_set_state_f( mirror, state );\
} while(0)
enum mirror_state mirror_get_state( struct mirror * mirror )
mirror_state_t mirror_get_state( mirror_p mirror )
{
NULLCHECK( mirror );
return mirror->commit_state;
@@ -136,7 +136,7 @@ enum mirror_state mirror_get_state( struct mirror * mirror )
#define mirror_state_is( mirror, state ) mirror_get_state( mirror ) == state
void mirror_init( struct mirror * mirror, const char * filename )
void mirror_init( mirror_p mirror, const char * filename )
{
int map_fd;
uint64_t size;
@@ -163,7 +163,7 @@ void mirror_init( struct mirror * mirror, const char * filename )
/* Call this before a mirror attempt. */
void mirror_reset( struct mirror * mirror )
void mirror_reset( mirror_p mirror )
{
NULLCHECK( mirror );
mirror_set_state( mirror, MS_INIT );
@@ -176,16 +176,16 @@ void mirror_reset( struct mirror * mirror )
}
struct mirror * mirror_create(
mirror_p mirror_create(
const char * filename,
union mysockaddr * connect_to,
union mysockaddr * connect_from,
uint64_t max_Bps,
int action_at_finish,
struct mbox * commit_signal)
mbox_p commit_signal)
{
/* FIXME: shouldn't map_fd get closed? */
struct mirror * mirror;
mirror_p mirror;
mirror = mirror_alloc( connect_to,
connect_from,
@@ -201,7 +201,7 @@ struct mirror * mirror_create(
}
void mirror_destroy( struct mirror *mirror )
void mirror_destroy( mirror_p mirror )
{
NULLCHECK( mirror );
self_pipe_destroy( mirror->abandon_signal );
@@ -254,7 +254,7 @@ void mirror_cleanup( struct server * serve,
int fatal __attribute__((unused)))
{
NULLCHECK( serve );
struct mirror * mirror = serve->mirror;
mirror_p mirror = serve->mirror;
NULLCHECK( mirror );
info( "Cleaning up mirror thread");
@@ -270,7 +270,7 @@ void mirror_cleanup( struct server * serve,
}
int mirror_connect( struct mirror * mirror, uint64_t local_size )
int mirror_connect( mirror_p mirror, uint64_t local_size )
{
struct sockaddr * connect_from = NULL;
int connected = 0;
@@ -325,7 +325,7 @@ int mirror_connect( struct mirror * mirror, uint64_t local_size )
}
int mirror_should_quit( struct mirror * mirror )
int mirror_should_quit( mirror_p mirror )
{
switch( mirror->action_at_finish ) {
case ACTION_EXIT:
@@ -359,7 +359,7 @@ int mirror_should_wait( struct mirror_ctrl *ctrl )
* next transfer, then puts it together. */
int mirror_setup_next_xfer( struct mirror_ctrl *ctrl )
{
struct mirror* mirror = ctrl->mirror;
mirror_p mirror = ctrl->mirror;
struct server* serve = ctrl->serve;
struct bitset_stream_entry e = { .event = BITSET_STREAM_UNSET };
uint64_t current = mirror->offset, run = 0, size = serve->size;
@@ -508,7 +508,7 @@ static void mirror_read_cb( struct ev_loop *loop, ev_io *w, int revents )
struct mirror_ctrl* ctrl = (struct mirror_ctrl*) w->data;
NULLCHECK( ctrl );
struct mirror *m = ctrl->mirror;
mirror_p m = ctrl->mirror;
NULLCHECK( m );
struct xfer *xfer = &ctrl->xfer;
@@ -733,7 +733,7 @@ void mirror_run( struct server *serve )
NULLCHECK( serve );
NULLCHECK( serve->mirror );
struct mirror *m = serve->mirror;
mirror_p m = serve->mirror;
m->migration_started = monotonic_time_ms();
info("Starting mirror" );
@@ -849,18 +849,17 @@ void mirror_run( struct server *serve )
}
void mbox_post_mirror_state( struct mbox * mbox, enum mirror_state st )
void mbox_post_mirror_state( mbox_p mbox, mirror_state_t st )
{
NULLCHECK( mbox );
enum mirror_state * contents = xmalloc( sizeof( enum mirror_state ) );
*contents = st;
mbox_item_t ste = { .i = st };
mbox_post( mbox, contents );
mbox_post( mbox, ste );
}
void mirror_signal_commit( struct mirror * mirror )
void mirror_signal_commit( mirror_p mirror )
{
NULLCHECK( mirror );
@@ -887,7 +886,7 @@ void* mirror_runner(void* serve_params_uncast)
NULLCHECK( serve );
NULLCHECK( serve->mirror );
struct mirror * mirror = serve->mirror;
mirror_p mirror = serve->mirror;
error_set_handler( (cleanup_handler *) mirror_cleanup, serve );
@@ -932,15 +931,15 @@ abandon_mirror:
}
struct mirror_super * mirror_super_create(
mirror_super_p mirror_super_create(
const char * filename,
union mysockaddr * connect_to,
union mysockaddr * connect_from,
uint64_t max_Bps,
enum mirror_finish_action action_at_finish,
struct mbox * state_mbox)
mirror_finish_action_t action_at_finish,
mbox_p state_mbox)
{
struct mirror_super * super = xmalloc( sizeof( struct mirror_super) );
mirror_super_p super = xmalloc( sizeof( struct mirror_super_t) );
super->mirror = mirror_create(
filename,
connect_to,
@@ -955,8 +954,8 @@ struct mirror_super * mirror_super_create(
/* Post the current state of the mirror into super->state_mbox.*/
void mirror_super_signal_committed(
struct mirror_super * super ,
enum mirror_state commit_state )
mirror_super_p super ,
mirror_state_t commit_state )
{
NULLCHECK( super );
NULLCHECK( super->state_mbox );
@@ -967,7 +966,7 @@ void mirror_super_signal_committed(
}
void mirror_super_destroy( struct mirror_super * super )
void mirror_super_destroy( mirror_super_p super )
{
NULLCHECK( super );
@@ -993,8 +992,8 @@ void * mirror_super_runner( void * serve_uncast )
int should_retry = 0;
int success = 0, abandoned = 0;
struct mirror * mirror = serve->mirror;
struct mirror_super * super = serve->mirror_super;
mirror_p mirror = serve->mirror;
mirror_super_p super = serve->mirror_super;
do {
FATAL_IF( 0 != pthread_create(
@@ -1005,8 +1004,8 @@ void * mirror_super_runner( void * serve_uncast )
"Failed to create mirror thread");
debug("Supervisor waiting for commit signal");
enum mirror_state * commit_state =
mbox_receive( mirror->commit_signal );
mirror_state_t commit_state =
mbox_receive( mirror->commit_signal ).i;
debug( "Supervisor got commit signal" );
if ( first_pass ) {
@@ -1015,18 +1014,14 @@ void * mirror_super_runner( void * serve_uncast )
* retry behind the scenes. This may race with migration completing
* but since we "shouldn't retry" in that case either, that's fine
*/
should_retry = *commit_state == MS_GO;
should_retry = commit_state == MS_GO;
/* Only send this signal the first time */
mirror_super_signal_committed(
super,
*commit_state);
commit_state);
debug("Mirror supervisor committed");
}
/* We only care about the value of the commit signal on
* the first pass, so this is ok
*/
free( commit_state );
debug("Supervisor waiting for mirror thread" );
pthread_join( mirror->thread, NULL );

View File

@@ -7,7 +7,7 @@
#include "bitset.h"
#include "self_pipe.h"
enum mirror_state;
#include "serve.h"
#include "mbox.h"
@@ -57,14 +57,14 @@ enum mirror_state;
#define MS_REQUEST_LIMIT_SECS 60
#define MS_REQUEST_LIMIT_SECS_F 60.0
enum mirror_finish_action {
ACTION_EXIT,
typedef enum {
ACTION_EXIT = 0,
ACTION_UNLINK,
ACTION_NOTHING
};
} mirror_finish_action_t;
enum mirror_state {
MS_UNKNOWN,
typedef enum {
MS_UNKNOWN = 0,
MS_INIT,
MS_GO,
MS_ABANDONED,
@@ -73,9 +73,9 @@ enum mirror_state {
MS_FAIL_REJECTED,
MS_FAIL_NO_HELLO,
MS_FAIL_SIZE_MISMATCH
};
} mirror_state_t;
struct mirror {
typedef struct mirror_t {
pthread_t thread;
/* Signal to this then join the thread if you want to abandon mirroring */
@@ -90,19 +90,19 @@ struct mirror {
* over the network) are considered */
uint64_t max_bytes_per_second;
enum mirror_finish_action action_at_finish;
mirror_finish_action_t action_at_finish;
char *mapped;
/* We need to send every byte at least once; we do so by */
uint64_t offset;
enum mirror_state commit_state;
mirror_state_t commit_state;
/* commit_signal is sent immediately after attempting to connect
* and checking the remote size, whether successful or not.
*/
struct mbox * commit_signal;
struct mbox_t * commit_signal;
/* The time (from monotonic_time_ms()) the migration was started. Can be
* used to calculate bps, etc. */
@@ -110,14 +110,14 @@ struct mirror {
/* Running count of all bytes we've transferred */
uint64_t all_dirty;
};
} mirror_t, *mirror_p;
struct mirror_super {
struct mirror * mirror;
typedef struct mirror_super_t {
mirror_p mirror;
pthread_t thread;
struct mbox * state_mbox;
};
struct mbox_t * state_mbox;
} mirror_super_t, *mirror_super_p;
@@ -127,13 +127,13 @@ struct mirror_super {
struct server;
struct flexnbd;
struct mirror_super * mirror_super_create(
mirror_super_p mirror_super_create(
const char * filename,
union mysockaddr * connect_to,
union mysockaddr * connect_from,
uint64_t max_Bps,
enum mirror_finish_action action_at_finish,
struct mbox * state_mbox
mirror_finish_action_t action_at_finish,
struct mbox_t * state_mbox
);
void * mirror_super_runner( void * serve_uncast );

View File

@@ -4,7 +4,7 @@
#include "ioutil.h"
#include "sockutil.h"
#include "util.h"
#include "bitset.h"
#include "bitstream.h"
#include "control.h"
#include "self_pipe.h"
@@ -255,7 +255,7 @@ int tryjoin_client_thread( struct client_tbl_entry *entry, int (*joinfunc)(pthre
debug("nbd thread %016x exited (%s) with status %ld",
entry->thread,
s_client_address,
(uint64_t)status);
(uintptr_t)status);
client_destroy( entry->client );
entry->client = NULL;
entry->thread = 0;

View File

@@ -53,8 +53,8 @@ struct server {
* shutting down on a SIGTERM. */
struct flexthread_mutex * l_start_mirror;
struct mirror* mirror;
struct mirror_super * mirror_super;
struct mirror_t * mirror;
struct mirror_super_t * mirror_super;
/* This is used to stop the mirror from starting after we
* receive a SIGTERM */
int mirror_can_start;

View File

@@ -31,6 +31,7 @@ struct status * status_create( struct server * serve )
status->migration_speed_limit = serve->mirror->max_bytes_per_second;
status->migration_seconds_left = server_mirror_eta( serve );
status->migration_bytes_left = server_mirror_bytes_remaining( serve );
}
server_unlock_start_mirror( serve );
@@ -60,6 +61,7 @@ int status_write( struct status * status, int fd )
PRINT_UINT64( migration_speed );
PRINT_UINT64( migration_duration );
PRINT_UINT64( migration_seconds_left );
PRINT_UINT64( migration_bytes_left );
if ( status->migration_speed_limit < UINT64_MAX ) {
PRINT_UINT64( migration_speed_limit );
};

View File

@@ -64,6 +64,8 @@
* Our current best estimate of how many seconds are left before the migration
* migration is finished.
*
* migration_bytes_left:
* The number of bytes remaining to migrate.
*/
@@ -84,6 +86,7 @@ struct status {
uint64_t migration_speed;
uint64_t migration_speed_limit;
uint64_t migration_seconds_left;
uint64_t migration_bytes_left;
};
/** Create a status object for the given server. */

View File

@@ -115,6 +115,11 @@ class TestHappyPath < Test::Unit::TestCase
def test_write_to_high_block
#
# This test does not work on 32 bit platforms.
#
skip("Not relevant on 32-bit platforms") if ( ["a"].pack("p").size < 8 )
# Create a large file, then try to write to somewhere after the 2G boundary
@env.truncate1 "4G"
@env.serve1

View File

@@ -59,7 +59,7 @@ END_TEST
START_TEST(test_bit_ranges)
{
bitfield_word_t buffer[BIT_WORDS_FOR_SIZE(4160)];
uint64_t *longs = (unsigned long*) buffer;
uint64_t *longs = (uint64_t *) buffer;
uint64_t i;
memset(buffer, 0, 4160);
@@ -67,9 +67,9 @@ START_TEST(test_bit_ranges)
for (i=0; i<64; i++) {
bit_set_range(buffer, i*64, i);
fail_unless(
longs[i] == (1UL<<i)-1,
longs[i] == (1ULL<<i)-1,
"longs[%ld] = %lx SHOULD BE %lx",
i, longs[i], (1L<<i)-1
i, longs[i], (1ULL<<i)-1
);
fail_unless(longs[i+1] == 0, "bit_set_range overshot at i=%d", i);

View File

@@ -6,7 +6,7 @@
START_TEST( test_allocs_cvar )
{
struct mbox * mbox = mbox_create();
struct mbox_t * mbox = mbox_create();
fail_if( NULL == mbox, "Nothing allocated" );
pthread_cond_t cond_zero;
@@ -22,7 +22,7 @@ END_TEST
START_TEST( test_post_stores_value )
{
struct mbox * mbox = mbox_create();
struct mbox_t * mbox = mbox_create();
void * deadbeef = (void *)0xDEADBEEF;
mbox_post( mbox, deadbeef );
@@ -33,19 +33,18 @@ START_TEST( test_post_stores_value )
END_TEST
void * mbox_receive_runner( void * mbox_uncast )
mbox_item_t mbox_receive_runner( void * mbox_uncast )
{
struct mbox * mbox = (struct mbox *)mbox_uncast;
struct mbox_t * mbox = (struct mbox_t *)mbox_uncast;
void * contents = NULL;
contents = mbox_receive( mbox );
return contents;
return mbox_receive( mbox );
}
START_TEST( test_receive_blocks_until_post )
{
struct mbox * mbox = mbox_create();
struct mbox_t * mbox = mbox_create();
pthread_t receiver;
pthread_create( &receiver, NULL, mbox_receive_runner, mbox );

View File

@@ -46,8 +46,7 @@ void * responder( void *respond_uncast )
struct respond * resp = (struct respond *) respond_uncast;
int sock_fd = resp->sock_fds[1];
struct nbd_request_raw request_raw;
char wrong_handle[] = "WHOOPSIE";
uint64_t wrong_handle = 0x80;
if( fd_read_request( sock_fd, &request_raw ) == -1){
fprintf(stderr, "Problem with fd_read_request\n");

View File

@@ -308,6 +308,7 @@ START_TEST( test_renders_migration_statistics )
status.migration_speed = 40000000;
status.migration_speed_limit = 40000001;
status.migration_seconds_left = 1;
status.migration_bytes_left = 5000;
status_write( &status, fds[1] );
fail_if_rendered( fds[0], "migration_duration" );
@@ -335,6 +336,9 @@ START_TEST( test_renders_migration_statistics )
status_write( &status, fds[1] );
fail_unless_rendered( fds[0], "migration_seconds_left=1" );
status_write( &status, fds[1] );
fail_unless_rendered( fds[0], "migration_bytes_left=5000" );
status.migration_speed_limit = UINT64_MAX;
status_write( &status, fds[1] );

18
tools/Makefile Normal file
View File

@@ -0,0 +1,18 @@
CPPFLAGS = -Wall -I../src/server -I../src/common -I../src/proxy \
-Doff64_t=__off64_t \
-Wunused-const-variable=0 -Wformat=0
CFLAGS = $(CPPFLAGS) -g -O3 -std=gnu99
all: holemap semtest
# holemap requires libmhash-dev for md5 calculation
holemap: holemap.c ../src/server/*.h
$(CC) $(CFLAGS) -o $@ $^ ${shell pkg-config mhash --cflags --libs}
semtest: semtest.c
$(CC) $(CFLAGS) -o $@ $^ -lpthread
clean:
rm -f holemap semtest

271
tools/holemap.c Normal file
View File

@@ -0,0 +1,271 @@
#define _GNU_SOURCE
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include "bitset.h"
#include <mhash.h>
static int verbose = 0;
static const char * fstr(const char *fname, const char *error) {
char * ret;
asprintf(&ret, "%s:%s", fname, error);
return strdup(ret); // leaks on purpose; it's before we exit() anyway
}
static void
fill_check_pages(
uint8_t *map,
bitfield_p zmap,
uint64_t current,
uint64_t count,
uint64_t blksize)
{
uint64_t * base = (uint64_t*)(map + (current * blksize));
const int wordcount = blksize / sizeof(uint64_t);
while (count--) {
int is_zero = 1;
if (is_zero) {
uint64_t *cur = base;
for (int z = 0; z < wordcount && is_zero; z++)
is_zero = 0 == *cur++;
}
if (!is_zero)
bit_set(zmap, current);
base += wordcount;
current++;
}
}
static int
read_file_data_ranges(
int fd,
bitfield_p bmap,
struct stat st)
{
off_t off = 0;
int whence = SEEK_DATA;
while (off < st.st_size) {
if (verbose > 1)
printf("offset %llu\n", off);
off_t res = lseek(fd, off, whence);
if (res == -1) {
// end of file is OK
if (errno == ENXIO)
return 0;
fprintf(stderr, "lseek fails at %llu/%llu\n", off, st.st_size);
return res;
}
if (res > off && verbose)
printf("%s from %llu to %llu\n",
whence == SEEK_DATA ? "hole" : "data", off, res);
if (whence != SEEK_DATA) {
if (res > off) {
uint64_t from = off / st.st_blksize,
cnt = ((res + st.st_blksize - 1) - off) / st.st_blksize;
bit_set_range(bmap, from, cnt);
if (verbose > 1)
printf("set bits from %u cnt %u\n", from, cnt);
}
}
whence = whence == SEEK_DATA ? SEEK_HOLE : SEEK_DATA;
off = res;
}
return 0;
}
int main(int argc, const char *argv[])
{
int dohash = 0;
for (int i = 1; i < argc; i++) {
if (argv[i][0] == '-') {
if (!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")) {
verbose++;
continue;
} else if (!strcmp(argv[i], "-h") || !strcmp(argv[i], "--hash")) {
dohash++;
continue;
}
fprintf(stderr, "%s: invalid argument: %s\n", argv[0], argv[i]);
exit(1);
}
const char *fname = argv[i];
struct stat st;
if (stat(fname, &st)) {
perror(fname);
exit(1);
}
int fd = open(fname, O_RDONLY);
if (fd == -1) {
perror(fname);
exit(1);
}
if (verbose)
printf("%s is %u (%u times %u)\n", fname,
st.st_size, st.st_blocks, st.st_blksize);
/* number of used blocks for this file size, also number of bits in bitmaps */
size_t maxblocks = (st.st_size + st.st_blksize -1) / st.st_blksize;
/* this is the original bitmap made using the operating system */
bitfield_p bmap = (bitfield_p)calloc(
1 + (maxblocks / BITS_PER_WORD), BITFIELD_WORD_SIZE);
if (read_file_data_ranges(fd, bmap, st)) {
perror(fstr(fname, "read_file_data_ranges"));
exit(1);
}
if (verbose > 3) {
printf("original map: ");
for (int z = 0; z <= (maxblocks / BITS_PER_WORD); z++)
printf("%016llx ", bmap[z]);
printf("\n");
}
uint8_t * map = mmap(NULL, maxblocks * st.st_blksize,
PROT_READ, MAP_PRIVATE,
fd, 0);
if (map == MAP_FAILED) {
perror(fname);
exit(1);
}
/* This one bitmap will have a subset, or be a copy of the bmap one when
* we're done
*/
bitfield_p zmap = (bitfield_p)calloc(
1 + (maxblocks / BITS_PER_WORD), BITFIELD_WORD_SIZE);
{
uint64_t current = 0, count;
int which;
while (current < maxblocks &&
(count = bit_run_count(bmap, current, maxblocks - current, &which)) > 0) {
if (verbose)
printf("map is %s from %u to %u\n",
which ? "set" : "zero",
current * st.st_blksize,
(current+count) * st.st_blksize);
/* if bits were sets in the extents, double check them */
if (which)
fill_check_pages(map, zmap, current, count, st.st_blksize);
current += count;
}
}
if (verbose > 2) {
printf("optimized map: ");
for (int z = 0; z <= (maxblocks / BITS_PER_WORD); z++)
printf("%016llx ", zmap[z]);
printf("\n");
}
{
bitfield_p b[2] = { bmap, zmap };
uint64_t set[2] = {0};
for (int bs = 0; bs < 2; bs++) {
for (int z = 0; z <= (maxblocks / BITS_PER_WORD); z++)
set[bs] += __builtin_popcountll(b[bs][z]);
}
printf("%s is %d%% sparse, saving %llu blocks (%lluMB)\n", fname,
100 - ((set[0] * 100) / maxblocks),
100 - (maxblocks - set[0]),
((maxblocks - set[0]) * st.st_blksize) / (1024*1024));
if (set[1] < set[0])
printf("%s could be %d%% sparse, saving %llu blocks (%lluMB)\n", fname,
100 - ((set[1] * 100) / maxblocks),
100 - (maxblocks - set[1]),
((maxblocks - set[1]) * st.st_blksize) / (1024*1024));
}
if (dohash) {
const int hs = mhash_get_block_size(MHASH_MD5);
struct {
char h[hs + 1];
} hash[3];
MHASH td;
if (verbose)
printf("%s: calculating hashes, be patient\n", argv[i]);
td = mhash_init(MHASH_MD5);
mhash(td, map, maxblocks * st.st_blksize);
mhash_deinit(td, hash[0].h);
if (verbose)
printf("%s: hash 1/3 done\n", argv[i]);
{
uint64_t current = 0, count;
int which;
td = mhash_init(MHASH_MD5);
while (current < maxblocks &&
(count = bit_run_count(bmap, current, maxblocks - current, &which)) > 0) {
/* if bits were sets in the extents, double check them */
if (which)
mhash(td, map + (current * st.st_blksize),
count * st.st_blksize);
current += count;
}
mhash_deinit(td, hash[1].h);
}
if (verbose)
printf("%s: hash 2/3 done\n", argv[i]);
{
uint64_t current = 0, count;
int which;
td = mhash_init(MHASH_MD5);
while (current < maxblocks &&
(count = bit_run_count(zmap, current, maxblocks - current, &which)) > 0) {
/* if bits were sets in the extents, double check them */
if (which)
mhash(td, map + (current * st.st_blksize),
count * st.st_blksize);
current += count;
}
mhash_deinit(td, hash[2].h);
}
if (memcmp(hash[0].h, hash[1].h, hs) || memcmp(hash[0].h, hash[2].h, hs)) {
fprintf(stderr, "%s: hash mismatch: ", argv[i]);
for (int h = 0; h < 3; h++) {
for (int hb = 0; hb < hs; hb++)
fprintf(stderr, "%.2x", (uint8_t)hash[h].h[hb]);
fprintf(stderr, " ");
}
fprintf(stderr, "\n");
} else if (verbose) {
fprintf(stdout, "%s: hash 3/3 match: ", argv[i]);
for (int hb = 0; hb < hs; hb++)
fprintf(stdout, "%.2x", (uint8_t)hash[0].h[hb]);
fprintf(stdout, "\n");
}
}
munmap(map, maxblocks * st.st_blksize);
close(fd);
free(bmap);
free(zmap);
}
}

122
tools/semtest.c Normal file
View File

@@ -0,0 +1,122 @@
/*
* Latency test for pthread 'conditions' vs using a good ol'
* pair of linked sockets.
* gcc -o semtest -O2 -g --std=c99 semtest.c -lpthread
*
*/
#include <sys/socket.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#define TESTCOUNT 10000000
pthread_mutex_t mutex;
pthread_cond_t kidwake, dadwake;
int flag_passed = 0;
static void *thread_semaphore(void * param)
{
printf("kid started\n");
pthread_mutex_lock(&mutex);
pthread_cond_signal( &dadwake );
pthread_mutex_unlock(&mutex);
do {
pthread_mutex_lock(&mutex);
while (flag_passed == 0)
pthread_cond_wait( &kidwake, &mutex );
flag_passed = 0;
pthread_cond_signal( &dadwake );
pthread_mutex_unlock(&mutex);
} while (1);
return NULL;
}
int sock[2];
static void * thread_socket(void * param)
{
printf("kid started\n");
do {
char b = 0;
int r = read(sock[1], &b, 1);
if (r != 1)
perror("read");
b = 0;
write(sock[1], &b, 1); // tell the kid
} while (1);
return NULL;
}
int main(int argc, char ** argv)
{
int test = 0;
for (int i = 1; i < argc; i++) {
if (!strcmp(argv[i], "socket"))
test = 1;
else if (!strcmp(argv[i], "semaphore"))
test = 2;
else
goto usage;
}
if (!test)
goto usage;
if (test == 1) {
if (socketpair(PF_LOCAL, SOCK_STREAM, 0, sock)) {
perror("socketpair");
exit(1);
}
printf("Socket test start:\n");
pthread_t t;
pthread_create(&t, NULL, thread_socket, NULL);
for (int ti = 0; ti < TESTCOUNT; ti++) {
char b = 1;
write(sock[0], &b, 1); // tell the kid
int r = read(sock[0], &b, 1);
if (r != 1)
perror("read");
if (!(ti % 100000)) {
printf("."); fflush(stdout);
}
}
} else {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init( &kidwake, NULL );
pthread_cond_init( &dadwake, NULL );
printf("Semaphore test start:\n");
pthread_t t;
pthread_create(&t, NULL, thread_semaphore, NULL);
pthread_mutex_lock(&mutex);
printf("dad wait for kid\n");
pthread_cond_wait( &dadwake, &mutex );
pthread_mutex_unlock(&mutex);
printf("dad has kid ready\n");
for (int ti = 0; ti < TESTCOUNT; ti++) {
pthread_mutex_lock(&mutex);
flag_passed = 1;
pthread_cond_signal( &kidwake );
while (flag_passed == 1)
pthread_cond_wait( &dadwake, &mutex );
pthread_mutex_unlock(&mutex);
if (!(ti % 100000)) {
printf("."); fflush(stdout);
}
}
}
printf("\nDone\n");
exit(0);
usage:
fprintf(stderr, "%s socket -- run semaphore latency test with sockets\n", argv[0]);
fprintf(stderr, "%s semaphore -- run semaphore latency test with sockets\n", argv[0]);
exit(1);
}