Once we have multiple threads during parallel processing, we need
to be sure that any I/O error flagged by one thread prevents the
next thread from attempting I/O. Although we already have a
separate lock for reads and writes, it's easier if status is
shared by both actions, which needs yet another mutex; however
we can optimize (via accessor functions) and only need to use the
mutex if there are actually multiple threads at work.
The next thing to notice is that because we now update status at
all important points, the return value of _handle_single_connection()
matches the latest status; which will come in handy later as it will
avoid trying to coordinate a value out of multiple threads.
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
src/connections.c | 82 +++++++++++++++++++++++++++++++++++++------------------
1 file changed, 56 insertions(+), 26 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index f779903..9d95e7f 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -64,8 +64,11 @@ struct connection {
pthread_mutex_t request_lock;
pthread_mutex_t read_lock;
pthread_mutex_t write_lock;
+ pthread_mutex_t status_lock;
+ int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */
void *handle;
void *crypto_session;
+ int nworkers; /* TODO set up a thread pool for parallel workers */
uint64_t exportsize;
int readonly;
@@ -146,40 +149,60 @@ connection_set_close (struct connection *conn,
connection_close_function close)
}
static int
+get_status (struct connection *conn)
+{
+ int r;
+
+ if (conn->nworkers)
+ pthread_mutex_lock (&conn->status_lock);
+ r = conn->status;
+ if (conn->nworkers)
+ pthread_mutex_unlock (&conn->status_lock);
+ return r;
+}
+
+/* Update the status if the new value is lower than the existing value.
+ * For convenience, return the incoming value. */
+static int
+set_status (struct connection *conn, int value)
+{
+ if (conn->nworkers)
+ pthread_mutex_lock (&conn->status_lock);
+ if (value < conn->status)
+ conn->status = value;
+ if (conn->nworkers)
+ pthread_mutex_unlock (&conn->status_lock);
+ return value;
+}
+
+static int
_handle_single_connection (int sockin, int sockout)
{
- int r;
+ int r = -1;
struct connection *conn = new_connection (sockin, sockout);
if (!conn)
- goto err;
+ goto done;
if (plugin_open (conn, readonly) == -1)
- goto err;
+ goto done;
threadlocal_set_name (plugin_name ());
/* Handshake. */
if (negotiate_handshake (conn) == -1)
- goto err;
+ goto done;
/* Process requests. XXX Allow these to be dispatched in parallel using
* a thread pool.
*/
- while (!quit) {
- r = recv_request_send_reply (conn);
- if (r == -1)
- goto err;
- if (r == 0)
- break;
- }
+ while (!quit && get_status (conn) > 0)
+ recv_request_send_reply (conn);
+ r = get_status (conn);
+ done:
free_connection (conn);
- return 0;
-
- err:
- free_connection (conn);
- return -1;
+ return r;
}
int
@@ -205,11 +228,13 @@ new_connection (int sockin, int sockout)
return NULL;
}
+ conn->status = 1;
conn->sockin = sockin;
conn->sockout = sockout;
pthread_mutex_init (&conn->request_lock, NULL);
pthread_mutex_init (&conn->read_lock, NULL);
pthread_mutex_init (&conn->write_lock, NULL);
+ pthread_mutex_init (&conn->status_lock, NULL);
conn->recv = raw_recv;
conn->send = raw_send;
@@ -229,6 +254,7 @@ free_connection (struct connection *conn)
pthread_mutex_destroy (&conn->request_lock);
pthread_mutex_destroy (&conn->read_lock);
pthread_mutex_destroy (&conn->write_lock);
+ pthread_mutex_destroy (&conn->status_lock);
/* Don't call the plugin again if quit has been set because the main
* thread will be in the process of unloading it. The plugin.unload
@@ -882,21 +908,23 @@ recv_request_send_reply (struct connection *conn)
/* Read the request packet. */
{
ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->read_lock);
+ if (get_status (conn) < 0)
+ return -1;
r = conn->recv (conn, &request, sizeof request);
if (r == -1) {
nbdkit_error ("read request: %m");
- return -1;
+ return set_status (conn, -1);
}
if (r == 0) {
debug ("client closed input socket, closing connection");
- return 0; /* disconnect */
+ return set_status (conn, 0); /* disconnect */
}
magic = be32toh (request.magic);
if (magic != NBD_REQUEST_MAGIC) {
nbdkit_error ("invalid request: 'magic' field is incorrect
(0x%x)",
magic);
- return -1;
+ return set_status (conn, -1);
}
cmd = be32toh (request.type);
@@ -908,14 +936,14 @@ recv_request_send_reply (struct connection *conn)
if (cmd == NBD_CMD_DISC) {
debug ("client sent disconnect command, closing connection");
- return 0; /* disconnect */
+ return set_status (conn, 0); /* disconnect */
}
/* Validate the request. */
if (!validate_request (conn, cmd, flags, offset, count, &error)) {
if (cmd == NBD_CMD_WRITE &&
skip_over_write_buffer (conn->sockin, count) < 0)
- return -1;
+ return set_status (conn, -1);
goto send_reply;
}
@@ -927,7 +955,7 @@ recv_request_send_reply (struct connection *conn)
error = ENOMEM;
if (cmd == NBD_CMD_WRITE &&
skip_over_write_buffer (conn->sockin, count) < 0)
- return -1;
+ return set_status (conn, -1);
goto send_reply;
}
}
@@ -941,13 +969,13 @@ recv_request_send_reply (struct connection *conn)
}
if (r == -1) {
nbdkit_error ("read data: %m");
- return -1;
+ return set_status (conn, -1);
}
}
}
/* Perform the request. Only this part happens inside the request lock. */
- if (quit) {
+ if (quit || !get_status (conn)) {
error = ESHUTDOWN;
}
else {
@@ -960,6 +988,8 @@ recv_request_send_reply (struct connection *conn)
send_reply:
{
ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->write_lock);
+ if (get_status (conn) < 0)
+ return -1;
reply.magic = htobe32 (NBD_REPLY_MAGIC);
reply.handle = request.handle;
reply.error = htobe32 (nbd_errno (error));
@@ -976,7 +1006,7 @@ recv_request_send_reply (struct connection *conn)
r = conn->send (conn, &reply, sizeof reply);
if (r == -1) {
nbdkit_error ("write reply: %m");
- return -1;
+ return set_status (conn, -1);
}
/* Send the read data buffer. */
@@ -984,7 +1014,7 @@ recv_request_send_reply (struct connection *conn)
r = conn->send (conn, buf, count);
if (r == -1) {
nbdkit_error ("write data: %m");
- return -1;
+ return set_status (conn, -1);
}
}
}
--
2.13.6