As already noted in our state machine, a client that batches up a
large read followed by large writes, coupled with a server that only
processes commands in order, can result in deadlock (the server won't
read more until we unblock its ability to write out its reply to our
first command; but we aren't willing to read until we are done writing
out our second command). Break the deadlock by teaching the generator
that while we are in the middle of writing a command, we must remain
responsive to read_notify events; if the server has data for us to
read, we should consume that before jumping back into the middle of
our command issue. With the new overlapping execution, we also have to
rearrange the storage - a reply will scribble over conn->sbuf, but we
must not corrupt the bytes being sent during SEND_REQUEST, so we have
to split the reply storage to a separate part of conn.
---
generator/generator | 26 ++++++++++++++------
generator/states-issue-command.c | 42 +++++++++++++++++++++++++-------
lib/internal.h | 7 +++++-
3 files changed, 57 insertions(+), 18 deletions(-)
diff --git a/generator/generator b/generator/generator
index f8d59a0..e37c71b 100755
--- a/generator/generator
+++ b/generator/generator
@@ -621,12 +621,6 @@ and issue_command_state_machine = [
State {
default_state with
name = "START";
- (* XXX There's a possible deadlock here if a server cannot
- * handle multiple requests pipelined on a single connection.
- * We could try to issue a command and block, but reads might
- * be available. It should be possible to break this with
- * another state.
- *)
comment = "Begin issuing a command to the remote server";
external_events = [];
};
@@ -635,7 +629,15 @@ and issue_command_state_machine = [
default_state with
name = "SEND_REQUEST";
comment = "Sending a request to the remote server";
- external_events = [ NotifyWrite, "" ];
+ external_events = [ NotifyWrite, "";
+ NotifyRead, "PAUSE_SEND_REQUEST" ];
+ };
+
+ State {
+ default_state with
+ name = "PAUSE_SEND_REQUEST";
+ comment = "Interrupt send request to receive an earlier command's
reply";
+ external_events = [];
};
State {
@@ -649,7 +651,15 @@ and issue_command_state_machine = [
default_state with
name = "SEND_WRITE_PAYLOAD";
comment = "Sending the write payload to the remote server";
- external_events = [ NotifyWrite, "" ];
+ external_events = [ NotifyWrite, "";
+ NotifyRead, "PAUSE_WRITE_PAYLOAD" ];
+ };
+
+State {
+ default_state with
+ name = "PAUSE_WRITE_PAYLOAD";
+ comment = "Interrupt write payload to receive an earlier command's
reply";
+ external_events = [];
};
State {
diff --git a/generator/states-issue-command.c b/generator/states-issue-command.c
index e24ea34..5f00aa7 100644
--- a/generator/states-issue-command.c
+++ b/generator/states-issue-command.c
@@ -25,14 +25,23 @@
assert (conn->cmds_to_issue != NULL);
cmd = conn->cmds_to_issue;
- conn->sbuf.request.magic = htobe32 (NBD_REQUEST_MAGIC);
- conn->sbuf.request.flags = htobe16 (cmd->flags);
- conn->sbuf.request.type = htobe16 (cmd->type);
- conn->sbuf.request.handle = htobe64 (cmd->handle);
- conn->sbuf.request.offset = htobe64 (cmd->offset);
- conn->sbuf.request.count = htobe32 ((uint32_t) cmd->count);
- conn->wbuf = &conn->sbuf;
- conn->wlen = sizeof (conn->sbuf.request);
+ /* Were we interrupted by reading a reply to an earlier command? */
+ if (conn->wlen) {
+ if (conn->in_write_payload)
+ SET_NEXT_STATE(%SEND_WRITE_PAYLOAD);
+ else
+ SET_NEXT_STATE(%SEND_REQUEST);
+ return 0;
+ }
+
+ conn->request.magic = htobe32 (NBD_REQUEST_MAGIC);
+ conn->request.flags = htobe16 (cmd->flags);
+ conn->request.type = htobe16 (cmd->type);
+ conn->request.handle = htobe64 (cmd->handle);
+ conn->request.offset = htobe64 (cmd->offset);
+ conn->request.count = htobe32 ((uint32_t) cmd->count);
+ conn->wbuf = &conn->request;
+ conn->wlen = sizeof (conn->request);
SET_NEXT_STATE (%SEND_REQUEST);
return 0;
@@ -43,12 +52,19 @@
}
return 0;
+ ISSUE_COMMAND.PAUSE_SEND_REQUEST:
+ assert (conn->wlen);
+ assert (conn->cmds_to_issue != NULL);
+ conn->in_write_payload = false;
+ SET_NEXT_STATE (%^REPLY.START);
+ return 0;
+
ISSUE_COMMAND.PREPARE_WRITE_PAYLOAD:
struct command_in_flight *cmd;
assert (conn->cmds_to_issue != NULL);
cmd = conn->cmds_to_issue;
- assert (cmd->handle == be64toh (conn->sbuf.request.handle));
+ assert (cmd->handle == be64toh (conn->request.handle));
if (cmd->type == NBD_CMD_WRITE) {
conn->wbuf = cmd->data;
conn->wlen = cmd->count;
@@ -65,9 +81,17 @@
}
return 0;
+ ISSUE_COMMAND.PAUSE_WRITE_PAYLOAD:
+ assert (conn->wlen);
+ assert (conn->cmds_to_issue != NULL);
+ conn->in_write_payload = true;
+ SET_NEXT_STATE (%^REPLY.START);
+ return 0;
+
ISSUE_COMMAND.FINISH:
struct command_in_flight *cmd;
+ assert (!conn->wlen);
assert (conn->cmds_to_issue != NULL);
cmd = conn->cmds_to_issue;
conn->cmds_to_issue = cmd->next;
diff --git a/lib/internal.h b/lib/internal.h
index de9b8bc..13832d7 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -133,7 +133,6 @@ struct nbd_connection {
} payload;
} __attribute__((packed)) or;
struct nbd_export_name_option_reply export_name_reply;
- struct nbd_request request;
struct nbd_simple_reply simple_reply;
struct {
struct nbd_structured_reply structured_reply;
@@ -149,6 +148,12 @@ struct nbd_connection {
uint32_t nrqueries;
} sbuf;
+ /* Issuing a command must use a buffer separate from sbuf, for the
+ * case when we interrupt a request to service a reply.
+ */
+ struct nbd_request request;
+ bool in_write_payload;
+
/* When connecting, this stores the socket address. */
struct sockaddr_storage connaddr;
socklen_t connaddrlen;
--
2.20.1