On 8/4/23 20:04, Richard W.M. Jones wrote:
Right, that's exactly that I got curious about.
In my opinion, writing to the self-pipe is mostly safe. Reading from the
self-pipe could be slightly polished.
Here are the POSIX pages on write() and read():
The first one (write) says that if you attempt to write at most PIPE_BUF
bytes, then writes will not be interleaved with other concurrent writes,
so in a sense the write will be atomic. (And in this case, O_NONBLOCK
only changes the behavior for the case when the buffer cannot be written
in entirety: with O_NONBLOCK clear, the writer thread will block, with
O_NONBLOCK set, the writer thread will see -1/EAGAIN.)
Now, PIPE_BUF is "variable" in a sense:
but it is greater than or equal to _POSIX_PIPE_BUF, which is 512. Our
pointers are 8 bytes in size, so the <=PIPE_BUF condition is surely
satisfied.
... The only case I see possible for a short write is the delivery of a
signal after transfer has started. *If* nbdkit catches some signal such
that the signal handler actually returns, then this could be a
(theoretical) problem, calling for xwrite() or similar.
The background worker thread picks the 'struct command *' up from the
pipe in the same event loop that it uses to process ongoing requests
on the multi handle. It takes the easy handle and adds it to the
multi handle.
The spec on read() does not seem to have the same kind of
non-interleaving / "atomicity" language that write() does. The rationale
section says:
"The standard developers considered adding atomicity requirements to a
pipe or FIFO, but recognized that due to the nature of pipes and FIFOs
there could be no guarantee of atomicity of reads of {PIPE_BUF} or any
other size that would be an aid to applications portability."
Now given that the writer side is free of interleaving (because, in the
first place: there is only a single writer!), I think we need not worry
about data corruption. However, it does feel like read() may return
fewer than 8 bytes in one go, "just because" (not only because of a
signal being delivered midway).
And that may be a problem with readiness reporting via
curl_multi_wait(); even if you get CURL_WAIT_POLLIN, the whole command
pointer may not yet be available.
Now I do think a split read is extremely unlikely, maybe even impossible
on Linux. If we choose to be pedantic, then the curl_multi_wait() loop
might want to expect "cmd" to be populated only over multiple iterations
-- like use "cmd_ptr_bytes" or something similar for tracking the size
already available, and only consider "cmd" usable when cmd_ptr_bytes
reaches sizeof cmd.
Yet another topic that comes up is visibility / ordering. Transfering a
pointer via write()/read() between threads does not seem to guarantee
ordering / visibility regarding the *pointed-to* area per spec. Pthread
mutex APIs (and C11 thread and atomics APIs) include the CPU
instructions for ensuring proper memory visibility.
*BUT* I think it would be insane for any POSIX implementation to have a
write()+read() combination that's *weaker* regarding data consistency,
(i.e., that's more racy) than mutexes. Write() and read() are
heavy-weight syscalls, so I can't imagine a publish-subscribe pattern
not working with them (i.e., the pointed-to area not "settling" until
the pointer is consumed).
Laszlo
When the easy handle work has finished, the worker thread removes it
from the multi handle and signals the nbdkit request thread to wake up
(using cmd->mutex + cmd->lock). At which point possession passes back
to the request thread which will usually free up both the command and
easy handle.
>> +++ b/plugins/curl/config.c
>
>> +++ b/plugins/curl/curl.c
>>
>> +/* Get the file size. */
>> +static int get_content_length_accept_range (struct curl_handle *ch);
>> +static bool try_fallback_GET_method (struct curl_handle *ch);
>> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
>> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
>> +
>> +static int64_t
>> +curl_get_size (void *handle)
>> +{
>> + struct curl_handle *ch;
>> + CURLcode r;
>> + long code;
>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>> + curl_off_t o;
>> +#else
>> + double d;
>> +#endif
>> + int64_t exportsize;
>> +
>> + /* Get a curl easy handle. */
>> + ch = allocate_handle ();
>> + if (ch == NULL) goto err;
>> +
>> + /* Prepare to read the headers. */
>> + if (get_content_length_accept_range (ch) == -1)
>> + goto err;
>> +
>> + /* Send the command to the worker thread and wait. */
>> + struct command cmd = {
>> + .type = EASY_HANDLE,
>> + .ch = ch,
>> + };
>> +
>> + r = send_command_and_wait (&cmd);
>> + update_times (ch->c);
>> + if (r != CURLE_OK) {
>> + display_curl_error (ch, r,
>> + "problem doing HEAD request to fetch size of URL
[%s]",
>> + url);
>> +
>> + /* Get the HTTP status code, if available. */
>> + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
>> + if (r == CURLE_OK)
>> + nbdkit_debug ("HTTP status code: %ld", code);
>> + else
>> + code = -1;
>> +
>> + /* See comment on try_fallback_GET_method below. */
>> + if (code != 403 || !try_fallback_GET_method (ch))
>> + goto err;
>> + }
>> +
>> + /* Get the content length.
>> + *
>> + * Note there is some subtlety here: For web servers using chunked
>> + * encoding, either the Content-Length header will not be present,
>> + * or if present it should be ignored. (For such servers the only
>> + * way to find out the true length would be to read all of the
>> + * content, which we don't want to do).
>> + *
>> + * Curl itself resolves this for us. It will ignore the
>> + * Content-Length header if chunked encoding is used, returning the
>> + * length as -1 which we check below (see also
>> + * curl:lib/http.c:Curl_http_size).
>> + */
>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
>> + if (r != CURLE_OK) {
>> + display_curl_error (ch, r,
>> + "could not get length of remote file [%s]",
url);
>> + goto err;
>> + }
>> +
>> + if (o == -1) {
>> + nbdkit_error ("could not get length of remote file [%s], "
>> + "is the URL correct?", url);
>> + goto err;
>> + }
>> +
>> + exportsize = o;
>> +#else
>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
>> + if (r != CURLE_OK) {
>> + display_curl_error (ch, r,
>> + "could not get length of remote file [%s]",
url);
>> + goto err;
>> + }
>> +
>> + if (d == -1) {
>> + nbdkit_error ("could not get length of remote file [%s], "
>> + "is the URL correct?", url);
>> + goto err;
>> + }
>> +
>> + exportsize = d;
>
> Does curl guarantee that the double d will contain a value assignable
> to int64_t without overflow/truncation? For particularly large sizes,
> double has insufficient precision for all possible file sizes, but I
> doubt someone is exposing such large files over HTTP.
No, I don't believe a 'double' is sufficient. This is why newer
versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T. Note
this code is just copied from the old curl plugin.
>> +#endif
>> + nbdkit_debug ("content length: %" PRIi64, exportsize);
>> +
>> + /* If this is HTTP, check that byte ranges are supported. */
>> + if (ascii_strncasecmp (url, "http://", strlen ("http://"))
== 0 ||
>> + ascii_strncasecmp (url, "https://", strlen
("https://")) == 0) {
>> + if (!ch->accept_range) {
>> + nbdkit_error ("server does not support 'range' (byte range)
requests");
>> + goto err;
>> + }
>> +
>> + nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
>> + }
>> +
>> + free_handle (ch);
>> + return exportsize;
>> +
>> + err:
>> + if (ch)
>> + free_handle (ch);
>> + return -1;
>> +}
>> +
>> +/* Get the file size and also whether the remote HTTP server
>> + * supports byte ranges.
>> + */
>> +static int
>> +get_content_length_accept_range (struct curl_handle *ch)
>> +{
>> + /* We must run the scripts if necessary and set headers in the
>> + * handle.
>> + */
>> + if (do_scripts (ch) == -1)
>> + return -1;
>> +
>> + /* Set this flag in the handle to false. The callback should set it
>> + * to true if byte ranges are supported, which we check below.
>> + */
>> + ch->accept_range = false;
>> +
>> + /* No Body, not nobody! This forces a HEAD request. */
>> + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
>> + return 0;
>> +}
>> +
>> +/* S3 servers can return 403 Forbidden for HEAD but still respond
>> + * to GET, so we give it a second chance in that case.
>> + *
https://github.com/kubevirt/containerized-data-importer/issues/2737
>> + *
>> + * This function issues a GET request with a writefunction that always
>> + * returns an error, thus effectively getting the headers but
>> + * abandoning the transfer as soon as possible after.
>> + */
>> +static bool
>> +try_fallback_GET_method (struct curl_handle *ch)
>> +{
>> + CURLcode r;
>> +
>> + nbdkit_debug ("attempting to fetch headers using GET method");
>> +
>> + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>> +
>> + struct command cmd = {
>> + .type = EASY_HANDLE,
>> + .ch = ch,
>> + };
>> +
>> + r = send_command_and_wait (&cmd);
>> + update_times (ch->c);
>> +
>> + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
>> + * (eg if the remote has zero length). Other errors might happen
>> + * but we ignore them since it is a fallback path.
>> + */
>> + return r == CURLE_OK || r == CURLE_WRITE_ERROR;
>> +}
>> +
>> +static size_t
>> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>> +{
>> + struct curl_handle *ch = opaque;
>> + size_t realsize = size * nmemb;
>> + const char *header = ptr;
>> + const char *end = header + realsize;
>> + const char *accept_ranges = "accept-ranges:";
>> + const char *bytes = "bytes";
>> +
>> + if (realsize >= strlen (accept_ranges) &&
>> + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) {
>> + const char *p = strchr (header, ':') + 1;
>> +
>> + /* Skip whitespace between the header name and value. */
>> + while (p < end && *p && ascii_isspace (*p))
>
> Technically, '*p && ascii_isspace (*p)' can be shortened to
> 'ascii_isspace (*p)', since the NUL byte is not ascii space. I don't
> know if the compiler is smart enough to make that optimization on your
> behalf.
Ah indeed.
>> + p++;
>> +
>> + if (end - p >= strlen (bytes)
>> + && strncmp (p, bytes, strlen (bytes)) == 0) {
>> + /* Check that there is nothing but whitespace after the value. */
>> + p += strlen (bytes);
>> + while (p < end && *p && ascii_isspace (*p))
>
> Another spot of the same.
>
>> + p++;
>> +
>> + if (p == end || !*p)
>> + ch->accept_range = true;
>> + }
>> + }
>> +
>> + return realsize;
>> +}
>> +
>> +static size_t
>> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>> +{
>> +#ifdef CURL_WRITEFUNC_ERROR
>> + return CURL_WRITEFUNC_ERROR;
>> +#else
>> + return 0; /* in older curl, any size < requested will also be an error */
>> +#endif
>> +}
>> +
>> /* Read data from the remote server. */
>> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
>> +
>> static int
>> curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
>> {
>> CURLcode r;
>> + struct curl_handle *ch;
>> char range[128];
>>
>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>> - if (ch == NULL)
>> - return -1;
>> + /* Get a curl easy handle. */
>> + ch = allocate_handle ();
>> + if (ch == NULL) goto err;
>>
>> /* Run the scripts if necessary and set headers in the handle. */
>> - if (do_scripts (ch) == -1) return -1;
>> + if (do_scripts (ch) == -1) goto err;
>>
>> /* Tell the write_cb where we want the data to be written. write_cb
>> * will update this if the data comes in multiple sections.
>> */
>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>> ch->write_buf = buf;
>> ch->write_count = count;
>>
>> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count,
uint64_t offset)
>> offset, offset + count);
>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>
>> - /* The assumption here is that curl will look after timeouts. */
>> - r = curl_easy_perform (ch->c);
>> + /* Send the command to the worker thread and wait. */
>> + struct command cmd = {
>> + .type = EASY_HANDLE,
>> + .ch = ch,
>> + };
>> +
>> + r = send_command_and_wait (&cmd);
>> if (r != CURLE_OK) {
>> - display_curl_error (ch, r, "pread: curl_easy_perform");
>> - return -1;
>> + display_curl_error (ch, r, "pread");
>> + goto err;
>> }
>> update_times (ch->c);
>>
>> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count,
uint64_t offset)
>> /* As far as I understand the cURL API, this should never happen. */
>> assert (ch->write_count == 0);
>>
>> + free_handle (ch);
>> return 0;
>> +
>> + err:
>> + if (ch)
>> + free_handle (ch);
>> + return -1;
>> +}
>> +
>> +/* NB: The terminology used by libcurl is confusing!
>> + *
>> + * WRITEFUNCTION / write_cb is used when reading from the remote server
>> + * READFUNCTION / read_cb is used when writing to the remote server.
>> + *
>> + * We use the same terminology as libcurl here.
>> + */
>> +static size_t
>> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>> +{
>> + struct curl_handle *ch = opaque;
>> + size_t orig_realsize = size * nmemb;
>> + size_t realsize = orig_realsize;
>
> Do we have to worry about overflow when compiling on 32-bit machines?
> Asked differently, should we be using off_t instead of size_t in any
> of this code? Thankfully, for now, we know NBD .pread and .pwrite
> requests are capped at 64M, so I think you're okay (we aren't ever
> going to ask curl for gigabytes in one request),
It's a good question ... I suspect that even if we requested it, web
servers probably wouldn't want to serve gigabytes of data in a range
request, but as you point out we shouldn't ever request it right now.
> but maybe a comment
> or assert() is worth it?
I'll add a comment, but could we do this with one of the overflow
macros? I'm not sure ...
>> +
>> + assert (ch->write_buf);
>> +
>> + /* Don't read more than the requested amount of data, even if the
>> + * server or libcurl sends more.
>> + */
>> + if (realsize > ch->write_count)
>> + realsize = ch->write_count;
>> +
>> + memcpy (ch->write_buf, ptr, realsize);
>> +
>> + ch->write_count -= realsize;
>> + ch->write_buf += realsize;
>> +
>> + return orig_realsize;
>
> [1]
>
>> }
>>
>> /* Write data to the remote server. */
>> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
>> +
>> static int
>> curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
>> {
>> CURLcode r;
>> + struct curl_handle *ch;
>> char range[128];
>>
>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>> - if (ch == NULL)
>> - return -1;
>> + /* Get a curl easy handle. */
>> + ch = allocate_handle ();
>> + if (ch == NULL) goto err;
>>
>> /* Run the scripts if necessary and set headers in the handle. */
>> - if (do_scripts (ch) == -1) return -1;
>> + if (do_scripts (ch) == -1) goto err;
>>
>> /* Tell the read_cb where we want the data to be read from. read_cb
>> * will update this if the data comes in multiple sections.
>> */
>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
>> ch->read_buf = buf;
>> ch->read_count = count;
>>
>> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count,
uint64_t offset)
>> offset, offset + count);
>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>
>> - /* The assumption here is that curl will look after timeouts. */
>> - r = curl_easy_perform (ch->c);
>> + /* Send the command to the worker thread and wait. */
>> + struct command cmd = {
>> + .type = EASY_HANDLE,
>> + .ch = ch,
>> + };
>> +
>> + r = send_command_and_wait (&cmd);
>> if (r != CURLE_OK) {
>> - display_curl_error (ch, r, "pwrite: curl_easy_perform");
>> - return -1;
>> + display_curl_error (ch, r, "pwrite");
>> + goto err;
>> }
>> update_times (ch->c);
>>
>> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count,
uint64_t offset)
>> /* As far as I understand the cURL API, this should never happen. */
>> assert (ch->read_count == 0);
>>
>> + free_handle (ch);
>> return 0;
>> +
>> + err:
>> + if (ch)
>> + free_handle (ch);
>> + return -1;
>> +}
>> +
>> +static size_t
>> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>> +{
>> + struct curl_handle *ch = opaque;
>> + size_t realsize = size * nmemb;
>> +
>> + assert (ch->read_buf);
>> + if (realsize > ch->read_count)
>> + realsize = ch->read_count;
>> +
>> + memcpy (ptr, ch->read_buf, realsize);
>> +
>> + ch->read_count -= realsize;
>> + ch->read_buf += realsize;
>> +
>> + return realsize;
>
> Why does write_cb in [1] above return orig_realsize, but read_cb
> returns the potentially modified realsize?
It's a good question (this code was copied from the old plugin which
was working for years). It definitely works now. Note that writes in
this plugin are probably never used. They require a web server that
supports "Range PUTs" which is, probably, not any server in existence.
>> }
>>
>> static struct nbdkit_plugin plugin = {
>> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
>> index eb2d330e1..2974cda3f 100644
>> --- a/plugins/curl/pool.c
>> +++ b/plugins/curl/pool.c
>> @@ -30,11 +30,29 @@
>> * SUCH DAMAGE.
>> */
>>
>> -/* Curl handle pool.
>> +/* Worker thread which processes the curl multi interface.
>> *
>> - * To get a libcurl handle, call get_handle(). When you hold the
>> - * handle, it is yours exclusively to use. After you have finished
>> - * with the handle, put it back into the pool by calling put_handle().
>> + * The main nbdkit threads (see curl.c) create curl easy handles
>> + * initialized with the work they want to carry out. Note there is
>> + * one easy handle per task (eg. per pread/pwrite request). The easy
>> + * handles are not reused.
>> + *
>> + * The commands + optional easy handle are submitted to the worker
>> + * thread over a self-pipe (it's easy to use a pipe here because the
>> + * way curl multi works is it can listen on an extra fd, but not on
>> + * anything else like a pthread condition). The curl multi performs
>> + * the work of the outstanding easy handles.
>> + *
>> + * When an easy handle finishes work or errors, we retire the command
>> + * by signalling back to the waiting nbdkit thread using a pthread
>> + * condition.
>> + *
>> + * In my experiments, we're almost always I/O bound so I haven't seen
>> + * any strong need to use more than one curl multi / worker thread,
>> + * although it would be possible to add more in future.
>> + *
>> + * See also this extremely useful thread:
>> + *
https://curl.se/mail/lib-2019-03/0100.html
>
> Very useful comment (and link).
>
>> */
>>
>> #include <config.h>
>> @@ -45,9 +63,19 @@
>> #include <stdint.h>
>> #include <inttypes.h>
>> #include <string.h>
>> +#include <unistd.h>
>> #include <assert.h>
>> #include <pthread.h>
>>
>> +#ifdef HAVE_STDATOMIC_H
>> +#include <stdatomic.h>
>> +#else
>> +/* Some old platforms lack atomic types, but 32 bit ints are usually
>> + * "atomic enough".
>> + */
>> +#define _Atomic /**/
>> +#endif
>> +
>> #include <curl/curl.h>
>>
>> #include <nbdkit-plugin.h>
>> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
>>
>> unsigned connections = 4;
>>
>> -/* This lock protects access to the curl_handles vector below. */
>> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>> +/* Pipe used to notify background thread that a command is pending in
>> + * the queue. A pointer to the 'struct command' is sent over the
>> + * pipe.
>> + */
>> +static int self_pipe[2] = { -1, -1 };
>>
>> -/* List of curl handles. This is allocated dynamically as more
>> - * handles are requested. Currently it does not shrink. It may grow
>> - * up to 'connections' in length.
>> +/* The curl multi handle. */
>> +static CURLM *multi;
>> +
>> +/* List of running easy handles. We only need to maintain this so we
>> + * can remove them from the multi handle when cleaning up.
>> */
>> DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
>> static curl_handle_list curl_handles = empty_vector;
>>
>> -/* The condition is used when the curl handles vector is full and
>> - * we're waiting for a thread to put_handle.
>> - */
>> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
>> -static size_t in_use = 0, waiting = 0;
>> +static const char *
>> +command_type_to_string (enum command_type type)
>> +{
>> + switch (type) {
>> + case EASY_HANDLE: return "EASY_HANDLE";
>> + case STOP: return "STOP";
>> + default: abort ();
>> + }
>> +}
>>
>> int
>> pool_get_ready (void)
>> {
>> + multi = curl_multi_init ();
>> + if (multi == NULL) {
>> + nbdkit_error ("curl_multi_init failed: %m");
>> + return -1;
>> + }
>> +
>> return 0;
>> }
>>
>> +/* Start and stop the background thread. */
>> +static pthread_t thread;
>> +static bool thread_running;
>> +static void *pool_worker (void *);
>> +
>> int
>> pool_after_fork (void)
>> {
>> + int err;
>> +
>> + if (pipe (self_pipe) == -1) {
>> + nbdkit_error ("pipe: %m");
>> + return -1;
>> + }
>> +
>> + /* Start the pool background thread where all the curl work is done. */
>> + err = pthread_create (&thread, NULL, pool_worker, NULL);
>> + if (err != 0) {
>> + errno = err;
>> + nbdkit_error ("pthread_create: %m");
>> + return -1;
>> + }
>> + thread_running = true;
>> +
>> return 0;
>> }
>>
>> -/* Close and free all handles in the pool. */
>> +/* Unload the background thread. */
>> void
>> pool_unload (void)
>> {
>> - size_t i;
>> + if (thread_running) {
>> + /* Stop the background thread. */
>> + struct command cmd = { .type = STOP };
>> + send_command_and_wait (&cmd);
>> + pthread_join (thread, NULL);
>> + thread_running = false;
>> + }
>>
>> - if (curl_debug_pool)
>> - nbdkit_debug ("unload_pool: number of curl handles allocated:
%zu",
>> - curl_handles.len);
>> + if (self_pipe[0] >= 0) {
>> + close (self_pipe[0]);
>> + self_pipe[0] = -1;
>> + }
>> + if (self_pipe[1] >= 0) {
>> + close (self_pipe[1]);
>> + self_pipe[1] = -1;
>> + }
>>
>> - for (i = 0; i < curl_handles.len; ++i)
>> - free_handle (curl_handles.ptr[i]);
>> - curl_handle_list_reset (&curl_handles);
>> + if (multi) {
>> + size_t i;
>> +
>> + /* Remove and free any easy handles in the multi. */
>> + for (i = 0; i < curl_handles.len; ++i) {
>> + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
>> + free_handle (curl_handles.ptr[i]);
>> + }
>> +
>> + curl_multi_cleanup (multi);
>> + multi = NULL;
>> + }
>> }
>>
>> -/* Get a handle from the pool.
>> - *
>> - * It is owned exclusively by the caller until they call put_handle.
>> +/* Command queue. */
>> +static _Atomic uint64_t id; /* next command ID */
>> +
>> +/* Send command to the background thread and wait for completion.
>> + * This is only called by one of the nbdkit threads.
>> */
>> -struct curl_handle *
>> -get_handle (void)
>> +CURLcode
>> +send_command_and_wait (struct command *cmd)
>> {
>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>> - size_t i;
>> - struct curl_handle *ch;
>> -
>> - again:
>> - /* Look for a handle which is not in_use. */
>> - for (i = 0; i < curl_handles.len; ++i) {
>> - ch = curl_handles.ptr[i];
>> - if (!ch->in_use) {
>> - ch->in_use = true;
>> - in_use++;
>> + cmd->id = id++;
>> +
>> + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
>> + * indicate that the command has not yet been completed and status
>> + * set.
>> + */
>> + cmd->status = -1;
>> +
>> + /* This will be used to signal command completion back to us. */
>> + pthread_mutex_init (&cmd->mutex, NULL);
>> + pthread_cond_init (&cmd->cond, NULL);
>> +
>> + /* Send the command to the background thread. */
>> + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
>> + abort ();
>> +
>> + /* Wait for the command to be completed by the background thread. */
>> + {
>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>> + while (cmd->status == -1) /* for -1, see above */
>> + pthread_cond_wait (&cmd->cond, &cmd->mutex);
>> + }
>> +
>> + pthread_mutex_destroy (&cmd->mutex);
>> + pthread_cond_destroy (&cmd->cond);
>> +
>> + /* Note the main thread must call nbdkit_error on error! */
>> + return cmd->status;
>> +}
>> +
>> +/* The background thread. */
>> +static void check_for_finished_handles (void);
>> +static void retire_command (struct command *cmd, CURLcode code);
>> +static void do_easy_handle (struct command *cmd);
>> +
>> +static void *
>> +pool_worker (void *vp)
>> +{
>> + bool stop = false;
>> +
>> + if (curl_debug_pool)
>> + nbdkit_debug ("curl: background thread started");
>> +
>> + while (!stop) {
>> + struct command *cmd = NULL;
>> + struct curl_waitfd extra_fds[1] =
>> + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
>> + CURLMcode mc;
>> + int numfds, running_handles, repeats = 0;
>> +
>> + do {
>> + /* Process the multi handle. */
>> + mc = curl_multi_perform (multi, &running_handles);
>> + if (mc != CURLM_OK) {
>> + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror
(mc));
>
> Since nbdkit_error() stores its string in thread-local storage, is
> there anything that ever extracts this error over to the nbdkit thread
> that issued the original request to the worker thread?...
>
>> + abort (); /* XXX We don't expect this to happen */
>
> ...Then again, if we abort, it doesn't matter.
I was unclear what to do here. The final code does:
while (!stop) {
...
cmd = process_multi_handle ();
if (cmd == NULL)
continue; /* or die?? */
with process_multi_handle still calling nbdkit_error. I felt it might
be better to keep trying than just abort, as presumably some (most?)
errors are transient.
nbdkit_error will ensure the error message is written out.
>> + }
>
>> +
>> + check_for_finished_handles ();
>> +
>> + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
>> + if (mc != CURLM_OK) {
>> + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror
(mc));
>> + abort (); /* XXX We don't expect this to happen */
>> + }
>> +
>> if (curl_debug_pool)
>> - nbdkit_debug ("get_handle: %zu", ch->i);
>> - return ch;
>> - }
>> - }
>> + nbdkit_debug ("curl_multi_wait returned: running_handles=%d
numfds=%d",
>> + running_handles, numfds);
>> +
>> + if (numfds == 0) {
>> + repeats++;
>> + if (repeats > 1)
>> + nbdkit_nanosleep (1, 0);
>> + }
>> + else {
>> + repeats = 0;
>> + if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
>> + /* There's a command waiting. */
>> + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
>> + abort ();
>> + }
>> + }
>> + } while (!cmd);
>>
>> - /* If more connections are allowed, then allocate a new handle. */
>> - if (curl_handles.len < connections) {
>> - ch = allocate_handle ();
>> - if (ch == NULL)
>> - return NULL;
>> - if (curl_handle_list_append (&curl_handles, ch) == -1) {
>> - free_handle (ch);
>> - return NULL;
>> - }
>> - ch->i = curl_handles.len - 1;
>> - ch->in_use = true;
>> - in_use++;
>> if (curl_debug_pool)
>> - nbdkit_debug ("get_handle: %zu", ch->i);
>> - return ch;
>> - }
>> + nbdkit_debug ("curl: dispatching %s command %" PRIu64,
>> + command_type_to_string (cmd->type), cmd->id);
>> +
>> + switch (cmd->type) {
>> + case STOP:
>> + stop = true;
>> + retire_command (cmd, CURLE_OK);
>> + break;
>>
>> - /* Otherwise we have run out of connections so we must wait until
>> - * another thread calls put_handle.
>> - */
>> - assert (in_use == connections);
>> - waiting++;
>> - while (in_use == connections)
>> - pthread_cond_wait (&cond, &lock);
>> - waiting--;
>> + case EASY_HANDLE:
>> + do_easy_handle (cmd);
>> + break;
>> + }
>> + } /* while (!stop) */
>>
>> - goto again;
>> + if (curl_debug_pool)
>> + nbdkit_debug ("curl: background thread stopped");
>> +
>> + return NULL;
>> }
>>
>> -/* Return the handle to the pool. */
>> -void
>> -put_handle (struct curl_handle *ch)
>> +/* This checks if any easy handles in the multi have
>> + * finished and retires the associated commands.
>> + */
>> +static void
>> +check_for_finished_handles (void)
>> {
>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>> + CURLMsg *msg;
>> + int msgs_in_queue;
>> +
>> + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
>> + size_t i;
>> + struct curl_handle *ch = NULL;
>> +
>> + if (msg->msg == CURLMSG_DONE) {
>> + /* Find this curl_handle. */
>> + for (i = 0; i < curl_handles.len; ++i) {
>> + if (curl_handles.ptr[i]->c == msg->easy_handle) {
>> + ch = curl_handles.ptr[i];
>> + curl_handle_list_remove (&curl_handles, i);
>> + }
>> + }
>> + if (ch == NULL) abort ();
>> + curl_multi_remove_handle (multi, ch->c);
>> +
>> + retire_command (ch->cmd, msg->data.result);
>> + }
>> + }
>> +}
>>
>> +/* Retire a command. status is a CURLcode. */
>> +static void
>> +retire_command (struct command *cmd, CURLcode status)
>> +{
>> if (curl_debug_pool)
>> - nbdkit_debug ("put_handle: %zu", ch->i);
>> + nbdkit_debug ("curl: retiring %s command %" PRIu64,
>> + command_type_to_string (cmd->type), cmd->id);
>> +
>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>> + cmd->status = status;
>> + pthread_cond_signal (&cmd->cond);
>> +}
>> +
>> +static void
>> +do_easy_handle (struct command *cmd)
>> +{
>> + CURLMcode mc;
>> +
>> + cmd->ch->cmd = cmd;
>> +
>> + /* Add the handle to the multi. */
>> + mc = curl_multi_add_handle (multi, cmd->ch->c);
>> + if (mc != CURLM_OK) {
>> + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror
(mc));
>> + goto err;
>> + }
>>
>> - ch->in_use = false;
>> - in_use--;
>> + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
>> + goto err;
>> + return;
>>
>> - /* Signal the next thread which is waiting. */
>> - if (waiting > 0)
>> - pthread_cond_signal (&cond);
>> + err:
>> + retire_command (cmd, CURLE_OUT_OF_MEMORY);
>> }
>> --
>> 2.41.0
>
> Overall looks nice, and I learned more about curl in the process.
For me, too much :-(
Thanks,
Rich.