On 8/7/23 13:08, Richard W.M. Jones wrote:
On Mon, Aug 07, 2023 at 12:57:02PM +0200, Laszlo Ersek wrote:
> On 8/4/23 20:04, Richard W.M. Jones wrote:
>> On Fri, Aug 04, 2023 at 11:38:03AM -0500, Eric Blake wrote:
>>> On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:
>>>> See the comment at the top of plugins/curl/pool.c for general
>>>> information about how this works.
>>>>
>>>> This makes a very large difference to performance over the previous
>>>> implementation. Note for the tests below I also applied the next
>>>> commit changing the behaviour of the connections parameter.
>>>>
>>>> Using this test case:
>>>>
>>>> $
uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-a...
>>>> $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri
null'
>>>>
>>>> The times are as follows:
>>>>
>>>> multi, connections=64 21.5s
>>>> multi, connections=32 30.2s
>>>> multi, connections=16 56.0s
>>>> before this commit 166s
>>>
>>> Awesome performance improvements! As painful as this series has been
>>> for you to write and debug, it is showing its worth.
>>>
>>>> ---
>>>> plugins/curl/curldefs.h | 35 ++--
>>>> plugins/curl/config.c | 246 ---------------------------
>>>> plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++-----
>>>> plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++---------
>>>> 4 files changed, 616 insertions(+), 377 deletions(-)
>>>
>>> Finally taking time to review this, even though it is already in-tree.
>>>
>>>> @@ -98,8 +88,30 @@ struct curl_handle {
>>>> const char *read_buf;
>>>> uint32_t read_count;
>>>>
>>>> + /* This field is used by curl_get_size. */
>>>> + bool accept_range;
>>>> +
>>>> /* Used by scripts.c */
>>>> struct curl_slist *headers_copy;
>>>> +
>>>> + /* Used by pool.c */
>>>> + struct command *cmd;
>>>> +};
>>>> +
>>>> +/* Asynchronous commands that can be sent to the pool thread. */
>>>> +enum command_type { EASY_HANDLE, STOP };
>>>> +struct command {
>>>> + /* These fields are set by the caller. */
>>>> + enum command_type type; /* command */
>>>> + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */
>>>> +
>>>> + /* This field is set to a unique value by send_command_and_wait. */
>>>> + uint64_t id; /* serial number */
>>>> +
>>>> + /* These fields are used to signal back that the command finished. */
>>>> + pthread_mutex_t mutex; /* completion mutex */
>>>> + pthread_cond_t cond; /* completion condition */
>>>> + CURLcode status; /* status code (CURLE_OK = succeeded)
*/
>>>> };
>>>
>>> Makes sense. The two types are mutually recursive (curl_handle
>>> includes a struct command *; command includes a struct curl_handle *);
>>> hopefully you have proper locking when altering multiple objects to
>>> adjust how they point to one another.
>>
>> Actually locking is not needed. Let me document it through ...
>>
>> We create both the curl easy handle and the associated EASY_HANDLE
>> command in the nbdkit thread that gets the request, eg. in the curl
>> .pread method. That of course requires no locking.
>>
>> There is a single background worker thread.
>>
>> A "self pipe" passes pointers to 'struct command *' to this
worker
>> thread simple by writing the 8 byte pointer onto the pipe (hopefully
>> atomic ...) The nbdkit request thread then blocks on the mutex/cond
>> in the command handle.
>
> Right, that's exactly that I got curious about.
>
> In my opinion, writing to the self-pipe is mostly safe. Reading from the
> self-pipe could be slightly polished.
>
> Here are the POSIX pages on write() and read():
>
>
https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
>
https://pubs.opengroup.org/onlinepubs/9699919799/functions/read.html
>
> The first one (write) says that if you attempt to write at most PIPE_BUF
> bytes, then writes will not be interleaved with other concurrent writes,
> so in a sense the write will be atomic. (And in this case, O_NONBLOCK
> only changes the behavior for the case when the buffer cannot be written
> in entirety: with O_NONBLOCK clear, the writer thread will block, with
> O_NONBLOCK set, the writer thread will see -1/EAGAIN.)
>
> Now, PIPE_BUF is "variable" in a sense:
>
>
https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/limits.h.html
>
> but it is greater than or equal to _POSIX_PIPE_BUF, which is 512. Our
> pointers are 8 bytes in size, so the <=PIPE_BUF condition is surely
> satisfied.
>
> ... The only case I see possible for a short write is the delivery of a
> signal after transfer has started. *If* nbdkit catches some signal such
> that the signal handler actually returns, then this could be a
> (theoretical) problem, calling for xwrite() or similar.
>
>>
>> The background worker thread picks the 'struct command *' up from the
>> pipe in the same event loop that it uses to process ongoing requests
>> on the multi handle. It takes the easy handle and adds it to the
>> multi handle.
>
> The spec on read() does not seem to have the same kind of
> non-interleaving / "atomicity" language that write() does. The rationale
> section says:
>
> "The standard developers considered adding atomicity requirements to a
> pipe or FIFO, but recognized that due to the nature of pipes and FIFOs
> there could be no guarantee of atomicity of reads of {PIPE_BUF} or any
> other size that would be an aid to applications portability."
>
> Now given that the writer side is free of interleaving (because, in the
> first place: there is only a single writer!), I think we need not worry
> about data corruption. However, it does feel like read() may return
> fewer than 8 bytes in one go, "just because" (not only because of a
> signal being delivered midway).
So there are actually multiple writer threads, but only a single
reader thread. Also I should say that I didn't set O_NONBLOCK
(actually I completely forgot), but that might be a benefit in this
case since it makes it less likely for the 8 byte read to be broken up.
Yes, if the writers are safe from interleaving, then the one reader will
only read pristine pointers (in unspecified order of course).
> And that may be a problem with readiness reporting via
> curl_multi_wait(); even if you get CURL_WAIT_POLLIN, the whole command
> pointer may not yet be available.
>
> Now I do think a split read is extremely unlikely, maybe even impossible
> on Linux. If we choose to be pedantic, then the curl_multi_wait() loop
> might want to expect "cmd" to be populated only over multiple iterations
> -- like use "cmd_ptr_bytes" or something similar for tracking the size
> already available, and only consider "cmd" usable when cmd_ptr_bytes
> reaches sizeof cmd.
>
>
> Yet another topic that comes up is visibility / ordering. Transfering a
> pointer via write()/read() between threads does not seem to guarantee
> ordering / visibility regarding the *pointed-to* area per spec. Pthread
> mutex APIs (and C11 thread and atomics APIs) include the CPU
> instructions for ensuring proper memory visibility.
>
> *BUT* I think it would be insane for any POSIX implementation to have a
> write()+read() combination that's *weaker* regarding data consistency,
> (i.e., that's more racy) than mutexes. Write() and read() are
> heavy-weight syscalls, so I can't imagine a publish-subscribe pattern
> not working with them (i.e., the pointed-to area not "settling" until
> the pointer is consumed).
Yup, it seems unlikely, but maybe it could happen on architectures
with weaker store ordering like Arm? I wonder if adding a memory
barrier before the write would be a good idea?
We'd need one before the write, and another (matching one) after the
read, but I'm unsure what types precisely, and how we'd actually express
them in nbdkit! Paolo should be able to advise.
Laszlo
Thanks,
Rich.
> Laszlo
>
>>
>> When the easy handle work has finished, the worker thread removes it
>> from the multi handle and signals the nbdkit request thread to wake up
>> (using cmd->mutex + cmd->lock). At which point possession passes back
>> to the request thread which will usually free up both the command and
>> easy handle.
>>
>>>> +++ b/plugins/curl/config.c
>>>
>>>> +++ b/plugins/curl/curl.c
>>>>
>>>> +/* Get the file size. */
>>>> +static int get_content_length_accept_range (struct curl_handle *ch);
>>>> +static bool try_fallback_GET_method (struct curl_handle *ch);
>>>> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void
*opaque);
>>>> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void
*opaque);
>>>> +
>>>> +static int64_t
>>>> +curl_get_size (void *handle)
>>>> +{
>>>> + struct curl_handle *ch;
>>>> + CURLcode r;
>>>> + long code;
>>>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>>>> + curl_off_t o;
>>>> +#else
>>>> + double d;
>>>> +#endif
>>>> + int64_t exportsize;
>>>> +
>>>> + /* Get a curl easy handle. */
>>>> + ch = allocate_handle ();
>>>> + if (ch == NULL) goto err;
>>>> +
>>>> + /* Prepare to read the headers. */
>>>> + if (get_content_length_accept_range (ch) == -1)
>>>> + goto err;
>>>> +
>>>> + /* Send the command to the worker thread and wait. */
>>>> + struct command cmd = {
>>>> + .type = EASY_HANDLE,
>>>> + .ch = ch,
>>>> + };
>>>> +
>>>> + r = send_command_and_wait (&cmd);
>>>> + update_times (ch->c);
>>>> + if (r != CURLE_OK) {
>>>> + display_curl_error (ch, r,
>>>> + "problem doing HEAD request to fetch size
of URL [%s]",
>>>> + url);
>>>> +
>>>> + /* Get the HTTP status code, if available. */
>>>> + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE,
&code);
>>>> + if (r == CURLE_OK)
>>>> + nbdkit_debug ("HTTP status code: %ld", code);
>>>> + else
>>>> + code = -1;
>>>> +
>>>> + /* See comment on try_fallback_GET_method below. */
>>>> + if (code != 403 || !try_fallback_GET_method (ch))
>>>> + goto err;
>>>> + }
>>>> +
>>>> + /* Get the content length.
>>>> + *
>>>> + * Note there is some subtlety here: For web servers using chunked
>>>> + * encoding, either the Content-Length header will not be present,
>>>> + * or if present it should be ignored. (For such servers the only
>>>> + * way to find out the true length would be to read all of the
>>>> + * content, which we don't want to do).
>>>> + *
>>>> + * Curl itself resolves this for us. It will ignore the
>>>> + * Content-Length header if chunked encoding is used, returning the
>>>> + * length as -1 which we check below (see also
>>>> + * curl:lib/http.c:Curl_http_size).
>>>> + */
>>>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>>>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
&o);
>>>> + if (r != CURLE_OK) {
>>>> + display_curl_error (ch, r,
>>>> + "could not get length of remote file
[%s]", url);
>>>> + goto err;
>>>> + }
>>>> +
>>>> + if (o == -1) {
>>>> + nbdkit_error ("could not get length of remote file [%s],
"
>>>> + "is the URL correct?", url);
>>>> + goto err;
>>>> + }
>>>> +
>>>> + exportsize = o;
>>>> +#else
>>>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD,
&d);
>>>> + if (r != CURLE_OK) {
>>>> + display_curl_error (ch, r,
>>>> + "could not get length of remote file
[%s]", url);
>>>> + goto err;
>>>> + }
>>>> +
>>>> + if (d == -1) {
>>>> + nbdkit_error ("could not get length of remote file [%s],
"
>>>> + "is the URL correct?", url);
>>>> + goto err;
>>>> + }
>>>> +
>>>> + exportsize = d;
>>>
>>> Does curl guarantee that the double d will contain a value assignable
>>> to int64_t without overflow/truncation? For particularly large sizes,
>>> double has insufficient precision for all possible file sizes, but I
>>> doubt someone is exposing such large files over HTTP.
>>
>> No, I don't believe a 'double' is sufficient. This is why newer
>> versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T. Note
>> this code is just copied from the old curl plugin.
>>
>>>> +#endif
>>>> + nbdkit_debug ("content length: %" PRIi64, exportsize);
>>>> +
>>>> + /* If this is HTTP, check that byte ranges are supported. */
>>>> + if (ascii_strncasecmp (url, "http://", strlen
("http://")) == 0 ||
>>>> + ascii_strncasecmp (url, "https://", strlen
("https://")) == 0) {
>>>> + if (!ch->accept_range) {
>>>> + nbdkit_error ("server does not support 'range' (byte
range) requests");
>>>> + goto err;
>>>> + }
>>>> +
>>>> + nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
>>>> + }
>>>> +
>>>> + free_handle (ch);
>>>> + return exportsize;
>>>> +
>>>> + err:
>>>> + if (ch)
>>>> + free_handle (ch);
>>>> + return -1;
>>>> +}
>>>> +
>>>> +/* Get the file size and also whether the remote HTTP server
>>>> + * supports byte ranges.
>>>> + */
>>>> +static int
>>>> +get_content_length_accept_range (struct curl_handle *ch)
>>>> +{
>>>> + /* We must run the scripts if necessary and set headers in the
>>>> + * handle.
>>>> + */
>>>> + if (do_scripts (ch) == -1)
>>>> + return -1;
>>>> +
>>>> + /* Set this flag in the handle to false. The callback should set it
>>>> + * to true if byte ranges are supported, which we check below.
>>>> + */
>>>> + ch->accept_range = false;
>>>> +
>>>> + /* No Body, not nobody! This forces a HEAD request. */
>>>> + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
>>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
>>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
>>>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
>>>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
>>>> + return 0;
>>>> +}
>>>> +
>>>> +/* S3 servers can return 403 Forbidden for HEAD but still respond
>>>> + * to GET, so we give it a second chance in that case.
>>>> + *
https://github.com/kubevirt/containerized-data-importer/issues/2737
>>>> + *
>>>> + * This function issues a GET request with a writefunction that always
>>>> + * returns an error, thus effectively getting the headers but
>>>> + * abandoning the transfer as soon as possible after.
>>>> + */
>>>> +static bool
>>>> +try_fallback_GET_method (struct curl_handle *ch)
>>>> +{
>>>> + CURLcode r;
>>>> +
>>>> + nbdkit_debug ("attempting to fetch headers using GET
method");
>>>> +
>>>> + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
>>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>>>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
>>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>>>> +
>>>> + struct command cmd = {
>>>> + .type = EASY_HANDLE,
>>>> + .ch = ch,
>>>> + };
>>>> +
>>>> + r = send_command_and_wait (&cmd);
>>>> + update_times (ch->c);
>>>> +
>>>> + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
>>>> + * (eg if the remote has zero length). Other errors might happen
>>>> + * but we ignore them since it is a fallback path.
>>>> + */
>>>> + return r == CURLE_OK || r == CURLE_WRITE_ERROR;
>>>> +}
>>>> +
>>>> +static size_t
>>>> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>>>> +{
>>>> + struct curl_handle *ch = opaque;
>>>> + size_t realsize = size * nmemb;
>>>> + const char *header = ptr;
>>>> + const char *end = header + realsize;
>>>> + const char *accept_ranges = "accept-ranges:";
>>>> + const char *bytes = "bytes";
>>>> +
>>>> + if (realsize >= strlen (accept_ranges) &&
>>>> + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges))
== 0) {
>>>> + const char *p = strchr (header, ':') + 1;
>>>> +
>>>> + /* Skip whitespace between the header name and value. */
>>>> + while (p < end && *p && ascii_isspace (*p))
>>>
>>> Technically, '*p && ascii_isspace (*p)' can be shortened to
>>> 'ascii_isspace (*p)', since the NUL byte is not ascii space. I
don't
>>> know if the compiler is smart enough to make that optimization on your
>>> behalf.
>>
>> Ah indeed.
>>
>>>> + p++;
>>>> +
>>>> + if (end - p >= strlen (bytes)
>>>> + && strncmp (p, bytes, strlen (bytes)) == 0) {
>>>> + /* Check that there is nothing but whitespace after the value. */
>>>> + p += strlen (bytes);
>>>> + while (p < end && *p && ascii_isspace (*p))
>>>
>>> Another spot of the same.
>>>
>>>> + p++;
>>>> +
>>>> + if (p == end || !*p)
>>>> + ch->accept_range = true;
>>>> + }
>>>> + }
>>>> +
>>>> + return realsize;
>>>> +}
>>>> +
>>>> +static size_t
>>>> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>>>> +{
>>>> +#ifdef CURL_WRITEFUNC_ERROR
>>>> + return CURL_WRITEFUNC_ERROR;
>>>> +#else
>>>> + return 0; /* in older curl, any size < requested will also be an
error */
>>>> +#endif
>>>> +}
>>>> +
>>>> /* Read data from the remote server. */
>>>> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void
*opaque);
>>>> +
>>>> static int
>>>> curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
>>>> {
>>>> CURLcode r;
>>>> + struct curl_handle *ch;
>>>> char range[128];
>>>>
>>>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>>>> - if (ch == NULL)
>>>> - return -1;
>>>> + /* Get a curl easy handle. */
>>>> + ch = allocate_handle ();
>>>> + if (ch == NULL) goto err;
>>>>
>>>> /* Run the scripts if necessary and set headers in the handle. */
>>>> - if (do_scripts (ch) == -1) return -1;
>>>> + if (do_scripts (ch) == -1) goto err;
>>>>
>>>> /* Tell the write_cb where we want the data to be written. write_cb
>>>> * will update this if the data comes in multiple sections.
>>>> */
>>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
>>>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>>>> ch->write_buf = buf;
>>>> ch->write_count = count;
>>>>
>>>> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t
count, uint64_t offset)
>>>> offset, offset + count);
>>>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>>>
>>>> - /* The assumption here is that curl will look after timeouts. */
>>>> - r = curl_easy_perform (ch->c);
>>>> + /* Send the command to the worker thread and wait. */
>>>> + struct command cmd = {
>>>> + .type = EASY_HANDLE,
>>>> + .ch = ch,
>>>> + };
>>>> +
>>>> + r = send_command_and_wait (&cmd);
>>>> if (r != CURLE_OK) {
>>>> - display_curl_error (ch, r, "pread: curl_easy_perform");
>>>> - return -1;
>>>> + display_curl_error (ch, r, "pread");
>>>> + goto err;
>>>> }
>>>> update_times (ch->c);
>>>>
>>>> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t
count, uint64_t offset)
>>>> /* As far as I understand the cURL API, this should never happen. */
>>>> assert (ch->write_count == 0);
>>>>
>>>> + free_handle (ch);
>>>> return 0;
>>>> +
>>>> + err:
>>>> + if (ch)
>>>> + free_handle (ch);
>>>> + return -1;
>>>> +}
>>>> +
>>>> +/* NB: The terminology used by libcurl is confusing!
>>>> + *
>>>> + * WRITEFUNCTION / write_cb is used when reading from the remote server
>>>> + * READFUNCTION / read_cb is used when writing to the remote server.
>>>> + *
>>>> + * We use the same terminology as libcurl here.
>>>> + */
>>>> +static size_t
>>>> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>>>> +{
>>>> + struct curl_handle *ch = opaque;
>>>> + size_t orig_realsize = size * nmemb;
>>>> + size_t realsize = orig_realsize;
>>>
>>> Do we have to worry about overflow when compiling on 32-bit machines?
>>> Asked differently, should we be using off_t instead of size_t in any
>>> of this code? Thankfully, for now, we know NBD .pread and .pwrite
>>> requests are capped at 64M, so I think you're okay (we aren't ever
>>> going to ask curl for gigabytes in one request),
>>
>> It's a good question ... I suspect that even if we requested it, web
>> servers probably wouldn't want to serve gigabytes of data in a range
>> request, but as you point out we shouldn't ever request it right now.
>>
>>> but maybe a comment
>>> or assert() is worth it?
>>
>> I'll add a comment, but could we do this with one of the overflow
>> macros? I'm not sure ...
>>
>>>> +
>>>> + assert (ch->write_buf);
>>>> +
>>>> + /* Don't read more than the requested amount of data, even if the
>>>> + * server or libcurl sends more.
>>>> + */
>>>> + if (realsize > ch->write_count)
>>>> + realsize = ch->write_count;
>>>> +
>>>> + memcpy (ch->write_buf, ptr, realsize);
>>>> +
>>>> + ch->write_count -= realsize;
>>>> + ch->write_buf += realsize;
>>>> +
>>>> + return orig_realsize;
>>>
>>> [1]
>>>
>>>> }
>>>>
>>>> /* Write data to the remote server. */
>>>> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void
*opaque);
>>>> +
>>>> static int
>>>> curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t
offset)
>>>> {
>>>> CURLcode r;
>>>> + struct curl_handle *ch;
>>>> char range[128];
>>>>
>>>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>>>> - if (ch == NULL)
>>>> - return -1;
>>>> + /* Get a curl easy handle. */
>>>> + ch = allocate_handle ();
>>>> + if (ch == NULL) goto err;
>>>>
>>>> /* Run the scripts if necessary and set headers in the handle. */
>>>> - if (do_scripts (ch) == -1) return -1;
>>>> + if (do_scripts (ch) == -1) goto err;
>>>>
>>>> /* Tell the read_cb where we want the data to be read from. read_cb
>>>> * will update this if the data comes in multiple sections.
>>>> */
>>>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
>>>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
>>>> ch->read_buf = buf;
>>>> ch->read_count = count;
>>>>
>>>> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf,
uint32_t count, uint64_t offset)
>>>> offset, offset + count);
>>>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>>>
>>>> - /* The assumption here is that curl will look after timeouts. */
>>>> - r = curl_easy_perform (ch->c);
>>>> + /* Send the command to the worker thread and wait. */
>>>> + struct command cmd = {
>>>> + .type = EASY_HANDLE,
>>>> + .ch = ch,
>>>> + };
>>>> +
>>>> + r = send_command_and_wait (&cmd);
>>>> if (r != CURLE_OK) {
>>>> - display_curl_error (ch, r, "pwrite: curl_easy_perform");
>>>> - return -1;
>>>> + display_curl_error (ch, r, "pwrite");
>>>> + goto err;
>>>> }
>>>> update_times (ch->c);
>>>>
>>>> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t
count, uint64_t offset)
>>>> /* As far as I understand the cURL API, this should never happen. */
>>>> assert (ch->read_count == 0);
>>>>
>>>> + free_handle (ch);
>>>> return 0;
>>>> +
>>>> + err:
>>>> + if (ch)
>>>> + free_handle (ch);
>>>> + return -1;
>>>> +}
>>>> +
>>>> +static size_t
>>>> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>>>> +{
>>>> + struct curl_handle *ch = opaque;
>>>> + size_t realsize = size * nmemb;
>>>> +
>>>> + assert (ch->read_buf);
>>>> + if (realsize > ch->read_count)
>>>> + realsize = ch->read_count;
>>>> +
>>>> + memcpy (ptr, ch->read_buf, realsize);
>>>> +
>>>> + ch->read_count -= realsize;
>>>> + ch->read_buf += realsize;
>>>> +
>>>> + return realsize;
>>>
>>> Why does write_cb in [1] above return orig_realsize, but read_cb
>>> returns the potentially modified realsize?
>>
>> It's a good question (this code was copied from the old plugin which
>> was working for years). It definitely works now. Note that writes in
>> this plugin are probably never used. They require a web server that
>> supports "Range PUTs" which is, probably, not any server in existence.
>>
>>>> }
>>>>
>>>> static struct nbdkit_plugin plugin = {
>>>> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
>>>> index eb2d330e1..2974cda3f 100644
>>>> --- a/plugins/curl/pool.c
>>>> +++ b/plugins/curl/pool.c
>>>> @@ -30,11 +30,29 @@
>>>> * SUCH DAMAGE.
>>>> */
>>>>
>>>> -/* Curl handle pool.
>>>> +/* Worker thread which processes the curl multi interface.
>>>> *
>>>> - * To get a libcurl handle, call get_handle(). When you hold the
>>>> - * handle, it is yours exclusively to use. After you have finished
>>>> - * with the handle, put it back into the pool by calling put_handle().
>>>> + * The main nbdkit threads (see curl.c) create curl easy handles
>>>> + * initialized with the work they want to carry out. Note there is
>>>> + * one easy handle per task (eg. per pread/pwrite request). The easy
>>>> + * handles are not reused.
>>>> + *
>>>> + * The commands + optional easy handle are submitted to the worker
>>>> + * thread over a self-pipe (it's easy to use a pipe here because
the
>>>> + * way curl multi works is it can listen on an extra fd, but not on
>>>> + * anything else like a pthread condition). The curl multi performs
>>>> + * the work of the outstanding easy handles.
>>>> + *
>>>> + * When an easy handle finishes work or errors, we retire the command
>>>> + * by signalling back to the waiting nbdkit thread using a pthread
>>>> + * condition.
>>>> + *
>>>> + * In my experiments, we're almost always I/O bound so I haven't
seen
>>>> + * any strong need to use more than one curl multi / worker thread,
>>>> + * although it would be possible to add more in future.
>>>> + *
>>>> + * See also this extremely useful thread:
>>>> + *
https://curl.se/mail/lib-2019-03/0100.html
>>>
>>> Very useful comment (and link).
>>>
>>>> */
>>>>
>>>> #include <config.h>
>>>> @@ -45,9 +63,19 @@
>>>> #include <stdint.h>
>>>> #include <inttypes.h>
>>>> #include <string.h>
>>>> +#include <unistd.h>
>>>> #include <assert.h>
>>>> #include <pthread.h>
>>>>
>>>> +#ifdef HAVE_STDATOMIC_H
>>>> +#include <stdatomic.h>
>>>> +#else
>>>> +/* Some old platforms lack atomic types, but 32 bit ints are usually
>>>> + * "atomic enough".
>>>> + */
>>>> +#define _Atomic /**/
>>>> +#endif
>>>> +
>>>> #include <curl/curl.h>
>>>>
>>>> #include <nbdkit-plugin.h>
>>>> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
>>>>
>>>> unsigned connections = 4;
>>>>
>>>> -/* This lock protects access to the curl_handles vector below. */
>>>> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>>>> +/* Pipe used to notify background thread that a command is pending in
>>>> + * the queue. A pointer to the 'struct command' is sent over
the
>>>> + * pipe.
>>>> + */
>>>> +static int self_pipe[2] = { -1, -1 };
>>>>
>>>> -/* List of curl handles. This is allocated dynamically as more
>>>> - * handles are requested. Currently it does not shrink. It may grow
>>>> - * up to 'connections' in length.
>>>> +/* The curl multi handle. */
>>>> +static CURLM *multi;
>>>> +
>>>> +/* List of running easy handles. We only need to maintain this so we
>>>> + * can remove them from the multi handle when cleaning up.
>>>> */
>>>> DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
>>>> static curl_handle_list curl_handles = empty_vector;
>>>>
>>>> -/* The condition is used when the curl handles vector is full and
>>>> - * we're waiting for a thread to put_handle.
>>>> - */
>>>> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
>>>> -static size_t in_use = 0, waiting = 0;
>>>> +static const char *
>>>> +command_type_to_string (enum command_type type)
>>>> +{
>>>> + switch (type) {
>>>> + case EASY_HANDLE: return "EASY_HANDLE";
>>>> + case STOP: return "STOP";
>>>> + default: abort ();
>>>> + }
>>>> +}
>>>>
>>>> int
>>>> pool_get_ready (void)
>>>> {
>>>> + multi = curl_multi_init ();
>>>> + if (multi == NULL) {
>>>> + nbdkit_error ("curl_multi_init failed: %m");
>>>> + return -1;
>>>> + }
>>>> +
>>>> return 0;
>>>> }
>>>>
>>>> +/* Start and stop the background thread. */
>>>> +static pthread_t thread;
>>>> +static bool thread_running;
>>>> +static void *pool_worker (void *);
>>>> +
>>>> int
>>>> pool_after_fork (void)
>>>> {
>>>> + int err;
>>>> +
>>>> + if (pipe (self_pipe) == -1) {
>>>> + nbdkit_error ("pipe: %m");
>>>> + return -1;
>>>> + }
>>>> +
>>>> + /* Start the pool background thread where all the curl work is done.
*/
>>>> + err = pthread_create (&thread, NULL, pool_worker, NULL);
>>>> + if (err != 0) {
>>>> + errno = err;
>>>> + nbdkit_error ("pthread_create: %m");
>>>> + return -1;
>>>> + }
>>>> + thread_running = true;
>>>> +
>>>> return 0;
>>>> }
>>>>
>>>> -/* Close and free all handles in the pool. */
>>>> +/* Unload the background thread. */
>>>> void
>>>> pool_unload (void)
>>>> {
>>>> - size_t i;
>>>> + if (thread_running) {
>>>> + /* Stop the background thread. */
>>>> + struct command cmd = { .type = STOP };
>>>> + send_command_and_wait (&cmd);
>>>> + pthread_join (thread, NULL);
>>>> + thread_running = false;
>>>> + }
>>>>
>>>> - if (curl_debug_pool)
>>>> - nbdkit_debug ("unload_pool: number of curl handles allocated:
%zu",
>>>> - curl_handles.len);
>>>> + if (self_pipe[0] >= 0) {
>>>> + close (self_pipe[0]);
>>>> + self_pipe[0] = -1;
>>>> + }
>>>> + if (self_pipe[1] >= 0) {
>>>> + close (self_pipe[1]);
>>>> + self_pipe[1] = -1;
>>>> + }
>>>>
>>>> - for (i = 0; i < curl_handles.len; ++i)
>>>> - free_handle (curl_handles.ptr[i]);
>>>> - curl_handle_list_reset (&curl_handles);
>>>> + if (multi) {
>>>> + size_t i;
>>>> +
>>>> + /* Remove and free any easy handles in the multi. */
>>>> + for (i = 0; i < curl_handles.len; ++i) {
>>>> + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
>>>> + free_handle (curl_handles.ptr[i]);
>>>> + }
>>>> +
>>>> + curl_multi_cleanup (multi);
>>>> + multi = NULL;
>>>> + }
>>>> }
>>>>
>>>> -/* Get a handle from the pool.
>>>> - *
>>>> - * It is owned exclusively by the caller until they call put_handle.
>>>> +/* Command queue. */
>>>> +static _Atomic uint64_t id; /* next command ID */
>>>> +
>>>> +/* Send command to the background thread and wait for completion.
>>>> + * This is only called by one of the nbdkit threads.
>>>> */
>>>> -struct curl_handle *
>>>> -get_handle (void)
>>>> +CURLcode
>>>> +send_command_and_wait (struct command *cmd)
>>>> {
>>>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>>>> - size_t i;
>>>> - struct curl_handle *ch;
>>>> -
>>>> - again:
>>>> - /* Look for a handle which is not in_use. */
>>>> - for (i = 0; i < curl_handles.len; ++i) {
>>>> - ch = curl_handles.ptr[i];
>>>> - if (!ch->in_use) {
>>>> - ch->in_use = true;
>>>> - in_use++;
>>>> + cmd->id = id++;
>>>> +
>>>> + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
>>>> + * indicate that the command has not yet been completed and status
>>>> + * set.
>>>> + */
>>>> + cmd->status = -1;
>>>> +
>>>> + /* This will be used to signal command completion back to us. */
>>>> + pthread_mutex_init (&cmd->mutex, NULL);
>>>> + pthread_cond_init (&cmd->cond, NULL);
>>>> +
>>>> + /* Send the command to the background thread. */
>>>> + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
>>>> + abort ();
>>>> +
>>>> + /* Wait for the command to be completed by the background thread. */
>>>> + {
>>>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>>>> + while (cmd->status == -1) /* for -1, see above */
>>>> + pthread_cond_wait (&cmd->cond, &cmd->mutex);
>>>> + }
>>>> +
>>>> + pthread_mutex_destroy (&cmd->mutex);
>>>> + pthread_cond_destroy (&cmd->cond);
>>>> +
>>>> + /* Note the main thread must call nbdkit_error on error! */
>>>> + return cmd->status;
>>>> +}
>>>> +
>>>> +/* The background thread. */
>>>> +static void check_for_finished_handles (void);
>>>> +static void retire_command (struct command *cmd, CURLcode code);
>>>> +static void do_easy_handle (struct command *cmd);
>>>> +
>>>> +static void *
>>>> +pool_worker (void *vp)
>>>> +{
>>>> + bool stop = false;
>>>> +
>>>> + if (curl_debug_pool)
>>>> + nbdkit_debug ("curl: background thread started");
>>>> +
>>>> + while (!stop) {
>>>> + struct command *cmd = NULL;
>>>> + struct curl_waitfd extra_fds[1] =
>>>> + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
>>>> + CURLMcode mc;
>>>> + int numfds, running_handles, repeats = 0;
>>>> +
>>>> + do {
>>>> + /* Process the multi handle. */
>>>> + mc = curl_multi_perform (multi, &running_handles);
>>>> + if (mc != CURLM_OK) {
>>>> + nbdkit_error ("curl_multi_perform: %s",
curl_multi_strerror (mc));
>>>
>>> Since nbdkit_error() stores its string in thread-local storage, is
>>> there anything that ever extracts this error over to the nbdkit thread
>>> that issued the original request to the worker thread?...
>>>
>>>> + abort (); /* XXX We don't expect this to happen */
>>>
>>> ...Then again, if we abort, it doesn't matter.
>>
>> I was unclear what to do here. The final code does:
>>
>> while (!stop) {
>> ...
>> cmd = process_multi_handle ();
>> if (cmd == NULL)
>> continue; /* or die?? */
>>
>> with process_multi_handle still calling nbdkit_error. I felt it might
>> be better to keep trying than just abort, as presumably some (most?)
>> errors are transient.
>>
>> nbdkit_error will ensure the error message is written out.
>>
>>>> + }
>>>
>>>> +
>>>> + check_for_finished_handles ();
>>>> +
>>>> + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
>>>> + if (mc != CURLM_OK) {
>>>> + nbdkit_error ("curl_multi_wait: %s",
curl_multi_strerror (mc));
>>>> + abort (); /* XXX We don't expect this to happen */
>>>> + }
>>>> +
>>>> if (curl_debug_pool)
>>>> - nbdkit_debug ("get_handle: %zu", ch->i);
>>>> - return ch;
>>>> - }
>>>> - }
>>>> + nbdkit_debug ("curl_multi_wait returned: running_handles=%d
numfds=%d",
>>>> + running_handles, numfds);
>>>> +
>>>> + if (numfds == 0) {
>>>> + repeats++;
>>>> + if (repeats > 1)
>>>> + nbdkit_nanosleep (1, 0);
>>>> + }
>>>> + else {
>>>> + repeats = 0;
>>>> + if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
>>>> + /* There's a command waiting. */
>>>> + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
>>>> + abort ();
>>>> + }
>>>> + }
>>>> + } while (!cmd);
>>>>
>>>> - /* If more connections are allowed, then allocate a new handle. */
>>>> - if (curl_handles.len < connections) {
>>>> - ch = allocate_handle ();
>>>> - if (ch == NULL)
>>>> - return NULL;
>>>> - if (curl_handle_list_append (&curl_handles, ch) == -1) {
>>>> - free_handle (ch);
>>>> - return NULL;
>>>> - }
>>>> - ch->i = curl_handles.len - 1;
>>>> - ch->in_use = true;
>>>> - in_use++;
>>>> if (curl_debug_pool)
>>>> - nbdkit_debug ("get_handle: %zu", ch->i);
>>>> - return ch;
>>>> - }
>>>> + nbdkit_debug ("curl: dispatching %s command %" PRIu64,
>>>> + command_type_to_string (cmd->type), cmd->id);
>>>> +
>>>> + switch (cmd->type) {
>>>> + case STOP:
>>>> + stop = true;
>>>> + retire_command (cmd, CURLE_OK);
>>>> + break;
>>>>
>>>> - /* Otherwise we have run out of connections so we must wait until
>>>> - * another thread calls put_handle.
>>>> - */
>>>> - assert (in_use == connections);
>>>> - waiting++;
>>>> - while (in_use == connections)
>>>> - pthread_cond_wait (&cond, &lock);
>>>> - waiting--;
>>>> + case EASY_HANDLE:
>>>> + do_easy_handle (cmd);
>>>> + break;
>>>> + }
>>>> + } /* while (!stop) */
>>>>
>>>> - goto again;
>>>> + if (curl_debug_pool)
>>>> + nbdkit_debug ("curl: background thread stopped");
>>>> +
>>>> + return NULL;
>>>> }
>>>>
>>>> -/* Return the handle to the pool. */
>>>> -void
>>>> -put_handle (struct curl_handle *ch)
>>>> +/* This checks if any easy handles in the multi have
>>>> + * finished and retires the associated commands.
>>>> + */
>>>> +static void
>>>> +check_for_finished_handles (void)
>>>> {
>>>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>>>> + CURLMsg *msg;
>>>> + int msgs_in_queue;
>>>> +
>>>> + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) !=
NULL) {
>>>> + size_t i;
>>>> + struct curl_handle *ch = NULL;
>>>> +
>>>> + if (msg->msg == CURLMSG_DONE) {
>>>> + /* Find this curl_handle. */
>>>> + for (i = 0; i < curl_handles.len; ++i) {
>>>> + if (curl_handles.ptr[i]->c == msg->easy_handle) {
>>>> + ch = curl_handles.ptr[i];
>>>> + curl_handle_list_remove (&curl_handles, i);
>>>> + }
>>>> + }
>>>> + if (ch == NULL) abort ();
>>>> + curl_multi_remove_handle (multi, ch->c);
>>>> +
>>>> + retire_command (ch->cmd, msg->data.result);
>>>> + }
>>>> + }
>>>> +}
>>>>
>>>> +/* Retire a command. status is a CURLcode. */
>>>> +static void
>>>> +retire_command (struct command *cmd, CURLcode status)
>>>> +{
>>>> if (curl_debug_pool)
>>>> - nbdkit_debug ("put_handle: %zu", ch->i);
>>>> + nbdkit_debug ("curl: retiring %s command %" PRIu64,
>>>> + command_type_to_string (cmd->type), cmd->id);
>>>> +
>>>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>>>> + cmd->status = status;
>>>> + pthread_cond_signal (&cmd->cond);
>>>> +}
>>>> +
>>>> +static void
>>>> +do_easy_handle (struct command *cmd)
>>>> +{
>>>> + CURLMcode mc;
>>>> +
>>>> + cmd->ch->cmd = cmd;
>>>> +
>>>> + /* Add the handle to the multi. */
>>>> + mc = curl_multi_add_handle (multi, cmd->ch->c);
>>>> + if (mc != CURLM_OK) {
>>>> + nbdkit_error ("curl_multi_add_handle: %s",
curl_multi_strerror (mc));
>>>> + goto err;
>>>> + }
>>>>
>>>> - ch->in_use = false;
>>>> - in_use--;
>>>> + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
>>>> + goto err;
>>>> + return;
>>>>
>>>> - /* Signal the next thread which is waiting. */
>>>> - if (waiting > 0)
>>>> - pthread_cond_signal (&cond);
>>>> + err:
>>>> + retire_command (cmd, CURLE_OUT_OF_MEMORY);
>>>> }
>>>> --
>>>> 2.41.0
>>>
>>> Overall looks nice, and I learned more about curl in the process.
>>
>> For me, too much :-(
>>
>> Thanks,
>>
>> Rich.
>>