On 8/4/23 20:04, Richard W.M. Jones wrote:
> On Fri, Aug 04, 2023 at 11:38:03AM -0500, Eric Blake wrote:
>> On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:
>>> See the comment at the top of plugins/curl/pool.c for general
>>> information about how this works.
>>>
>>> This makes a very large difference to performance over the previous
>>> implementation. Note for the tests below I also applied the next
>>> commit changing the behaviour of the connections parameter.
>>>
>>> Using this test case:
>>>
>>> $
uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-a...
>>> $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri
null'
>>>
>>> The times are as follows:
>>>
>>> multi, connections=64 21.5s
>>> multi, connections=32 30.2s
>>> multi, connections=16 56.0s
>>> before this commit 166s
>>
>> Awesome performance improvements! As painful as this series has been
>> for you to write and debug, it is showing its worth.
>>
>>> ---
>>> plugins/curl/curldefs.h | 35 ++--
>>> plugins/curl/config.c | 246 ---------------------------
>>> plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++-----
>>> plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++---------
>>> 4 files changed, 616 insertions(+), 377 deletions(-)
>>
>> Finally taking time to review this, even though it is already in-tree.
>>
>>> @@ -98,8 +88,30 @@ struct curl_handle {
>>> const char *read_buf;
>>> uint32_t read_count;
>>>
>>> + /* This field is used by curl_get_size. */
>>> + bool accept_range;
>>> +
>>> /* Used by scripts.c */
>>> struct curl_slist *headers_copy;
>>> +
>>> + /* Used by pool.c */
>>> + struct command *cmd;
>>> +};
>>> +
>>> +/* Asynchronous commands that can be sent to the pool thread. */
>>> +enum command_type { EASY_HANDLE, STOP };
>>> +struct command {
>>> + /* These fields are set by the caller. */
>>> + enum command_type type; /* command */
>>> + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */
>>> +
>>> + /* This field is set to a unique value by send_command_and_wait. */
>>> + uint64_t id; /* serial number */
>>> +
>>> + /* These fields are used to signal back that the command finished. */
>>> + pthread_mutex_t mutex; /* completion mutex */
>>> + pthread_cond_t cond; /* completion condition */
>>> + CURLcode status; /* status code (CURLE_OK = succeeded) */
>>> };
>>
>> Makes sense. The two types are mutually recursive (curl_handle
>> includes a struct command *; command includes a struct curl_handle *);
>> hopefully you have proper locking when altering multiple objects to
>> adjust how they point to one another.
>
> Actually locking is not needed. Let me document it through ...
>
> We create both the curl easy handle and the associated EASY_HANDLE
> command in the nbdkit thread that gets the request, eg. in the curl
> .pread method. That of course requires no locking.
>
> There is a single background worker thread.
>
> A "self pipe" passes pointers to 'struct command *' to this
worker
> thread simple by writing the 8 byte pointer onto the pipe (hopefully
> atomic ...) The nbdkit request thread then blocks on the mutex/cond
> in the command handle.
Right, that's exactly that I got curious about.
In my opinion, writing to the self-pipe is mostly safe. Reading from the
self-pipe could be slightly polished.
Here are the POSIX pages on write() and read():
https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
https://pubs.opengroup.org/onlinepubs/9699919799/functions/read.html
The first one (write) says that if you attempt to write at most PIPE_BUF
bytes, then writes will not be interleaved with other concurrent writes,
so in a sense the write will be atomic. (And in this case, O_NONBLOCK
only changes the behavior for the case when the buffer cannot be written
in entirety: with O_NONBLOCK clear, the writer thread will block, with
O_NONBLOCK set, the writer thread will see -1/EAGAIN.)
Now, PIPE_BUF is "variable" in a sense:
https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/limits.h.html
but it is greater than or equal to _POSIX_PIPE_BUF, which is 512. Our
pointers are 8 bytes in size, so the <=PIPE_BUF condition is surely
satisfied.
... The only case I see possible for a short write is the delivery of a
signal after transfer has started. *If* nbdkit catches some signal such
that the signal handler actually returns, then this could be a
(theoretical) problem, calling for xwrite() or similar.
>
> The background worker thread picks the 'struct command *' up from the
> pipe in the same event loop that it uses to process ongoing requests
> on the multi handle. It takes the easy handle and adds it to the
> multi handle.
The spec on read() does not seem to have the same kind of
non-interleaving / "atomicity" language that write() does. The rationale
section says:
"The standard developers considered adding atomicity requirements to a
pipe or FIFO, but recognized that due to the nature of pipes and FIFOs
there could be no guarantee of atomicity of reads of {PIPE_BUF} or any
other size that would be an aid to applications portability."
Now given that the writer side is free of interleaving (because, in the
first place: there is only a single writer!), I think we need not worry
about data corruption. However, it does feel like read() may return
fewer than 8 bytes in one go, "just because" (not only because of a
signal being delivered midway).
So there are actually multiple writer threads, but only a single
reader thread. Also I should say that I didn't set O_NONBLOCK
(actually I completely forgot), but that might be a benefit in this
case since it makes it less likely for the 8 byte read to be broken up.
And that may be a problem with readiness reporting via
curl_multi_wait(); even if you get CURL_WAIT_POLLIN, the whole command
pointer may not yet be available.
Now I do think a split read is extremely unlikely, maybe even impossible
on Linux. If we choose to be pedantic, then the curl_multi_wait() loop
might want to expect "cmd" to be populated only over multiple iterations
-- like use "cmd_ptr_bytes" or something similar for tracking the size
already available, and only consider "cmd" usable when cmd_ptr_bytes
reaches sizeof cmd.
Yet another topic that comes up is visibility / ordering. Transfering a
pointer via write()/read() between threads does not seem to guarantee
ordering / visibility regarding the *pointed-to* area per spec. Pthread
mutex APIs (and C11 thread and atomics APIs) include the CPU
instructions for ensuring proper memory visibility.
*BUT* I think it would be insane for any POSIX implementation to have a
write()+read() combination that's *weaker* regarding data consistency,
(i.e., that's more racy) than mutexes. Write() and read() are
heavy-weight syscalls, so I can't imagine a publish-subscribe pattern
not working with them (i.e., the pointed-to area not "settling" until
the pointer is consumed).
Yup, it seems unlikely, but maybe it could happen on architectures
with weaker store ordering like Arm? I wonder if adding a memory
barrier before the write would be a good idea?
Thanks,
Rich.
Laszlo
>
> When the easy handle work has finished, the worker thread removes it
> from the multi handle and signals the nbdkit request thread to wake up
> (using cmd->mutex + cmd->lock). At which point possession passes back
> to the request thread which will usually free up both the command and
> easy handle.
>
>>> +++ b/plugins/curl/config.c
>>
>>> +++ b/plugins/curl/curl.c
>>>
>>> +/* Get the file size. */
>>> +static int get_content_length_accept_range (struct curl_handle *ch);
>>> +static bool try_fallback_GET_method (struct curl_handle *ch);
>>> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void
*opaque);
>>> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void
*opaque);
>>> +
>>> +static int64_t
>>> +curl_get_size (void *handle)
>>> +{
>>> + struct curl_handle *ch;
>>> + CURLcode r;
>>> + long code;
>>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>>> + curl_off_t o;
>>> +#else
>>> + double d;
>>> +#endif
>>> + int64_t exportsize;
>>> +
>>> + /* Get a curl easy handle. */
>>> + ch = allocate_handle ();
>>> + if (ch == NULL) goto err;
>>> +
>>> + /* Prepare to read the headers. */
>>> + if (get_content_length_accept_range (ch) == -1)
>>> + goto err;
>>> +
>>> + /* Send the command to the worker thread and wait. */
>>> + struct command cmd = {
>>> + .type = EASY_HANDLE,
>>> + .ch = ch,
>>> + };
>>> +
>>> + r = send_command_and_wait (&cmd);
>>> + update_times (ch->c);
>>> + if (r != CURLE_OK) {
>>> + display_curl_error (ch, r,
>>> + "problem doing HEAD request to fetch size of
URL [%s]",
>>> + url);
>>> +
>>> + /* Get the HTTP status code, if available. */
>>> + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
>>> + if (r == CURLE_OK)
>>> + nbdkit_debug ("HTTP status code: %ld", code);
>>> + else
>>> + code = -1;
>>> +
>>> + /* See comment on try_fallback_GET_method below. */
>>> + if (code != 403 || !try_fallback_GET_method (ch))
>>> + goto err;
>>> + }
>>> +
>>> + /* Get the content length.
>>> + *
>>> + * Note there is some subtlety here: For web servers using chunked
>>> + * encoding, either the Content-Length header will not be present,
>>> + * or if present it should be ignored. (For such servers the only
>>> + * way to find out the true length would be to read all of the
>>> + * content, which we don't want to do).
>>> + *
>>> + * Curl itself resolves this for us. It will ignore the
>>> + * Content-Length header if chunked encoding is used, returning the
>>> + * length as -1 which we check below (see also
>>> + * curl:lib/http.c:Curl_http_size).
>>> + */
>>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
&o);
>>> + if (r != CURLE_OK) {
>>> + display_curl_error (ch, r,
>>> + "could not get length of remote file
[%s]", url);
>>> + goto err;
>>> + }
>>> +
>>> + if (o == -1) {
>>> + nbdkit_error ("could not get length of remote file [%s], "
>>> + "is the URL correct?", url);
>>> + goto err;
>>> + }
>>> +
>>> + exportsize = o;
>>> +#else
>>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD,
&d);
>>> + if (r != CURLE_OK) {
>>> + display_curl_error (ch, r,
>>> + "could not get length of remote file
[%s]", url);
>>> + goto err;
>>> + }
>>> +
>>> + if (d == -1) {
>>> + nbdkit_error ("could not get length of remote file [%s], "
>>> + "is the URL correct?", url);
>>> + goto err;
>>> + }
>>> +
>>> + exportsize = d;
>>
>> Does curl guarantee that the double d will contain a value assignable
>> to int64_t without overflow/truncation? For particularly large sizes,
>> double has insufficient precision for all possible file sizes, but I
>> doubt someone is exposing such large files over HTTP.
>
> No, I don't believe a 'double' is sufficient. This is why newer
> versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T. Note
> this code is just copied from the old curl plugin.
>
>>> +#endif
>>> + nbdkit_debug ("content length: %" PRIi64, exportsize);
>>> +
>>> + /* If this is HTTP, check that byte ranges are supported. */
>>> + if (ascii_strncasecmp (url, "http://", strlen
("http://")) == 0 ||
>>> + ascii_strncasecmp (url, "https://", strlen
("https://")) == 0) {
>>> + if (!ch->accept_range) {
>>> + nbdkit_error ("server does not support 'range' (byte
range) requests");
>>> + goto err;
>>> + }
>>> +
>>> + nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
>>> + }
>>> +
>>> + free_handle (ch);
>>> + return exportsize;
>>> +
>>> + err:
>>> + if (ch)
>>> + free_handle (ch);
>>> + return -1;
>>> +}
>>> +
>>> +/* Get the file size and also whether the remote HTTP server
>>> + * supports byte ranges.
>>> + */
>>> +static int
>>> +get_content_length_accept_range (struct curl_handle *ch)
>>> +{
>>> + /* We must run the scripts if necessary and set headers in the
>>> + * handle.
>>> + */
>>> + if (do_scripts (ch) == -1)
>>> + return -1;
>>> +
>>> + /* Set this flag in the handle to false. The callback should set it
>>> + * to true if byte ranges are supported, which we check below.
>>> + */
>>> + ch->accept_range = false;
>>> +
>>> + /* No Body, not nobody! This forces a HEAD request. */
>>> + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
>>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
>>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
>>> + return 0;
>>> +}
>>> +
>>> +/* S3 servers can return 403 Forbidden for HEAD but still respond
>>> + * to GET, so we give it a second chance in that case.
>>> + *
https://github.com/kubevirt/containerized-data-importer/issues/2737
>>> + *
>>> + * This function issues a GET request with a writefunction that always
>>> + * returns an error, thus effectively getting the headers but
>>> + * abandoning the transfer as soon as possible after.
>>> + */
>>> +static bool
>>> +try_fallback_GET_method (struct curl_handle *ch)
>>> +{
>>> + CURLcode r;
>>> +
>>> + nbdkit_debug ("attempting to fetch headers using GET method");
>>> +
>>> + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>>> +
>>> + struct command cmd = {
>>> + .type = EASY_HANDLE,
>>> + .ch = ch,
>>> + };
>>> +
>>> + r = send_command_and_wait (&cmd);
>>> + update_times (ch->c);
>>> +
>>> + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
>>> + * (eg if the remote has zero length). Other errors might happen
>>> + * but we ignore them since it is a fallback path.
>>> + */
>>> + return r == CURLE_OK || r == CURLE_WRITE_ERROR;
>>> +}
>>> +
>>> +static size_t
>>> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> + struct curl_handle *ch = opaque;
>>> + size_t realsize = size * nmemb;
>>> + const char *header = ptr;
>>> + const char *end = header + realsize;
>>> + const char *accept_ranges = "accept-ranges:";
>>> + const char *bytes = "bytes";
>>> +
>>> + if (realsize >= strlen (accept_ranges) &&
>>> + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) ==
0) {
>>> + const char *p = strchr (header, ':') + 1;
>>> +
>>> + /* Skip whitespace between the header name and value. */
>>> + while (p < end && *p && ascii_isspace (*p))
>>
>> Technically, '*p && ascii_isspace (*p)' can be shortened to
>> 'ascii_isspace (*p)', since the NUL byte is not ascii space. I
don't
>> know if the compiler is smart enough to make that optimization on your
>> behalf.
>
> Ah indeed.
>
>>> + p++;
>>> +
>>> + if (end - p >= strlen (bytes)
>>> + && strncmp (p, bytes, strlen (bytes)) == 0) {
>>> + /* Check that there is nothing but whitespace after the value. */
>>> + p += strlen (bytes);
>>> + while (p < end && *p && ascii_isspace (*p))
>>
>> Another spot of the same.
>>
>>> + p++;
>>> +
>>> + if (p == end || !*p)
>>> + ch->accept_range = true;
>>> + }
>>> + }
>>> +
>>> + return realsize;
>>> +}
>>> +
>>> +static size_t
>>> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> +#ifdef CURL_WRITEFUNC_ERROR
>>> + return CURL_WRITEFUNC_ERROR;
>>> +#else
>>> + return 0; /* in older curl, any size < requested will also be an error
*/
>>> +#endif
>>> +}
>>> +
>>> /* Read data from the remote server. */
>>> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void
*opaque);
>>> +
>>> static int
>>> curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
>>> {
>>> CURLcode r;
>>> + struct curl_handle *ch;
>>> char range[128];
>>>
>>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>>> - if (ch == NULL)
>>> - return -1;
>>> + /* Get a curl easy handle. */
>>> + ch = allocate_handle ();
>>> + if (ch == NULL) goto err;
>>>
>>> /* Run the scripts if necessary and set headers in the handle. */
>>> - if (do_scripts (ch) == -1) return -1;
>>> + if (do_scripts (ch) == -1) goto err;
>>>
>>> /* Tell the write_cb where we want the data to be written. write_cb
>>> * will update this if the data comes in multiple sections.
>>> */
>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>>> ch->write_buf = buf;
>>> ch->write_count = count;
>>>
>>> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count,
uint64_t offset)
>>> offset, offset + count);
>>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>>
>>> - /* The assumption here is that curl will look after timeouts. */
>>> - r = curl_easy_perform (ch->c);
>>> + /* Send the command to the worker thread and wait. */
>>> + struct command cmd = {
>>> + .type = EASY_HANDLE,
>>> + .ch = ch,
>>> + };
>>> +
>>> + r = send_command_and_wait (&cmd);
>>> if (r != CURLE_OK) {
>>> - display_curl_error (ch, r, "pread: curl_easy_perform");
>>> - return -1;
>>> + display_curl_error (ch, r, "pread");
>>> + goto err;
>>> }
>>> update_times (ch->c);
>>>
>>> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count,
uint64_t offset)
>>> /* As far as I understand the cURL API, this should never happen. */
>>> assert (ch->write_count == 0);
>>>
>>> + free_handle (ch);
>>> return 0;
>>> +
>>> + err:
>>> + if (ch)
>>> + free_handle (ch);
>>> + return -1;
>>> +}
>>> +
>>> +/* NB: The terminology used by libcurl is confusing!
>>> + *
>>> + * WRITEFUNCTION / write_cb is used when reading from the remote server
>>> + * READFUNCTION / read_cb is used when writing to the remote server.
>>> + *
>>> + * We use the same terminology as libcurl here.
>>> + */
>>> +static size_t
>>> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> + struct curl_handle *ch = opaque;
>>> + size_t orig_realsize = size * nmemb;
>>> + size_t realsize = orig_realsize;
>>
>> Do we have to worry about overflow when compiling on 32-bit machines?
>> Asked differently, should we be using off_t instead of size_t in any
>> of this code? Thankfully, for now, we know NBD .pread and .pwrite
>> requests are capped at 64M, so I think you're okay (we aren't ever
>> going to ask curl for gigabytes in one request),
>
> It's a good question ... I suspect that even if we requested it, web
> servers probably wouldn't want to serve gigabytes of data in a range
> request, but as you point out we shouldn't ever request it right now.
>
>> but maybe a comment
>> or assert() is worth it?
>
> I'll add a comment, but could we do this with one of the overflow
> macros? I'm not sure ...
>
>>> +
>>> + assert (ch->write_buf);
>>> +
>>> + /* Don't read more than the requested amount of data, even if the
>>> + * server or libcurl sends more.
>>> + */
>>> + if (realsize > ch->write_count)
>>> + realsize = ch->write_count;
>>> +
>>> + memcpy (ch->write_buf, ptr, realsize);
>>> +
>>> + ch->write_count -= realsize;
>>> + ch->write_buf += realsize;
>>> +
>>> + return orig_realsize;
>>
>> [1]
>>
>>> }
>>>
>>> /* Write data to the remote server. */
>>> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void
*opaque);
>>> +
>>> static int
>>> curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t
offset)
>>> {
>>> CURLcode r;
>>> + struct curl_handle *ch;
>>> char range[128];
>>>
>>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>>> - if (ch == NULL)
>>> - return -1;
>>> + /* Get a curl easy handle. */
>>> + ch = allocate_handle ();
>>> + if (ch == NULL) goto err;
>>>
>>> /* Run the scripts if necessary and set headers in the handle. */
>>> - if (do_scripts (ch) == -1) return -1;
>>> + if (do_scripts (ch) == -1) goto err;
>>>
>>> /* Tell the read_cb where we want the data to be read from. read_cb
>>> * will update this if the data comes in multiple sections.
>>> */
>>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
>>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
>>> ch->read_buf = buf;
>>> ch->read_count = count;
>>>
>>> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t
count, uint64_t offset)
>>> offset, offset + count);
>>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>>
>>> - /* The assumption here is that curl will look after timeouts. */
>>> - r = curl_easy_perform (ch->c);
>>> + /* Send the command to the worker thread and wait. */
>>> + struct command cmd = {
>>> + .type = EASY_HANDLE,
>>> + .ch = ch,
>>> + };
>>> +
>>> + r = send_command_and_wait (&cmd);
>>> if (r != CURLE_OK) {
>>> - display_curl_error (ch, r, "pwrite: curl_easy_perform");
>>> - return -1;
>>> + display_curl_error (ch, r, "pwrite");
>>> + goto err;
>>> }
>>> update_times (ch->c);
>>>
>>> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t
count, uint64_t offset)
>>> /* As far as I understand the cURL API, this should never happen. */
>>> assert (ch->read_count == 0);
>>>
>>> + free_handle (ch);
>>> return 0;
>>> +
>>> + err:
>>> + if (ch)
>>> + free_handle (ch);
>>> + return -1;
>>> +}
>>> +
>>> +static size_t
>>> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> + struct curl_handle *ch = opaque;
>>> + size_t realsize = size * nmemb;
>>> +
>>> + assert (ch->read_buf);
>>> + if (realsize > ch->read_count)
>>> + realsize = ch->read_count;
>>> +
>>> + memcpy (ptr, ch->read_buf, realsize);
>>> +
>>> + ch->read_count -= realsize;
>>> + ch->read_buf += realsize;
>>> +
>>> + return realsize;
>>
>> Why does write_cb in [1] above return orig_realsize, but read_cb
>> returns the potentially modified realsize?
>
> It's a good question (this code was copied from the old plugin which
> was working for years). It definitely works now. Note that writes in
> this plugin are probably never used. They require a web server that
> supports "Range PUTs" which is, probably, not any server in existence.
>
>>> }
>>>
>>> static struct nbdkit_plugin plugin = {
>>> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
>>> index eb2d330e1..2974cda3f 100644
>>> --- a/plugins/curl/pool.c
>>> +++ b/plugins/curl/pool.c
>>> @@ -30,11 +30,29 @@
>>> * SUCH DAMAGE.
>>> */
>>>
>>> -/* Curl handle pool.
>>> +/* Worker thread which processes the curl multi interface.
>>> *
>>> - * To get a libcurl handle, call get_handle(). When you hold the
>>> - * handle, it is yours exclusively to use. After you have finished
>>> - * with the handle, put it back into the pool by calling put_handle().
>>> + * The main nbdkit threads (see curl.c) create curl easy handles
>>> + * initialized with the work they want to carry out. Note there is
>>> + * one easy handle per task (eg. per pread/pwrite request). The easy
>>> + * handles are not reused.
>>> + *
>>> + * The commands + optional easy handle are submitted to the worker
>>> + * thread over a self-pipe (it's easy to use a pipe here because the
>>> + * way curl multi works is it can listen on an extra fd, but not on
>>> + * anything else like a pthread condition). The curl multi performs
>>> + * the work of the outstanding easy handles.
>>> + *
>>> + * When an easy handle finishes work or errors, we retire the command
>>> + * by signalling back to the waiting nbdkit thread using a pthread
>>> + * condition.
>>> + *
>>> + * In my experiments, we're almost always I/O bound so I haven't
seen
>>> + * any strong need to use more than one curl multi / worker thread,
>>> + * although it would be possible to add more in future.
>>> + *
>>> + * See also this extremely useful thread:
>>> + *
https://curl.se/mail/lib-2019-03/0100.html
>>
>> Very useful comment (and link).
>>
>>> */
>>>
>>> #include <config.h>
>>> @@ -45,9 +63,19 @@
>>> #include <stdint.h>
>>> #include <inttypes.h>
>>> #include <string.h>
>>> +#include <unistd.h>
>>> #include <assert.h>
>>> #include <pthread.h>
>>>
>>> +#ifdef HAVE_STDATOMIC_H
>>> +#include <stdatomic.h>
>>> +#else
>>> +/* Some old platforms lack atomic types, but 32 bit ints are usually
>>> + * "atomic enough".
>>> + */
>>> +#define _Atomic /**/
>>> +#endif
>>> +
>>> #include <curl/curl.h>
>>>
>>> #include <nbdkit-plugin.h>
>>> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
>>>
>>> unsigned connections = 4;
>>>
>>> -/* This lock protects access to the curl_handles vector below. */
>>> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>>> +/* Pipe used to notify background thread that a command is pending in
>>> + * the queue. A pointer to the 'struct command' is sent over the
>>> + * pipe.
>>> + */
>>> +static int self_pipe[2] = { -1, -1 };
>>>
>>> -/* List of curl handles. This is allocated dynamically as more
>>> - * handles are requested. Currently it does not shrink. It may grow
>>> - * up to 'connections' in length.
>>> +/* The curl multi handle. */
>>> +static CURLM *multi;
>>> +
>>> +/* List of running easy handles. We only need to maintain this so we
>>> + * can remove them from the multi handle when cleaning up.
>>> */
>>> DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
>>> static curl_handle_list curl_handles = empty_vector;
>>>
>>> -/* The condition is used when the curl handles vector is full and
>>> - * we're waiting for a thread to put_handle.
>>> - */
>>> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
>>> -static size_t in_use = 0, waiting = 0;
>>> +static const char *
>>> +command_type_to_string (enum command_type type)
>>> +{
>>> + switch (type) {
>>> + case EASY_HANDLE: return "EASY_HANDLE";
>>> + case STOP: return "STOP";
>>> + default: abort ();
>>> + }
>>> +}
>>>
>>> int
>>> pool_get_ready (void)
>>> {
>>> + multi = curl_multi_init ();
>>> + if (multi == NULL) {
>>> + nbdkit_error ("curl_multi_init failed: %m");
>>> + return -1;
>>> + }
>>> +
>>> return 0;
>>> }
>>>
>>> +/* Start and stop the background thread. */
>>> +static pthread_t thread;
>>> +static bool thread_running;
>>> +static void *pool_worker (void *);
>>> +
>>> int
>>> pool_after_fork (void)
>>> {
>>> + int err;
>>> +
>>> + if (pipe (self_pipe) == -1) {
>>> + nbdkit_error ("pipe: %m");
>>> + return -1;
>>> + }
>>> +
>>> + /* Start the pool background thread where all the curl work is done. */
>>> + err = pthread_create (&thread, NULL, pool_worker, NULL);
>>> + if (err != 0) {
>>> + errno = err;
>>> + nbdkit_error ("pthread_create: %m");
>>> + return -1;
>>> + }
>>> + thread_running = true;
>>> +
>>> return 0;
>>> }
>>>
>>> -/* Close and free all handles in the pool. */
>>> +/* Unload the background thread. */
>>> void
>>> pool_unload (void)
>>> {
>>> - size_t i;
>>> + if (thread_running) {
>>> + /* Stop the background thread. */
>>> + struct command cmd = { .type = STOP };
>>> + send_command_and_wait (&cmd);
>>> + pthread_join (thread, NULL);
>>> + thread_running = false;
>>> + }
>>>
>>> - if (curl_debug_pool)
>>> - nbdkit_debug ("unload_pool: number of curl handles allocated:
%zu",
>>> - curl_handles.len);
>>> + if (self_pipe[0] >= 0) {
>>> + close (self_pipe[0]);
>>> + self_pipe[0] = -1;
>>> + }
>>> + if (self_pipe[1] >= 0) {
>>> + close (self_pipe[1]);
>>> + self_pipe[1] = -1;
>>> + }
>>>
>>> - for (i = 0; i < curl_handles.len; ++i)
>>> - free_handle (curl_handles.ptr[i]);
>>> - curl_handle_list_reset (&curl_handles);
>>> + if (multi) {
>>> + size_t i;
>>> +
>>> + /* Remove and free any easy handles in the multi. */
>>> + for (i = 0; i < curl_handles.len; ++i) {
>>> + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
>>> + free_handle (curl_handles.ptr[i]);
>>> + }
>>> +
>>> + curl_multi_cleanup (multi);
>>> + multi = NULL;
>>> + }
>>> }
>>>
>>> -/* Get a handle from the pool.
>>> - *
>>> - * It is owned exclusively by the caller until they call put_handle.
>>> +/* Command queue. */
>>> +static _Atomic uint64_t id; /* next command ID */
>>> +
>>> +/* Send command to the background thread and wait for completion.
>>> + * This is only called by one of the nbdkit threads.
>>> */
>>> -struct curl_handle *
>>> -get_handle (void)
>>> +CURLcode
>>> +send_command_and_wait (struct command *cmd)
>>> {
>>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>>> - size_t i;
>>> - struct curl_handle *ch;
>>> -
>>> - again:
>>> - /* Look for a handle which is not in_use. */
>>> - for (i = 0; i < curl_handles.len; ++i) {
>>> - ch = curl_handles.ptr[i];
>>> - if (!ch->in_use) {
>>> - ch->in_use = true;
>>> - in_use++;
>>> + cmd->id = id++;
>>> +
>>> + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
>>> + * indicate that the command has not yet been completed and status
>>> + * set.
>>> + */
>>> + cmd->status = -1;
>>> +
>>> + /* This will be used to signal command completion back to us. */
>>> + pthread_mutex_init (&cmd->mutex, NULL);
>>> + pthread_cond_init (&cmd->cond, NULL);
>>> +
>>> + /* Send the command to the background thread. */
>>> + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
>>> + abort ();
>>> +
>>> + /* Wait for the command to be completed by the background thread. */
>>> + {
>>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>>> + while (cmd->status == -1) /* for -1, see above */
>>> + pthread_cond_wait (&cmd->cond, &cmd->mutex);
>>> + }
>>> +
>>> + pthread_mutex_destroy (&cmd->mutex);
>>> + pthread_cond_destroy (&cmd->cond);
>>> +
>>> + /* Note the main thread must call nbdkit_error on error! */
>>> + return cmd->status;
>>> +}
>>> +
>>> +/* The background thread. */
>>> +static void check_for_finished_handles (void);
>>> +static void retire_command (struct command *cmd, CURLcode code);
>>> +static void do_easy_handle (struct command *cmd);
>>> +
>>> +static void *
>>> +pool_worker (void *vp)
>>> +{
>>> + bool stop = false;
>>> +
>>> + if (curl_debug_pool)
>>> + nbdkit_debug ("curl: background thread started");
>>> +
>>> + while (!stop) {
>>> + struct command *cmd = NULL;
>>> + struct curl_waitfd extra_fds[1] =
>>> + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
>>> + CURLMcode mc;
>>> + int numfds, running_handles, repeats = 0;
>>> +
>>> + do {
>>> + /* Process the multi handle. */
>>> + mc = curl_multi_perform (multi, &running_handles);
>>> + if (mc != CURLM_OK) {
>>> + nbdkit_error ("curl_multi_perform: %s",
curl_multi_strerror (mc));
>>
>> Since nbdkit_error() stores its string in thread-local storage, is
>> there anything that ever extracts this error over to the nbdkit thread
>> that issued the original request to the worker thread?...
>>
>>> + abort (); /* XXX We don't expect this to happen */
>>
>> ...Then again, if we abort, it doesn't matter.
>
> I was unclear what to do here. The final code does:
>
> while (!stop) {
> ...
> cmd = process_multi_handle ();
> if (cmd == NULL)
> continue; /* or die?? */
>
> with process_multi_handle still calling nbdkit_error. I felt it might
> be better to keep trying than just abort, as presumably some (most?)
> errors are transient.
>
> nbdkit_error will ensure the error message is written out.
>
>>> + }
>>
>>> +
>>> + check_for_finished_handles ();
>>> +
>>> + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
>>> + if (mc != CURLM_OK) {
>>> + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror
(mc));
>>> + abort (); /* XXX We don't expect this to happen */
>>> + }
>>> +
>>> if (curl_debug_pool)
>>> - nbdkit_debug ("get_handle: %zu", ch->i);
>>> - return ch;
>>> - }
>>> - }
>>> + nbdkit_debug ("curl_multi_wait returned: running_handles=%d
numfds=%d",
>>> + running_handles, numfds);
>>> +
>>> + if (numfds == 0) {
>>> + repeats++;
>>> + if (repeats > 1)
>>> + nbdkit_nanosleep (1, 0);
>>> + }
>>> + else {
>>> + repeats = 0;
>>> + if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
>>> + /* There's a command waiting. */
>>> + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
>>> + abort ();
>>> + }
>>> + }
>>> + } while (!cmd);
>>>
>>> - /* If more connections are allowed, then allocate a new handle. */
>>> - if (curl_handles.len < connections) {
>>> - ch = allocate_handle ();
>>> - if (ch == NULL)
>>> - return NULL;
>>> - if (curl_handle_list_append (&curl_handles, ch) == -1) {
>>> - free_handle (ch);
>>> - return NULL;
>>> - }
>>> - ch->i = curl_handles.len - 1;
>>> - ch->in_use = true;
>>> - in_use++;
>>> if (curl_debug_pool)
>>> - nbdkit_debug ("get_handle: %zu", ch->i);
>>> - return ch;
>>> - }
>>> + nbdkit_debug ("curl: dispatching %s command %" PRIu64,
>>> + command_type_to_string (cmd->type), cmd->id);
>>> +
>>> + switch (cmd->type) {
>>> + case STOP:
>>> + stop = true;
>>> + retire_command (cmd, CURLE_OK);
>>> + break;
>>>
>>> - /* Otherwise we have run out of connections so we must wait until
>>> - * another thread calls put_handle.
>>> - */
>>> - assert (in_use == connections);
>>> - waiting++;
>>> - while (in_use == connections)
>>> - pthread_cond_wait (&cond, &lock);
>>> - waiting--;
>>> + case EASY_HANDLE:
>>> + do_easy_handle (cmd);
>>> + break;
>>> + }
>>> + } /* while (!stop) */
>>>
>>> - goto again;
>>> + if (curl_debug_pool)
>>> + nbdkit_debug ("curl: background thread stopped");
>>> +
>>> + return NULL;
>>> }
>>>
>>> -/* Return the handle to the pool. */
>>> -void
>>> -put_handle (struct curl_handle *ch)
>>> +/* This checks if any easy handles in the multi have
>>> + * finished and retires the associated commands.
>>> + */
>>> +static void
>>> +check_for_finished_handles (void)
>>> {
>>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>>> + CURLMsg *msg;
>>> + int msgs_in_queue;
>>> +
>>> + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL)
{
>>> + size_t i;
>>> + struct curl_handle *ch = NULL;
>>> +
>>> + if (msg->msg == CURLMSG_DONE) {
>>> + /* Find this curl_handle. */
>>> + for (i = 0; i < curl_handles.len; ++i) {
>>> + if (curl_handles.ptr[i]->c == msg->easy_handle) {
>>> + ch = curl_handles.ptr[i];
>>> + curl_handle_list_remove (&curl_handles, i);
>>> + }
>>> + }
>>> + if (ch == NULL) abort ();
>>> + curl_multi_remove_handle (multi, ch->c);
>>> +
>>> + retire_command (ch->cmd, msg->data.result);
>>> + }
>>> + }
>>> +}
>>>
>>> +/* Retire a command. status is a CURLcode. */
>>> +static void
>>> +retire_command (struct command *cmd, CURLcode status)
>>> +{
>>> if (curl_debug_pool)
>>> - nbdkit_debug ("put_handle: %zu", ch->i);
>>> + nbdkit_debug ("curl: retiring %s command %" PRIu64,
>>> + command_type_to_string (cmd->type), cmd->id);
>>> +
>>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>>> + cmd->status = status;
>>> + pthread_cond_signal (&cmd->cond);
>>> +}
>>> +
>>> +static void
>>> +do_easy_handle (struct command *cmd)
>>> +{
>>> + CURLMcode mc;
>>> +
>>> + cmd->ch->cmd = cmd;
>>> +
>>> + /* Add the handle to the multi. */
>>> + mc = curl_multi_add_handle (multi, cmd->ch->c);
>>> + if (mc != CURLM_OK) {
>>> + nbdkit_error ("curl_multi_add_handle: %s",
curl_multi_strerror (mc));
>>> + goto err;
>>> + }
>>>
>>> - ch->in_use = false;
>>> - in_use--;
>>> + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
>>> + goto err;
>>> + return;
>>>
>>> - /* Signal the next thread which is waiting. */
>>> - if (waiting > 0)
>>> - pthread_cond_signal (&cond);
>>> + err:
>>> + retire_command (cmd, CURLE_OUT_OF_MEMORY);
>>> }
>>> --
>>> 2.41.0
>>
>> Overall looks nice, and I learned more about curl in the process.
>
> For me, too much :-(
>
> Thanks,
>
> Rich.
>
Fedora Windows cross-compiler. Compile Windows programs, test, and
build Windows installers. Over 100 libraries supported.