Callers may now optionally set up a concurrent writer thread. The
outline of this idea is explained here:
https://www.redhat.com/archives/libguestfs/2019-June/msg00010.html
The change is quite small, but here are some points which are true but
may not be obvious:
* All writes return immediately with success (unless we run out of
memory), and writes never block.
* When going down the READY -> ISSUE_COMMAND.START (etc) path, because
writes never block, we never stop for a read notification, so we
always send all commands in the issue queue before starting to read
any replies. (However this only means that the raw commands are
enqueued in the writer thread, which can still take its merry time
writing the requests to the socket.)
* Commands can be counted as "in flight" when they haven't been
written to the socket yet. This is another reason for increasing
the in flight limits. Does this matter? Probably not. The kernel
might also queue up packets before sending them, or they might be in
flight across the internet or queued at the receiver. Nothing about
"in flight" ever meant that the server has received and is
processing those packets.
* Even with this change, full parallelism isn't quite achievable.
It's still possible to be in a state such as
REPLY.STRUCTURED_REPLY.RECV_* waiting for an unusual fast writer /
slow reader server. If you then decide that you want to send yet
more commands then those commands will only be enqueued in the
handle, not dispatched to the writer thread. To avoid this it is
best to send as many commands as possible as soon as possible before
entering poll, but to a certain extent this is unavoidable with
having only one state machine.
---
docs/libnbd.pod | 73 +++++++++++++++++++++++++++++++++++++++++++++
generator/generator | 29 ++++++++++++++++++
lib/handle.c | 32 ++++++++++++++++++++
lib/internal.h | 7 +++++
lib/socket.c | 27 ++++++++++++++---
podwrapper.pl.in | 3 +-
6 files changed, 166 insertions(+), 5 deletions(-)
diff --git a/docs/libnbd.pod b/docs/libnbd.pod
index ede2539..07d259f 100644
--- a/docs/libnbd.pod
+++ b/docs/libnbd.pod
@@ -400,6 +400,79 @@ If you are issuing multiple in-flight requests (see above) and
limiting the number, then the limit should be applied to each
individual NBD connection.
+=head2 Concurrent writer thread
+
+To achieve the maximum possible performance from libnbd and NBD
+servers, as well as the above techniques you must also use a
+concurrent writer thread. This feature allows requests to be issued
+on the NBD socket at the same time that replies are being read from
+the socket. In other words L<send(2)> and L<recv(2)> calls will be
+running at the same time on the same socket from different threads.
+
+There is a full example using a concurrent writer available at
+L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c>
+
+To implement this, you change your ordinary AIO code in four ways:
+
+=over 4
+
+=item 1. Call nbd_set_concurrent_writer
+
+ struct writer_data {
+ struct nbd_handle *nbd;
+ /* other data here as required */
+ } data;
+
+ nbd_set_concurrent_writer (nbd, &data, writer);
+
+This function can be called on the handle at any time, either after
+the handle is created or after the connection and handshaking has
+completed.
+
+=item 2. Implement non-blocking writer callback
+
+C<writer> is a I<non-blocking> callback which enqueues the buffer into
+a ring or similar FIFO structure:
+
+ struct ring_item {
+ struct writer_data *data;
+ const void *buf;
+ size_t len;
+ };
+
+ void writer (void *data, const void *buf, size_t len)
+ {
+ struct ring_item item;
+
+ /* add (data, buf, len) to a shared ring */
+ item.data = data;
+ item.buf = malloc (len);
+ memcpy (item.buf, buf, len);
+ item.len = len;
+ ring_add (&item);
+
+ writer_signal (); /* kick the writer thread */
+ }
+
+=item 3. Implement writer thread
+
+You must also supply another thread which picks up data off the ring
+and writes it to the socket (see C<nbd_aio_get_fd>). If there is an
+error when writing to the socket, call C<nbd_concurrent_writer_error>
+with the C<errno>.
+
+You have a choice of whether to implement one thread per nbd_handle or
+one thread shared between all handles.
+
+=item 4. Modify main loop
+
+Finally your main loop can unconditionally call
+C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns C<WRITE>
+or C<BOTH> (since the concurrent thread can always enqueue more data
+and so is always "ready to write").
+
+=back
+
=head1 ENCRYPTION AND AUTHENTICATION
The NBD protocol and libnbd supports TLS (sometimes incorrectly called
diff --git a/generator/generator b/generator/generator
index ff6075d..718e253 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd
(see qemu-nbd I<-B> option). See also C<nbd_block_status>.";
};
+ "set_concurrent_writer", {
+ default_call with
+ args = [ Opaque "data";
+ CallbackPersist ("writer", [Opaque "data";
+ BytesIn ("buf", "len")]) ];
+ ret = RErr;
+ permitted_states = [ Created; Connecting; Connected ];
+ shortdesc = "set a concurrent writer thread";
+ longdesc = "\
+Provide an optional concurrent writer thread for better performance.
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+ };
+
+ "concurrent_writer_error", {
+ default_call with
+ args = [ Int "err" ]; ret = RErr;
+ shortdesc = "signal an error from the concurrent writer thread";
+ longdesc = "\
+This can be called from the concurrent writer thread to signal
+that there was an error writing to the socket. As there is no
+way to recover from such errors, the connection will move to the
+dead state soon after.
+
+The parameter is the C<errno> returned by the failed L<send(2)> call.
+It must be non-zero.
+
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+ };
+
"connect_uri", {
default_call with
args = [ String "uri" ]; ret = RErr;
diff --git a/lib/handle.c b/lib/handle.c
index cc311ba..cc5d40f 100644
--- a/lib/handle.c
+++ b/lib/handle.c
@@ -215,6 +215,38 @@ nbd_add_close_callback (struct nbd_handle *h, nbd_close_callback cb,
void *data)
return ret;
}
+int
+nbd_unlocked_set_concurrent_writer (struct nbd_handle *h,
+ void *data, writer_cb writer)
+{
+ /* I suppose we could allow this, but it seems more likely that
+ * it's an error rather than intentional.
+ */
+ if (h->writer != NULL) {
+ set_error (EINVAL, "concurrent writer was already set for this handle");
+ return -1;
+ }
+
+ h->writer = writer;
+ h->writer_data = data;
+ return 0;
+}
+
+int
+nbd_unlocked_concurrent_writer_error (struct nbd_handle *h, int err)
+{
+ if (err != 0) {
+ set_error (EINVAL, "concurrent writer error parameter must be non-zero");
+ return -1;
+ }
+
+ /* Ignore second and subsequent calls, record only the first error. */
+ if (h->writer_error == 0)
+ h->writer_error = err;
+
+ return 0;
+}
+
const char *
nbd_unlocked_get_package_name (struct nbd_handle *h)
{
diff --git a/lib/internal.h b/lib/internal.h
index c1a57ac..380302d 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -43,6 +43,8 @@ struct close_callback;
struct socket;
struct command_in_flight;
+typedef int (*writer_cb) (void *data, const void *buf, size_t len);
+
struct nbd_handle {
/* Lock protecting concurrent access to the handle. */
pthread_mutex_t lock;
@@ -90,6 +92,11 @@ struct nbd_handle {
/* The socket or a wrapper if using GnuTLS. */
struct socket *sock;
+ /* Writer callback if using concurrent writer. */
+ void *writer_data;
+ writer_cb writer;
+ int writer_error;
+
/* Generic way to read into a buffer - set rbuf to point to a
* buffer, rlen to the amount of data you expect, and in the state
* machine call recv_into_rbuf.
diff --git a/lib/socket.c b/lib/socket.c
index f48e455..77a45cd 100644
--- a/lib/socket.c
+++ b/lib/socket.c
@@ -46,10 +46,29 @@ socket_send (struct nbd_handle *h,
{
ssize_t r;
- r = send (sock->u.fd, buf, len, 0);
- if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
- set_error (errno, "send");
- return r;
+ if (!h->writer) {
+ r = send (sock->u.fd, buf, len, 0);
+ if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
+ set_error (errno, "send");
+ return r;
+ }
+ else if (h->writer_error) {
+ /* Concurrent writer thread signaled an error earlier, so
+ * return that here.
+ */
+ errno = h->writer_error;
+ set_error (errno, "concurrent writer thread error");
+ return -1;
+ }
+ else {
+ /* Pass the buffer to the concurrent writer thread. */
+ if (h->writer (h->writer_data, buf, len) == -1) {
+ if (errno == 0) errno = EIO;
+ set_error (errno, "submitting to concurrent writer thread");
+ return -1;
+ }
+ return len;
+ }
}
static int
diff --git a/podwrapper.pl.in b/podwrapper.pl.in
index 2471807..ecff2d6 100755
--- a/podwrapper.pl.in
+++ b/podwrapper.pl.in
@@ -324,7 +324,8 @@ foreach (@lines) {
die "$progname: $input: line too long:\n$_\n"
if length $_ > 76 &&
substr ($_, 0, 1) ne ' ' &&
- ! m/https?:/;
+ ! m/https?:/ &&
+ ! m/connected and finished handshaking/;
}
# Output man page.
--
2.21.0