This makes a very large difference to performance over the previous
implementation. Note for the tests below I also applied the next
commit changing the behaviour of the connections parameter.
Using this test case:
$ nbdkit -r -U - curl
https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64... \
ipresolve=v4 --run 'nbdcopy -p $uri null'
The times are as follows:
multi, connections=0 19s (*)
multi, connections=32 26s
multi, connections=16 50s
before this commit 2m46s
(*) Not documented, but setting CURLMOPT_MAX_TOTAL_CONNECTIONS = 0
means allow unlimited connections. In practice the number of
connections reaches 64.
---
plugins/curl/curldefs.h | 35 +++-
plugins/curl/config.c | 66 +-----
plugins/curl/curl.c | 146 +++++++++-----
plugins/curl/pool.c | 431 ++++++++++++++++++++++++++++++++--------
4 files changed, 482 insertions(+), 196 deletions(-)
diff --git a/plugins/curl/curldefs.h b/plugins/curl/curldefs.h
index 9169b256d..db384f055 100644
--- a/plugins/curl/curldefs.h
+++ b/plugins/curl/curldefs.h
@@ -76,12 +76,6 @@ struct curl_handle {
/* The underlying curl handle. */
CURL *c;
- /* Index of this handle in the pool (for debugging). */
- size_t i;
-
- /* True if the handle is in use by a thread. */
- bool in_use;
-
/* These fields are used/initialized when we create the handle. */
bool accept_range;
int64_t exportsize;
@@ -100,6 +94,26 @@ struct curl_handle {
/* Used by scripts.c */
struct curl_slist *headers_copy;
+
+ /* Used by pool.c */
+ struct command *cmd;
+};
+
+/* Asynchronous commands that can be sent to the pool thread. */
+enum command_type { GET_SIZE, PREAD, PWRITE, STOP };
+struct command {
+ /* These fields are set by the caller. */
+ enum command_type type; /* command */
+ void *ptr; /* for get_size, returned size */
+ struct curl_handle *ch; /* for read/write, the easy handle */
+
+ /* This field is set to a unique value by send_command_and_wait. */
+ uint64_t id; /* serial number */
+
+ /* These fields are used to signal back that the command finished. */
+ pthread_mutex_t mutex; /* completion mutex */
+ pthread_cond_t cond; /* completion condition */
+ enum { SUBMITTED, SUCCEEDED, FAILED } status;
};
/* config.c */
@@ -109,12 +123,13 @@ extern const char *curl_config_help;
extern void unload_config (void);
extern struct curl_handle *allocate_handle (void);
extern void free_handle (struct curl_handle *);
+extern int get_content_length_accept_range (struct curl_handle *ch);
/* pool.c */
-extern void load_pool (void);
-extern void unload_pool (void);
-extern struct curl_handle *get_handle (void);
-extern void put_handle (struct curl_handle *ch);
+extern int curl_get_ready (void);
+extern int curl_after_fork (void);
+extern void curl_cleanup (void);
+extern int send_command_and_wait (struct command *cmd);
/* scripts.c */
extern int do_scripts (struct curl_handle *ch);
diff --git a/plugins/curl/config.c b/plugins/curl/config.c
index 742d60809..5cda46031 100644
--- a/plugins/curl/config.c
+++ b/plugins/curl/config.c
@@ -89,9 +89,6 @@ static const char *user_agent = NULL;
static int debug_cb (CURL *handle, curl_infotype type,
const char *data, size_t size, void *);
-static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
-static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
-static int get_content_length_accept_range (struct curl_handle *ch);
static bool try_fallback_GET_method (struct curl_handle *ch);
static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
@@ -668,17 +665,9 @@ allocate_handle (void)
if (user_agent)
curl_easy_setopt (ch->c, CURLOPT_USERAGENT, user_agent);
- if (get_content_length_accept_range (ch) == -1)
- goto err;
-
/* Get set up for reading and writing. */
curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, NULL);
curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, NULL);
- curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
- curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
- /* These are only used if !readonly but we always register them. */
- curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
- curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
return ch;
@@ -743,59 +732,10 @@ debug_cb (CURL *handle, curl_infotype type,
return 0;
}
-/* NB: The terminology used by libcurl is confusing!
- *
- * WRITEFUNCTION / write_cb is used when reading from the remote server
- * READFUNCTION / read_cb is used when writing to the remote server.
- *
- * We use the same terminology as libcurl here.
- */
-
-static size_t
-write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
-{
- struct curl_handle *ch = opaque;
- size_t orig_realsize = size * nmemb;
- size_t realsize = orig_realsize;
-
- assert (ch->write_buf);
-
- /* Don't read more than the requested amount of data, even if the
- * server or libcurl sends more.
- */
- if (realsize > ch->write_count)
- realsize = ch->write_count;
-
- memcpy (ch->write_buf, ptr, realsize);
-
- ch->write_count -= realsize;
- ch->write_buf += realsize;
-
- return orig_realsize;
-}
-
-static size_t
-read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
-{
- struct curl_handle *ch = opaque;
- size_t realsize = size * nmemb;
-
- assert (ch->read_buf);
- if (realsize > ch->read_count)
- realsize = ch->read_count;
-
- memcpy (ptr, ch->read_buf, realsize);
-
- ch->read_count -= realsize;
- ch->read_buf += realsize;
-
- return realsize;
-}
-
/* Get the file size and also whether the remote HTTP server
* supports byte ranges.
*/
-static int
+int
get_content_length_accept_range (struct curl_handle *ch)
{
CURLcode r;
@@ -821,6 +761,10 @@ get_content_length_accept_range (struct curl_handle *ch)
curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
+ curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
+ curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
r = curl_easy_perform (ch->c);
update_times (ch->c);
if (r != CURLE_OK) {
diff --git a/plugins/curl/curl.c b/plugins/curl/curl.c
index 99a7e00b5..c29b5d0cb 100644
--- a/plugins/curl/curl.c
+++ b/plugins/curl/curl.c
@@ -48,8 +48,6 @@
#include <nbdkit-plugin.h>
-#include "cleanup.h"
-
#include "curldefs.h"
const char *cookie_script = NULL;
@@ -67,8 +65,6 @@ curl_load (void)
nbdkit_error ("libcurl initialization failed: %d", (int) r);
exit (EXIT_FAILURE);
}
-
- load_pool ();
}
static void
@@ -76,7 +72,6 @@ curl_unload (void)
{
unload_config ();
scripts_unload ();
- unload_pool ();
display_times ();
curl_global_cleanup ();
}
@@ -108,30 +103,17 @@ curl_close (void *handle)
#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
-/* Calls get_handle() ... put_handle() to get a handle for the length
- * of the current scope.
- */
-#define GET_HANDLE_FOR_CURRENT_SCOPE(ch) \
- CLEANUP_PUT_HANDLE struct curl_handle *ch = get_handle ();
-#define CLEANUP_PUT_HANDLE __attribute__ ((cleanup (cleanup_put_handle)))
-static void
-cleanup_put_handle (void *chp)
-{
- struct curl_handle *ch = * (struct curl_handle **) chp;
-
- if (ch != NULL)
- put_handle (ch);
-}
-
/* Get the file size. */
static int64_t
curl_get_size (void *handle)
{
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
- if (ch == NULL)
+ int64_t size;
+ struct command cmd = { .type = GET_SIZE, .ptr = &size };
+
+ if (send_command_and_wait (&cmd) == -1)
return -1;
- return ch->exportsize;
+ return size;
}
/* Multi-conn is safe for read-only connections, but HTTP does not
@@ -146,23 +128,56 @@ curl_can_multi_conn (void *handle)
return !! h->readonly;
}
+/* NB: The terminology used by libcurl is confusing!
+ *
+ * WRITEFUNCTION / write_cb is used when reading from the remote server
+ * READFUNCTION / read_cb is used when writing to the remote server.
+ *
+ * We use the same terminology as libcurl here.
+ */
+
+static size_t
+write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
+{
+ struct curl_handle *ch = opaque;
+ size_t orig_realsize = size * nmemb;
+ size_t realsize = orig_realsize;
+
+ assert (ch->write_buf);
+
+ /* Don't read more than the requested amount of data, even if the
+ * server or libcurl sends more.
+ */
+ if (realsize > ch->write_count)
+ realsize = ch->write_count;
+
+ memcpy (ch->write_buf, ptr, realsize);
+
+ ch->write_count -= realsize;
+ ch->write_buf += realsize;
+
+ return orig_realsize;
+}
+
/* Read data from the remote server. */
static int
curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
{
- CURLcode r;
+ struct curl_handle *ch;
char range[128];
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
- if (ch == NULL)
- return -1;
+ /* Get a curl easy handle. */
+ ch = allocate_handle ();
+ if (ch == NULL) goto err;
/* Run the scripts if necessary and set headers in the handle. */
- if (do_scripts (ch) == -1) return -1;
+ if (do_scripts (ch) == -1) goto err;
/* Tell the write_cb where we want the data to be written. write_cb
* will update this if the data comes in multiple sections.
*/
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
ch->write_buf = buf;
ch->write_count = count;
@@ -173,12 +188,14 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t
offset)
offset, offset + count);
curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
- /* The assumption here is that curl will look after timeouts. */
- r = curl_easy_perform (ch->c);
- if (r != CURLE_OK) {
- display_curl_error (ch, r, "pread: curl_easy_perform");
- return -1;
- }
+ /* Send the command to the worker thread and wait. */
+ struct command cmd = {
+ .type = PREAD,
+ .ch = ch,
+ };
+
+ if (send_command_and_wait (&cmd) == -1)
+ goto err;
update_times (ch->c);
/* Could use curl_easy_getinfo here to obtain further information
@@ -188,42 +205,70 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t
offset)
/* As far as I understand the cURL API, this should never happen. */
assert (ch->write_count == 0);
+ free_handle (ch);
return 0;
+
+ err:
+ if (ch)
+ free_handle (ch);
+ return -1;
+}
+
+static size_t
+read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
+{
+ struct curl_handle *ch = opaque;
+ size_t realsize = size * nmemb;
+
+ assert (ch->read_buf);
+ if (realsize > ch->read_count)
+ realsize = ch->read_count;
+
+ memcpy (ptr, ch->read_buf, realsize);
+
+ ch->read_count -= realsize;
+ ch->read_buf += realsize;
+
+ return realsize;
}
/* Write data to the remote server. */
static int
curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
{
- CURLcode r;
+ struct curl_handle *ch;
char range[128];
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
- if (ch == NULL)
- return -1;
+ /* Get a curl easy handle. */
+ ch = allocate_handle ();
+ if (ch == NULL) goto err;
/* Run the scripts if necessary and set headers in the handle. */
- if (do_scripts (ch) == -1) return -1;
+ if (do_scripts (ch) == -1) goto err;
/* Tell the read_cb where we want the data to be read from. read_cb
* will update this if the data comes in multiple sections.
*/
+ curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
+ curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
ch->read_buf = buf;
ch->read_count = count;
- curl_easy_setopt (ch->c, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
/* Make an HTTP range request. */
snprintf (range, sizeof range, "%" PRIu64 "-%" PRIu64,
offset, offset + count);
curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
- /* The assumption here is that curl will look after timeouts. */
- r = curl_easy_perform (ch->c);
- if (r != CURLE_OK) {
- display_curl_error (ch, r, "pwrite: curl_easy_perform");
- return -1;
- }
+ /* Send the command to the worker thread and wait. */
+ struct command cmd = {
+ .type = PWRITE,
+ .ch = ch,
+ };
+
+ if (send_command_and_wait (&cmd) == -1)
+ goto err;
update_times (ch->c);
/* Could use curl_easy_getinfo here to obtain further information
@@ -233,7 +278,13 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t
offset)
/* As far as I understand the cURL API, this should never happen. */
assert (ch->read_count == 0);
+ free_handle (ch);
return 0;
+
+ err:
+ if (ch)
+ free_handle (ch);
+ return -1;
}
static struct nbdkit_plugin plugin = {
@@ -249,6 +300,9 @@ static struct nbdkit_plugin plugin = {
*/
//.config_help = curl_config_help,
.magic_config_key = "url",
+ .get_ready = curl_get_ready,
+ .after_fork = curl_after_fork,
+ .cleanup = curl_cleanup,
.open = curl_open,
.close = curl_close,
.get_size = curl_get_size,
diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
index 91e56f070..2835f1664 100644
--- a/plugins/curl/pool.c
+++ b/plugins/curl/pool.c
@@ -30,11 +30,29 @@
* SUCH DAMAGE.
*/
-/* Curl handle pool.
+/* Worker thread which processes the curl multi interface.
*
- * To get a libcurl handle, call get_handle(). When you hold the
- * handle, it is yours exclusively to use. After you have finished
- * with the handle, put it back into the pool by calling put_handle().
+ * The main nbdkit threads (see curl.c) create curl easy handles
+ * initialized with the work they want to carry out. Note there is
+ * one easy handle per task (eg. per pread/pwrite request). The easy
+ * handles are not reused.
+ *
+ * The commands + optional easy handle are submitted to the worker
+ * thread over a self-pipe (it's easy to use a pipe here because the
+ * way curl multi works is it can listen on an extra fd, but not on
+ * anything else like a pthread condition). The curl multi performs
+ * the work of the outstanding easy handles.
+ *
+ * When an easy handle finishes work or errors, we retire the command
+ * by signalling back to the waiting nbdkit thread using a pthread
+ * condition.
+ *
+ * In my experiments, we're almost always I/O bound so I haven't seen
+ * any strong need to use more than one curl multi / worker thread,
+ * although it would be possible to add more in future.
+ *
+ * See also this extremely useful thread:
+ *
https://curl.se/mail/lib-2019-03/0100.html
*/
#include <config.h>
@@ -45,9 +63,19 @@
#include <stdint.h>
#include <inttypes.h>
#include <string.h>
+#include <unistd.h>
#include <assert.h>
#include <pthread.h>
+#ifdef HAVE_STDATOMIC_H
+#include <stdatomic.h>
+#else
+/* Some old platforms lack atomic types, but 32 bit ints are usually
+ * "atomic enough".
+ */
+#define _Atomic /**/
+#endif
+
#include <curl/curl.h>
#include <nbdkit-plugin.h>
@@ -62,109 +90,354 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
unsigned connections = 4;
-/* This lock protects access to the curl_handles vector below. */
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+/* Pipe used to notify background thread that a command is pending in
+ * the queue. A pointer to the 'struct command' is sent over the
+ * pipe.
+ */
+static int self_pipe[2] = { -1, -1 };
-/* List of curl handles. This is allocated dynamically as more
- * handles are requested. Currently it does not shrink. It may grow
- * up to 'connections' in length.
+/* The curl multi handle. */
+static CURLM *multi;
+
+/* List of running easy handles. We only need to maintain this so we
+ * can remove them from the multi handle when cleaning up.
*/
DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
static curl_handle_list curl_handles = empty_vector;
-/* The condition is used when the curl handles vector is full and
- * we're waiting for a thread to put_handle.
- */
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static size_t in_use = 0, waiting = 0;
+static const char *
+command_type_to_string (enum command_type type)
+{
+ switch (type) {
+ case GET_SIZE: return "GET_SIZE";
+ case PREAD: return "PREAD";
+ case PWRITE: return "PWRITE";
+ case STOP: return "STOP";
+ default: abort ();
+ }
+}
-/* Initialize pool structures. */
-void
-load_pool (void)
+int
+curl_get_ready (void)
+{
+ multi = curl_multi_init ();
+ if (multi == NULL) {
+ nbdkit_error ("curl_multi_init failed: %m");
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Start and stop the background thread. */
+static pthread_t thread;
+static bool thread_running;
+static void *pool_worker (void *);
+
+int
+curl_after_fork (void)
{
+ int err;
+
+ if (pipe (self_pipe) == -1) {
+ nbdkit_error ("pipe: %m");
+ return -1;
+ }
+
+ /* Start the pool background thread where all the curl work is done. */
+ err = pthread_create (&thread, NULL, pool_worker, NULL);
+ if (err != 0) {
+ errno = err;
+ nbdkit_error ("pthread_create: %m");
+ return -1;
+ }
+ thread_running = true;
+
+ return 0;
}
-/* Close and free all handles in the pool. */
+/* Unload the background thread. */
void
-unload_pool (void)
+curl_cleanup (void)
{
- size_t i;
+ if (thread_running) {
+ /* Stop the background thread. */
+ struct command cmd = { .type = STOP };
+ send_command_and_wait (&cmd);
+ pthread_join (thread, NULL);
+ thread_running = false;
+ }
- if (curl_debug_pool)
- nbdkit_debug ("unload_pool: number of curl handles allocated: %zu",
- curl_handles.len);
+ if (self_pipe[0] >= 0) {
+ close (self_pipe[0]);
+ self_pipe[0] = -1;
+ }
+ if (self_pipe[1] >= 0) {
+ close (self_pipe[1]);
+ self_pipe[1] = -1;
+ }
- for (i = 0; i < curl_handles.len; ++i)
- free_handle (curl_handles.ptr[i]);
- curl_handle_list_reset (&curl_handles);
+ if (multi) {
+ size_t i;
+
+ /* Remove and free any easy handles in the multi. */
+ for (i = 0; i < curl_handles.len; ++i) {
+ curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
+ free_handle (curl_handles.ptr[i]);
+ }
+
+ curl_multi_cleanup (multi);
+ multi = NULL;
+ }
}
-/* Get a handle from the pool.
+/* Command queue. */
+static _Atomic uint64_t id; /* next command ID */
+
+/* Send command to the background thread and wait for completion.
+ * This is only called by one of the nbdkit threads.
*
- * It is owned exclusively by the caller until they call put_handle.
+ * Returns 0 for OK
+ * On error, calls nbdkit_error and returns -1.
*/
-struct curl_handle *
-get_handle (void)
+int
+send_command_and_wait (struct command *cmd)
{
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
- size_t i;
- struct curl_handle *ch;
-
- again:
- /* Look for a handle which is not in_use. */
- for (i = 0; i < curl_handles.len; ++i) {
- ch = curl_handles.ptr[i];
- if (!ch->in_use) {
- ch->in_use = true;
- in_use++;
+ cmd->id = id++;
+
+ /* This will be used to signal command completion back to us. */
+ pthread_mutex_init (&cmd->mutex, NULL);
+ pthread_cond_init (&cmd->cond, NULL);
+
+ /* Send the command to the background thread. */
+ if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
+ abort ();
+
+ /* Wait for the command to be completed by the background thread. */
+ {
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+ while (cmd->status == SUBMITTED)
+ pthread_cond_wait (&cmd->cond, &cmd->mutex);
+ }
+
+ pthread_mutex_destroy (&cmd->mutex);
+ pthread_cond_destroy (&cmd->cond);
+
+ /* On error the background thread will call nbdkit_error. */
+ switch (cmd->status) {
+ case SUCCEEDED: return 0;
+ case FAILED: return -1;
+ default: abort ();
+ }
+}
+
+/* The background thread. */
+static void check_for_finished_handles (void);
+static void retire_command (struct command *cmd, int status);
+static void do_get_size (struct command *cmd);
+static void do_pread (struct command *cmd);
+static void do_pwrite (struct command *cmd);
+
+static void *
+pool_worker (void *vp)
+{
+ bool stop = false;
+
+ if (curl_debug_pool)
+ nbdkit_debug ("curl: background thread started");
+
+ while (!stop) {
+ struct command *cmd = NULL;
+ struct curl_waitfd extra_fds[1] =
+ { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
+ CURLMcode mc;
+ int numfds, running_handles, repeats = 0;
+
+ do {
+ /* Process the multi handle. */
+ mc = curl_multi_perform (multi, &running_handles);
+ if (mc != CURLM_OK) {
+ nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));
+ abort (); /* XXX We don't expect this to happen */
+ }
+
+ check_for_finished_handles ();
+
+ mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
+ if (mc != CURLM_OK) {
+ nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc));
+ abort (); /* XXX We don't expect this to happen */
+ }
+
if (curl_debug_pool)
- nbdkit_debug ("get_handle: %zu", ch->i);
- return ch;
- }
- }
+ nbdkit_debug ("curl_multi_wait returned: running_handles=%d
numfds=%d",
+ running_handles, numfds);
+
+ if (numfds == 0) {
+ repeats++;
+ if (repeats > 1)
+ nbdkit_nanosleep (1, 0);
+ }
+ else {
+ repeats = 0;
+ if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
+ /* There's a command waiting. */
+ if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
+ abort ();
+ }
+ }
+ } while (!cmd);
- /* If more connections are allowed, then allocate a new handle. */
- if (curl_handles.len < connections) {
- ch = allocate_handle ();
- if (ch == NULL)
- return NULL;
- if (curl_handle_list_append (&curl_handles, ch) == -1) {
- free_handle (ch);
- return NULL;
- }
- ch->i = curl_handles.len - 1;
- ch->in_use = true;
- in_use++;
if (curl_debug_pool)
- nbdkit_debug ("get_handle: %zu", ch->i);
- return ch;
- }
+ nbdkit_debug ("curl: dispatching %s command %" PRIu64,
+ command_type_to_string (cmd->type), cmd->id);
+
+ switch (cmd->type) {
+ case STOP:
+ stop = true;
+ retire_command (cmd, SUCCEEDED);
+ break;
+
+ case GET_SIZE:
+ do_get_size (cmd);
+ break;
+
+ case PREAD:
+ do_pread (cmd);
+ break;
+
+ case PWRITE:
+ do_pwrite (cmd);
+ break;
+ }
+ } /* while (!stop) */
- /* Otherwise we have run out of connections so we must wait until
- * another thread calls put_handle.
- */
- assert (in_use == connections);
- waiting++;
- while (in_use == connections)
- pthread_cond_wait (&cond, &lock);
- waiting--;
+ if (curl_debug_pool)
+ nbdkit_debug ("curl: background thread stopped");
- goto again;
+ return NULL;
}
-/* Return the handle to the pool. */
-void
-put_handle (struct curl_handle *ch)
+/* This checks if any easy handles in the multi have
+ * finished and retires the associated commands.
+ */
+static void
+check_for_finished_handles (void)
{
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
+ CURLMsg *msg;
+ int msgs_in_queue;
+
+ while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
+ size_t i;
+ struct curl_handle *ch = NULL;
+
+ if (msg->msg == CURLMSG_DONE) {
+ /* Find this curl_handle. */
+ for (i = 0; i < curl_handles.len; ++i) {
+ if (curl_handles.ptr[i]->c == msg->easy_handle) {
+ ch = curl_handles.ptr[i];
+ curl_handle_list_remove (&curl_handles, i);
+ }
+ }
+ if (ch == NULL) abort ();
+
+ if (msg->data.result != CURLE_OK)
+ display_curl_error (ch, msg->data.result, "curl");
+
+ curl_multi_remove_handle (multi, ch->c);
+ retire_command (ch->cmd,
+ msg->data.result == CURLE_OK ? SUCCEEDED : FAILED);
+ }
+ }
+}
+
+/* Retire a command. status is SUCCEEDED | FAILED */
+static void
+retire_command (struct command *cmd, int status)
+{
if (curl_debug_pool)
- nbdkit_debug ("put_handle: %zu", ch->i);
+ nbdkit_debug ("curl: retiring %s command %" PRIu64,
+ command_type_to_string (cmd->type), cmd->id);
+
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+ cmd->status = status;
+ pthread_cond_signal (&cmd->cond);
+}
+
+/* We handle get_size synchronously, the first time we are called.
+ * We assume the size never changes (you must restart nbdkit).
+ */
+static struct curl_handle *get_size_ch;
+
+static void
+do_get_size (struct command *cmd)
+{
+ int64_t size;
+
+ if (get_size_ch == NULL) {
+ get_size_ch = allocate_handle ();
+ if (get_size_ch == NULL)
+ goto err;
+
+ if (get_content_length_accept_range (get_size_ch) == -1) {
+ free_handle (get_size_ch);
+ get_size_ch = NULL;
+ goto err;
+ }
+ }
+
+ size = get_size_ch->exportsize;
+ if (size == -1)
+ goto err;
+ * (int64_t *) cmd->ptr = size;
+ retire_command (cmd, SUCCEEDED);
+ return;
+
+err:
+ retire_command (cmd, FAILED);
+}
+
+static void
+do_pread (struct command *cmd)
+{
+ CURLMcode mc;
+
+ cmd->ch->cmd = cmd;
+
+ /* Add the handle to the multi. */
+ mc = curl_multi_add_handle (multi, cmd->ch->c);
+ if (mc != CURLM_OK) {
+ nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
+ goto err;
+ }
+
+ if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
+ goto err;
+ return;
+
+ err:
+ retire_command (cmd, FAILED);
+}
+
+static void
+do_pwrite (struct command *cmd)
+{
+ CURLMcode mc;
+
+ cmd->ch->cmd = cmd;
+
+ /* Add the handle to the multi. */
+ mc = curl_multi_add_handle (multi, cmd->ch->c);
+ if (mc != CURLM_OK) {
+ nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
+ goto err;
+ }
- ch->in_use = false;
- in_use--;
+ if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
+ goto err;
+ return;
- /* Signal the next thread which is waiting. */
- if (waiting > 0)
- pthread_cond_signal (&cond);
+ err:
+ retire_command (cmd, FAILED);
}
--
2.41.0