In preparation for allowing interleaved response, refactor the
nbd forwarder so that writes are still done from the thread
handling the original request from the client, but all reads are
done by a dedicated reader thread. Control between the two
flags is gated by a mutex for storing the transaction
information, coupled with a pipe for the reader thread to send
the final status back to the handler thread.
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
plugins/nbd/nbd.c | 117 ++++++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 100 insertions(+), 17 deletions(-)
diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
index 35f2781..770fb71 100644
--- a/plugins/nbd/nbd.c
+++ b/plugins/nbd/nbd.c
@@ -44,6 +44,7 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <assert.h>
+#include <pthread.h>
#include <nbdkit-plugin.h>
#include "protocol.h"
@@ -119,18 +120,23 @@ nbd_config_complete (void)
/* The per-transaction details */
struct transaction {
- /* TODO: protocol allows 64-bit handle, but until we allow
- interleaved transactions, 31 bits with wraparound is plenty */
- int cookie;
+ union {
+ uint64_t cookie;
+ int fds[2];
+ } u;
void *buf;
uint32_t count;
};
/* The per-connection handle */
struct handle {
+ /* These fields are read-only once initialized */
int fd;
int flags;
int64_t size;
+ pthread_t reader;
+
+ pthread_mutex_t lock; /* Covers access to all fields below */
/* Our choice of THREAD_MODEL means at most one outstanding transaction */
struct transaction trans;
bool dead;
@@ -179,6 +185,18 @@ write_full (int fd, const void *buf, size_t len)
return 0;
}
+static void nbd_lock (struct handle *h)
+{
+ int r = pthread_mutex_lock (&h->lock);
+ assert (!r);
+}
+
+static void nbd_unlock (struct handle *h)
+{
+ int r = pthread_mutex_unlock (&h->lock);
+ assert (!r);
+}
+
/* Called during transmission phases when there is no hope of
* resynchronizing with the server, and all further requests from the
* client will fail. Returns -1 for convenience. */
@@ -187,6 +205,7 @@ nbd_mark_dead (struct handle *h)
{
int err = errno;
+ nbd_lock (h);
if (!h->dead) {
nbdkit_debug ("permanent failure while talking to server %s: %m",
sockname);
@@ -194,6 +213,7 @@ nbd_mark_dead (struct handle *h)
}
else if (!err)
errno = ESHUTDOWN;
+ nbd_unlock (h);
/* NBD only accepts a limited set of errno values over the wire, and
nbdkit converts all other values to EINVAL. If we died due to an
errno value that cannot transmit over the wire, translate it to
@@ -223,22 +243,34 @@ nbd_request_raw (struct handle *h, uint32_t type, uint64_t offset,
}
/* Perform the request half of a transaction. On success, return the
- non-negative cookie to match to the reply; on error return -1. */
+ non-negative fd for reading the reply; on error return -1. */
static int
nbd_request_full (struct handle *h, uint32_t type, uint64_t offset,
uint32_t count, const void *req_buf, void *rep_buf)
{
+ int err;
+
if (h->dead)
return nbd_mark_dead (h);
+ if (pipe (h->trans.u.fds)) {
+ nbdkit_error ("unable to create pipe: %m");
+ /* Still in sync with server, so don't mark connection dead */
+ return -1;
+ }
h->trans.buf = rep_buf;
h->trans.count = rep_buf ? count : 0;
- if (++h->trans.cookie > INT_MAX)
- h->trans.cookie = 1;
- if (nbd_request_raw (h, type, offset, count, h->trans.cookie) < 0)
- return nbd_mark_dead (h);
+ if (nbd_request_raw (h, type, offset, count, h->trans.u.cookie) < 0)
+ goto err;
if (req_buf && write_full (h->fd, req_buf, count) < 0)
- return nbd_mark_dead (h);
- return h->trans.cookie;
+ goto err;
+ return h->trans.u.fds[0];
+
+ err:
+ err = errno;
+ close (h->trans.u.fds[0]);
+ close (h->trans.u.fds[1]);
+ errno = err;
+ return nbd_mark_dead (h);
}
/* Shorthand for nbd_request_full when no extra buffers are involved. */
@@ -258,9 +290,11 @@ nbd_reply_raw (struct handle *h, struct transaction *trans)
if (read_full (h->fd, &rep, sizeof rep) < 0)
return nbd_mark_dead (h);
+ nbd_lock (h);
*trans = h->trans;
+ nbd_unlock (h);
nbdkit_debug ("received reply for cookie %#" PRIx64, rep.handle);
- if (be32toh (rep.magic) != NBD_REPLY_MAGIC || rep.handle != trans->cookie)
+ if (be32toh (rep.magic) != NBD_REPLY_MAGIC || rep.handle != trans->u.cookie)
return nbd_mark_dead (h);
switch (be32toh (rep.error)) {
case NBD_SUCCESS:
@@ -295,17 +329,52 @@ nbd_reply_raw (struct handle *h, struct transaction *trans)
}
}
+/* Reader loop. */
+void *nbd_reader (void *handle)
+{
+ struct handle *h = handle;
+ bool done = false;
+
+ while (!done) {
+ struct transaction trans;
+ int r;
+ int fd;
+
+ r = nbd_reply_raw (h, &trans);
+ fd = trans.u.fds[1];
+ trans.u.fds[1] = -1;
+ if (r >= 0) {
+ if (write (fd, &r, sizeof r) != sizeof r) {
+ nbdkit_error ("failed to write pipe: %m");
+ abort ();
+ }
+ }
+ close (fd);
+ nbd_lock (h);
+ done = h->dead;
+ nbd_unlock (h);
+ }
+ return NULL;
+}
+
/* Perform the reply half of a transaction. */
static int
-nbd_reply (struct handle *h, int cookie)
+nbd_reply (struct handle *h, int fd)
{
int err;
- struct transaction trans;
- err = nbd_reply_raw (h, &trans);
- assert (err < 0 || cookie == trans.cookie);
- if (err > 0)
- errno = err;
+ if (read (fd, &err, sizeof err) != sizeof err) {
+ nbdkit_debug ("failed to read pipe: %m");
+ err = EIO;
+ }
+ nbd_lock (h);
+ /* TODO This check is just for sanity that the reader thread concept
+ works; it won't work once we allow interleaved requests */
+ assert (fd == h->trans.u.fds[0]);
+ h->trans.u.fds[0] = -1;
+ nbd_unlock (h);
+ close (fd);
+ errno = err;
return err ? -1 : 0;
}
@@ -396,6 +465,17 @@ nbd_open (int readonly)
goto err;
}
+ /* Spawn a dedicated reader thread */
+ if ((errno = pthread_mutex_init (&h->lock, NULL))) {
+ nbdkit_error ("failed to initialize mutex");
+ goto err;
+ }
+ if ((errno = pthread_create (&h->reader, NULL, nbd_reader, h))) {
+ nbdkit_error ("failed to initialize reader thread");
+ pthread_mutex_destroy (&h->lock);
+ goto err;
+ }
+
return h;
err:
@@ -412,6 +492,9 @@ nbd_close (void *handle)
if (!h->dead)
nbd_request_raw (h, NBD_CMD_DISC, 0, 0, 0);
close (h->fd);
+ if ((errno = pthread_join (h->reader, NULL)))
+ nbdkit_debug ("failed to join reader thread: %m");
+ pthread_mutex_destroy (&h->lock);
free (h);
}
--
2.13.6