Finish plumbing up everything we will need to process multiple
client requests in parallel after handshake is complete. Since
status is now global, and properly protected by a mutex, all
of the threads will eventually quit as soon as any of them
notices EOF or nbdkit detects a signal.
For ease of review, the framework for configuring threads is
done separately from the low-level work of utilizing the threads,
so this patch sees no behavior change (because we hard-code
conn->nworkers to 0); although it's a one-line hack to test that
a larger nworkers still behaves the same even for a non-parallel
plugin (in fact, such a hack was how I found and squashed several
thread-safety bugs in the previous patches, exposed from running
test-socket-activation in a loop).
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
src/connections.c | 91 +++++++++++++++++++++++++++++++++++++++++++++++++------
1 file changed, 82 insertions(+), 9 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index 9d95e7f..41371fb 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -68,7 +68,7 @@ struct connection {
int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */
void *handle;
void *crypto_session;
- int nworkers; /* TODO set up a thread pool for parallel workers */
+ int nworkers;
uint64_t exportsize;
int readonly;
@@ -83,7 +83,8 @@ struct connection {
connection_close_function close;
};
-static struct connection *new_connection (int sockin, int sockout);
+static struct connection *new_connection (int sockin, int sockout,
+ int nworkers);
static void free_connection (struct connection *conn);
static int negotiate_handshake (struct connection *conn);
static int recv_request_send_reply (struct connection *conn);
@@ -175,12 +176,39 @@ set_status (struct connection *conn, int value)
return value;
}
+struct worker_data {
+ struct connection *conn;
+ char *name;
+};
+
+static void *
+connection_worker (void *data)
+{
+ struct worker_data *worker = data;
+ struct connection *conn = worker->conn;
+ char *name = worker->name;
+
+ debug ("starting worker thread %s", name);
+ threadlocal_new_server_thread ();
+ threadlocal_set_name (name);
+ free (worker);
+
+ while (!quit && get_status (conn) > 0)
+ recv_request_send_reply (conn);
+ debug ("exiting worker thread %s", threadlocal_get_name ());
+ free (name);
+ return NULL;
+}
+
static int
_handle_single_connection (int sockin, int sockout)
{
int r = -1;
- struct connection *conn = new_connection (sockin, sockout);
+ struct connection *conn;
+ int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line
override */
+ pthread_t *workers = NULL;
+ conn = new_connection (sockin, sockout, nworkers);
if (!conn)
goto done;
@@ -193,11 +221,55 @@ _handle_single_connection (int sockin, int sockout)
if (negotiate_handshake (conn) == -1)
goto done;
- /* Process requests. XXX Allow these to be dispatched in parallel using
- * a thread pool.
- */
- while (!quit && get_status (conn) > 0)
- recv_request_send_reply (conn);
+ if (nworkers <= 1) {
+ /* No need for a separate thread. */
+ debug ("handshake complete, processing requests serially");
+ conn->nworkers = 0;
+ while (!quit && get_status (conn) > 0)
+ recv_request_send_reply (conn);
+ }
+ else {
+ /* Create thread pool to process requests. */
+ debug ("handshake complete, processing requests with %d threads",
+ nworkers);
+ workers = calloc (nworkers, sizeof *workers);
+ if (!workers) {
+ perror ("malloc");
+ goto done;
+ }
+
+ for (nworkers = 0; nworkers < conn->nworkers; nworkers++) {
+ struct worker_data *worker = malloc (sizeof *worker);
+ int err;
+
+ if (!worker) {
+ perror ("malloc");
+ set_status (conn, -1);
+ goto wait;
+ }
+ if (asprintf (&worker->name, "%s.%d", plugin_name (), nworkers)
< 0) {
+ perror ("asprintf");
+ set_status (conn, -1);
+ free (worker);
+ goto wait;
+ }
+ worker->conn = conn;
+ err = pthread_create (&workers[nworkers], NULL, connection_worker,
+ worker);
+ if (err) {
+ errno = err;
+ perror ("pthread_create");
+ set_status (conn, -1);
+ free (worker);
+ goto wait;
+ }
+ }
+
+ wait:
+ while (nworkers)
+ pthread_join (workers[--nworkers], NULL);
+ free (workers);
+ }
r = get_status (conn);
done:
@@ -218,7 +290,7 @@ handle_single_connection (int sockin, int sockout)
}
static struct connection *
-new_connection (int sockin, int sockout)
+new_connection (int sockin, int sockout, int nworkers)
{
struct connection *conn;
@@ -229,6 +301,7 @@ new_connection (int sockin, int sockout)
}
conn->status = 1;
+ conn->nworkers = nworkers;
conn->sockin = sockin;
conn->sockout = sockout;
pthread_mutex_init (&conn->request_lock, NULL);
--
2.13.6