Added working read via splice syscall.
This commit is contained in:
113
flexnbd.c
113
flexnbd.c
@@ -136,11 +136,11 @@ struct mode_serve_params {
|
||||
};
|
||||
|
||||
struct mode_readwrite_params {
|
||||
int writeNotRead;
|
||||
union mysockaddr connect_to;
|
||||
off64_t from;
|
||||
off64_t len;
|
||||
int fd;
|
||||
int data_fd;
|
||||
int client;
|
||||
};
|
||||
|
||||
struct client_params {
|
||||
@@ -193,6 +193,30 @@ int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int splice_via_pipe_loop(int fd_in, int fd_out, size_t len)
|
||||
{
|
||||
int pipefd[2];
|
||||
size_t spliced=0;
|
||||
|
||||
if (pipe(pipefd) == -1)
|
||||
return -1;
|
||||
|
||||
while (spliced < len) {
|
||||
size_t r1,r2;
|
||||
r1 = splice(fd_in, NULL, pipefd[1], NULL, len-spliced, 0);
|
||||
if (r1 <= 0)
|
||||
break;
|
||||
r2 = splice(pipefd[0], NULL, fd_out, NULL, r1, 0);
|
||||
if (r1 != r2)
|
||||
break;
|
||||
spliced += r1;
|
||||
}
|
||||
close(pipefd[0]);
|
||||
close(pipefd[1]);
|
||||
|
||||
return spliced < len ? -1 : 0;
|
||||
}
|
||||
|
||||
int client_serve_request(struct client_params* client)
|
||||
{
|
||||
off64_t offset;
|
||||
@@ -442,14 +466,77 @@ void do_serve(struct mode_serve_params* params)
|
||||
serve_accept_loop(params);
|
||||
}
|
||||
|
||||
int socket_connect(struct sockaddr* to)
|
||||
{
|
||||
int fd = socket(PF_INET, SOCK_STREAM, 0);
|
||||
SERVER_ERROR_ON_FAILURE(fd, "Couldn't create client socket");
|
||||
SERVER_ERROR_ON_FAILURE(connect(fd, to, sizeof(*to)),
|
||||
"connect failed");
|
||||
return fd;
|
||||
}
|
||||
|
||||
off64_t socket_nbd_read_hello(int fd)
|
||||
{
|
||||
struct nbd_init init;
|
||||
SERVER_ERROR_ON_FAILURE(readloop(fd, &init, sizeof(init)),
|
||||
"Couldn't read init");
|
||||
if (strncmp(init.passwd, INIT_PASSWD, 8) != 0)
|
||||
SERVER_ERROR("wrong passwd");
|
||||
if (be64toh(init.magic) != INIT_MAGIC)
|
||||
SERVER_ERROR("wrong magic (%x)", be64toh(init.magic));
|
||||
return be64toh(init.size);
|
||||
}
|
||||
|
||||
void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf)
|
||||
{
|
||||
struct nbd_request request;
|
||||
struct nbd_reply reply;
|
||||
|
||||
request.magic = htobe32(REQUEST_MAGIC);
|
||||
request.type = htobe32(REQUEST_READ);
|
||||
((int*) request.handle)[0] = rand();
|
||||
((int*) request.handle)[1] = rand();
|
||||
request.from = htobe64(from);
|
||||
request.len = htobe32(len);
|
||||
|
||||
SERVER_ERROR_ON_FAILURE(writeloop(fd, &request, sizeof(request)),
|
||||
"Couldn't write request");
|
||||
SERVER_ERROR_ON_FAILURE(readloop(fd, &reply, sizeof(reply)),
|
||||
"Couldn't read reply");
|
||||
if (be32toh(reply.magic) != REPLY_MAGIC)
|
||||
SERVER_ERROR("Reply magic incorrect (%p)", reply.magic);
|
||||
if (be32toh(reply.error) != 0)
|
||||
SERVER_ERROR("Server replied with error %d", reply.error);
|
||||
|
||||
if (out_buf) {
|
||||
SERVER_ERROR_ON_FAILURE(readloop(fd, out_buf, len),
|
||||
"Read failed");
|
||||
}
|
||||
else {
|
||||
SERVER_ERROR_ON_FAILURE(
|
||||
splice_via_pipe_loop(fd, out_fd, len),
|
||||
"Splice failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
void do_read(struct mode_readwrite_params* params)
|
||||
{
|
||||
|
||||
off64_t size;
|
||||
params->client = socket_connect(¶ms->connect_to.generic);
|
||||
size = socket_nbd_read_hello(params->client);
|
||||
if (params->from < 0 || (params->from + params->len) >= size)
|
||||
SERVER_ERROR("Read request %d+%d is out of range given size %d",
|
||||
params->from, params->len, size);
|
||||
socket_nbd_read(params->client, params->from, params->len,
|
||||
params->data_fd, NULL);
|
||||
close(params->client);
|
||||
}
|
||||
|
||||
void do_write(struct mode_readwrite_params* params)
|
||||
{
|
||||
|
||||
params->client = socket_connect(¶ms->connect_to.generic);
|
||||
close(params->client);
|
||||
}
|
||||
|
||||
#define IS_IP_VALID_CHAR(x) ( ((x) >= '0' && (x) <= '9' ) || \
|
||||
@@ -473,8 +560,6 @@ int parse_ip_to_sockaddr(struct sockaddr* out, char* src)
|
||||
temp[j] = 0;
|
||||
}
|
||||
|
||||
debug("temp='%s'", temp);
|
||||
|
||||
if (temp[0] == '0' && temp[1] == '\0') {
|
||||
v4->sin_family = AF_INET;
|
||||
v4->sin_addr.s_addr = INADDR_ANY;
|
||||
@@ -579,14 +664,20 @@ void params_readwrite(
|
||||
SERVER_ERROR("No IP address supplied");
|
||||
if (s_port == NULL)
|
||||
SERVER_ERROR("No port number supplied");
|
||||
if (s_from == NULL);
|
||||
if (s_from == NULL)
|
||||
SERVER_ERROR("No from supplied");
|
||||
if (s_length == NULL);
|
||||
if (s_length == NULL)
|
||||
SERVER_ERROR("No length supplied");
|
||||
|
||||
if (parse_ip_to_sockaddr(&out->connect_to.generic, s_ip_address) == 0)
|
||||
SERVER_ERROR("Couldn't parse connection address '%s'");
|
||||
|
||||
out->connect_to.v4.sin_port = atoi(s_port);
|
||||
if (out->connect_to.v4.sin_port < 0 || out->connect_to.v4.sin_port > 65535)
|
||||
SERVER_ERROR("Port number must be >= 0 and <= 65535");
|
||||
out->connect_to.v4.sin_port = htobe16(out->connect_to.v4.sin_port);
|
||||
|
||||
|
||||
out->from = atol(s_from);
|
||||
out->len = atol(s_length);
|
||||
}
|
||||
@@ -605,8 +696,9 @@ void mode(char* mode, int argc, char **argv)
|
||||
}
|
||||
}
|
||||
else if (strcmp(mode, "read") == 0 ) {
|
||||
if (argc != 4) {
|
||||
if (argc == 4) {
|
||||
params_readwrite(¶ms.readwrite, argv[0], argv[1], argv[2], argv[3]);
|
||||
params.readwrite.data_fd = 1;
|
||||
do_read(¶ms.readwrite);
|
||||
}
|
||||
else {
|
||||
@@ -614,8 +706,9 @@ void mode(char* mode, int argc, char **argv)
|
||||
}
|
||||
}
|
||||
else if (strcmp(mode, "write") == 0 ) {
|
||||
if (argc != 4) {
|
||||
if (argc == 4) {
|
||||
params_readwrite(¶ms.readwrite, argv[0], argv[1], argv[2], argv[3]);
|
||||
params.readwrite.data_fd = 0;
|
||||
do_write(¶ms.readwrite);
|
||||
}
|
||||
else {
|
||||
|
Reference in New Issue
Block a user