On top of the previous commit which enabled multithreading but
continued to use the synchronous libnbd API, this allows each thread
to issue commands asynchronously. Because there is still a single
handle, this introduces a single background thread to poll the file
descriptor and dispatch the commands.
This is only a little bit faster (compare to results in previous
commit message):
READ: bw=250MiB/s (262MB/s), 62.4MiB/s-62.4MiB/s (65.4MB/s-65.5MB/s), io=4096MiB
(4295MB), run=16398-16411msec
A future multi-conn version of nbdfuse would likely use multiple
background threads (one per connection) to do the same job, but that
is left for future work.
---
fuse/nbdfuse.c | 5 ++
fuse/nbdfuse.h | 1 +
fuse/operations.c | 181 +++++++++++++++++++++++++++++++++++++++++-----
3 files changed, 169 insertions(+), 18 deletions(-)
diff --git a/fuse/nbdfuse.c b/fuse/nbdfuse.c
index fa35080..f91ff7f 100644
--- a/fuse/nbdfuse.c
+++ b/fuse/nbdfuse.c
@@ -426,6 +426,11 @@ main (int argc, char *argv[])
if (nbd_is_read_only (nbd) > 0)
readonly = true;
+ /* Create the background thread which is used to dispatch NBD
+ * operations.
+ */
+ start_operations_thread ();
+
/* This is just used to give an unchanging time when they stat in
* the mountpoint.
*/
diff --git a/fuse/nbdfuse.h b/fuse/nbdfuse.h
index 1f8f703..016c325 100644
--- a/fuse/nbdfuse.h
+++ b/fuse/nbdfuse.h
@@ -36,5 +36,6 @@ extern char *filename;
extern uint64_t size;
extern struct fuse_operations nbdfuse_operations;
+extern void start_operations_thread (void);
#endif /* LIBNBD_NBDFUSE_H */
diff --git a/fuse/operations.c b/fuse/operations.c
index 4da701e..1e81593 100644
--- a/fuse/operations.c
+++ b/fuse/operations.c
@@ -39,6 +39,7 @@
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <pthread.h>
#include <libnbd.h>
@@ -47,14 +48,90 @@
#define MAX_REQUEST_SIZE (32 * 1024 * 1024)
-/* Wraps calls to libnbd functions and automatically checks for error,
- * returning errors in the format required by FUSE. It also prints
- * out the full error message on stderr, so that we don't lose it.
+/* Number of seconds to wait for commands to complete when closing the file. */
+#define RELEASE_TIMEOUT 5
+
+/* This operations background thread runs while nbdfuse is running and
+ * is responsible for dispatching AIO commands.
+ *
+ * The commands themselves are initiated by the FUSE threads (by
+ * calling eg. nbd_aio_pread), and then those threads call
+ * wait_for_completion() which waits for the command to retire.
+ *
+ * A condition variable is signalled by any FUSE thread when it has
+ * started a new AIO command and wants the operations thread to start
+ * processing (if it isn't doing so already). To signal completion we
+ * use a completion callback which signals a per-thread completion
+ * condition.
*/
-#define CHECK_NBD_ERROR(CALL) \
- do { if ((CALL) == -1) return check_nbd_error (); } while (0)
+static void *operations_thread (void *);
+
+void
+start_operations_thread (void)
+{
+ int err;
+ pthread_t t;
+
+ err = pthread_create (&t, NULL, operations_thread, NULL);
+ if (err != 0) {
+ errno = err;
+ perror ("nbdfuse: pthread_create");
+ exit (EXIT_FAILURE);
+ }
+}
+
+static pthread_mutex_t start_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER;
+
+struct completion {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ bool completed;
+} completion;
+
+static void *
+operations_thread (void *arg)
+{
+ while (1) {
+ /* Sleep until a command is in flight. */
+ pthread_mutex_lock (&start_mutex);
+ while (nbd_aio_in_flight (nbd) == 0)
+ pthread_cond_wait (&start_cond, &start_mutex);
+ pthread_mutex_unlock (&start_mutex);
+
+ /* Dispatch work while there are commands in flight. */
+ while (nbd_aio_in_flight (nbd) > 0)
+ nbd_poll (nbd, -1);
+ }
+
+ /*NOTREACHED*/
+ return NULL;
+}
+
+/* Completion callback - called from the operations thread when a
+ * command completes.
+ */
+static int
+completion_callback (void *vp, int *error)
+{
+ struct completion *completion = vp;
+
+ /* Mark the command as completed. */
+ completion->completed = true;
+
+ pthread_mutex_lock (&completion->mutex);
+ pthread_cond_signal (&completion->cond);
+ pthread_mutex_unlock (&completion->mutex);
+
+ /* Don't retire the command. We want to get the error indication in
+ * the FUSE thread.
+ */
+ return 0;
+}
+
+/* Report an NBD error and return -errno. */
static int
-check_nbd_error (void)
+report_nbd_error (void)
{
int err;
@@ -66,6 +143,55 @@ check_nbd_error (void)
return -EIO;
}
+static int
+wait_for_completion (struct completion *completion, int64_t cookie)
+{
+ int r;
+
+ /* Signal to the operations thread to start work, in case it is sleeping. */
+ pthread_mutex_lock (&start_mutex);
+ pthread_cond_signal (&start_cond);
+ pthread_mutex_unlock (&start_mutex);
+
+ /* Wait until the completion_callback sets the completed flag.
+ *
+ * We cannot call nbd_aio_command_completed yet because that can
+ * lead to a possible deadlock where completion_callback holds the
+ * NBD handle lock and we try to acquire it by calling
+ * nbd_aio_command_completed. That is the reason for the
+ * completion.completed flag.
+ */
+ pthread_mutex_lock (&completion->mutex);
+ while (!completion->completed)
+ pthread_cond_wait (&completion->cond, &completion->mutex);
+ pthread_mutex_unlock (&completion->mutex);
+
+ /* nbd_aio_command_completed returns:
+ * 0 => command still in flight (should be impossible)
+ * 1 => completed successfully
+ * -1 => error
+ */
+ r = nbd_aio_command_completed (nbd, cookie);
+ assert (r != 0);
+ return r;
+}
+
+/* Wrap calls to any asynch command and check the error. */
+#define CHECK_NBD_ASYNC_ERROR(CALL) \
+ do { \
+ struct completion completion = \
+ { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, false }; \
+ nbd_completion_callback cb = \
+ { .callback = completion_callback, .user_data = &completion }; \
+ int64_t cookie = (CALL); \
+ if (cookie == -1 || wait_for_completion (&completion, cookie) == -1) \
+ return report_nbd_error (); \
+ } while (0)
+
+/* Wraps calls to sync libnbd functions and check the error. */
+#define CHECK_NBD_SYNC_ERROR(CALL) \
+ do { if ((CALL) == -1) return report_nbd_error (); } while (0)
+
static int
nbdfuse_getattr (const char *path, struct stat *statbuf,
struct fuse_file_info *fi)
@@ -150,7 +276,7 @@ nbdfuse_read (const char *path, char *buf,
if (offset + count > size)
count = size - offset;
- CHECK_NBD_ERROR (nbd_pread (nbd, buf, count, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_pread (nbd, buf, count, offset, cb, 0));
return (int) count;
}
@@ -176,7 +302,7 @@ nbdfuse_write (const char *path, const char *buf,
if (offset + count > size)
count = size - offset;
- CHECK_NBD_ERROR (nbd_pwrite (nbd, buf, count, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_pwrite (nbd, buf, count, offset, cb, 0));
return (int) count;
}
@@ -191,21 +317,40 @@ nbdfuse_fsync (const char *path, int datasync, struct fuse_file_info
*fi)
* silently ignored.
*/
if (nbd_can_flush (nbd))
- CHECK_NBD_ERROR (nbd_flush (nbd, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_flush (nbd, cb, 0));
return 0;
}
-/* This is called on the last close of a file. We do a flush here to
- * be on the safe side, but it's not strictly necessary.
- */
+/* This is called on the last close of a file. */
static int
nbdfuse_release (const char *path, struct fuse_file_info *fi)
{
- if (readonly)
- return 0;
+ time_t st;
- return nbdfuse_fsync (path, 0, fi);
+ /* We do a synchronous flush here to be on the safe side, but it's
+ * not strictly necessary.
+ */
+ if (!readonly && nbd_can_flush (nbd))
+ CHECK_NBD_SYNC_ERROR (nbd_flush (nbd, 0));
+
+ /* Wait until there are no more commands in flight or until a
+ * timeout is reached.
+ */
+ time (&st);
+ while (1) {
+ if (nbd_aio_in_flight (nbd) == 0)
+ break;
+ if (time (NULL) - st > RELEASE_TIMEOUT)
+ break;
+
+ /* Signal to the operations thread to work. */
+ pthread_mutex_lock (&start_mutex);
+ pthread_cond_signal (&start_cond);
+ pthread_mutex_unlock (&start_mutex);
+ }
+
+ return 0;
}
/* Punch a hole or write zeros. */
@@ -220,7 +365,7 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
if (!nbd_can_trim (nbd))
return -EOPNOTSUPP; /* Trim not supported. */
else {
- CHECK_NBD_ERROR (nbd_trim (nbd, len, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_trim (nbd, len, offset, cb, 0));
return 0;
}
}
@@ -236,13 +381,13 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
while (len > 0) {
off_t n = MIN (len, sizeof zerobuf);
- CHECK_NBD_ERROR (nbd_pwrite (nbd, zerobuf, n, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_pwrite (nbd, zerobuf, n, offset, cb, 0));
len -= n;
}
return 0;
}
else {
- CHECK_NBD_ERROR (nbd_zero (nbd, len, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_zero (nbd, len, offset, cb, 0));
return 0;
}
}
--
2.31.1