On top of the previous commit which enabled multithreading but
continued to use the synchronous libnbd API, this allows each thread
to issue commands asynchronously.  Because there is still a single
handle, this introduces a single background thread to poll the file
descriptor and dispatch the commands.
This is only a little bit faster (compare to results in previous
commit message):
   READ: bw=250MiB/s (262MB/s), 62.4MiB/s-62.4MiB/s (65.4MB/s-65.5MB/s), io=4096MiB
(4295MB), run=16398-16411msec
A future multi-conn version of nbdfuse would likely use multiple
background threads (one per connection) to do the same job, but that
is left for future work.
---
 fuse/nbdfuse.c    |   5 ++
 fuse/nbdfuse.h    |   1 +
 fuse/operations.c | 181 +++++++++++++++++++++++++++++++++++++++++-----
 3 files changed, 169 insertions(+), 18 deletions(-)
diff --git a/fuse/nbdfuse.c b/fuse/nbdfuse.c
index fa35080..f91ff7f 100644
--- a/fuse/nbdfuse.c
+++ b/fuse/nbdfuse.c
@@ -426,6 +426,11 @@ main (int argc, char *argv[])
   if (nbd_is_read_only (nbd) > 0)
     readonly = true;
 
+  /* Create the background thread which is used to dispatch NBD
+   * operations.
+   */
+  start_operations_thread ();
+
   /* This is just used to give an unchanging time when they stat in
    * the mountpoint.
    */
diff --git a/fuse/nbdfuse.h b/fuse/nbdfuse.h
index 1f8f703..016c325 100644
--- a/fuse/nbdfuse.h
+++ b/fuse/nbdfuse.h
@@ -36,5 +36,6 @@ extern char *filename;
 extern uint64_t size;
 
 extern struct fuse_operations nbdfuse_operations;
+extern void start_operations_thread (void);
 
 #endif /* LIBNBD_NBDFUSE_H */
diff --git a/fuse/operations.c b/fuse/operations.c
index 4da701e..1e81593 100644
--- a/fuse/operations.c
+++ b/fuse/operations.c
@@ -39,6 +39,7 @@
 #include <assert.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <pthread.h>
 
 #include <libnbd.h>
 
@@ -47,14 +48,90 @@
 
 #define MAX_REQUEST_SIZE (32 * 1024 * 1024)
 
-/* Wraps calls to libnbd functions and automatically checks for error,
- * returning errors in the format required by FUSE.  It also prints
- * out the full error message on stderr, so that we don't lose it.
+/* Number of seconds to wait for commands to complete when closing the file. */
+#define RELEASE_TIMEOUT 5
+
+/* This operations background thread runs while nbdfuse is running and
+ * is responsible for dispatching AIO commands.
+ *
+ * The commands themselves are initiated by the FUSE threads (by
+ * calling eg. nbd_aio_pread), and then those threads call
+ * wait_for_completion() which waits for the command to retire.
+ *
+ * A condition variable is signalled by any FUSE thread when it has
+ * started a new AIO command and wants the operations thread to start
+ * processing (if it isn't doing so already).  To signal completion we
+ * use a completion callback which signals a per-thread completion
+ * condition.
  */
-#define CHECK_NBD_ERROR(CALL)                                   \
-  do { if ((CALL) == -1) return check_nbd_error (); } while (0)
+static void *operations_thread (void *);
+
+void
+start_operations_thread (void)
+{
+  int err;
+  pthread_t t;
+
+  err = pthread_create (&t, NULL, operations_thread, NULL);
+  if (err != 0) {
+    errno = err;
+    perror ("nbdfuse: pthread_create");
+    exit (EXIT_FAILURE);
+  }
+}
+
+static pthread_mutex_t start_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER;
+
+struct completion {
+  pthread_mutex_t mutex;
+  pthread_cond_t cond;
+  bool completed;
+} completion;
+
+static void *
+operations_thread (void *arg)
+{
+  while (1) {
+    /* Sleep until a command is in flight. */
+    pthread_mutex_lock (&start_mutex);
+    while (nbd_aio_in_flight (nbd) == 0)
+      pthread_cond_wait (&start_cond, &start_mutex);
+    pthread_mutex_unlock (&start_mutex);
+
+    /* Dispatch work while there are commands in flight. */
+    while (nbd_aio_in_flight (nbd) > 0)
+      nbd_poll (nbd, -1);
+  }
+
+  /*NOTREACHED*/
+  return NULL;
+}
+
+/* Completion callback - called from the operations thread when a
+ * command completes.
+ */
+static int
+completion_callback (void *vp, int *error)
+{
+  struct completion *completion = vp;
+
+  /* Mark the command as completed. */
+  completion->completed = true;
+
+  pthread_mutex_lock (&completion->mutex);
+  pthread_cond_signal (&completion->cond);
+  pthread_mutex_unlock (&completion->mutex);
+
+  /* Don't retire the command.  We want to get the error indication in
+   * the FUSE thread.
+   */
+  return 0;
+}
+
+/* Report an NBD error and return -errno. */
 static int
-check_nbd_error (void)
+report_nbd_error (void)
 {
   int err;
 
@@ -66,6 +143,55 @@ check_nbd_error (void)
     return -EIO;
 }
 
