Time to wire up user access to react to structured reads as the chunks
come in, exposing the framework added in the last patch.
I chose to use an int for the callback status rather than an enum for
ease of wiring things up to the various language bindings. Still, it
required quite a few tweaks to the generator to support an Int type in
a callback.
It's easy to test callback behavior for LIBNBD_READ_DATA/HOLE (the
next patch will add interop tests with qemu-nbd), but at present, I
don't know of any server that responds with
NBD_REPLY_TYPE_ERROR_OFFSET, which in turn makes testing
LIBNBD_READ_ERROR difficult. I used the following hack on top of
qemu.git commit d1bf88e5 to trigger an offset error for local testing:
| diff --git i/nbd/server.c w/nbd/server.c
| index 10faedcfc55d..804dced0bc8d 100644
| --- i/nbd/server.c
| +++ w/nbd/server.c
| @@ -1838,12 +1838,30 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient
*client,
|
| trace_nbd_co_send_structured_read_hole(handle, offset + progress,
| pnum);
| - set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
| + set_be_chunk(&chunk.h, 0,
| NBD_REPLY_TYPE_OFFSET_HOLE,
| handle, sizeof(chunk) - sizeof(chunk.h));
| stq_be_p(&chunk.offset, offset + progress);
| stl_be_p(&chunk.length, pnum);
| ret = nbd_co_send_iov(client, iov, 1, errp);
| + if (ret == 0) {
| + NBDStructuredError chunk;
| + int64_t off;
| + struct iovec iov[] = {
| + {.iov_base = &chunk, .iov_len = sizeof(chunk)},
| + {.iov_base = (char*)"MSG", .iov_len = 3},
| + {.iov_base = &off, .iov_len = sizeof(off)},
| + };
| +
| + set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
| + NBD_REPLY_TYPE_ERROR_OFFSET,
| + handle, sizeof(chunk) - sizeof(chunk.h) +
| + 3 + sizeof(off));
| + stl_be_p(&chunk.error, NBD_EPERM);
| + stw_be_p(&chunk.message_length, 3);
| + stq_be_p(&off, offset + progress);
| + ret = nbd_co_send_iov(client, iov, 3, errp);
| + }
| } else {
| ret = blk_pread(exp->blk, offset + progress + exp->dev_offset,
| data + progress, pnum);
---
generator/generator | 113 +++++++++++++++++++++++++++++++++++++++-----
lib/rw.c | 32 +++++++++++++
2 files changed, 134 insertions(+), 11 deletions(-)
diff --git a/generator/generator b/generator/generator
index 2614689..ce77f17 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1305,7 +1305,72 @@ Issue a read command to the NBD server for the range starting
at C<offset> and ending at C<offset> + C<count> - 1. NBD
can only read all or nothing using this call. The call
returns when the data has been read fully into C<buf> or there is an
-error.
+error. See also C<nbd_pread_callback>, if finer visibility is
+required into the server's replies.
+
+The C<flags> parameter must be C<0> for now (it exists for future NBD
+protocol extensions).";
+ };
+
+ "pread_callback", {
+ default_call with
+ args = [ BytesOut ("buf", "count"); UInt64 "offset";
+ Opaque "data";
+ Callback ("chunk", [Opaque "data";
+ BytesIn ("buf", "count"); UInt64
"offset";
+ Int "status";]);
+ Flags "flags" ];
+ ret = RErr;
+ permitted_states = [ Connected ];
+ shortdesc = "read from the NBD server";
+ longdesc = "\
+Issue a read command to the NBD server for the range starting
+at C<offset> and ending at C<offset> + C<count> - 1. The server's
+response may be subdivided into chunks which may arrive out of order
+before reassembly into the original buffer; the C<chunk> callback
+is used for notification after each chunk arrives, and may perform
+additional sanity checking on the server's reply. The callback cannot
+call C<nbd_*> APIs on the same handle since it holds the handle lock
+and will cause a deadlock. If the callback returns C<-1>, and no
+earlier error has been detected, then the overall read command will
+fail with the same value of C<errno> left after the failing callback;
+but any further chunks will still invoke the callback.
+
+The C<chunk> function is called once per chunk of data received.
+The C<buf> and C<count> parameters represent the subset of the original
+buffer which has just been populated by results from the server. The
+C<offset> parameter represents the absolute offset at which C<buf>
+begins within the image (note that this is not the relative offset
+of C<buf> within the original buffer). The C<status> parameter is
+one of
+
+=over 4
+
+=item C<LIBNBD_READ_DATA> = 1
+
+C<buf> was populated with C<count> bytes of data.
+
+=item C<LIBNBD_READ_HOLE> = 2
+
+C<buf> represents a hole, and contains C<count> NUL bytes.
+
+=item C<LIBNBD_READ_ERR> = 3
+
+C<count> is 0, and C<buf> represents the offset of where an error
+occurred. The error is visible in C<errno> or by calling
+C<nbd_get_errno>.
+
+=back
+
+It is possible for the C<chunk> function to be called more times than
+you expect (if the server is buggy). It is also possible that the
+C<chunk> function is not called at all (in particular,
+C<LIBNBD_READ_ERROR> is used only when an error is associated with a
+particular offset), but you are guaranteed that the callback was
+called at least once if the overall read succeeds. Libnbd does not
+validate that the server obeyed the requirement that a read call must
+not have overlapping chunks and must not succeed without enough chunks
+to cover the entire request.
The C<flags> parameter must be C<0> for now (it exists for future NBD
protocol extensions).";
@@ -1591,6 +1656,26 @@ C<buf> is valid until the command has completed. Other
parameters behave as documented in C<nbd_pread>.";
};
+ "aio_pread_callback", {
+ default_call with
+ args = [ BytesPersistOut ("buf", "count"); UInt64
"offset";
+ Opaque "data";
+ CallbackPersist ("chunk", [Opaque "data";
+ BytesIn ("buf", "count");
+ UInt64 "offset";
+ Int "status";]);
+ Flags "flags" ];
+ ret = RInt64;
+ permitted_states = [ Connected ];
+ shortdesc = "read from the NBD server";
+ longdesc = "\
+Issue a read command to the NBD server. This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error. To check if the command completed, call
+C<nbd_aio_command_completed>. Parameters behave as documented
+in C<nbd_pread_callback>.";
+ };
+
"aio_pwrite", {
default_call with
args = [ BytesPersistIn ("buf", "count"); UInt64
"offset"; Flags "flags" ];
@@ -3264,7 +3349,8 @@ let print_python_binding name { args; ret } =
pr " PyObject *py_%s = PyList_New (%s);\n" n len;
pr " for (size_t i = 0; i < %s; ++i)\n" len;
pr " PyList_SET_ITEM (py_%s, i, PyLong_FromUnsignedLong
(%s[i]));\n" n n
- | BytesIn _ -> ()
+ | BytesIn _
+ | Int _ -> ()
| Opaque n ->
pr " struct %s_%s_data *_data = %s;\n" name cb_name n
| String n
@@ -3272,7 +3358,7 @@ let print_python_binding name { args; ret } =
(* The following not yet implemented for callbacks XXX *)
| ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
- | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+ | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
) args;
pr "\n";
@@ -3282,13 +3368,14 @@ let print_python_binding name { args; ret } =
function
| ArrayAndLen (UInt32 n, len) -> pr " \"O\""
| BytesIn (n, len) -> pr " \"y#\""
+ | Int n -> pr " \"i\""
| Opaque n -> pr " \"O\""
| String n -> pr " \"s\""
| UInt64 n -> pr " \"K\""
(* The following not yet implemented for callbacks XXX *)
| ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
- | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+ | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
) args;
pr " \")\"";
@@ -3297,12 +3384,13 @@ let print_python_binding name { args; ret } =
| ArrayAndLen (UInt32 n, _) -> pr ", py_%s" n
| BytesIn (n, len) -> pr ", %s, (int) %s" n len
| Opaque _ -> pr ", _data->data"
+ | Int n
| String n
| UInt64 n -> pr ", %s" n
(* The following not yet implemented for callbacks XXX *)
| ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
- | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+ | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
) args;
pr ");\n";
@@ -3332,13 +3420,14 @@ let print_python_binding name { args; ret } =
| ArrayAndLen (UInt32 n, _) ->
pr " Py_DECREF (py_%s);\n" n
| BytesIn _
+ | Int _
+ | Opaque _
| String _
- | UInt64 _
- | Opaque _ -> ()
+ | UInt64 _ -> ()
(* The following not yet implemented for callbacks XXX *)
| ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
- | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+ | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
) args;
pr " return ret;\n";
@@ -3989,13 +4078,13 @@ let print_ocaml_binding (name, { args; ret }) =
List.map (
function
| ArrayAndLen (UInt32 n, _) | BytesIn (n, _)
- | String n | UInt64 n | Opaque n ->
+ | Int n | Opaque n | String n | UInt64 n ->
n ^ "v"
(* The following not yet implemented for callbacks XXX *)
| ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _
| Callback _ | CallbackPersist _
- | Flags _ | Int _ | Int64 _ | Path _
+ | Flags _ | Int64 _ | Path _
| SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
) args in
@@ -4021,6 +4110,8 @@ let print_ocaml_binding (name, { args; ret }) =
| BytesIn (n, len) ->
pr " %sv = caml_alloc_string (%s);\n" n len;
pr " memcpy (String_val (%sv), %s, %s);\n" n n len
+ | Int n ->
+ pr " %sv = caml_copy_int32 (%s);\n" n n
| String n ->
pr " %sv = caml_copy_string (%s);\n" n n
| UInt64 n ->
@@ -4032,7 +4123,7 @@ let print_ocaml_binding (name, { args; ret }) =
(* The following not yet implemented for callbacks XXX *)
| ArrayAndLen _ | Bool _ | BytesOut _
| BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
- | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+ | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
| UInt _ | UInt32 _ -> assert false
) args;
diff --git a/lib/rw.c b/lib/rw.c
index dc81c57..669987e 100644
--- a/lib/rw.c
+++ b/lib/rw.c
@@ -55,6 +55,22 @@ nbd_unlocked_pread (struct nbd_handle *h, void *buf,
return wait_for_command (h, ch);
}
+/* Issue a read command with callbacks and wait for the reply. */
+int
+nbd_unlocked_pread_callback (struct nbd_handle *h, void *buf,
+ size_t count, uint64_t offset,
+ void *opaque, read_fn read, uint32_t flags)
+{
+ int64_t ch;
+
+ ch = nbd_unlocked_aio_pread_callback (h, buf, count, offset,
+ opaque, read, flags);
+ if (ch == -1)
+ return -1;
+
+ return wait_for_command (h, ch);
+}
+
/* Issue a write command and wait for the reply. */
int
nbd_unlocked_pwrite (struct nbd_handle *h, const void *buf,
@@ -231,6 +247,22 @@ nbd_unlocked_aio_pread (struct nbd_handle *h, void *buf,
buf, NULL);
}
+int64_t
+nbd_unlocked_aio_pread_callback (struct nbd_handle *h, void *buf,
+ size_t count, uint64_t offset,
+ void *opaque, read_fn read, uint32_t flags)
+{
+ struct command_cb cb = { .opaque = opaque, .fn.read = read, };
+
+ if (flags != 0) {
+ set_error (EINVAL, "invalid flag: %" PRIu32, flags);
+ return -1;
+ }
+
+ return nbd_internal_command_common (h, 0, NBD_CMD_READ, offset, count,
+ buf, &cb);
+}
+
int64_t
nbd_unlocked_aio_pwrite (struct nbd_handle *h, const void *buf,
size_t count, uint64_t offset,
--
2.20.1