---
This is what I used to provoke the deadlocks; before my patch series,
it was succeeding for a fully-parallel server:
 nbdkit -U - memory 2M --run './deadlock $unixsocket'
as well as for a serialized server that didn't trip up Linux kernel
buffering limits:
 nbdkit -U - --filter=noparallel memory 256k --run './deadlock $unixsocket'
but was reliably hanging with both client and server on larger buffers:
 nbdkit -U - --filter=noparallel memory 512k --run './deadlock $unixsocket'
Post-patch, you can see the state machine now shift through
ISSUE_COMMAND.PAUSE_WRITE_PAYLOAD through an entire
REPLY.START..REPLY.FINISH sequence, and then resume in the middle of
the ISSUE_COMMAND.SEND_WRITE_PAYLOAD.
 .gitignore           |   1 +
 examples/Makefile.am |  10 +++
 examples/deadlock.c  | 200 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 211 insertions(+)
 create mode 100644 examples/deadlock.c
diff --git a/.gitignore b/.gitignore
index 66ff811..c135c26 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,6 +30,7 @@ Makefile.in
 /docs/libnbd.3
 /docs/libnbd-api.3
 /docs/libnbd-api.pod
+/examples/deadlock
 /examples/threaded-reads-and-writes
 /examples/simple-fetch-first-sector
 /examples/simple-reads-and-writes
diff --git a/examples/Makefile.am b/examples/Makefile.am
index be3f21d..16b3804 100644
--- a/examples/Makefile.am
+++ b/examples/Makefile.am
@@ -18,6 +18,7 @@
 include $(top_srcdir)/subdir-rules.mk
 noinst_PROGRAMS = \
+	deadlock \
 	simple-fetch-first-sector \
 	simple-reads-and-writes \
 	threaded-reads-and-writes
@@ -50,3 +51,12 @@ threaded_reads_and_writes_CFLAGS = \
 threaded_reads_and_writes_LDADD = \
 	$(top_builddir)/lib/libnbd.la \
 	$(PTHREAD_LIBS)
