When processing a server reply within the REPLY subgroup, we will
often hit a situation where recv() requires us to block until the next
NotifyRead. But since NotifyRead is the only permitted external action
while in this group, we are effectively blocking CmdIssue and
NotifyWrite events from happening until the server finishes the
in-progress reply, even though those events have no strict dependence
on the server's progress.
The solution is similar to commit dd101bde - any time we need to pause
the reply cycle, we need to save enough information to recall where we
left off then return to the READY state, then teach REPLY.START to
check if we are starting a fresh reply or resuming an earlier reply.
The state machine will still be blocked until the next POLLIN, but now
is in a position to also accept CmdIssue and NotifyWrite without
delay. With this patch in place, the only time is_state_processing is
true is during the ISSUE_COMMAND group when it is blocked on
NotifyWrite. Thus, once handshaking is complete, we can reliably
equate nbd_aio_get_direction() == DIRECTION_READ with is_ready(),
nbd_aio_get_direction() == DIRECTION_BOTH with is_processing() in the
ISSUE_COMMAND substate.
Oddly enough, I am not getting any measurable performance difference
with this patch applied and using examples/threaded-reads-and-writes
coupled with nbdkit. My explanation is that in the common case, once
a server has something to send, it is going to send the entire reply
as fast as it can, rather than sending a partial reply and waiting;
and our attempts to keep up to 64 commands in flight mean that our
real bottleneck is not sending our next command (even if we have to
wait for the server's reply to finish), but at the server (if we are
saturating the server's amount of in-flight requests, the server won't
read our next request until it has finished its current reply).
Still, I'm sure that it is possible to construct corner cases where
this shows more of an effect.
---
Applies on top of my series to add nbd_aio_pread_callback.
generator/generator | 22 +++++++++---------
generator/states-reply-simple.c | 4 ++++
generator/states-reply-structured.c | 32 +++++++++++++++++++++++++
generator/states-reply.c | 36 ++++++++++++++++++++++++++++-
lib/internal.h | 1 +
5 files changed, 83 insertions(+), 12 deletions(-)
diff --git a/generator/generator b/generator/generator
index 630260b..68a4fdf 100755
--- a/generator/generator
+++ b/generator/generator
@@ -682,14 +682,14 @@ and reply_state_machine = [
default_state with
name = "START";
comment = "Prepare to receive a reply from the remote server";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
default_state with
name = "RECV_REPLY";
comment = "Receive a reply from the remote server";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
@@ -723,7 +723,7 @@ and simple_reply_state_machine = [
default_state with
name = "RECV_READ_PAYLOAD";
comment = "Receiving the read payload for a simple reply";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
]
@@ -740,7 +740,7 @@ and structured_reply_state_machine = [
default_state with
name = "RECV_REMAINING";
comment = "Receiving the remaining part of a structured reply";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
@@ -754,49 +754,49 @@ and structured_reply_state_machine = [
default_state with
name = "RECV_ERROR";
comment = "Receive a structured reply error header";
- external_events = [ NotifyRead, "" ];
+ external_events = []
};
State {
default_state with
name = "RECV_ERROR_MESSAGE";
comment = "Receive a structured reply error message";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
default_state with
name = "RECV_ERROR_TAIL";
comment = "Receive a structured reply error tail";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
default_state with
name = "RECV_OFFSET_DATA";
comment = "Receive a structured reply offset-data header";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
default_state with
name = "RECV_OFFSET_DATA_DATA";
comment = "Receive a structured reply offset-data block of data";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
default_state with
name = "RECV_OFFSET_HOLE";
comment = "Receive a structured reply offset-hole header";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
default_state with
name = "RECV_BS_ENTRIES";
comment = "Receive a structured reply block-status payload";
- external_events = [ NotifyRead, "" ];
+ external_events = [];
};
State {
diff --git a/generator/states-reply-simple.c b/generator/states-reply-simple.c
index ddc91ce..3b63d07 100644
--- a/generator/states-reply-simple.c
+++ b/generator/states-reply-simple.c
@@ -53,6 +53,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
/* guaranteed by START */
assert (cmd);
diff --git a/generator/states-reply-structured.c b/generator/states-reply-structured.c
index 00659de..594525e 100644
--- a/generator/states-reply-structured.c
+++ b/generator/states-reply-structured.c
@@ -38,6 +38,10 @@
REPLY.STRUCTURED_REPLY.RECV_REMAINING:
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0: SET_NEXT_STATE (%CHECK);
}
return 0;
@@ -154,6 +158,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
length = be32toh (h->sbuf.sr.structured_reply.length);
msglen = be16toh (h->sbuf.sr.payload.error.error.len);
@@ -176,6 +184,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
length = be32toh (h->sbuf.sr.structured_reply.length);
msglen = be16toh (h->sbuf.sr.payload.error.error.len);
@@ -219,6 +231,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
error = be32toh (h->sbuf.sr.payload.error.error.error);
type = be16toh (h->sbuf.sr.structured_reply.type);
@@ -268,6 +284,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
length = be32toh (h->sbuf.sr.structured_reply.length);
offset = be64toh (h->sbuf.sr.payload.offset_data.offset);
@@ -324,6 +344,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
length = be32toh (h->sbuf.sr.structured_reply.length);
offset = be64toh (h->sbuf.sr.payload.offset_data.offset);
@@ -349,6 +373,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
offset = be64toh (h->sbuf.sr.payload.offset_hole.offset);
length = be32toh (h->sbuf.sr.payload.offset_hole.length);
@@ -410,6 +438,10 @@
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1:
+ save_reply_state (h);
+ SET_NEXT_STATE (%.READY);
+ return 0;
case 0:
length = be32toh (h->sbuf.sr.structured_reply.length);
diff --git a/generator/states-reply.c b/generator/states-reply.c
index 54f98c5..99e54f6 100644
--- a/generator/states-reply.c
+++ b/generator/states-reply.c
@@ -16,10 +16,43 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
-/* State machine for receiving reply messages from the server. */
+#include <assert.h>
+
+/* State machine for receiving reply messages from the server.
+ *
+ * Note that we never block while in this sub-group. If there is
+ * insufficient data to finish parsing a reply, requiring us to block
+ * until POLLIN, we instead track where in the state machine we left
+ * off, then return to READY to actually block. Then, on entry to
+ * REPLY.START, we can tell if this is the start of a new reply (rlen
+ * is 0, stay put), a continuation of the preamble (reply_cmd is NULL,
+ * resume with RECV_REPLY), or a continuation from any other location
+ * (reply_cmd contains the state to jump to).
+ */
+
+static void
+save_reply_state (struct nbd_handle *h)
+{
+ assert (h->reply_cmd);
+ assert (h->rlen);
+ h->reply_cmd->state = get_next_state (h);
+}
+
+/*----- End of prologue. -----*/
/* STATE MACHINE */ {
REPLY.START:
+ /* If rlen is non-zero, we are resuming an earlier reply cycle. */
+ if (h->rlen > 0) {
+ if (h->reply_cmd) {
+ assert (nbd_internal_is_state_processing (h->reply_cmd->state));
+ SET_NEXT_STATE (h->reply_cmd->state);
+ }
+ else
+ SET_NEXT_STATE (%RECV_REPLY);
+ return 0;
+ }
+
/* This state is entered when a read notification is received in the
* READY state. Therefore we know the socket is readable here.
* Reading a zero length now would indicate that the socket has been
@@ -66,6 +99,7 @@
REPLY.RECV_REPLY:
switch (recv_into_rbuf (h)) {
case -1: SET_NEXT_STATE (%.DEAD); return -1;
+ case 1: SET_NEXT_STATE (%.READY); return 0;
case 0: SET_NEXT_STATE (%CHECK_SIMPLE_OR_STRUCTURED_REPLY);
}
return 0;
diff --git a/lib/internal.h b/lib/internal.h
index a1e27df..662ff7a 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -253,6 +253,7 @@ struct command_in_flight {
uint32_t count;
void *data; /* Buffer for read/write */
struct command_cb cb;
+ enum state state; /* State to resume with on next POLLIN */
bool data_seen; /* For read, true if at least one data chunk seen */
uint32_t error; /* Local errno value */
};
--
2.20.1