Finish plumbing up everything we will need to process multiple
client requests in parallel after handshake is complete. Since
status is now global, and properly protected by a mutex, all
of the threads will eventually quit as soon as any of them
notices EOF or nbdkit detects a signal.
For ease of review, the framework for configuring threads is
done separately from the low-level work of utilizing the threads,
so this patch sees no behavior change (because we hard-code
nworkers to 1); although it's a one-line hack to test that
a larger nworkers still behaves the same even for a non-parallel
plugin.
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
Still RFC, because with this patch and changing nworkers to 2,
I can rather frequently get test-socket-activation to hang;
it looks like the extra load of multiple new threads created
after negotiate_handshake() claims success (because we wrote
the entire oldstyle header into our end of the socket), coupled
with the SIGTERM from the testsuite after only reading 8 bytes
off the socket, can cause us to miss the change to the global
'quit' and instead hang on a poll() waiting for a second client
that will never connect.
The fix for that hang may belong in main.c/sockets.c while
this patch is fine as-is, but I need more time to convince
myself of that.
---
src/connections.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++------
1 file changed, 83 insertions(+), 10 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index 75d8884..5257032 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -1,5 +1,5 @@
/* nbdkit
- * Copyright (C) 2013-2016 Red Hat Inc.
+ * Copyright (C) 2013-2017 Red Hat Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -68,7 +68,7 @@ struct connection {
int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */
void *handle;
void *crypto_session;
- int nworkers; /* TODO set up a thread pool for parallel workers */
+ int nworkers;
uint64_t exportsize;
int readonly;
@@ -83,7 +83,8 @@ struct connection {
connection_close_function close;
};
-static struct connection *new_connection (int sockin, int sockout);
+static struct connection *new_connection (int sockin, int sockout,
+ int nworkers);
static void free_connection (struct connection *conn);
static int negotiate_handshake (struct connection *conn);
static void recv_request_send_reply (struct connection *conn);
@@ -173,12 +174,39 @@ set_status (struct connection *conn, int value)
pthread_mutex_unlock (&conn->status_lock);
}
+struct worker_data {
+ struct connection *conn;
+ char *name;
+};
+
+static void *
+connection_worker (void *data)
+{
+ struct worker_data *worker = data;
+ struct connection *conn = worker->conn;
+ char *name = worker->name;
+
+ debug ("starting worker thread %s", name);
+ threadlocal_new_server_thread ();
+ threadlocal_set_name (name);
+ free (worker);
+
+ while (!quit && get_status (conn) > 0)
+ recv_request_send_reply (conn);
+ debug ("exiting worker thread %s", threadlocal_get_name ());
+ free (name);
+ return NULL;
+}
+
static int
_handle_single_connection (int sockin, int sockout)
{
int r = -1;
- struct connection *conn = new_connection (sockin, sockout);
+ struct connection *conn;
+ int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line
override */
+ pthread_t *workers = NULL;
+ conn = new_connection (sockin, sockout, nworkers);
if (!conn)
goto done;
@@ -191,11 +219,55 @@ _handle_single_connection (int sockin, int sockout)
if (negotiate_handshake (conn) == -1)
goto done;
- /* Process requests. XXX Allow these to be dispatched in parallel using
- * a thread pool.
- */
- while (!quit && get_status (conn) > 0)
- recv_request_send_reply (conn);
+ if (nworkers == 1) {
+ /* No need for a separate thread. */
+ debug ("handshake complete, processing requests serially");
+ nworkers = conn->nworkers = 0;
+ while (!quit && get_status (conn) > 0)
+ recv_request_send_reply (conn);
+ }
+ else {
+ /* Create thread pool to process requests. */
+ debug ("handshake complete, processing requests with %d threads",
+ nworkers);
+ workers = calloc (nworkers, sizeof *workers);
+ if (!workers) {
+ perror ("malloc");
+ goto done;
+ }
+
+ for (nworkers = 0; nworkers < conn->nworkers; nworkers++) {
+ struct worker_data *worker = malloc (sizeof *worker);
+ int err;
+
+ if (!worker) {
+ perror ("malloc");
+ set_status (conn, -1);
+ goto wait;
+ }
+ if (asprintf (&worker->name, "%s.%d", plugin_name (), nworkers)
< 0) {
+ perror ("asprintf");
+ set_status (conn, -1);
+ free (worker);
+ goto wait;
+ }
+ worker->conn = conn;
+ err = pthread_create (&workers[nworkers], NULL, connection_worker,
+ worker);
+ if (err) {
+ errno = err;
+ perror ("pthread_create");
+ set_status (conn, -1);
+ free (worker);
+ goto wait;
+ }
+ }
+
+ wait:
+ while (nworkers)
+ pthread_join (workers[--nworkers], NULL);
+ free (workers);
+ }
r = get_status (conn);
done:
@@ -216,7 +288,7 @@ handle_single_connection (int sockin, int sockout)
}
static struct connection *
-new_connection (int sockin, int sockout)
+new_connection (int sockin, int sockout, int nworkers)
{
struct connection *conn;
@@ -227,6 +299,7 @@ new_connection (int sockin, int sockout)
}
conn->status = 1;
+ conn->nworkers = nworkers;
conn->sockin = sockin;
conn->sockout = sockout;
pthread_mutex_init (&conn->request_lock, NULL);
--
2.13.6