With this patch things are a little faster (compare to previous
commit):
READ: bw=276MiB/s (290MB/s), 69.1MiB/s-69.3MiB/s (72.5MB/s-72.6MB/s), io=4096MiB
(4295MB), run=14781-14814msec
---
fuse/operations.c | 158 ++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 145 insertions(+), 13 deletions(-)
diff --git a/fuse/operations.c b/fuse/operations.c
index 4da701e..a8e6f81 100644
--- a/fuse/operations.c
+++ b/fuse/operations.c
@@ -35,10 +35,12 @@
#include <limits.h>
#include <fcntl.h>
#include <unistd.h>
+#include <poll.h>
#include <errno.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <pthread.h>
#include <libnbd.h>
@@ -47,6 +49,11 @@
#define MAX_REQUEST_SIZE (32 * 1024 * 1024)
+/* Number of seconds to wait for commands to complete when closing the
+ * file.
+ */
+#define RELEASE_TIMEOUT 5
+
/* Wraps calls to libnbd functions and automatically checks for error,
* returning errors in the format required by FUSE. It also prints
* out the full error message on stderr, so that we don't lose it.
@@ -133,11 +140,106 @@ nbdfuse_open (const char *path, struct fuse_file_info *fi)
return 0;
}
+/* Because we're called on multiple threads and are using a single nbd
+ * handle, we want to issue multiple commands in parallel. We
+ * therefore cannot use the synchronous APIs (nbd_pread etc) single
+ * those lock the handle while they are waiting for the response.
+ *
+ * Instead we start the command using an AIO call (eg. nbd_aio_pread),
+ * and wait for it to complete by calling this function which does not
+ * hold the handle lock. Thus commands in other threads can run in
+ * parallel.
+ *
+ * Note that we are intentionally calling poll(2) on the same file
+ * descriptor from multiple threads. This means it's likely that
+ * another threads will see events related to our command and do
+ * processing for us. This is (mostly) OK, but it means that the
+ * handle direction will change unexpectedly, so we need to be
+ * prepared for that. The alternative is a more complex and slower
+ * design involving a separate polling thread.
+ */
+static int poll_socket (struct pollfd *, int timeout);
+
+static int
+wait_for_completion (int64_t cookie)
+{
+ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+ int r;
+ unsigned dir;
+ struct pollfd fds[1];
+
+ /* nbd_aio_command_completed returns:
+ * 0 => command still in flight, we must wait in the loop
+ * 1 => completed successfully
+ * -1 => error
+ */
+ while ((r = nbd_aio_command_completed (nbd, cookie)) == 0) {
+ /* Don't poll forever here since other threads may finish our
+ * command for us.
+ */
+ if (poll_socket (fds, 100) == -1)
+ return -1;
+
+ /* Direction may have changed in another thread, so check it
+ * again. We also have to check the socket revents again.
+ * Protect the whole lot with a global lock.
+ */
+ pthread_mutex_lock (&lock);
+ r = poll_socket (fds, 0);
+ if (r >= 0) {
+ dir = nbd_aio_get_direction (nbd);
+ r = 0;
+ if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
+ (fds[0].revents & POLLIN) != 0)
+ r = nbd_aio_notify_read (nbd);
+ else if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0 &&
+ (fds[0].revents & POLLOUT) != 0)
+ r = nbd_aio_notify_write (nbd);
+ else if ((fds[0].revents & (POLLHUP | POLLERR | POLLNVAL)) != 0) {
+ fprintf (stderr, "nbdfuse: server closed socket unexpectedly\n");
+ r = -1;
+ }
+ }
+ pthread_mutex_unlock (&lock);
+ if (r == -1)
+ return -1;
+ }
+
+ return r;
+}
+
+static int
+poll_socket (struct pollfd *fds, int timeout)
+{
+ int r;
+ unsigned dir;
+
+ fds[0].fd = nbd_aio_get_fd (nbd);
+ if (fds[0].fd == -1)
+ return -1;
+ fds[0].events = fds[0].revents = 0;
+ dir = nbd_aio_get_direction (nbd);
+ if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
+ fds[0].events |= POLLIN;
+ if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0)
+ fds[0].events |= POLLOUT;
+
+ r = poll (fds, 1, timeout);
+ if (r == -1) {
+ perror ("nbdfuse: poll");
+ return -1;
+ }
+
+ return r;
+}
+
static int
nbdfuse_read (const char *path, char *buf,
size_t count, off_t offset,
struct fuse_file_info *fi)
{
+ int64_t cookie;
+
if (!file_mode && (path[0] != '/' || strcmp (path+1, filename) != 0))
return -ENOENT;
@@ -150,7 +252,9 @@ nbdfuse_read (const char *path, char *buf,
if (offset + count > size)
count = size - offset;
- CHECK_NBD_ERROR (nbd_pread (nbd, buf, count, offset, 0));
+ CHECK_NBD_ERROR (cookie = nbd_aio_pread (nbd, buf, count, offset,
+ NBD_NULL_COMPLETION, 0));
+ CHECK_NBD_ERROR (wait_for_completion (cookie));
return (int) count;
}
@@ -160,6 +264,8 @@ nbdfuse_write (const char *path, const char *buf,
size_t count, off_t offset,
struct fuse_file_info *fi)
{
+ int64_t cookie;
+
/* Probably shouldn't happen because of nbdfuse_open check. */
if (readonly)
return -EACCES;
@@ -176,7 +282,9 @@ nbdfuse_write (const char *path, const char *buf,
if (offset + count > size)
count = size - offset;
- CHECK_NBD_ERROR (nbd_pwrite (nbd, buf, count, offset, 0));
+ CHECK_NBD_ERROR (cookie = nbd_aio_pwrite (nbd, buf, count, offset,
+ NBD_NULL_COMPLETION, 0));
+ CHECK_NBD_ERROR (wait_for_completion (cookie));
return (int) count;
}
@@ -184,28 +292,44 @@ nbdfuse_write (const char *path, const char *buf,
static int
nbdfuse_fsync (const char *path, int datasync, struct fuse_file_info *fi)
{
+ int64_t cookie;
+
if (readonly)
return 0;
/* If the server doesn't support flush then the operation is
* silently ignored.
*/
- if (nbd_can_flush (nbd))
- CHECK_NBD_ERROR (nbd_flush (nbd, 0));
+ if (nbd_can_flush (nbd)) {
+ CHECK_NBD_ERROR (cookie = nbd_aio_flush (nbd, NBD_NULL_COMPLETION, 0));
+ CHECK_NBD_ERROR (wait_for_completion (cookie));
+ }
return 0;
}
-/* This is called on the last close of a file. We do a flush here to
- * be on the safe side, but it's not strictly necessary.
- */
+/* This is called on the last close of a file. */
static int
nbdfuse_release (const char *path, struct fuse_file_info *fi)
{
- if (readonly)
- return 0;
+ time_t st;
- return nbdfuse_fsync (path, 0, fi);
+ /* We do a synchronous flush here to be on the safe side, but it's
+ * not strictly necessary.
+ */
+ if (!readonly && nbd_can_flush (nbd))
+ CHECK_NBD_ERROR (nbd_flush (nbd, 0));
+
+ /* Wait until there are no more commands in flight or until a
+ * timeout is reached.
+ */
+ time (&st);
+ while (nbd_aio_in_flight (nbd) > 0 &&
+ time (NULL) - st <= RELEASE_TIMEOUT &&
+ nbd_poll (nbd, 1000) >= 0)
+ ;
+
+ return 0;
}
/* Punch a hole or write zeros. */
@@ -213,6 +337,8 @@ static int
nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t len,
struct fuse_file_info *fi)
{
+ int64_t cookie;
+
if (readonly)
return -EACCES;
@@ -220,7 +346,9 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
if (!nbd_can_trim (nbd))
return -EOPNOTSUPP; /* Trim not supported. */
else {
- CHECK_NBD_ERROR (nbd_trim (nbd, len, offset, 0));
+ CHECK_NBD_ERROR (cookie = nbd_aio_trim (nbd, len, offset,
+ NBD_NULL_COMPLETION, 0));
+ CHECK_NBD_ERROR (wait_for_completion (cookie));
return 0;
}
}
@@ -236,13 +364,17 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t
len,
while (len > 0) {
off_t n = MIN (len, sizeof zerobuf);
- CHECK_NBD_ERROR (nbd_pwrite (nbd, zerobuf, n, offset, 0));
+ CHECK_NBD_ERROR (cookie = nbd_aio_pwrite (nbd, zerobuf, n, offset,
+ NBD_NULL_COMPLETION,0));
+ CHECK_NBD_ERROR (wait_for_completion (cookie));
len -= n;
}
return 0;
}
else {
- CHECK_NBD_ERROR (nbd_zero (nbd, len, offset, 0));
+ CHECK_NBD_ERROR (cookie = nbd_aio_zero (nbd, len, offset,
+ NBD_NULL_COMPLETION,0));
+ CHECK_NBD_ERROR (wait_for_completion (cookie));
return 0;
}
}
--
2.31.1