+static int
+wait_for_completion (struct completion *completion, int64_t cookie)
+{
+  int r;
+
+  /* Signal to the operations thread to start work, in case it is sleeping. */
+  pthread_mutex_lock (&start_mutex);
+  pthread_cond_signal (&start_cond);
+  pthread_mutex_unlock (&start_mutex);
+
+  /* Wait until the completion_callback sets the completed flag.
+   *
+   * We cannot call nbd_aio_command_completed yet because that can
+   * lead to a possible deadlock where completion_callback holds the
+   * NBD handle lock and we try to acquire it by calling
+   * nbd_aio_command_completed.  That is the reason for the
+   * completion.completed flag.
+   */
+  pthread_mutex_lock (&completion->mutex);
+  while (!completion->completed)
+    pthread_cond_wait (&completion->cond, &completion->mutex);
+  pthread_mutex_unlock (&completion->mutex);
+
+  /* nbd_aio_command_completed returns:
+   *  0 => command still in flight (should be impossible)
+   *  1 => completed successfully
+   * -1 => error
+   */
+  r = nbd_aio_command_completed (nbd, cookie);
+  assert (r != 0);
+  return r;
+}
+
+/* Wrap calls to any asynch command and check the error. */
+#define CHECK_NBD_ASYNC_ERROR(CALL)                                     \
+  do {                                                                  \
+    struct completion completion =                                      \
+      { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, false };   \
+    nbd_completion_callback cb =                                        \
+      { .callback = completion_callback, .user_data = &completion };    \
+    int64_t cookie = (CALL);                                            \
+    if (cookie == -1 || wait_for_completion (&completion, cookie) == -1) \
+      return report_nbd_error ();                                       \
+  } while (0)
+
+/* Wraps calls to sync libnbd functions and check the error. */
+#define CHECK_NBD_SYNC_ERROR(CALL)                                      \
+  do { if ((CALL) == -1) return report_nbd_error (); } while (0)
+
 static int
 nbdfuse_getattr (const char *path, struct stat *statbuf,
                  struct fuse_file_info *fi)
@@ -150,7 +276,7 @@ nbdfuse_read (const char *path, char *buf,
   if (offset + count > size)
     count = size - offset;
 
-  CHECK_NBD_ERROR (nbd_pread (nbd, buf, count, offset, 0));
+  CHECK_NBD_ASYNC_ERROR (nbd_aio_pread (nbd, buf, count, offset, cb, 0));
 
   return (int) count;
 }
@@ -176,7 +302,7 @@ nbdfuse_write (const char *path, const char *buf,
   if (offset + count > size)
     count = size - offset;
 
-  CHECK_NBD_ERROR (nbd_pwrite (nbd, buf, count, offset, 0));
+  CHECK_NBD_ASYNC_ERROR (nbd_aio_pwrite (nbd, buf, count, offset, cb, 0));
 
   return (int) count;
 }
@@ -191,21 +317,40 @@ nbdfuse_fsync (const char *path, int datasync, struct fuse_file_info
*fi)
    * silently ignored.
    */
   if (nbd_can_flush (nbd))
-    CHECK_NBD_ERROR (nbd_flush (nbd, 0));
+    CHECK_NBD_ASYNC_ERROR (nbd_aio_flush (nbd, cb, 0));
 
   return 0;
 }
 
-/* This is called on the last close of a file.  We do a flush here to
- * be on the safe side, but it's not strictly necessary.
- */
+/* This is called on the last close of a file. */
 static int
 nbdfuse_release (const char *path, struct fuse_file_info *fi)
 {
-  if (readonly)
-    return 0;
+  time_t st;
 
-  return nbdfuse_fsync (path, 0, fi);
+  /* We do a synchronous flush here to be on the safe side, but it's
+   * not strictly necessary.
+   */
+  if (!readonly && nbd_can_flush (nbd))
+    CHECK_NBD_SYNC_ERROR (nbd_flush (nbd, 0));
+
+  /* Wait until there are no more commands in flight or until a
+   * timeout is reached.
+   */
+  time (&st);
+  while (1) {
+    if (nbd_aio_in_flight (nbd) == 0)
+      break;
+    if (time (NULL) - st > RELEASE_TIMEOUT)
+      break;
+
+    /* Signal to the operations thread to work. */
+    pthread_mutex_lock (&start_mutex);
+    pthread_cond_signal (&start_cond);
+    pthread_mutex_unlock (&start_mutex);
+  }
+
+ return 0;
 }
 
 /* Punch a hole or write zeros. */
@@ -220,7 +365,7 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
     if (!nbd_can_trim (nbd))
       return -EOPNOTSUPP;       /* Trim not supported. */
     else {
-      CHECK_NBD_ERROR (nbd_trim (nbd, len, offset, 0));
+      CHECK_NBD_ASYNC_ERROR (nbd_aio_trim (nbd, len, offset, cb, 0));
       return 0;
     }
   }
@@ -236,13 +381,13 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
 
       while (len > 0) {
         off_t n = MIN (len, sizeof zerobuf);
-        CHECK_NBD_ERROR (nbd_pwrite (nbd, zerobuf, n, offset, 0));
+        CHECK_NBD_ASYNC_ERROR (nbd_aio_pwrite (nbd, zerobuf, n, offset, cb, 0));
         len -= n;
       }
       return 0;
     }
     else {
-      CHECK_NBD_ERROR (nbd_zero (nbd, len, offset, 0));
+      CHECK_NBD_ASYNC_ERROR (nbd_aio_zero (nbd, len, offset, cb, 0));
       return 0;
     }
   }
-- 
2.31.1