As mentioned in the previous patch, there are situations where an aio
client wants instant notification when a given command is complete,
rather than having to maintain a separate data structure to track all
in-flight commands and then iterate over that structure to learn which
commands are complete.  It's also desirable when writing a server
validation program (such as for checking structured reads for
compliance) to be able to clean up the associated opaque data and have
a final chance to change the overall command status.
Introduce new nbd_aio_FOO_notify functions for each command. Rewire
the existing nbd_aio_FOO to forward to the new command.  (Perhaps the
generator could reduce some of the boilerplate duplication, if a later
patch wants to refactor this).
---
 docs/libnbd.pod     |  22 +++-
 generator/generator | 278 +++++++++++++++++++++++++++++++++++++++++---
 lib/rw.c            |  99 ++++++++++++++--
 3 files changed, 374 insertions(+), 25 deletions(-)
diff --git a/docs/libnbd.pod b/docs/libnbd.pod
index ede2539..93e80d4 100644
--- a/docs/libnbd.pod
+++ b/docs/libnbd.pod
@@ -169,7 +169,27 @@ has completed:
  }
 For almost all high level synchronous calls (eg. C<nbd_pread>) there
-is a low level asynchronous equivalent (eg. C<nbd_aio_pread>).
+are two low level asynchronous equivalents (eg. C<nbd_aio_pread> for
+starting a command, and C<nbd_aio_pread_notify> for also registering
+a callback to be invoked right before the command is complete).
+
+=head1 CALLBACKS
+
+Some of the high-level commands (C<nbd_pread_structured>,
+C<nbd_block_status>) involve the use of a callback function invoked by
+the state machine at appropriate points in the server's reply before
+the overall command is complete. Also, all of the low-level commands
+have a notify variant that registers a callback function used right
+before the command is marked complete.  These callback functions
+include a parameter C<error> containing the value of any error
+detected so far; if the callback function fails, it should assign back
+into C<error> and return C<-1> to change the resulting error of the
+overall command.
+
+The callbacks are invoked at a point where the libnbd lock is held; as
+such, it is unsafe for the callback to call any C<nbd_*> APIs on the
+same nbd object, as it would cause deadlock.  Functions that take two
+callback pointers share the same opaque data for both calls.
 =head1 ERROR HANDLING
diff --git a/generator/generator b/generator/generator
index c5988e2..fe73c15 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1708,9 +1708,40 @@ on the connection.";
 Issue a read command to the NBD server.  This returns the
 unique positive 64 bit handle for this command, or C<-1> on
 error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Note that you must ensure
