There was a theoretic race in this example: If the server was very
fast at handling commands then it's possible that in a call such as:
cookie = nbd_aio_pread_callback (..., callback, ...);
buffers[i].cookie = cookie;
nbd_aio_pread_callback finished and calls the callback before
returning. buffers[i].cookie would therefore not be set, but the
callback() function was checking the list of buffers for the cookie.
This would have caused an abort() in the existing code, although we
have never observed that.
The new code, instead of relying on the cookie, passes a pointer to
&buffers[i], so the callback no longer needs to search the list of
buffers (this is also quicker).
This necessitated another change however: previously we were copying
buffers around to ensure that the next free buffer was always at
&buffers[nr_buffers]. Instead of that introduce a new state
(BUFFER_UNUSED) and search the short list of buffers once when looking
for a free buffer.
Thanks: Eric Blake.
---
examples/glib-main-loop.c | 96 +++++++++++++--------------------------
1 file changed, 32 insertions(+), 64 deletions(-)
diff --git a/examples/glib-main-loop.c b/examples/glib-main-loop.c
index 9f98033..826651e 100644
--- a/examples/glib-main-loop.c
+++ b/examples/glib-main-loop.c
@@ -247,6 +247,7 @@ static const char *dest_args[] = {
#define BUFFER_SIZE 65536
enum buffer_state {
+ BUFFER_UNUSED = 0,
BUFFER_READING,
BUFFER_READ_COMPLETED,
BUFFER_WRITING,
@@ -254,13 +255,6 @@ enum buffer_state {
struct buffer {
uint64_t offset;
- /* Note that command cookies are only unique per libnbd handle.
- * Since we have two handles but we must look up completed commands
- * in the buffer table by cookie we must maintain separate read and
- * write cookies.
- */
- int64_t rcookie;
- int64_t wcookie;
enum buffer_state state;
char *data;
};
@@ -355,7 +349,7 @@ static gboolean
read_data (gpointer user_data)
{
static uint64_t posn = 0;
- const size_t i = nr_buffers;
+ size_t i;
if (gssrc == NULL)
return FALSE;
@@ -367,16 +361,21 @@ read_data (gpointer user_data)
return FALSE;
}
+ /* Find a free buffer. */
+ for (i = 0; i < MAX_BUFFERS; ++i)
+ if (buffers[i].state == BUFFER_UNUSED)
+ goto found;
+
/* If too many read requests are in flight, return FALSE so this
* idle callback is unregistered. It will be registered by the
* write callback when nr_buffers decreases.
*/
- if (nr_buffers >= MAX_BUFFERS) {
- DEBUG (gssrc, "read_data: buffer full, pausing reads from source");
- reader_paused = true;
- return FALSE;
- }
+ assert (nr_buffers == MAX_BUFFERS);
+ DEBUG (gssrc, "read_data: buffer full, pausing reads from source");
+ reader_paused = true;
+ return FALSE;
+ found:
/* Begin reading into the new buffer. */
assert (buffers[i].data == NULL);
buffers[i].data = g_new (char, BUFFER_SIZE);
@@ -385,11 +384,9 @@ read_data (gpointer user_data)
nr_buffers++;
posn += BUFFER_SIZE;
- buffers[i].rcookie =
- nbd_aio_pread_callback (gssrc->nbd, buffers[i].data,
- BUFFER_SIZE, buffers[i].offset,
- finished_read, NULL, 0);
- if (buffers[i].rcookie == -1) {
+ if (nbd_aio_pread_callback (gssrc->nbd, buffers[i].data,
+ BUFFER_SIZE, buffers[i].offset,
+ finished_read, &buffers[i], 0) == -1) {
fprintf (stderr, "%s\n", nbd_get_error ());
exit (EXIT_FAILURE);
}
@@ -399,9 +396,9 @@ read_data (gpointer user_data)
/* This callback is called from libnbd when any read command finishes. */
static int
-finished_read (unsigned valid_flag, void *vp, int64_t rcookie, int *error)
+finished_read (unsigned valid_flag, void *vp, int64_t unused, int *error)
{
- size_t i;
+ struct buffer *buffer = vp;
if (!(valid_flag & LIBNBD_CALLBACK_VALID))
return 0;
@@ -411,19 +408,11 @@ finished_read (unsigned valid_flag, void *vp, int64_t rcookie, int
*error)
DEBUG (gssrc, "finished_read: read completed");
- /* Find the corresponding buffer and mark it as completed. */
- for (i = 0; i < nr_buffers; ++i) {
- if (buffers[i].rcookie == rcookie)
- goto found;
- }
- /* This should never happen. */
- abort ();
-
- found:
- buffers[i].state = BUFFER_READ_COMPLETED;
+ assert (buffer->state == BUFFER_READING);
+ buffer->state = BUFFER_READ_COMPLETED;
/* Create a writer idle handler. */
- g_idle_add (write_data, NULL);
+ g_idle_add (write_data, buffer);
return 1;
}
@@ -432,31 +421,19 @@ finished_read (unsigned valid_flag, void *vp, int64_t rcookie, int
*error)
static gboolean
write_data (gpointer user_data)
{
- size_t i;
+ struct buffer *buffer = user_data;
if (gsdest == NULL)
return FALSE;
- /* Find the first read-completed buffer and schedule it to be
- * written.
- */
- for (i = 0; i < nr_buffers; ++i) {
- if (buffers[i].state == BUFFER_READ_COMPLETED)
- goto found;
- }
- /* This should never happen. */
- abort ();
-
- found:
- buffers[i].wcookie =
- nbd_aio_pwrite_callback (gsdest->nbd, buffers[i].data,
- BUFFER_SIZE, buffers[i].offset,
- finished_write, NULL, 0);
- if (buffers[i].wcookie == -1) {
+ assert (buffer->state == BUFFER_READ_COMPLETED);
+ buffer->state = BUFFER_WRITING;
+ if (nbd_aio_pwrite_callback (gsdest->nbd, buffer->data,
+ BUFFER_SIZE, buffer->offset,
+ finished_write, buffer, 0) == -1) {
fprintf (stderr, "%s\n", nbd_get_error ());
exit (EXIT_FAILURE);
}
- buffers[i].state = BUFFER_WRITING;
/* We always unregister this idle handler because the read side
* creates a new idle handler for every buffer that has to be
@@ -467,9 +444,9 @@ write_data (gpointer user_data)
/* This callback is called from libnbd when any write command finishes. */
static int
-finished_write (unsigned valid_flag, void *vp, int64_t wcookie, int *error)
+finished_write (unsigned valid_flag, void *vp, int64_t unused, int *error)
{
- size_t i;
+ struct buffer *buffer = vp;
if (!(valid_flag & LIBNBD_CALLBACK_VALID))
return 0;
@@ -479,20 +456,11 @@ finished_write (unsigned valid_flag, void *vp, int64_t wcookie, int
*error)
DEBUG (gsdest, "finished_write: write completed");
- /* Find the corresponding buffer and free it. */
- for (i = 0; i < nr_buffers; ++i) {
- if (buffers[i].wcookie == wcookie)
- goto found;
- }
- /* This should never happen. */
- abort ();
-
- found:
- g_free (buffers[i].data);
- memmove (&buffers[i], &buffers[i+1],
- sizeof (struct buffer) * (nr_buffers-(i+1)));
+ assert (buffer->state == BUFFER_WRITING);
+ g_free (buffer->data);
+ buffer->data = NULL;
+ buffer->state = BUFFER_UNUSED;
nr_buffers--;
- buffers[nr_buffers].data = NULL;
/* If the number of buffers was MAX_BUFFERS and has now gone down to
* MAX_BUFFERS-1 then we need to restart the read handler.
--
2.22.0