When a server supports structured reads, knowing where the holes are
in the responses can aid the client's behavior. Furthermore, since
the server can send data out of order or interleaved with other
replies, a client may be able to start operating on the partial
results as soon as they are available rather than waiting for the
entire buffer to be reconstructed. As such, it makes sense to have an
optional callback invoked each time we finish reading a chunk (for
simple replies, we behave as if everything was a single data chunk).
This also requires special handling of NBD_REPLY_TYPE_ERROR_OFFSET, as
that is the one situation where we want to call the callback to inform
them that the server pointed out a specific offset for the error; all
other errors continue to affect only the final pread return value
without any use of the callback. If any callback fails, and if no
prior error was set, then the callback's failure becomes the failure
reason for the overall read.
Nothing actually passes a callback function yet, so for now this is no
functional change; but this will make it possible for the next patch
to add an 'nbd_aio_pread_callback' API.
---
generator/generator | 4 +++
generator/states-reply-simple.c | 15 +++++++++-
generator/states-reply-structured.c | 43 +++++++++++++++++++++++++++--
lib/internal.h | 4 ++-
4 files changed, 62 insertions(+), 4 deletions(-)
diff --git a/generator/generator b/generator/generator
index 4c81859..2614689 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1934,6 +1934,10 @@ let constants = [
"CMD_FLAG_FUA", 1 lsl 0;
"CMD_FLAG_NO_HOLE", 1 lsl 1;
"CMD_FLAG_REQ_ONE", 1 lsl 3;
+
+ "READ_DATA", 1;
+ "READ_HOLE", 2;
+ "READ_ERROR", 3;
]
(*----------------------------------------------------------------------*)
diff --git a/generator/states-reply-simple.c b/generator/states-reply-simple.c
index 935f6d2..ddc91ce 100644
--- a/generator/states-reply-simple.c
+++ b/generator/states-reply-simple.c
@@ -49,9 +49,22 @@
return 0;
REPLY.SIMPLE_REPLY.RECV_READ_PAYLOAD:
+ struct command_in_flight *cmd = h->reply_cmd;
+
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
- case 0: SET_NEXT_STATE (%^FINISH_COMMAND);
+ case 0:
+ /* guaranteed by START */
+ assert (cmd);
+ if (cmd->cb.fn.read) {
+ assert (cmd->error == 0);
+ errno = 0;
+ if (cmd->cb.fn.read (cmd->cb.opaque, cmd->data, cmd->count,
+ cmd->offset, LIBNBD_READ_DATA) == -1)
+ cmd->error = errno ? errno : EPROTO;
+ }
+
+ SET_NEXT_STATE (%^FINISH_COMMAND);
}
return 0;
diff --git a/generator/states-reply-structured.c b/generator/states-reply-structured.c
index 657106e..00659de 100644
--- a/generator/states-reply-structured.c
+++ b/generator/states-reply-structured.c
@@ -225,7 +225,9 @@
assert (cmd); /* guaranteed by CHECK */
- /* Sanity check that any error offset is in range */
+ /* Sanity check that any error offset is in range, then invoke
+ * user callback if present.
+ */
if (type == NBD_REPLY_TYPE_ERROR_OFFSET) {
offset = be64toh (h->sbuf.sr.payload.error.offset);
if (offset < cmd->offset || offset >= cmd->offset + cmd->count) {
@@ -237,6 +239,18 @@
offset, cmd->offset, cmd->count);
return -1;
}
+ if (cmd->cb.fn.read) {
+ /* Different from successful reads: if the callback clears
+ * errno but still fails, then use the server's error below.
+ */
+ errno = nbd_internal_errno_of_nbd_error (error);
+ set_error (errno, "server reported read failure at offset 0x%" PRIx64,
+ offset);
+ if (cmd->cb.fn.read (cmd->cb.opaque, cmd->data + (offset -
cmd->offset),
+ 0, offset, LIBNBD_READ_ERROR) == -1)
+ if (cmd->error == 0 && errno)
+ cmd->error = errno;
+ }
}
/* Preserve first error encountered */
@@ -304,9 +318,27 @@
return 0;
REPLY.STRUCTURED_REPLY.RECV_OFFSET_DATA_DATA:
+ struct command_in_flight *cmd = h->reply_cmd;
+ uint64_t offset;
+ uint32_t length;
+
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
- case 0: SET_NEXT_STATE (%FINISH);
+ case 0:
+ length = be32toh (h->sbuf.sr.structured_reply.length);
+ offset = be64toh (h->sbuf.sr.payload.offset_data.offset);
+
+ assert (cmd); /* guaranteed by CHECK */
+ if (cmd->cb.fn.read) {
+ errno = 0;
+ if (cmd->cb.fn.read (cmd->cb.opaque, cmd->data + (offset -
cmd->offset),
+ length - sizeof offset, offset,
+ LIBNBD_READ_DATA) == -1)
+ if (cmd->error == 0)
+ cmd->error = errno ? errno : EPROTO;
+ }
+
+ SET_NEXT_STATE (%FINISH);
}
return 0;
@@ -357,6 +389,13 @@
}
memset (cmd->data + offset, 0, length);
+ if (cmd->cb.fn.read) {
+ errno = 0;
+ if (cmd->cb.fn.read (cmd->cb.opaque, cmd->data + offset, length,
+ cmd->offset + offset, LIBNBD_READ_HOLE) == -1)
+ if (cmd->error == 0)
+ cmd->error = errno ? errno : EPROTO;
+ }
SET_NEXT_STATE(%FINISH);
}
diff --git a/lib/internal.h b/lib/internal.h
index cb0e170..a1e27df 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -233,12 +233,14 @@ struct socket {
typedef int (*extent_fn) (void *data, const char *metacontext, uint64_t offset,
uint32_t *entries, size_t nr_entries);
+typedef int (*read_fn) (void *data, const void *buf, size_t count,
+ uint64_t offset, int status);
struct command_cb {
void *opaque;
union {
extent_fn extent;
- /* More to come */
+ read_fn read;
} fn;
};
--
2.20.1