+
+deadlock_SOURCES = \
+	deadlock.c
+deadlock_CPPFLAGS = \
+	-I$(top_srcdir)/include
+deadlock_CFLAGS = \
+	$(WARNINGS_CFLAGS)
+deadlock_LDADD = \
+	$(top_builddir)/lib/libnbd.la
diff --git a/examples/deadlock.c b/examples/deadlock.c
new file mode 100644
index 0000000..1c9be8d
--- /dev/null
+++ b/examples/deadlock.c
@@ -0,0 +1,200 @@
+/* This example can be copied, used and modified for any purpose
+ * without restrictions.
+ *
+ * Example usage with nbdkit:
+ *
+ * nbdkit -U - --filter=noparallel memory 2M --run './deadlock $unixsocket'
+ *
+ * This will attempt to create a deadlock by sending a large read
+ * request, immediately followed by a large write request, prior to
+ * waiting for any command replies from the server. If the server does
+ * not support reading a second command until after the response to
+ * the first one has been sent, then this could deadlock with the
+ * server waiting for libnbd to finish reading the read response,
+ * while libnbd is waiting for the server to finish reading the write
+ * request.  Fixing the deadlock requires that the client prioritize
+ * reads over writes when more than one command is in flight.
+ *
+ * To run it against a remote server over TCP:
+ *
+ * ./deadlock hostname port
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <poll.h>
+#include <time.h>
+#include <assert.h>
+
+#include <libnbd.h>
+
+/* The single NBD handle. */
+static struct nbd_handle *nbd;
+
+/* Buffers used for the test. */
+static char *in, *out;
+static int64_t packetsize;
+
+static int
+try_deadlock (void *arg)
+{
+  struct pollfd fds[1];
+  struct nbd_connection *conn;
+  char buf[512];
+  size_t i, j;
+  int64_t handles[2];
+  size_t in_flight;        /* counts number of requests in flight */
+  int dir, r, cmd;
+  bool want_to_send;
+
+  /* The single thread "owns" the connection. */
+  nbd_set_debug (nbd, true);
+  conn = nbd_get_connection (nbd, 0);
+
+  /* Issue commands. */
+  in_flight = 0;
+  fprintf (stderr, " * before aio_pread\n");
+  handles[0] = nbd_aio_pread (conn, in, packetsize, 0);
+  if (handles[0] == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    goto error;
+  }
+  fprintf (stderr, " * after aio_pread\n");
+  in_flight++;
+  fprintf (stderr, " * before aio_pwrite\n");
+  handles[1] = nbd_aio_pwrite (conn, out, packetsize, packetsize, 0);
+  if (handles[1] == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    goto error;
+  }
+  fprintf (stderr, " * after aio_pwrite\n");
+  in_flight++;
+
+  /* Now wait for commands to retire, or for deadlock to occur */
+  while (in_flight > 0) {
+    if (nbd_aio_is_dead (conn) || nbd_aio_is_closed (conn)) {
+      fprintf (stderr, "connection is dead or closed\n");
+      goto error;
+    }
+
+    fds[0].fd = nbd_aio_get_fd (conn);
+    fds[0].events = 0;
+    fds[0].revents = 0;
+    dir = nbd_aio_get_direction (conn);
+    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
+      fds[0].events |= POLLIN;
+    if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0)
+      fds[0].events |= POLLOUT;
+
+    if (poll (fds, 1, -1) == -1) {
+      perror ("poll");
+      goto error;
+    }
+
+    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
+        (fds[0].revents & POLLIN) != 0)
+      nbd_aio_notify_read (conn);
+    else if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0 &&
+             (fds[0].revents & POLLOUT) != 0)
+      nbd_aio_notify_write (conn);
+
+    /* If a command is ready to retire, retire it. */
+    for (j = 0; j < in_flight; ++j) {
+      r = nbd_aio_command_completed (conn, handles[j]);
+      if (r == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        goto error;
+      }
+      if (r) {
+        memmove (&handles[j], &handles[j+1],
+                 sizeof (handles[0]) * (in_flight - j - 1));
+        j--;
+        in_flight--;
+      }
+    }
+  }
+
+  printf ("finished OK\n");
+
+  return 0;
+
+ error:
+  fprintf (stderr, "failed\n");
+  return -1;
+}
+
+int
+main (int argc, char *argv[])
+{
+  int err;
+  int64_t exportsize;
+
+  if (argc < 2 || argc > 3) {
+    fprintf (stderr, "%s socket | hostname port\n", argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  nbd = nbd_create ();
+  if (nbd == NULL) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  /* Connect synchronously as this is simpler. */
+  if (argc == 2) {
+    if (nbd_connect_unix (nbd, argv[1]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+  else {
+    if (nbd_connect_tcp (nbd, argv[1], argv[2]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  if (nbd_read_only (nbd) == 1) {
+    fprintf (stderr, "%s: error: this NBD export is read-only\n", argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  exportsize = nbd_get_size (nbd);
+  if (exportsize == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+  packetsize = exportsize / 2;
+  if (packetsize > 2 * 1024 * 1024)
+    packetsize = 2 * 1024 * 1024;
+
+  in = malloc (packetsize);
+  out = malloc (packetsize);
+  if (!in || !out) {
+    fprintf (stderr, "insufficient memory\n");
+    exit (EXIT_FAILURE);
+  }
+
+  /* Attempt to be non-destructive, by writing what file already contains */
+  if (nbd_pread (nbd, out, packetsize, packetsize) == -1) {
+    fprintf (stderr, "sync read failed: %s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  if (try_deadlock (NULL) == -1)
+    exit (EXIT_FAILURE);
+
+  if (nbd_shutdown (nbd) == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  nbd_close (nbd);
+
+  return EXIT_SUCCESS;
+}
-- 
2.20.1