It's finally time to implement one of the TODO items: we want to
support a thread pool of parallel readers from the client, in
order to allow multiple in-flight operations with potential
out-of-order completion. We also need at least one plugin that
supports parallel processing for testing the option; the file
plugin fits the bill.
Add and document a new command line option, -t/--threads=N,
which controls how many threads to create per connection (although
we only ever spawn multiple threads if the plugin is parallel,
since otherwise, at most one thread is running at a time anyway).
Setting -t 1 forces a parallel plugin to behave serialized,
setting to other values allows tuning for performance; the
default of 16 matches the choice of MAX_NBD_REQUESTS used in qemu.
One easy way to test:
term1$ echo hello > junk
term1$ ./nbdkit -f -v -r file file=junk rdelay=2s wdelay=1s
term2$ qemu-io -f raw nbd://localhost:10809/ --trace='nbd_*' \
-c 'aio_read 0 1' -c 'aio_write -P 0x6c 2 2' -c 'aio_flush'
If the write completes before the read, then nbdkit was properly
handling things in parallel with out-of-order replies.
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
TODO | 7 -------
docs/nbdkit.pod | 12 +++++++++++-
nbdkit.in | 2 +-
plugins/file/file.c | 2 +-
src/connections.c | 10 +++++++---
src/internal.h | 2 ++
src/main.c | 20 ++++++++++++++++++--
src/plugins.c | 8 ++++++++
8 files changed, 48 insertions(+), 15 deletions(-)
diff --git a/TODO b/TODO
index 6c5bb5b..db7469b 100644
--- a/TODO
+++ b/TODO
@@ -12,10 +12,3 @@
* Glance and/or cinder plugins.
* Performance - measure and improve it.
-
-* Implement true parallel request handling. Currently
- NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS and
- NBDKIT_THREAD_MODEL_PARALLEL are the same, because we handle
- requests within each connection synchronously one at a time. We
- could (and should) be able to handle them in parallel by having
- another thread pool for requests.
diff --git a/docs/nbdkit.pod b/docs/nbdkit.pod
index e3043ba..4593391 100644
--- a/docs/nbdkit.pod
+++ b/docs/nbdkit.pod
@@ -9,7 +9,7 @@ nbdkit - A toolkit for creating NBD servers
nbdkit [-e EXPORTNAME] [--exit-with-parent] [-f]
[-g GROUP] [-i IPADDR]
[--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT] [-r]
- [--run CMD] [-s] [--selinux-label LABEL]
+ [--run CMD] [-s] [--selinux-label LABEL] [-t THREADS]
[--tls=off|on|require] [--tls-certificates /path/to/certificates]
[--tls-verify-peer]
[-U SOCKET] [-u USER] [-v] [-V]
@@ -230,6 +230,16 @@ Unix domain sockets:
nbdkit --selinux-label system_u:object_r:svirt_t:s0 ...
+=item B<-t> THREADS
+
+= item B<--threads> THREADS
+
+Set the number of threads to be used per connection, which in turn
+controls the number of outstanding requests that can be processed at
+once. Only matters for plugins with thread_model=parallel (where it
+defaults to 16). To force serialized behavior (useful if the client
+is not prepared for out-of-order responses), set this to 1.
+
=item B<--tls=off>
=item B<--tls=on>
diff --git a/nbdkit.in b/nbdkit.in
index 6be89ec..9c3d625 100644
--- a/nbdkit.in
+++ b/nbdkit.in
@@ -65,7 +65,7 @@ verbose=
while [ $# -gt 0 ]; do
case "$1" in
# Flags that take an argument. We must not rewrite the argument.
- -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port | --run |
--selinux-label | --tls | --tls-certificates | -U | --unix | -u | --user)
+ -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port | --run |
--selinux-label | -t | --threads | --tls | --tls-certificates | -U | --unix | -u |
--user)
args[$i]="$1"
((++i))
args[$i]="$2"
diff --git a/plugins/file/file.c b/plugins/file/file.c
index a603be8..ef5da3d 100644
--- a/plugins/file/file.c
+++ b/plugins/file/file.c
@@ -200,7 +200,7 @@ file_close (void *handle)
free (h);
}
-#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS
+#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
/* Get the file size. */
static int64_t
diff --git a/src/connections.c b/src/connections.c
index 5257032..2d184b0 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -59,6 +59,9 @@
/* Maximum length of any option data (bytes). */
#define MAX_OPTION_LENGTH 4096
+/* Default number of parallel requests. */
+#define DEFAULT_PARALLEL_REQUESTS 16
+
/* Connection structure. */
struct connection {
pthread_mutex_t request_lock;
@@ -203,9 +206,11 @@ _handle_single_connection (int sockin, int sockout)
{
int r = -1;
struct connection *conn;
- int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line
override */
+ int nworkers = threads ? threads : DEFAULT_PARALLEL_REQUESTS;
pthread_t *workers = NULL;
+ if (!plugin_is_parallel())
+ nworkers = 0;
conn = new_connection (sockin, sockout, nworkers);
if (!conn)
goto done;
@@ -219,10 +224,9 @@ _handle_single_connection (int sockin, int sockout)
if (negotiate_handshake (conn) == -1)
goto done;
- if (nworkers == 1) {
+ if (!nworkers) {
/* No need for a separate thread. */
debug ("handshake complete, processing requests serially");
- nworkers = conn->nworkers = 0;
while (!quit && get_status (conn) > 0)
recv_request_send_reply (conn);
}
diff --git a/src/internal.h b/src/internal.h
index 1fc5d69..b79c12c 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -103,6 +103,7 @@ extern const char *tls_certificates_dir;
extern int tls_verify_peer;
extern char *unixsocket;
extern int verbose;
+extern int threads;
extern volatile int quit;
@@ -151,6 +152,7 @@ extern void plugin_lock_connection (void);
extern void plugin_unlock_connection (void);
extern void plugin_lock_request (struct connection *conn);
extern void plugin_unlock_request (struct connection *conn);
+extern bool plugin_is_parallel (void);
extern int plugin_errno_is_preserved (void);
extern int plugin_open (struct connection *conn, int readonly);
extern void plugin_close (struct connection *conn);
diff --git a/src/main.c b/src/main.c
index c9f08ab..cc5e9e3 100644
--- a/src/main.c
+++ b/src/main.c
@@ -84,6 +84,7 @@ int readonly; /* -r */
char *run; /* --run */
int listen_stdin; /* -s */
const char *selinux_label; /* --selinux-label */
+int threads; /* -t */
int tls; /* --tls : 0=off 1=on 2=require */
const char *tls_certificates_dir; /* --tls-certificates */
int tls_verify_peer; /* --tls-verify-peer */
@@ -99,7 +100,7 @@ static char *random_fifo = NULL;
enum { HELP_OPTION = CHAR_MAX + 1 };
-static const char *short_options = "e:fg:i:nop:P:rsu:U:vV";
+static const char *short_options = "e:fg:i:nop:P:rst:u:U:vV";
static const struct option long_options[] = {
{ "help", 0, NULL, HELP_OPTION },
{ "dump-config",0, NULL, 0 },
@@ -126,6 +127,7 @@ static const struct option long_options[] = {
{ "selinux-label", 1, NULL, 0 },
{ "single", 0, NULL, 's' },
{ "stdin", 0, NULL, 's' },
+ { "threads", 1, NULL, 't' },
{ "tls", 1, NULL, 0 },
{ "tls-certificates", 1, NULL, 0 },
{ "tls-verify-peer", 0, NULL, 0 },
@@ -143,7 +145,7 @@ usage (void)
" [-e EXPORTNAME] [--exit-with-parent] [-f]\n"
" [-g GROUP] [-i IPADDR]\n"
" [--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT] [-r]\n"
- " [--run CMD] [-s] [--selinux-label LABEL]\n"
+ " [--run CMD] [-s] [--selinux-label LABEL] [-t THREADS]\n"
" [--tls=off|on|require] [--tls-certificates
/path/to/certificates]\n"
" [--tls-verify-peer]\n"
" [-U SOCKET] [-u USER] [-v] [-V]\n"
@@ -331,6 +333,20 @@ main (int argc, char *argv[])
listen_stdin = 1;
break;
+ case 't':
+ {
+ char *end;
+
+ errno = 0;
+ threads = strtoul (optarg, &end, 0);
+ if (errno || *end) {
+ fprintf (stderr, "%s: cannot parse '%s' into threads\n",
+ program_name, optarg);
+ exit (EXIT_FAILURE);
+ }
+ /* XXX Worth a maximimum limit on threads? */
+ }
+
case 'U':
if (socket_activation) {
fprintf (stderr, "%s: cannot use socket activation with -U flag\n",
diff --git a/src/plugins.c b/src/plugins.c
index e8c6b28..47c4fa5 100644
--- a/src/plugins.c
+++ b/src/plugins.c
@@ -360,6 +360,14 @@ plugin_unlock_request (struct connection *conn)
}
}
+bool
+plugin_is_parallel (void)
+{
+ assert (dl);
+
+ return plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL;
+}
+
int
plugin_errno_is_preserved (void)
{
--
2.13.6