---
docs/libnbd.pod | 73 +++++++++++++++++++++++++++++++++++++++++++++
generator/generator | 52 +++++++++++++++++++++++++++-----
lib/handle.c | 32 ++++++++++++++++++++
lib/internal.h | 7 +++++
lib/socket.c | 22 +++++++++++---
podwrapper.pl.in | 3 +-
6 files changed, 177 insertions(+), 12 deletions(-)
diff --git a/docs/libnbd.pod b/docs/libnbd.pod
index 7cbb9cd..ab74be3 100644
--- a/docs/libnbd.pod
+++ b/docs/libnbd.pod
@@ -385,6 +385,79 @@ If you are issuing multiple in-flight requests (see above) and
limiting the number, then the limit should be applied to each
individual NBD connection.
+=head2 Concurrent writer thread
+
+To achieve the maximum possible performance from libnbd and NBD
+servers, as well as the above techniques you must also use a
+concurrent writer thread. This feature allows requests to be issued
+on the NBD socket at the same time that replies are being read from
+the socket. In other words L<send(2)> and L<recv(2)> calls will be
+running at the same time on the same socket.
+
+There is a full example using a concurrent writer available at
+L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c>
+
+To implement this, you change your ordinary AIO code in four ways:
+
+=over 4
+
+=item 1. Call nbd_set_concurrent_writer
+
+ struct writer_data {
+ struct nbd_handle *nbd;
+ /* other data here as required */
+ } data;
+
+ nbd_set_concurrent_writer (nbd, &data, writer);
+
+This function can be called on the handle at any time, either after
+the handle is created or after the connection and handshaking has
+completed.
+
+=item 2. Implement non-blocking writer callback
+
+C<writer> is a I<non-blocking> callback which enqueues the buffer into
+a ring or similar FIFO structure:
+
+ struct ring_item {
+ struct writer_data *data;
+ const void *buf;
+ size_t len;
+ };
+
+ void writer (void *data, const void *buf, size_t len)
+ {
+ struct ring_item item;
+
+ /* add (data, buf, len) to a shared ring */
+ item.data = data;
+ item.buf = malloc (len);
+ memcpy (item.buf, buf, len);
+ item.len = len;
+ ring_add (&item);
+
+ writer_signal (); /* kick the writer thread */
+ }
+
+=item 3. Implement writer thread
+
+You must also supply another thread which picks up data off the ring
+and writes it to the socket (see C<nbd_aio_get_fd>). If there an
+error when writing to the socket, call C<nbd_concurrent_writer_error>
+with the C<errno>.
+
+You have a choice of whether to implement one thread per nbd_handle or
+one thread shared between all handles.
+
+=item 4. Modify main loop
+
+Finally your main loop can unconditionally call
+C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns C<WRITE>
+or C<BOTH> (since the concurrent thread can always enqueue more data
+and so is always "ready to write").
+
+=back
+
=head1 ENCRYPTION AND AUTHENTICATION
The NBD protocol and libnbd supports TLS (sometimes incorrectly called
diff --git a/generator/generator b/generator/generator
index db7c10f..2b48c67 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd
(see qemu-nbd I<-B> option). See also C<nbd_block_status>.";
};
+ "set_concurrent_writer", {
+ default_call with
+ args = [ Opaque "data";
+ CallbackPersist ("writer", [Opaque "data";
+ BytesIn ("buf", "len")]) ];
+ ret = RErr;
+ permitted_states = [ Created; Connecting; Connected ];
+ shortdesc = "set a concurrent writer thread";
+ longdesc = "\
+Provide an optional concurrent writer thread for better performance.
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+ };
+
+ "concurrent_writer_error", {
+ default_call with
+ args = [ Int "err" ]; ret = RErr;
+ shortdesc = "signal an error from the concurrent writer thread";
+ longdesc = "\
+This can be called from the concurrent writer thread to signal
+that there was an error writing to the socket. As there is no
+way to recover from such errors, the connection will move to the
+dead state soon after.
+
+The parameter is the C<errno> returned by the failed L<send(2)> call.
+It must be non-zero.
+
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+ };
+
"connect_uri", {
default_call with
args = [ String "uri" ]; ret = RErr;
@@ -3157,12 +3186,13 @@ let print_python_binding name { args; ret } =
pr " PyObject *py_%s = PyList_New (%s);\n" n len;
pr " for (size_t i = 0; i < %s; ++i)\n" len;
pr " PyList_SET_ITEM (py_%s, i, PyLong_FromUnsignedLong
(%s[i]));\n" n n
+ | BytesIn _ -> ()
| Opaque n ->
pr " struct %s_%s_data *_data = %s;\n" name cb_name n
| String n
| UInt64 n -> ()
(* The following not yet implemented for callbacks XXX *)
- | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+ | ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
| Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
@@ -3173,11 +3203,12 @@ let print_python_binding name { args; ret } =
List.iter (
function
| ArrayAndLen (UInt32 n, len) -> pr " \"O\""
+ | BytesIn (n, len) -> pr " \"y#\""
| Opaque n -> pr " \"O\""
| String n -> pr " \"s\""
| UInt64 n -> pr " \"K\""
(* The following not yet implemented for callbacks XXX *)
- | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+ | ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
| Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
@@ -3186,11 +3217,12 @@ let print_python_binding name { args; ret } =
List.iter (
function
| ArrayAndLen (UInt32 n, _) -> pr ", py_%s" n
+ | BytesIn (n, len) -> pr ", %s, (int) %s" n len
| Opaque _ -> pr ", _data->data"
| String n
| UInt64 n -> pr ", %s" n
(* The following not yet implemented for callbacks XXX *)
- | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+ | ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
| Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
@@ -3217,11 +3249,12 @@ let print_python_binding name { args; ret } =
function
| ArrayAndLen (UInt32 n, _) ->
pr " Py_DECREF (py_%s);\n" n
+ | BytesIn _
| String _
| UInt64 _
| Opaque _ -> ()
(* The following not yet implemented for callbacks XXX *)
- | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+ | ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
| Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
@@ -3899,10 +3932,11 @@ let print_ocaml_binding (name, { args; ret }) =
let argnames =
List.map (
function
- | ArrayAndLen (UInt32 n, _) | String n | UInt64 n | Opaque n ->
+ | ArrayAndLen (UInt32 n, _) | BytesIn (n, _)
+ | String n | UInt64 n | Opaque n ->
n ^ "v"
(* The following not yet implemented for callbacks XXX *)
- | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+ | ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _
| Callback _ | CallbackPersist _
| Flags _ | Int _ | Int64 _ | Path _
@@ -3928,6 +3962,9 @@ let print_ocaml_binding (name, { args; ret }) =
| ArrayAndLen (UInt32 n, count) ->
pr " %sv = nbd_internal_ocaml_alloc_int32_array (%s, %s);\n"
n n count;
+ | BytesIn (n, len) ->
+ pr " %sv = caml_alloc_string (%s);\n" n len;
+ pr " memcpy (String_val (%sv), %s, %s);\n" n n len
| String n ->
pr " %sv = caml_copy_string (%s);\n" n n
| UInt64 n ->
@@ -3937,7 +3974,7 @@ let print_ocaml_binding (name, { args; ret }) =
pr " fnv = *_%s->cb;\n" n;
pr " %sv = *_%s->data;\n" n n
(* The following not yet implemented for callbacks XXX *)
- | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+ | ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
| Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
@@ -4192,6 +4229,7 @@ let generate_ocaml_nbd_c () =
pr "\n";
pr "#include <stdio.h>\n";
pr "#include <stdlib.h>\n";
+ pr "#include <string.h>\n";
pr "\n";
pr "#include <libnbd.h>\n";
pr "\n";
diff --git a/lib/handle.c b/lib/handle.c
index cc311ba..cc5d40f 100644
--- a/lib/handle.c
+++ b/lib/handle.c
@@ -215,6 +215,38 @@ nbd_add_close_callback (struct nbd_handle *h, nbd_close_callback cb,
void *data)
return ret;
}
+int
+nbd_unlocked_set_concurrent_writer (struct nbd_handle *h,
+ void *data, writer_cb writer)
+{
+ /* I suppose we could allow this, but it seems more likely that
+ * it's an error rather than intentional.
+ */
+ if (h->writer != NULL) {
+ set_error (EINVAL, "concurrent writer was already set for this handle");
+ return -1;
+ }
+
+ h->writer = writer;
+ h->writer_data = data;
+ return 0;
+}
+
+int
+nbd_unlocked_concurrent_writer_error (struct nbd_handle *h, int err)
+{
+ if (err != 0) {
+ set_error (EINVAL, "concurrent writer error parameter must be non-zero");
+ return -1;
+ }
+
+ /* Ignore second and subsequent calls, record only the first error. */
+ if (h->writer_error == 0)
+ h->writer_error = err;
+
+ return 0;
+}
+
const char *
nbd_unlocked_get_package_name (struct nbd_handle *h)
{
diff --git a/lib/internal.h b/lib/internal.h
index c8e5094..c41741d 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -43,6 +43,8 @@ struct close_callback;
struct socket;
struct command_in_flight;
+typedef void (*writer_cb) (void *data, const void *buf, size_t len);
+
struct nbd_handle {
/* Lock protecting concurrent access to the handle. */
pthread_mutex_t lock;
@@ -90,6 +92,11 @@ struct nbd_handle {
/* The socket or a wrapper if using GnuTLS. */
struct socket *sock;
+ /* Writer callback if using concurrent writer. */
+ void *writer_data;
+ writer_cb writer;
+ int writer_error;
+
/* Generic way to read into a buffer - set rbuf to point to a
* buffer, rlen to the amount of data you expect, and in the state
* machine call recv_into_rbuf.
diff --git a/lib/socket.c b/lib/socket.c
index f48e455..c6fba6d 100644
--- a/lib/socket.c
+++ b/lib/socket.c
@@ -46,10 +46,24 @@ socket_send (struct nbd_handle *h,
{
ssize_t r;
- r = send (sock->u.fd, buf, len, 0);
- if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
- set_error (errno, "send");
- return r;
+ if (!h->writer) {
+ r = send (sock->u.fd, buf, len, 0);
+ if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
+ set_error (errno, "send");
+ return r;
+ }
+ else if (h->writer_error) {
+ /* Concurrent writer thread signaled an error earlier, so
+ * return that here.
+ */
+ set_error (h->writer_error, "concurrent writer thread error");
+ return -1;
+ }
+ else {
+ /* Pass the buffer to the concurrent writer thread. */
+ h->writer (h->writer_data, buf, len);
+ return len;
+ }
}
static int
diff --git a/podwrapper.pl.in b/podwrapper.pl.in
index 2471807..ecff2d6 100755
--- a/podwrapper.pl.in
+++ b/podwrapper.pl.in
@@ -324,7 +324,8 @@ foreach (@lines) {
die "$progname: $input: line too long:\n$_\n"
if length $_ > 76 &&
substr ($_, 0, 1) ne ' ' &&
- ! m/https?:/;
+ ! m/https?:/ &&
+ ! m/connected and finished handshaking/;
}
# Output man page.
--
2.21.0