On Fri, May 14, 2021 at 12:07 PM Richard W.M. Jones <rjones(a)redhat.com> wrote:
On top of the previous commit which enabled multithreading but
continued to use the synchronous libnbd API, this allows each thread
to issue commands asynchronously. Because there is still a single
handle, this introduces a single background thread to poll the file
descriptor and dispatch the commands.
This is only a little bit faster (compare to results in previous
commit message):
READ: bw=250MiB/s (262MB/s), 62.4MiB/s-62.4MiB/s (65.4MB/s-65.5MB/s), io=4096MiB
(4295MB), run=16398-16411msec
A future multi-conn version of nbdfuse would likely use multiple
background threads (one per connection) to do the same job, but that
is left for future work.
---
fuse/nbdfuse.c | 5 ++
fuse/nbdfuse.h | 1 +
fuse/operations.c | 181 +++++++++++++++++++++++++++++++++++++++++-----
3 files changed, 169 insertions(+), 18 deletions(-)
diff --git a/fuse/nbdfuse.c b/fuse/nbdfuse.c
index fa35080..f91ff7f 100644
--- a/fuse/nbdfuse.c
+++ b/fuse/nbdfuse.c
@@ -426,6 +426,11 @@ main (int argc, char *argv[])
if (nbd_is_read_only (nbd) > 0)
readonly = true;
+ /* Create the background thread which is used to dispatch NBD
+ * operations.
+ */
+ start_operations_thread ();
+
/* This is just used to give an unchanging time when they stat in
* the mountpoint.
*/
diff --git a/fuse/nbdfuse.h b/fuse/nbdfuse.h
index 1f8f703..016c325 100644
--- a/fuse/nbdfuse.h
+++ b/fuse/nbdfuse.h
@@ -36,5 +36,6 @@ extern char *filename;
extern uint64_t size;
extern struct fuse_operations nbdfuse_operations;
+extern void start_operations_thread (void);
#endif /* LIBNBD_NBDFUSE_H */
diff --git a/fuse/operations.c b/fuse/operations.c
index 4da701e..1e81593 100644
--- a/fuse/operations.c
+++ b/fuse/operations.c
@@ -39,6 +39,7 @@
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <pthread.h>
#include <libnbd.h>
@@ -47,14 +48,90 @@
#define MAX_REQUEST_SIZE (32 * 1024 * 1024)
-/* Wraps calls to libnbd functions and automatically checks for error,
- * returning errors in the format required by FUSE. It also prints
- * out the full error message on stderr, so that we don't lose it.
+/* Number of seconds to wait for commands to complete when closing the file. */
+#define RELEASE_TIMEOUT 5
+
+/* This operations background thread runs while nbdfuse is running and
+ * is responsible for dispatching AIO commands.
+ *
+ * The commands themselves are initiated by the FUSE threads (by
+ * calling eg. nbd_aio_pread), and then those threads call
+ * wait_for_completion() which waits for the command to retire.
+ *
+ * A condition variable is signalled by any FUSE thread when it has
+ * started a new AIO command and wants the operations thread to start
+ * processing (if it isn't doing so already). To signal completion we
+ * use a completion callback which signals a per-thread completion
+ * condition.
*/
-#define CHECK_NBD_ERROR(CALL) \
- do { if ((CALL) == -1) return check_nbd_error (); } while (0)
+static void *operations_thread (void *);
+
+void
+start_operations_thread (void)
+{
+ int err;
+ pthread_t t;
+
+ err = pthread_create (&t, NULL, operations_thread, NULL);
+ if (err != 0) {
+ errno = err;
+ perror ("nbdfuse: pthread_create");
+ exit (EXIT_FAILURE);
+ }
+}
+
+static pthread_mutex_t start_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER;
+
+struct completion {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ bool completed;
+} completion;
+
+static void *
+operations_thread (void *arg)
+{
+ while (1) {
+ /* Sleep until a command is in flight. */
+ pthread_mutex_lock (&start_mutex);
+ while (nbd_aio_in_flight (nbd) == 0)
+ pthread_cond_wait (&start_cond, &start_mutex);
+ pthread_mutex_unlock (&start_mutex);
+
+ /* Dispatch work while there are commands in flight. */
+ while (nbd_aio_in_flight (nbd) > 0)
+ nbd_poll (nbd, -1);
Just to make sure I understand this correctly - this runs all completion
callbacks?
Would be nice to mention this in the comment.
+ }
+
+ /*NOTREACHED*/
+ return NULL;
+}
+
+/* Completion callback - called from the operations thread when a
+ * command completes.
+ */
+static int
+completion_callback (void *vp, int *error)
+{
+ struct completion *completion = vp;
+
+ /* Mark the command as completed. */
+ completion->completed = true;
I think changing this after locking the completion mutex will
be better. The mutx will ensure proper ordering of things and
it matches the code checking this flag while the mutex is locked.
+
+ pthread_mutex_lock (&completion->mutex);
+ pthread_cond_signal (&completion->cond);
+ pthread_mutex_unlock (&completion->mutex);
+
+ /* Don't retire the command. We want to get the error indication in
+ * the FUSE thread.
+ */
+ return 0;
How retiring the command affects the error indicator?
libnbd errors are stored in thread local storage - how does it
work when another thread is polling?
+}
+
+/* Report an NBD error and return -errno. */
static int
-check_nbd_error (void)
+report_nbd_error (void)
{
int err;
@@ -66,6 +143,55 @@ check_nbd_error (void)
return -EIO;
}
+static int
+wait_for_completion (struct completion *completion, int64_t cookie)
+{
+ int r;
+
+ /* Signal to the operations thread to start work, in case it is sleeping. */
+ pthread_mutex_lock (&start_mutex);
+ pthread_cond_signal (&start_cond);
+ pthread_mutex_unlock (&start_mutex);
+
+ /* Wait until the completion_callback sets the completed flag.
+ *
+ * We cannot call nbd_aio_command_completed yet because that can
+ * lead to a possible deadlock where completion_callback holds the
+ * NBD handle lock and we try to acquire it by calling
+ * nbd_aio_command_completed. That is the reason for the
+ * completion.completed flag.
+ */
+ pthread_mutex_lock (&completion->mutex);
+ while (!completion->completed)
+ pthread_cond_wait (&completion->cond, &completion->mutex);
+ pthread_mutex_unlock (&completion->mutex);
+
+ /* nbd_aio_command_completed returns:
+ * 0 => command still in flight (should be impossible)
+ * 1 => completed successfully
+ * -1 => error
+ */
+ r = nbd_aio_command_completed (nbd, cookie);
+ assert (r != 0);
+ return r;
+}
+
+/* Wrap calls to any asynch command and check the error. */
+#define CHECK_NBD_ASYNC_ERROR(CALL)
+ do { \
+ struct completion completion = \
+ { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, false }; \
+ nbd_completion_callback cb = \
+ { .callback = completion_callback, .user_data = &completion }; \
+ int64_t cookie = (CALL); \
+ if (cookie == -1 || wait_for_completion (&completion, cookie) == -1) \
+ return report_nbd_error ();
This runs in the fuse thread, but the error was detected in the operations
thread. Do we pass the error to the thread starting the async io request
in some way?
Looks good, I'm not sure there is a better way to use a single handle from
multiple threads.
It looks like a generic infrastructure that could be part of the library like
nbd_poll().
Nir
\
+ } while (0)
+
+/* Wraps calls to sync libnbd functions and check the error. */
+#define CHECK_NBD_SYNC_ERROR(CALL) \
+ do { if ((CALL) == -1) return report_nbd_error (); } while (0)
+
static int
nbdfuse_getattr (const char *path, struct stat *statbuf,
struct fuse_file_info *fi)
@@ -150,7 +276,7 @@ nbdfuse_read (const char *path, char *buf,
if (offset + count > size)
count = size - offset;
- CHECK_NBD_ERROR (nbd_pread (nbd, buf, count, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_pread (nbd, buf, count, offset, cb, 0));
return (int) count;
}
@@ -176,7 +302,7 @@ nbdfuse_write (const char *path, const char *buf,
if (offset + count > size)
count = size - offset;
- CHECK_NBD_ERROR (nbd_pwrite (nbd, buf, count, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_pwrite (nbd, buf, count, offset, cb, 0));
return (int) count;
}
@@ -191,21 +317,40 @@ nbdfuse_fsync (const char *path, int datasync, struct
fuse_file_info *fi)
* silently ignored.
*/
if (nbd_can_flush (nbd))
- CHECK_NBD_ERROR (nbd_flush (nbd, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_flush (nbd, cb, 0));
return 0;
}
-/* This is called on the last close of a file. We do a flush here to
- * be on the safe side, but it's not strictly necessary.
- */
+/* This is called on the last close of a file. */
static int
nbdfuse_release (const char *path, struct fuse_file_info *fi)
{
- if (readonly)
- return 0;
+ time_t st;
- return nbdfuse_fsync (path, 0, fi);
+ /* We do a synchronous flush here to be on the safe side, but it's
+ * not strictly necessary.
+ */
+ if (!readonly && nbd_can_flush (nbd))
+ CHECK_NBD_SYNC_ERROR (nbd_flush (nbd, 0));
+
+ /* Wait until there are no more commands in flight or until a
+ * timeout is reached.
+ */
+ time (&st);
+ while (1) {
+ if (nbd_aio_in_flight (nbd) == 0)
+ break;
+ if (time (NULL) - st > RELEASE_TIMEOUT)
+ break;
+
+ /* Signal to the operations thread to work. */
+ pthread_mutex_lock (&start_mutex);
+ pthread_cond_signal (&start_cond);
+ pthread_mutex_unlock (&start_mutex);
+ }
+
+ return 0;
}
/* Punch a hole or write zeros. */
@@ -220,7 +365,7 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
if (!nbd_can_trim (nbd))
return -EOPNOTSUPP; /* Trim not supported. */
else {
- CHECK_NBD_ERROR (nbd_trim (nbd, len, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_trim (nbd, len, offset, cb, 0));
return 0;
}
}
@@ -236,13 +381,13 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
while (len > 0) {
off_t n = MIN (len, sizeof zerobuf);
- CHECK_NBD_ERROR (nbd_pwrite (nbd, zerobuf, n, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_pwrite (nbd, zerobuf, n, offset, cb, 0));
len -= n;
}
return 0;
}
else {
- CHECK_NBD_ERROR (nbd_zero (nbd, len, offset, 0));
+ CHECK_NBD_ASYNC_ERROR (nbd_aio_zero (nbd, len, offset, cb, 0));
return 0;
}
}
--
2.31.1
_______________________________________________
Libguestfs mailing list
Libguestfs(a)redhat.com
https://listman.redhat.com/mailman/listinfo/libguestfs