In preparation for parallel processing, we need to be sure that
two threads belonging to the same connection cannot interleave
their I/O except at message boundaries. Add a mutex around
all reads and writes that must occur as a group (for now, there
is no contention for either mutex).
This commit is best viewed with 'git diff -b', thanks to the
indentation changes required by using macros for scoped locking
coupled with needing two scopes in one function. Splitting
the function wouldn't be any prettier, as we would then have
to coordinate the malloc'd buffer for NBD_CMD_READ/WRITE.
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
src/connections.c | 151 +++++++++++++++++++++++++++++-------------------------
1 file changed, 82 insertions(+), 69 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index 27d685e..f779903 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -1,5 +1,5 @@
/* nbdkit
- * Copyright (C) 2013-2016 Red Hat Inc.
+ * Copyright (C) 2013-2017 Red Hat Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -62,6 +62,8 @@
/* Connection structure. */
struct connection {
pthread_mutex_t request_lock;
+ pthread_mutex_t read_lock;
+ pthread_mutex_t write_lock;
void *handle;
void *crypto_session;
@@ -206,6 +208,8 @@ new_connection (int sockin, int sockout)
conn->sockin = sockin;
conn->sockout = sockout;
pthread_mutex_init (&conn->request_lock, NULL);
+ pthread_mutex_init (&conn->read_lock, NULL);
+ pthread_mutex_init (&conn->write_lock, NULL);
conn->recv = raw_recv;
conn->send = raw_send;
@@ -223,6 +227,8 @@ free_connection (struct connection *conn)
conn->close (conn);
pthread_mutex_destroy (&conn->request_lock);
+ pthread_mutex_destroy (&conn->read_lock);
+ pthread_mutex_destroy (&conn->write_lock);
/* Don't call the plugin again if quit has been set because the main
* thread will be in the process of unloading it. The plugin.unload
@@ -874,65 +880,69 @@ recv_request_send_reply (struct connection *conn)
CLEANUP_FREE char *buf = NULL;
/* Read the request packet. */
- r = conn->recv (conn, &request, sizeof request);
- if (r == -1) {
- nbdkit_error ("read request: %m");
- return -1;
- }
- if (r == 0) {
- debug ("client closed input socket, closing connection");
- return 0; /* disconnect */
- }
-
- magic = be32toh (request.magic);
- if (magic != NBD_REQUEST_MAGIC) {
- nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)",
magic);
- return -1;
- }
+ {
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->read_lock);
+ r = conn->recv (conn, &request, sizeof request);
+ if (r == -1) {
+ nbdkit_error ("read request: %m");
+ return -1;
+ }
+ if (r == 0) {
+ debug ("client closed input socket, closing connection");
+ return 0; /* disconnect */
+ }
- cmd = be32toh (request.type);
- flags = cmd & ~NBD_CMD_MASK_COMMAND;
- cmd &= NBD_CMD_MASK_COMMAND;
+ magic = be32toh (request.magic);
+ if (magic != NBD_REQUEST_MAGIC) {
+ nbdkit_error ("invalid request: 'magic' field is incorrect
(0x%x)",
+ magic);
+ return -1;
+ }
- offset = be64toh (request.offset);
- count = be32toh (request.count);
+ cmd = be32toh (request.type);
+ flags = cmd & ~NBD_CMD_MASK_COMMAND;
+ cmd &= NBD_CMD_MASK_COMMAND;
- if (cmd == NBD_CMD_DISC) {
- debug ("client sent disconnect command, closing connection");
- return 0; /* disconnect */
- }
+ offset = be64toh (request.offset);
+ count = be32toh (request.count);
- /* Validate the request. */
- if (!validate_request (conn, cmd, flags, offset, count, &error)) {
- if (cmd == NBD_CMD_WRITE &&
- skip_over_write_buffer (conn->sockin, count) < 0)
- return -1;
- goto send_reply;
- }
+ if (cmd == NBD_CMD_DISC) {
+ debug ("client sent disconnect command, closing connection");
+ return 0; /* disconnect */
+ }
- /* Allocate the data buffer used for either read or write requests. */
- if (cmd == NBD_CMD_READ || cmd == NBD_CMD_WRITE) {
- buf = malloc (count);
- if (buf == NULL) {
- perror ("malloc");
- error = ENOMEM;
+ /* Validate the request. */
+ if (!validate_request (conn, cmd, flags, offset, count, &error)) {
if (cmd == NBD_CMD_WRITE &&
skip_over_write_buffer (conn->sockin, count) < 0)
return -1;
goto send_reply;
}
- }
- /* Receive the write data buffer. */
- if (cmd == NBD_CMD_WRITE) {
- r = conn->recv (conn, buf, count);
- if (r == 0) {
- errno = EBADMSG;
- r = -1;
+ /* Allocate the data buffer used for either read or write requests. */
+ if (cmd == NBD_CMD_READ || cmd == NBD_CMD_WRITE) {
+ buf = malloc (count);
+ if (buf == NULL) {
+ perror ("malloc");
+ error = ENOMEM;
+ if (cmd == NBD_CMD_WRITE &&
+ skip_over_write_buffer (conn->sockin, count) < 0)
+ return -1;
+ goto send_reply;
+ }
}
- if (r == -1) {
- nbdkit_error ("read data: %m");
- return -1;
+
+ /* Receive the write data buffer. */
+ if (cmd == NBD_CMD_WRITE) {
+ r = conn->recv (conn, buf, count);
+ if (r == 0) {
+ errno = EBADMSG;
+ r = -1;
+ }
+ if (r == -1) {
+ nbdkit_error ("read data: %m");
+ return -1;
+ }
}
}
@@ -948,32 +958,35 @@ recv_request_send_reply (struct connection *conn)
/* Send the reply packet. */
send_reply:
- reply.magic = htobe32 (NBD_REPLY_MAGIC);
- reply.handle = request.handle;
- reply.error = htobe32 (nbd_errno (error));
+ {
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->write_lock);
+ reply.magic = htobe32 (NBD_REPLY_MAGIC);
+ reply.handle = request.handle;
+ reply.error = htobe32 (nbd_errno (error));
- if (error != 0) {
- /* Since we're about to send only the limited NBD_E* errno to the
- * client, don't lose the information about what really happened
- * on the server side. Make sure there is a way for the operator
- * to retrieve the real error.
- */
- debug ("sending error reply: %s", strerror (error));
- }
+ if (error != 0) {
+ /* Since we're about to send only the limited NBD_E* errno to the
+ * client, don't lose the information about what really happened
+ * on the server side. Make sure there is a way for the operator
+ * to retrieve the real error.
+ */
+ debug ("sending error reply: %s", strerror (error));
+ }
- r = conn->send (conn, &reply, sizeof reply);
- if (r == -1) {
- nbdkit_error ("write reply: %m");
- return -1;
- }
-
- /* Send the read data buffer. */
- if (cmd == NBD_CMD_READ && !error) {
- r = conn->send (conn, buf, count);
+ r = conn->send (conn, &reply, sizeof reply);
if (r == -1) {
- nbdkit_error ("write data: %m");
+ nbdkit_error ("write reply: %m");
return -1;
}
+
+ /* Send the read data buffer. */
+ if (cmd == NBD_CMD_READ && !error) {
+ r = conn->send (conn, buf, count);
+ if (r == -1) {
+ nbdkit_error ("write data: %m");
+ return -1;
+ }
+ }
}
return 1; /* command processed ok */
--
2.13.6