In preparation for allowing interleaved response, refactor the
nbd forwarder so that writes are still done from the thread
handling the original request from the client, but all reads are
done by a dedicated reader thread. Control between the two
threads is gated by a new mutex for storing the transaction
information, coupled with a pipe for the reader thread to send
the final status back to the handler thread.
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
plugins/nbd/nbd.c | 145 ++++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 119 insertions(+), 26 deletions(-)
diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
index 739eecd..53bf104 100644
--- a/plugins/nbd/nbd.c
+++ b/plugins/nbd/nbd.c
@@ -44,6 +44,7 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <assert.h>
+#include <pthread.h>
#include <nbdkit-plugin.h>
#include "protocol.h"
@@ -119,18 +120,23 @@ nbd_config_complete (void)
/* The per-transaction details */
struct transaction {
- /* TODO: the protocol uses a 64-bit handle, but until we allow
- interleaved transactions, 31 bits with wraparound is plenty */
- int cookie;
+ union {
+ uint64_t cookie;
+ int fds[2];
+ } u;
void *buf;
uint32_t count;
};
/* The per-connection handle */
struct handle {
+ /* These fields are read-only once initialized */
int fd;
int flags;
int64_t size;
+ pthread_t reader;
+
+ pthread_mutex_t trans_lock; /* Covers access to all fields below */
/* Our choice of THREAD_MODEL means at most one outstanding transaction */
struct transaction trans;
bool dead;
@@ -179,6 +185,20 @@ write_full (int fd, const void *buf, size_t len)
return 0;
}
+static void
+nbd_lock (struct handle *h)
+{
+ int r = pthread_mutex_lock (&h->trans_lock);
+ assert (!r);
+}
+
+static void
+nbd_unlock (struct handle *h)
+{
+ int r = pthread_mutex_unlock (&h->trans_lock);
+ assert (!r);
+}
+
/* Called during transmission phases when there is no hope of
* resynchronizing with the server, and all further requests from the
* client will fail. Returns -1 for convenience. */
@@ -187,6 +207,7 @@ nbd_mark_dead (struct handle *h)
{
int err = errno;
+ nbd_lock (h);
if (!h->dead) {
nbdkit_debug ("permanent failure while talking to server %s: %m",
sockname);
@@ -194,6 +215,7 @@ nbd_mark_dead (struct handle *h)
}
else if (!err)
errno = ESHUTDOWN;
+ nbd_unlock (h);
/* NBD only accepts a limited set of errno values over the wire, and
nbdkit converts all other values to EINVAL. If we died due to an
errno value that cannot transmit over the wire, translate it to
@@ -223,22 +245,40 @@ nbd_request_raw (struct handle *h, uint32_t type, uint64_t offset,
}
/* Perform the request half of a transaction. On success, return the
- non-negative cookie to match to the reply; on error return -1. */
+ non-negative fd for reading the reply; on error return -1. */
static int
nbd_request_full (struct handle *h, uint32_t type, uint64_t offset,
uint32_t count, const void *req_buf, void *rep_buf)
{
- if (h->dead)
- return nbd_mark_dead (h);
- h->trans.buf = rep_buf;
- h->trans.count = rep_buf ? count : 0;
- if (++h->trans.cookie > INT_MAX)
- h->trans.cookie = 1;
- if (nbd_request_raw (h, type, offset, count, h->trans.cookie) < 0)
+ int err;
+ struct transaction *trans;
+
+ nbd_lock (h);
+ if (h->dead) {
+ nbd_unlock (h);
return nbd_mark_dead (h);
+ }
+ trans = &h->trans;
+ nbd_unlock (h);
+ if (pipe (trans->u.fds)) {
+ nbdkit_error ("unable to create pipe: %m");
+ /* Still in sync with server, so don't mark connection dead */
+ return -1;
+ }
+ trans->buf = rep_buf;
+ trans->count = rep_buf ? count : 0;
+ if (nbd_request_raw (h, type, offset, count, trans->u.cookie) < 0)
+ goto err;
if (req_buf && write_full (h->fd, req_buf, count) < 0)
- return nbd_mark_dead (h);
- return h->trans.cookie;
+ goto err;
+ return trans->u.fds[0];
+
+ err:
+ err = errno;
+ close (trans->u.fds[0]);
+ close (trans->u.fds[1]);
+ errno = err;
+ return nbd_mark_dead (h);
}
/* Shorthand for nbd_request_full when no extra buffers are involved. */
@@ -248,23 +288,28 @@ nbd_request (struct handle *h, uint32_t type, uint64_t offset,
uint32_t count)
return nbd_request_full (h, type, offset, count, NULL, NULL);
}
-/* Read a reply, and look up the corresponding transaction. Return
- the server's non-negative answer (converted to local errno value)
- on success, or -1 on read failure. */
+/* Read a reply, and look up the fd corresponding to the transaction.
+ Return the server's non-negative answer (converted to local errno
+ value) on success, or -1 on read failure. */
static int
-nbd_reply_raw (struct handle *h, struct transaction *trans)
+nbd_reply_raw (struct handle *h, int *fd)
{
struct reply rep;
+ struct transaction trans;
+ *fd = -1;
if (read_full (h->fd, &rep, sizeof rep) < 0)
return nbd_mark_dead (h);
- *trans = h->trans;
+ nbd_lock (h);
+ trans = h->trans;
+ nbd_unlock (h);
+ *fd = trans.u.fds[1];
nbdkit_debug ("received reply for cookie %#" PRIx64, rep.handle);
- if (be32toh (rep.magic) != NBD_REPLY_MAGIC || rep.handle != trans->cookie)
+ if (be32toh (rep.magic) != NBD_REPLY_MAGIC || rep.handle != trans.u.cookie)
return nbd_mark_dead (h);
switch (be32toh (rep.error)) {
case NBD_SUCCESS:
- if (trans->buf && read_full (h->fd, trans->buf, trans->count)
< 0)
+ if (trans.buf && read_full (h->fd, trans.buf, trans.count) < 0)
return nbd_mark_dead (h);
return 0;
case NBD_EPERM:
@@ -295,17 +340,51 @@ nbd_reply_raw (struct handle *h, struct transaction *trans)
}
}
+/* Reader loop. */
+void *
+nbd_reader (void *handle)
+{
+ struct handle *h = handle;
+ bool done = false;
+
+ while (!done) {
+ int r;
+ int fd;
+
+ r = nbd_reply_raw (h, &fd);
+ if (r >= 0) {
+ if (write (fd, &r, sizeof r) != sizeof r) {
+ nbdkit_error ("failed to write pipe: %m");
+ abort ();
+ }
+ }
+ if (fd >= 0)
+ close (fd);
+ nbd_lock (h);
+ done = h->dead;
+ nbd_unlock (h);
+ }
+ return NULL;
+}
+
/* Perform the reply half of a transaction. */
static int
-nbd_reply (struct handle *h, int cookie)
+nbd_reply (struct handle *h, int fd)
{
int err;
- struct transaction trans = { 0 };
- err = nbd_reply_raw (h, &trans);
- assert (err < 0 || cookie == trans.cookie);
- if (err > 0)
- errno = err;
+ if (read (fd, &err, sizeof err) != sizeof err) {
+ nbdkit_debug ("failed to read pipe: %m");
+ err = EIO;
+ }
+ nbd_lock (h);
+ /* TODO This check is just for sanity that the reader thread concept
+ works; it won't work once we allow interleaved requests */
+ assert (fd == h->trans.u.fds[0]);
+ h->trans.u.fds[0] = -1;
+ nbd_unlock (h);
+ close (fd);
+ errno = err;
return err ? -1 : 0;
}
@@ -396,6 +475,17 @@ nbd_open (int readonly)
goto err;
}
+ /* Spawn a dedicated reader thread */
+ if ((errno = pthread_mutex_init (&h->trans_lock, NULL))) {
+ nbdkit_error ("failed to initialize transaction mutex: %m");
+ goto err;
+ }
+ if ((errno = pthread_create (&h->reader, NULL, nbd_reader, h))) {
+ nbdkit_error ("failed to initialize reader thread: %m");
+ pthread_mutex_destroy (&h->trans_lock);
+ goto err;
+ }
+
return h;
err:
@@ -412,6 +502,9 @@ nbd_close (void *handle)
if (!h->dead)
nbd_request_raw (h, NBD_CMD_DISC, 0, 0, 0);
close (h->fd);
+ if ((errno = pthread_join (h->reader, NULL)))
+ nbdkit_debug ("failed to join reader thread: %m");
+ pthread_mutex_destroy (&h->trans_lock);
free (h);
}
--
2.13.6