From b1aa942b3d0d1f1749ad1be87a83ecd18122e4a6 Mon Sep 17 00:00:00 2001 From: Matthew Bloch Date: Wed, 16 May 2012 03:20:09 +0100 Subject: [PATCH] Added working read via splice syscall. --- flexnbd.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 103 insertions(+), 10 deletions(-) diff --git a/flexnbd.c b/flexnbd.c index dcacf3c..ada99d4 100644 --- a/flexnbd.c +++ b/flexnbd.c @@ -136,11 +136,11 @@ struct mode_serve_params { }; struct mode_readwrite_params { - int writeNotRead; union mysockaddr connect_to; off64_t from; off64_t len; - int fd; + int data_fd; + int client; }; struct client_params { @@ -193,6 +193,30 @@ int sendfileloop(int out_fd, int in_fd, off64_t *offset, size_t count) return 0; } +int splice_via_pipe_loop(int fd_in, int fd_out, size_t len) +{ + int pipefd[2]; + size_t spliced=0; + + if (pipe(pipefd) == -1) + return -1; + + while (spliced < len) { + size_t r1,r2; + r1 = splice(fd_in, NULL, pipefd[1], NULL, len-spliced, 0); + if (r1 <= 0) + break; + r2 = splice(pipefd[0], NULL, fd_out, NULL, r1, 0); + if (r1 != r2) + break; + spliced += r1; + } + close(pipefd[0]); + close(pipefd[1]); + + return spliced < len ? -1 : 0; +} + int client_serve_request(struct client_params* client) { off64_t offset; @@ -442,14 +466,77 @@ void do_serve(struct mode_serve_params* params) serve_accept_loop(params); } +int socket_connect(struct sockaddr* to) +{ + int fd = socket(PF_INET, SOCK_STREAM, 0); + SERVER_ERROR_ON_FAILURE(fd, "Couldn't create client socket"); + SERVER_ERROR_ON_FAILURE(connect(fd, to, sizeof(*to)), + "connect failed"); + return fd; +} + +off64_t socket_nbd_read_hello(int fd) +{ + struct nbd_init init; + SERVER_ERROR_ON_FAILURE(readloop(fd, &init, sizeof(init)), + "Couldn't read init"); + if (strncmp(init.passwd, INIT_PASSWD, 8) != 0) + SERVER_ERROR("wrong passwd"); + if (be64toh(init.magic) != INIT_MAGIC) + SERVER_ERROR("wrong magic (%x)", be64toh(init.magic)); + return be64toh(init.size); +} + +void socket_nbd_read(int fd, off64_t from, int len, int out_fd, void* out_buf) +{ + struct nbd_request request; + struct nbd_reply reply; + + request.magic = htobe32(REQUEST_MAGIC); + request.type = htobe32(REQUEST_READ); + ((int*) request.handle)[0] = rand(); + ((int*) request.handle)[1] = rand(); + request.from = htobe64(from); + request.len = htobe32(len); + + SERVER_ERROR_ON_FAILURE(writeloop(fd, &request, sizeof(request)), + "Couldn't write request"); + SERVER_ERROR_ON_FAILURE(readloop(fd, &reply, sizeof(reply)), + "Couldn't read reply"); + if (be32toh(reply.magic) != REPLY_MAGIC) + SERVER_ERROR("Reply magic incorrect (%p)", reply.magic); + if (be32toh(reply.error) != 0) + SERVER_ERROR("Server replied with error %d", reply.error); + + if (out_buf) { + SERVER_ERROR_ON_FAILURE(readloop(fd, out_buf, len), + "Read failed"); + } + else { + SERVER_ERROR_ON_FAILURE( + splice_via_pipe_loop(fd, out_fd, len), + "Splice failed" + ); + } +} + void do_read(struct mode_readwrite_params* params) { - + off64_t size; + params->client = socket_connect(¶ms->connect_to.generic); + size = socket_nbd_read_hello(params->client); + if (params->from < 0 || (params->from + params->len) >= size) + SERVER_ERROR("Read request %d+%d is out of range given size %d", + params->from, params->len, size); + socket_nbd_read(params->client, params->from, params->len, + params->data_fd, NULL); + close(params->client); } void do_write(struct mode_readwrite_params* params) { - + params->client = socket_connect(¶ms->connect_to.generic); + close(params->client); } #define IS_IP_VALID_CHAR(x) ( ((x) >= '0' && (x) <= '9' ) || \ @@ -473,8 +560,6 @@ int parse_ip_to_sockaddr(struct sockaddr* out, char* src) temp[j] = 0; } - debug("temp='%s'", temp); - if (temp[0] == '0' && temp[1] == '\0') { v4->sin_family = AF_INET; v4->sin_addr.s_addr = INADDR_ANY; @@ -579,14 +664,20 @@ void params_readwrite( SERVER_ERROR("No IP address supplied"); if (s_port == NULL) SERVER_ERROR("No port number supplied"); - if (s_from == NULL); + if (s_from == NULL) SERVER_ERROR("No from supplied"); - if (s_length == NULL); + if (s_length == NULL) SERVER_ERROR("No length supplied"); if (parse_ip_to_sockaddr(&out->connect_to.generic, s_ip_address) == 0) SERVER_ERROR("Couldn't parse connection address '%s'"); + out->connect_to.v4.sin_port = atoi(s_port); + if (out->connect_to.v4.sin_port < 0 || out->connect_to.v4.sin_port > 65535) + SERVER_ERROR("Port number must be >= 0 and <= 65535"); + out->connect_to.v4.sin_port = htobe16(out->connect_to.v4.sin_port); + + out->from = atol(s_from); out->len = atol(s_length); } @@ -605,8 +696,9 @@ void mode(char* mode, int argc, char **argv) } } else if (strcmp(mode, "read") == 0 ) { - if (argc != 4) { + if (argc == 4) { params_readwrite(¶ms.readwrite, argv[0], argv[1], argv[2], argv[3]); + params.readwrite.data_fd = 1; do_read(¶ms.readwrite); } else { @@ -614,8 +706,9 @@ void mode(char* mode, int argc, char **argv) } } else if (strcmp(mode, "write") == 0 ) { - if (argc != 4) { + if (argc == 4) { params_readwrite(¶ms.readwrite, argv[0], argv[1], argv[2], argv[3]); + params.readwrite.data_fd = 0; do_write(¶ms.readwrite); } else {