diff --git a/src/mirror.c b/src/mirror.c index 71faf6f..a30dc67 100644 --- a/src/mirror.c +++ b/src/mirror.c @@ -189,7 +189,8 @@ int mirror_pass(struct server * serve, int should_lock, uint64_t *written) current, run, 0, - serve->mirror->mapped + current); + serve->mirror->mapped + current, + MS_REQUEST_LIMIT_SECS); /* now mark it clean */ bitset_clear_range(map, current, run); diff --git a/src/mirror.h b/src/mirror.h index 9e4ee0c..c1fdbf9 100644 --- a/src/mirror.h +++ b/src/mirror.h @@ -10,12 +10,14 @@ enum mirror_state; #include "serve.h" + /* MS_CONNECT_TIME_SECS * The length of time after which the sender will assume a connect() to * the destination has failed. */ #define MS_CONNECT_TIME_SECS 60 + /* MS_HELLO_TIME_SECS * The length of time the sender will wait for the NBD hello message * after connect() before aborting the connection attempt. @@ -29,6 +31,16 @@ enum mirror_state; */ #define MS_RETRY_DELAY_SECS 1 + +/* MS_REQUEST_LIMIT_SECS + * We must receive a reply to a request within this time. For a read + * request, this is the time between the end of the NBD request and the + * start of the NBD reply. For a write request, this is the time + * between the end of the written data and the start of the NBD reply. + */ +#define MS_REQUEST_LIMIT_SECS 4 + + enum mirror_finish_action { ACTION_EXIT, ACTION_NOTHING diff --git a/src/readwrite.c b/src/readwrite.c index 86be4e4..d4b2f91 100644 --- a/src/readwrite.c +++ b/src/readwrite.c @@ -81,7 +81,24 @@ void read_reply(int fd, struct nbd_request *request, struct nbd_reply *reply) } } -void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf) +void wait_for_data( int fd, int timeout_secs ) +{ + fd_set fds; + struct timeval tv = {timeout_secs, 0}; + int selected; + + FD_ZERO( &fds ); + FD_SET( fd, &fds ); + selected = select( FD_SETSIZE, + &fds, NULL, NULL, + timeout_secs >=0 ? &tv : NULL ); + + FATAL_IF( -1 == selected, "Select failed" ); + ERROR_IF( 0 == selected, "Timed out waiting for reply" ); +} + + +void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs) { struct nbd_request request; struct nbd_reply reply; @@ -89,6 +106,8 @@ void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf) fill_request(&request, REQUEST_READ, from, len); FATAL_IF_NEGATIVE(writeloop(fd, &request, sizeof(request)), "Couldn't write request"); + + wait_for_data( fd, timeout_secs ); read_reply(fd, &request, &reply); if (out_buf) { @@ -103,7 +122,7 @@ void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf) } } -void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf) +void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf, int timeout_secs) { struct nbd_request request; struct nbd_reply reply; @@ -123,6 +142,7 @@ void socket_nbd_write(int fd, off64_t from, int len, int in_fd, void* in_buf) ); } + wait_for_data( fd, timeout_secs ); read_reply(fd, &request, &reply); } @@ -175,7 +195,7 @@ void do_read(struct mode_readwrite_params* params) FATAL_IF_NEGATIVE( params->client, "Couldn't connect." ); CHECK_RANGE("read"); socket_nbd_read(params->client, params->from, params->len, - params->data_fd, NULL); + params->data_fd, NULL, 10); close(params->client); } @@ -185,7 +205,7 @@ void do_write(struct mode_readwrite_params* params) FATAL_IF_NEGATIVE( params->client, "Couldn't connect." ); CHECK_RANGE("write"); socket_nbd_write(params->client, params->from, params->len, - params->data_fd, NULL); + params->data_fd, NULL, 10); close(params->client); } diff --git a/src/readwrite.h b/src/readwrite.h index cb07e68..b8df087 100644 --- a/src/readwrite.h +++ b/src/readwrite.h @@ -7,8 +7,8 @@ int socket_connect(struct sockaddr* to, struct sockaddr* from); int socket_nbd_read_hello(int fd, off64_t * size); -void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf); -void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf); +void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs); +void socket_nbd_write(int fd, off64_t from, int len, int out_fd, void* out_buf, int timeout_secs); void socket_nbd_entrust(int fd); int socket_nbd_disconnect( int fd ); diff --git a/tests/fakes/dest/hang_after_write.rb b/tests/fakes/dest/hang_after_write.rb new file mode 100755 index 0000000..7c31729 --- /dev/null +++ b/tests/fakes/dest/hang_after_write.rb @@ -0,0 +1,27 @@ +#!/usr/bin/env ruby +# encoding: utf-8 + +# Open a socket, say hello, receive a write, then sleep for > 6 +# seconds. This should tell the source that the write has gone MIA, +# and we expect a reconnect. + +require 'flexnbd/fake_dest' +include FlexNBD::FakeDest + +sock = serve( *ARGV ) +client_sock1 = accept( sock ) +write_hello( client_sock1 ) +read_request( client_sock1 ) + +t = Thread.start do + client_sock2 = accept( sock, "Timed out waiting for a reconnection", + FlexNBD::MS_REQUEST_LIMIT_SECS + 1 ) + client_sock2.close +end + +sleep( FlexNBD::MS_REQUEST_LIMIT_SECS + 2 ) +client_sock1.close + +t.join + +exit(0) diff --git a/tests/flexnbd/fake_dest.rb b/tests/flexnbd/fake_dest.rb index fc6d290..e525084 100644 --- a/tests/flexnbd/fake_dest.rb +++ b/tests/flexnbd/fake_dest.rb @@ -13,7 +13,7 @@ module FlexNBD end - def accept( sock, err_msg, timeout=2 ) + def accept( sock, err_msg = "Timed out waiting for a connection", timeout=2 ) client_sock = nil begin @@ -47,5 +47,11 @@ module FlexNBD client_sock.write( "\x00" * 128 ) end - end + + def read_request( client_sock ) + client_sock.read(28) + end + + + end # module FakeDest end # module FlexNBD diff --git a/tests/nbd_scenarios b/tests/nbd_scenarios index 5a033fe..947894e 100644 --- a/tests/nbd_scenarios +++ b/tests/nbd_scenarios @@ -274,6 +274,14 @@ class NBDConnectSourceFailureScenarios < Test::Unit::TestCase end + def test_write_times_out_causes_retry + @env.run_fake( "dest/hang_after_write", @env.ip, @env.port2 ) + stdout, stderr = @env.mirror12_unchecked + + assert @env.fake_reports_success, "Fake failed." + end + + end