+C<nbd_aio_command_completed>, or use C<nbd_aio_pread_notify>.
+Note that you must ensure C<buf> is valid until the command
+has completed.  Other parameters behave as documented in
+C<nbd_pread>.";
+  };
+
+  "aio_pread_notify", {
+    default_call with
+    args = [ BytesPersistOut ("buf", "count"); UInt64
"offset";
+             Opaque "data";
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "read from the NBD server, and notify on completion";
+    longdesc = "\
+Issue a read command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.  If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Note that you must ensure
 C<buf> is valid until the command has completed.  Other
-parameters behave as documented in C<nbd_pread>.";
+parameters behave as documented in C<nbd_pread>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_pread_structured", {
@@ -1730,8 +1761,43 @@ parameters behave as documented in C<nbd_pread>.";
 Issue a read command to the NBD server.  This returns the
 unique positive 64 bit handle for this command, or C<-1> on
 error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Parameters behave as documented
-in C<nbd_pread_structured>.";
+C<nbd_aio_command_completed>, or use
+C<nbd_aio_pread_structured_notify>.  Parameters behave as
+documented in C<nbd_pread_structured>.";
+  };
+
+  "aio_pread_structured_notify", {
+    default_call with
+    args = [ BytesPersistOut ("buf", "count"); UInt64
"offset";
+             Opaque "data";
+             CallbackPersist ("chunk", [ Opaque "data";
+                                         BytesIn ("subbuf",
"count");
+                                         UInt64 "offset";
+                                         Mutable (Int "error");
+                                         Int "status" ]);
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "read from the NBD server, and notify on completion";
+    longdesc = "\
+Issue a read command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.  If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Other parameters behave as
+documented in C<nbd_pread_structured>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_pwrite", {
@@ -1744,9 +1810,40 @@ in C<nbd_pread_structured>.";
 Issue a write command to the NBD server.  This returns the
 unique positive 64 bit handle for this command, or C<-1> on
 error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Note that you must ensure
+C<nbd_aio_command_completed>, or use C<nbd_aio_pwrite_notify>.
+Note that you must ensure C<buf> is valid until the command
+has completed.  Other parameters behave as documented in
+C<nbd_pwrite>.";
+  };
+
+  "aio_pwrite_notify", {
+    default_call with
+    args = [ BytesPersistIn ("buf", "count"); UInt64
"offset";
+             Opaque "data";
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "write to the NBD server, and notify on completion";
+    longdesc = "\
+Issue a write command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.    If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Note that you must ensure
 C<buf> is valid until the command has completed.  Other
-parameters behave as documented in C<nbd_pwrite>.";
+parameters behave as documented in C<nbd_pwrite>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_disconnect", {
@@ -1780,8 +1877,36 @@ however, C<nbd_shutdown> will call this function if
appropriate.";
 Issue the flush command to the NBD server.  This returns the
 unique positive 64 bit handle for this command, or C<-1> on
 error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Parameters behave as documented
-in C<nbd_flush>.";
+C<nbd_aio_command_completed>, or use C<nbd_aio_flush_notify>.
+Parameters behave as documented in C<nbd_flush>.";
+  };
+
+  "aio_flush_notify", {
+    default_call with
+    args = [ Opaque "data";
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "send flush command to the NBD server, and notify on
completion";
+    longdesc = "\
+Issue the flush command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.  If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Other parameters behave as
+documented in C<nbd_flush>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_trim", {
@@ -1794,8 +1919,37 @@ in C<nbd_flush>.";
 Issue a trim command to the NBD server.  This returns the
 unique positive 64 bit handle for this command, or C<-1> on
 error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Parameters behave as documented
-in C<nbd_trim>.";
+C<nbd_aio_command_completed>, or use C<nbd_aio_trim_notify>.
+Parameters behave as documented in C<nbd_trim>.";
+  };
+
+  "aio_trim_notify", {
+    default_call with
+    args = [ UInt64 "count"; UInt64 "offset";
+             Opaque "data";
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "send trim command to the NBD server, and notify on
completion";
+    longdesc = "\
+Issue a trim command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.  If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Other parameters behave as
+documented in C<nbd_trim>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_cache", {
@@ -1808,8 +1962,37 @@ in C<nbd_trim>.";
 Issue the cache (prefetch) command to the NBD server.  This
 returns the unique positive 64 bit handle for this command, or
 C<-1> on error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Parameters behave as documented
-in C<nbd_cache>.";
+C<nbd_aio_command_completed>, or use C<nbd_aio_cache_notify>.
+Parameters behave as documented in C<nbd_cache>.";
+  };
+
+  "aio_cache_notify", {
+    default_call with
+    args = [ UInt64 "count"; UInt64 "offset";
+             Opaque "data";
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "send cache (prefetch) command to the NBD server, and notify on
completion";
+    longdesc = "\
+Issue the cache (prefetch) command to the NBD server.  This
+returns the unique positive 64 bit handle for this command, or
+C<-1> on error.  If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Other parameters behave as
+documented in C<nbd_cache>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_zero", {
@@ -1822,8 +2005,37 @@ in C<nbd_cache>.";
 Issue a write zeroes command to the NBD server.  This returns the
 unique positive 64 bit handle for this command, or C<-1> on
 error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Parameters behave as documented
-in C<nbd_zero>.";
+C<nbd_aio_command_completed>, or use C<nbd_aio_zero_notify>.
+Parameters behave as documented in C<nbd_zero>.";
+  };
+
+  "aio_zero_notify", {
+    default_call with
+    args = [ UInt64 "count"; UInt64 "offset";
+             Opaque "data";
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "send write zeroes command to the NBD server, and notify on
completion";
+    longdesc = "\
+Issue a write zeroes command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.  If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Other parameters behave as
+documented in C<nbd_zero>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_block_status", {
@@ -1843,8 +2055,42 @@ in C<nbd_zero>.";
 Send the block status command to the NBD server.  This returns the
 unique positive 64 bit handle for this command, or C<-1> on
 error.  To check if the command completed, call
-C<nbd_aio_command_completed>.  Parameters behave as documented
-in C<nbd_block_status>.";
+C<nbd_aio_command_completed>, or use C<nbd_aio_block_status_notify>.
+Parameters behave as documented in C<nbd_block_status>.";
+  };
+
+  "aio_block_status_notify", {
+    default_call with
+    args = [ UInt64 "count"; UInt64 "offset";
+             Opaque "data";
+             CallbackPersist ("extent", [Opaque "data"; String
"metacontext";
+                                         UInt64 "offset";
+                                         ArrayAndLen (UInt32 "entries",
+                                                      "nr_entries");
+                                         Mutable (Int "error") ]);
+             CallbackPersist ("notify", [ Opaque "data"; Int64
"handle";
+                                          Mutable (Int "error") ]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "send block status command to the NBD server, and notify on
completion";
+    longdesc = "\
+Send the block status command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.  If this command returns a handle, then the C<notify>
+callback will be called when the server is done replying,
+although you must still use C<nbd_aio_command_completed> after
+the callback to retire the command.  Other parameters behave as
+documented in C<nbd_block_status>.
+
+The C<notify> callback is called with the same C<data> passed to
+this function, C<handle> set to the return value of this function,
+and C<error> containing the command's result so far. The callback
+may modify the overall status of the command by storing into
+C<error> and returning C<-1>, although attempts to undo non-zero
+status back to zero are ignored. The callback cannot call C<nbd_*>
+APIs on the same handle since it holds the handle lock and will
+cause a deadlock.";
   };
   "aio_get_fd", {
diff --git a/lib/rw.c b/lib/rw.c
index 53cd521..93388a9 100644
--- a/lib/rw.c
+++ b/lib/rw.c
@@ -249,6 +249,17 @@ int64_t
 nbd_unlocked_aio_pread (struct nbd_handle *h, void *buf,
                         size_t count, uint64_t offset, uint32_t flags)
 {
+  return nbd_unlocked_aio_pread_notify (h, buf, count, offset, NULL, NULL,
+                                        flags);
+}
+
+int64_t
+nbd_unlocked_aio_pread_notify (struct nbd_handle *h, void *buf,
+                               size_t count, uint64_t offset,
+                               void *opaque, notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .notify = notify, };
+
   /* We could silently accept flag DF, but it really only makes sense
    * with callbacks, because otherwise there is no observable change
    * except that the server may fail where it would otherwise succeed.
@@ -259,7 +270,7 @@ nbd_unlocked_aio_pread (struct nbd_handle *h, void *buf,
   }
   return nbd_internal_command_common (h, 0, NBD_CMD_READ, offset, count,
-                                      buf, NULL);
+                                      buf, &cb);
 }
 int64_t
@@ -267,7 +278,18 @@ nbd_unlocked_aio_pread_structured (struct nbd_handle *h, void *buf,
                                    size_t count, uint64_t offset,
                                    void *opaque, read_fn read, uint32_t flags)
 {
-  struct command_cb cb = { .opaque = opaque, .fn.read = read, };
+  return nbd_unlocked_aio_pread_structured_notify (h, buf, count, offset,
+                                                   opaque, read, NULL, flags);
+}
+
+int64_t
+nbd_unlocked_aio_pread_structured_notify (struct nbd_handle *h, void *buf,
+                                          size_t count, uint64_t offset,
+                                          void *opaque, read_fn read,
+                                          notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .fn.read = read,
+                           .notify = notify, };
   if ((flags & ~LIBNBD_CMD_FLAG_DF) != 0) {
     set_error (EINVAL, "invalid flag: %" PRIu32, flags);
@@ -289,6 +311,17 @@ nbd_unlocked_aio_pwrite (struct nbd_handle *h, const void *buf,
                          size_t count, uint64_t offset,
                          uint32_t flags)
 {
+  return nbd_unlocked_aio_pwrite_notify (h, buf, count, offset, NULL, NULL,
+                                         flags);
+}
+
+int64_t
+nbd_unlocked_aio_pwrite_notify (struct nbd_handle *h, const void *buf,
+                                size_t count, uint64_t offset,
+                                void *opaque, notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .notify = notify, };
+
   if (nbd_unlocked_read_only (h) == 1) {
     set_error (EINVAL, "server does not support write operations");
     return -1;
@@ -306,12 +339,21 @@ nbd_unlocked_aio_pwrite (struct nbd_handle *h, const void *buf,
   }
   return nbd_internal_command_common (h, flags, NBD_CMD_WRITE, offset, count,
-                                      (void *) buf, NULL);
+                                      (void *) buf, &cb);
 }
 int64_t
 nbd_unlocked_aio_flush (struct nbd_handle *h, uint32_t flags)
 {
+  return nbd_unlocked_aio_flush_notify (h, NULL, NULL, flags);
+}
+
+int64_t
+nbd_unlocked_aio_flush_notify (struct nbd_handle *h, void *opaque,
+                               notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .notify = notify, };
+
   if (nbd_unlocked_can_flush (h) != 1) {
     set_error (EINVAL, "server does not support flush operations");
     return -1;
@@ -323,7 +365,7 @@ nbd_unlocked_aio_flush (struct nbd_handle *h, uint32_t flags)
   }
   return nbd_internal_command_common (h, 0, NBD_CMD_FLUSH, 0, 0,
-                                      NULL, NULL);
+                                      NULL, &cb);
 }
 int64_t
@@ -331,6 +373,16 @@ nbd_unlocked_aio_trim (struct nbd_handle *h,
                        uint64_t count, uint64_t offset,
                        uint32_t flags)
 {
+  return nbd_unlocked_aio_trim_notify (h, count, offset, NULL, NULL, flags);
+}
+
+int64_t
+nbd_unlocked_aio_trim_notify (struct nbd_handle *h,
+                              uint64_t count, uint64_t offset,
+                              void *opaque, notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .notify = notify, };
+
   if (nbd_unlocked_read_only (h) == 1) {
     set_error (EINVAL, "server does not support write operations");
     return -1;
@@ -353,13 +405,23 @@ nbd_unlocked_aio_trim (struct nbd_handle *h,
   }
   return nbd_internal_command_common (h, flags, NBD_CMD_TRIM, offset, count,
-                                      NULL, NULL);
+                                      NULL, &cb);
 }
 int64_t
 nbd_unlocked_aio_cache (struct nbd_handle *h,
                         uint64_t count, uint64_t offset, uint32_t flags)
 {
+  return nbd_unlocked_aio_cache_notify (h, count, offset, NULL, NULL, flags);
+}
+
+int64_t
+nbd_unlocked_aio_cache_notify (struct nbd_handle *h,
+                               uint64_t count, uint64_t offset,
+                               void *opaque, notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .notify = notify, };
+
   /* Actually according to the NBD protocol document, servers do exist
    * that support NBD_CMD_CACHE but don't advertise the
    * NBD_FLAG_SEND_CACHE bit, but we ignore those.
@@ -375,7 +437,7 @@ nbd_unlocked_aio_cache (struct nbd_handle *h,
   }
   return nbd_internal_command_common (h, 0, NBD_CMD_CACHE, offset, count,
-                                      NULL, NULL);
+                                      NULL, &cb);
 }
 int64_t
@@ -383,6 +445,16 @@ nbd_unlocked_aio_zero (struct nbd_handle *h,
                        uint64_t count, uint64_t offset,
                        uint32_t flags)
 {
+  return nbd_unlocked_aio_zero_notify (h, count, offset, NULL, NULL, flags);
+}
+
+int64_t
+nbd_unlocked_aio_zero_notify (struct nbd_handle *h,
+                              uint64_t count, uint64_t offset,
+                              void *opaque, notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .notify = notify, };
+
   if (nbd_unlocked_read_only (h) == 1) {
     set_error (EINVAL, "server does not support write operations");
     return -1;
@@ -405,7 +477,7 @@ nbd_unlocked_aio_zero (struct nbd_handle *h,
   }
   return nbd_internal_command_common (h, flags, NBD_CMD_WRITE_ZEROES, offset,
-                                      count, NULL, NULL);
+                                      count, NULL, &cb);
 }
 int64_t
@@ -414,7 +486,18 @@ nbd_unlocked_aio_block_status (struct nbd_handle *h,
                                void *data, extent_fn extent,
                                uint32_t flags)
 {
-  struct command_cb cb = { .opaque = data, .fn.extent = extent, };
+  return nbd_unlocked_aio_block_status_notify (h, count, offset, data, extent,
+                                               NULL, flags);
+}
+
+int64_t
+nbd_unlocked_aio_block_status_notify (struct nbd_handle *h,
+                                      uint64_t count, uint64_t offset,
+                                      void *data, extent_fn extent,
+                                      notify_fn notify, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = data, .fn.extent = extent,
+                           .notify = notify, };
   if (!h->structured_replies) {
     set_error (ENOTSUP, "server does not support structured replies");
-- 
2.20.1