In preparation for parallel processing, we need to be sure that
two threads belonging to the same connection cannot interleave
their I/O except at message boundaries. Add a mutex around
all reads and writes that must occur as a group (for now, there
is no contention for either mutex).
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
src/connections.c | 27 +++++++++++++++++++++++++--
1 file changed, 25 insertions(+), 2 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index dada9aa..dd43a9a 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -62,6 +62,8 @@
/* Connection structure. */
struct connection {
pthread_mutex_t request_lock;
+ pthread_mutex_t read_lock;
+ pthread_mutex_t write_lock;
void *handle;
void *crypto_session;
@@ -206,6 +208,8 @@ new_connection (int sockin, int sockout)
conn->sockin = sockin;
conn->sockout = sockout;
pthread_mutex_init (&conn->request_lock, NULL);
+ pthread_mutex_init (&conn->read_lock, NULL);
+ pthread_mutex_init (&conn->write_lock, NULL);
conn->recv = raw_recv;
conn->send = raw_send;
@@ -223,6 +227,8 @@ free_connection (struct connection *conn)
conn->close (conn);
pthread_mutex_destroy (&conn->request_lock);
+ pthread_mutex_destroy (&conn->read_lock);
+ pthread_mutex_destroy (&conn->write_lock);
/* Don't call the plugin again if quit has been set because the main
* thread will be in the process of unloading it. The plugin.unload
@@ -888,19 +894,23 @@ recv_request_send_reply (struct connection *conn)
CLEANUP_FREE char *buf = NULL;
/* Read the request packet. */
+ pthread_mutex_lock (&conn->read_lock);
r = conn->recv (conn, &request, sizeof request);
if (r == -1) {
nbdkit_error ("read request: %m");
+ pthread_mutex_unlock (&conn->read_lock);
return -1;
}
if (r == 0) {
debug ("client closed input socket, closing connection");
+ pthread_mutex_unlock (&conn->read_lock);
return 0; /* disconnect */
}
magic = be32toh (request.magic);
if (magic != NBD_REQUEST_MAGIC) {
nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)",
magic);
+ pthread_mutex_unlock (&conn->read_lock);
return -1;
}
@@ -913,14 +923,18 @@ recv_request_send_reply (struct connection *conn)
if (cmd == NBD_CMD_DISC) {
debug ("client sent disconnect command, closing connection");
+ pthread_mutex_unlock (&conn->read_lock);
return 0; /* disconnect */
}
/* Validate the request. */
if (!validate_request (conn, cmd, flags, offset, count, &error)) {
if (cmd == NBD_CMD_WRITE &&
- skip_over_write_buffer (conn->sockin, count) < 0)
+ skip_over_write_buffer (conn->sockin, count) < 0) {
+ pthread_mutex_unlock (&conn->read_lock);
return -1;
+ }
+ pthread_mutex_unlock (&conn->read_lock);
goto send_reply;
}
@@ -931,8 +945,11 @@ recv_request_send_reply (struct connection *conn)
perror ("malloc");
error = ENOMEM;
if (cmd == NBD_CMD_WRITE &&
- skip_over_write_buffer (conn->sockin, count) < 0)
+ skip_over_write_buffer (conn->sockin, count) < 0) {
+ pthread_mutex_unlock (&conn->read_lock);
return -1;
+ }
+ pthread_mutex_unlock (&conn->read_lock);
goto send_reply;
}
}
@@ -946,9 +963,11 @@ recv_request_send_reply (struct connection *conn)
}
if (r == -1) {
nbdkit_error ("read data: %m");
+ pthread_mutex_unlock (&conn->read_lock);
return -1;
}
}
+ pthread_mutex_unlock (&conn->read_lock);
/* Perform the request. Only this part happens inside the request lock. */
if (quit) {
@@ -962,6 +981,7 @@ recv_request_send_reply (struct connection *conn)
/* Send the reply packet. */
send_reply:
+ pthread_mutex_lock (&conn->write_lock);
reply.magic = htobe32 (NBD_REPLY_MAGIC);
reply.handle = request.handle;
reply.error = htobe32 (nbd_errno (error));
@@ -978,6 +998,7 @@ recv_request_send_reply (struct connection *conn)
r = conn->send (conn, &reply, sizeof reply);
if (r == -1) {
nbdkit_error ("write reply: %m");
+ pthread_mutex_unlock (&conn->write_lock);
return -1;
}
@@ -986,9 +1007,11 @@ recv_request_send_reply (struct connection *conn)
r = conn->send (conn, buf, count);
if (r == -1) {
nbdkit_error ("write data: %m");
+ pthread_mutex_unlock (&conn->write_lock);
return -1;
}
}
+ pthread_mutex_unlock (&conn->write_lock);
return 1; /* command processed ok */
}
--
2.13.6