I went ahead and implemented 2 new hooks on the existing nbdkit_plugin struct for async_pread and async_pwrite to get some testable numbers.
The test is set up with an all in memory block device (data being copied to and from a std vector as simulated reading and writing).
Every read and write op has a 64ms wait before completing (simulated device latency).
These tests are using nbd-client local to the nbd-server with a unix domain socket for communication
Read and Write performance are both around 2MB/s which is exactly a single 128k read or write every 64ms.
Read performance was capped at around 825 MiB/s for 1 file sequential read but when performing 2 files in parallel the read throughput was 1.6GB/s and for 4 files in parallel 1.9GB/s.
Below is the patch for changes made to nbdkit to support this.
diff --git a/include/nbdkit-plugin.h b/include/nbdkit-plugin.h
index 95cba8d..2e88cad 100644
--- a/include/nbdkit-plugin.h
+++ b/include/nbdkit-plugin.h
@@ -38,15 +38,13 @@
#include <stdarg.h>
#include <stdint.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <stdbool.h>
#define NBDKIT_THREAD_MODEL_SERIALIZE_CONNECTIONS 0
#define NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS 1
#define NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS 2
#define NBDKIT_THREAD_MODEL_PARALLEL 3
+#define NBDKIT_THREAD_MODEL_ASYNC 4
#define NBDKIT_API_VERSION 1
@@ -94,28 +92,35 @@ struct nbdkit_plugin {
int errno_is_preserved;
+ int (*async_pread) (void *conn, uint64_t reqid, bool flush, void *handle, void *buf, uint32_t count, uint64_t offset);
+ int (*async_pwrite) (void *conn, uint64_t reqid, bool flush, void *handle, const void *buf, uint32_t count, uint64_t offset);
/* int (*set_exportname) (void *handle, const char *exportname); */
};
-extern void nbdkit_set_error (int err);
-extern void nbdkit_error (const char *msg, ...)
- __attribute__((format (printf, 1, 2)));
-extern void nbdkit_verror (const char *msg, va_list args);
-extern void nbdkit_debug (const char *msg, ...)
- __attribute__((format (printf, 1, 2)));
-extern void nbdkit_vdebug (const char *msg, va_list args);
-
-extern char *nbdkit_absolute_path (const char *path);
-extern int64_t nbdkit_parse_size (const char *str);
-
#ifdef __cplusplus
-#define NBDKIT_CXX_LANG_C extern "C"
+#define NBDKIT_CXX_LANG_C "C"
#else
#define NBDKIT_CXX_LANG_C /* nothing */
#endif
+extern NBDKIT_CXX_LANG_C void nbdkit_set_error (int err);
+extern NBDKIT_CXX_LANG_C void nbdkit_error (const char *msg, ...)
+ __attribute__((format (printf, 1, 2)));
+extern NBDKIT_CXX_LANG_C void nbdkit_verror (const char *msg, va_list args);
+extern NBDKIT_CXX_LANG_C void nbdkit_debug (const char *msg, ...)
+ __attribute__((format (printf, 1, 2)));
+extern NBDKIT_CXX_LANG_C void nbdkit_vdebug (const char *msg, va_list args);
+
+extern NBDKIT_CXX_LANG_C char *nbdkit_absolute_path (const char *path);
+extern NBDKIT_CXX_LANG_C int64_t nbdkit_parse_size (const char *str);
+
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply (void *conn, uint64_t reqid);
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply_read (void *conn, uint64_t reqid, uint32_t count, void *buf);
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply_error (void *conn, uint64_t reqid);
+
+
#define NBDKIT_REGISTER_PLUGIN(plugin) \
- NBDKIT_CXX_LANG_C \
+ extern NBDKIT_CXX_LANG_C \
struct nbdkit_plugin * \
plugin_init (void) \
{ \
@@ -125,8 +130,4 @@ extern int64_t nbdkit_parse_size (const char *str);
return &(plugin); \
}
-#ifdef __cplusplus
-}
-#endif
-
#endif /* NBDKIT_PLUGIN_H */
diff --git a/src/connections.c b/src/connections.c
index a0d689a..e03a0f6 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -62,7 +62,8 @@
static struct connection *new_connection (int sockin, int sockout);
static void free_connection (struct connection *conn);
static int negotiate_handshake (struct connection *conn);
-static int recv_request_send_reply (struct connection *conn);
+static int recv_request (struct connection *conn);
+static int send_reply (struct connection *conn, uint64_t handle, uint32_t count, void *buf, uint32_t error);
static int
_handle_single_connection (int sockin, int sockout)
@@ -86,7 +87,7 @@ _handle_single_connection (int sockin, int sockout)
* a thread pool.
*/
while (!quit) {
- r = recv_request_send_reply (conn);
+ r = recv_request (conn);
if (r == -1)
goto err;
if (r == 0)
@@ -127,6 +128,7 @@ new_connection (int sockin, int sockout)
conn->sockin = sockin;
conn->sockout = sockout;
pthread_mutex_init (&conn->request_lock, NULL);
+ pthread_mutex_init (&conn->reply_lock, NULL);
return conn;
}
@@ -143,6 +145,7 @@ free_connection (struct connection *conn)
close (conn->sockout);
pthread_mutex_destroy (&conn->request_lock);
+ pthread_mutex_destroy (&conn->reply_lock);
if (conn->handle)
plugin_close (conn);
@@ -626,13 +629,13 @@ get_error (struct connection *conn)
* On read/write errors, sets *error appropriately and returns 0.
*/
static int
-_handle_request (struct connection *conn,
+_handle_request (struct connection *conn, uint64_t handle,
uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t count,
- void *buf,
- uint32_t *error)
+ void *buf)
{
- bool flush_after_command;
int r;
+ uint32_t error = 0;
+ bool flush_after_command;
/* Flush after command performed? */
flush_after_command = (flags & NBD_CMD_FLAG_FUA) != 0;
@@ -645,42 +648,54 @@ _handle_request (struct connection *conn,
switch (cmd) {
case NBD_CMD_READ:
- r = plugin_pread (conn, buf, count, offset);
+ if (plugin_can_async_read (conn)) {
+ r = plugin_async_pread (conn, handle, flush_after_command, buf, count, offset);
+ if (r == 0)
+ return 0; // plugin now has responsibility of sending response
+ }
+ else
+ r = plugin_pread (conn, buf, count, offset);
if (r == -1) {
- *error = get_error (conn);
- return 0;
+ error = get_error (conn);
+ return send_reply (conn, handle, 0, NULL, error);
}
break;
case NBD_CMD_WRITE:
- r = plugin_pwrite (conn, buf, count, offset);
+ if (plugin_can_async_write (conn)) {
+ r = plugin_async_pwrite (conn, handle, flush_after_command, buf, count, offset);
+ if (r == 0)
+ return 0; // plugin now has responsibility of sending response
+ }
+ else
+ r = plugin_pwrite (conn, buf, count, offset);
if (r == -1) {
- *error = get_error (conn);
- return 0;
+ error = get_error (conn);
+ return send_reply (conn, handle, 0, NULL, error);
}
break;
case NBD_CMD_FLUSH:
r = plugin_flush (conn);
if (r == -1) {
- *error = get_error (conn);
- return 0;
+ error = get_error (conn);
+ return send_reply (conn, handle, 0, NULL, error);
}
break;
case NBD_CMD_TRIM:
r = plugin_trim (conn, count, offset);
if (r == -1) {
- *error = get_error (conn);
- return 0;
+ error = get_error (conn);
+ return send_reply (conn, handle, 0, NULL, error);
}
break;
case NBD_CMD_WRITE_ZEROES:
r = plugin_zero (conn, count, offset, !(flags & NBD_CMD_FLAG_NO_HOLE));
if (r == -1) {
- *error = get_error (conn);
- return 0;
+ error = get_error (conn);
+ return send_reply (conn, handle, 0, NULL, error);
}
break;
@@ -691,24 +706,28 @@ _handle_request (struct connection *conn,
if (flush_after_command) {
r = plugin_flush (conn);
if (r == -1) {
- *error = get_error (conn);
- return 0;
+ error = get_error (conn);
+ return send_reply (conn, handle, 0, NULL, error);
}
}
- return 0;
+ if (cmd == NBD_CMD_READ)
+ r = send_reply (conn, handle, count, buf, error);
+ else
+ r = send_reply (conn, handle, 0, NULL, error);
+
+ return r;
}
static int
-handle_request (struct connection *conn,
+handle_request (struct connection *conn, uint64_t handle,
uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t count,
- void *buf,
- uint32_t *error)
+ void *buf)
{
int r;
plugin_lock_request (conn);
- r = _handle_request (conn, cmd, flags, offset, count, buf, error);
+ r = _handle_request (conn, handle, cmd, flags, offset, count, buf);
plugin_unlock_request (conn);
return r;
@@ -763,11 +782,10 @@ nbd_errno (int error)
}
static int
-recv_request_send_reply (struct connection *conn)
+recv_request (struct connection *conn)
{
int r;
struct request request;
- struct reply reply;
uint32_t magic, cmd, flags, count, error = 0;
uint64_t offset;
CLEANUP_FREE char *buf = NULL;
@@ -808,7 +826,10 @@ recv_request_send_reply (struct connection *conn)
if (r == 0) { /* request not valid */
if (cmd == NBD_CMD_WRITE)
skip_over_write_buffer (conn->sockin, count);
- goto send_reply;
+ r = send_reply (conn, request.handle, 0, NULL, error);
+ if (r == -1)
+ return -1;
+ return 1;
}
/* Allocate the data buffer used for either read or write requests. */
@@ -819,7 +840,9 @@ recv_request_send_reply (struct connection *conn)
error = ENOMEM;
if (cmd == NBD_CMD_WRITE)
skip_over_write_buffer (conn->sockin, count);
- goto send_reply;
+ r = send_reply (conn, request.handle, 0, NULL, error);
+ if (r == -1)
+ return -1;
}
}
@@ -837,14 +860,20 @@ recv_request_send_reply (struct connection *conn)
}
/* Perform the request. Only this part happens inside the request lock. */
- r = handle_request (conn, cmd, flags, offset, count, buf, &error);
+ r = handle_request (conn, request.handle, cmd, flags, offset, count, buf);
if (r == -1)
return -1;
- /* Send the reply packet. */
- send_reply:
+ return 1; /* command processed ok */
+}
+
+static int
+_send_reply (struct connection *conn, uint64_t handle, uint32_t count, void *buf, uint32_t error)
+{
+ int r;
+ struct reply reply;
reply.magic = htobe32 (NBD_REPLY_MAGIC);
- reply.handle = request.handle;
+ reply.handle = handle;
reply.error = htobe32 (nbd_errno (error));
if (error != 0) {
@@ -862,8 +891,7 @@ recv_request_send_reply (struct connection *conn)
return -1;
}
- /* Send the read data buffer. */
- if (cmd == NBD_CMD_READ) {
+ if (error == 0 && buf != NULL) { /* Send the read data buffer. */
r = xwrite (conn->sockout, buf, count);
if (r == -1) {
nbdkit_error ("write data: %m");
@@ -871,5 +899,37 @@ recv_request_send_reply (struct connection *conn)
}
}
- return 1; /* command processed ok */
+ return 0;
+}
+
+static int
+send_reply (struct connection *conn, uint64_t handle, uint32_t count, void *buf, uint32_t error)
+{
+ int r;
+
+ plugin_lock_reply (conn);
+ r = _send_reply (conn, handle, count, buf, error);
+ plugin_unlock_reply (conn);
+
+ return r;
+}
+
+int
+nbdkit_async_reply (void *conn, uint64_t reqid)
+{
+ return send_reply (conn, reqid, 0, NULL, 0);
+}
+
+int
+nbdkit_async_reply_read (void *conn, uint64_t reqid, uint32_t count, void *buf)
+{
+
+ return send_reply (conn, reqid, count, buf, 0);
+}
+
+int
+nbdkit_async_reply_error (void *conn, uint64_t reqid)
+{
+ uint32_t error = get_error (conn);
+ return send_reply (conn, reqid, 0, NULL, error);
}
diff --git a/src/internal.h b/src/internal.h
index e73edf1..93d32e9 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -114,6 +114,7 @@ extern void cleanup_free (void *ptr);
struct connection {
int sockin, sockout;
pthread_mutex_t request_lock;
+ pthread_mutex_t reply_lock;
void *handle;
uint64_t exportsize;
int readonly;
@@ -140,6 +141,8 @@ extern void plugin_lock_connection (void);
extern void plugin_unlock_connection (void);
extern void plugin_lock_request (struct connection *conn);
extern void plugin_unlock_request (struct connection *conn);
+extern void plugin_lock_reply (struct connection *conn);
+extern void plugin_unlock_reply (struct connection *conn);
extern int plugin_errno_is_preserved (void);
extern int plugin_open (struct connection *conn, int readonly);
extern void plugin_close (struct connection *conn);
@@ -148,8 +151,12 @@ extern int plugin_can_write (struct connection *conn);
extern int plugin_can_flush (struct connection *conn);
extern int plugin_is_rotational (struct connection *conn);
extern int plugin_can_trim (struct connection *conn);
+extern int plugin_can_async_read (struct connection *conn);
extern int plugin_pread (struct connection *conn, void *buf, uint32_t count, uint64_t offset);
+extern int plugin_async_pread (struct connection *conn, uint64_t handle, bool flush, void *buf, uint32_t count, uint64_t offset);
+extern int plugin_can_async_write (struct connection *conn);
extern int plugin_pwrite (struct connection *conn, void *buf, uint32_t count, uint64_t offset);
+extern int plugin_async_pwrite (struct connection *conn, uint64_t handle, bool flush, void *buf, uint32_t count, uint64_t offset);
extern int plugin_flush (struct connection *conn);
extern int plugin_trim (struct connection *conn, uint32_t count, uint64_t offset);
extern int plugin_zero (struct connection *conn, uint32_t count, uint64_t offset, int may_trim);
diff --git a/src/plugins.c b/src/plugins.c
index eeed8a9..9da30db 100644
--- a/src/plugins.c
+++ b/src/plugins.c
@@ -121,8 +121,9 @@ plugin_register (const char *_filename,
program_name, filename);
exit (EXIT_FAILURE);
}
- if (plugin.pread == NULL) {
- fprintf (stderr, "%s: %s: plugin must have a .pread callback\n",
+ if (plugin.pread == NULL &&
+ (plugin._thread_model != NBDKIT_THREAD_MODEL_ASYNC || plugin.async_pread == NULL)) {
+ fprintf (stderr, "%s: %s: plugin must have either a .pread or .async_pread callback\n",
program_name, filename);
exit (EXIT_FAILURE);
}
@@ -231,6 +232,9 @@ plugin_dump_fields (void)
case NBDKIT_THREAD_MODEL_PARALLEL:
printf ("parallel");
break;
+ case NBDKIT_THREAD_MODEL_ASYNC:
+ printf ("async");
+ break;
default:
printf ("%d # unknown thread model!", plugin._thread_model);
break;
@@ -258,6 +262,8 @@ plugin_dump_fields (void)
HAS (flush);
HAS (trim);
HAS (zero);
+ HAS (async_pread);
+ HAS (async_pwrite);
#undef HAS
}
@@ -350,6 +356,28 @@ plugin_unlock_request (struct connection *conn)
}
}
+void
+plugin_lock_reply (struct connection *conn)
+{
+ assert (dl);
+
+ if (plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL) {
+ debug ("acquire per-connection reply lock");
+ pthread_mutex_lock (&conn->reply_lock);
+ }
+}
+
+void
+plugin_unlock_reply (struct connection *conn)
+{
+ assert (dl);
+
+ if (plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL) {
+ debug ("release per-connection reply lock");
+ pthread_mutex_unlock (&conn->reply_lock);
+ }
+}
+
int
plugin_errno_is_preserved (void)
{
@@ -414,7 +442,8 @@ plugin_can_write (struct connection *conn)
if (plugin.can_write)
return plugin.can_write (conn->handle);
else
- return plugin.pwrite != NULL;
+ return plugin.pwrite != NULL ||
+ (plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC && plugin.async_pwrite != NULL);
}
int
@@ -460,6 +489,16 @@ plugin_can_trim (struct connection *conn)
}
int
+plugin_can_async_read (struct connection *conn)
+{
+ assert (dl);
+ assert (conn->handle);
+
+ return ((plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC) &&
+ (plugin.async_pread != NULL));
+}
+
+int
plugin_pread (struct connection *conn,
void *buf, uint32_t count, uint64_t offset)
{
@@ -473,6 +512,27 @@ plugin_pread (struct connection *conn,
}
int
+plugin_async_pread (struct connection *conn, uint64_t handle, bool flush,
+ void *buf, uint32_t count, uint64_t offset)
+{
+ assert (dl);
+ assert (conn->handle);
+
+ debug ("async_pread count=%" PRIu32 " offset=%" PRIu64, count, offset);
+
+ return plugin.async_pread (conn, handle, flush, conn->handle, buf, count, offset);
+}
+
+int
+plugin_can_async_write (struct connection *conn)
+{
+ assert (dl);
+ assert (conn->handle);
+
+ return (plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC) && (plugin.async_pwrite != NULL);
+}
+
+int
plugin_pwrite (struct connection *conn,
void *buf, uint32_t count, uint64_t offset)
{
@@ -490,23 +550,44 @@ plugin_pwrite (struct connection *conn,
}
int
+plugin_async_pwrite (struct connection *conn, uint64_t handle, bool flush,
+ void *buf, uint32_t count, uint64_t offset)
+{
+ assert (dl);
+ assert (conn->handle);
+
+ debug ("async_pwrite count=%" PRIu32 " offset=%" PRIu64, count, offset);
+
+ if (plugin.async_pwrite != NULL)
+ return plugin.async_pwrite (conn, handle, flush, conn->handle, buf, count, offset);
+ else {
+ errno = EROFS;
+ return -1;
+ }
+}
+
+int
plugin_flush (struct connection *conn)
{
+ int r;
assert (dl);
assert (conn->handle);
debug ("flush");
if (plugin.flush != NULL)
- return plugin.flush (conn->handle);
+ r = plugin.flush (conn->handle);
else {
errno = EINVAL;
- return -1;
+ r = -1;
}
+
+ return r;
}
int
-plugin_trim (struct connection *conn, uint32_t count, uint64_t offset)
+plugin_trim (struct connection *conn,
+ uint32_t count, uint64_t offset)
{
assert (dl);
assert (conn->handle);