On 6/4/19 4:59 AM, Richard W.M. Jones wrote:
 ---
  .gitignore                   |   1 +
  examples/Makefile.am         |  12 +
  examples/concurrent-writer.c | 450 +++++++++++++++++++++++++++++++++++
  3 files changed, 463 insertions(+)
  
 @@ -0,0 +1,450 @@
 +/* Example usage with nbdkit:
 + *
 + * nbdkit -U - memory 100M --run './concurrent-writer $unixsocket'
 + *
 + * This will read and write randomly over the first megabyte of the 
Stale comment.
 + * plugin using multi-conn, multiple threads, multiple requests in
 + * flight on each connection, and concurrent writer threads.
 + *
 + * To run it against a remote server over TCP:
 + *
 + * ./concurrent-writer hostname port
 + *  or
 + * ./concurrent-writer nbd://hostname:port
 + */
 + 
 +
 +/* Number of simultaneous connections to the NBD server.  The number
 + * of threads is NR_MULTI_CONN * 2 because there is one thread reading
 + * plus a concurrent writer thread.  Note that some servers only
 + * support a limited number of simultaneous connections, and/or have a
 + * configurable thread pool internally, and if you exceed those limits
 + * then something will break. 
Possibly stale comment. More likely, you'll reach a point of diminishing
returns.
 +
 +  /* Make sure the number of requests that were required matches what
 +   * we expect.
 +   */
 +  assert (requests == NR_MULTI_CONN * NR_CYCLES);
 +
 +  printf ("most requests seen in flight = %u (per thread) "
 +          "vs MAX_IN_FLIGHT = %d\n",
 +          most_in_flight, MAX_IN_FLIGHT); 
Now that we queue commands without regards to the server receiving them,
this should always equal MAX_IN_FLIGHT.  But it doesn't hurt to print it
to still check.
 +
 +  exit (errors == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
 +}
 +
 +struct queue {
 +  struct queue *next;
 +  void *buf;
 +  size_t len;
 +};
 +
 +/* Concurrent writer thread (one per libnbd handle). */
 +struct writer_data {
 +  size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
 +  struct nbd_handle *nbd;       /* NBD handle. */
 +  struct queue *q, *q_end;      /* Queue of items to write. */
 +  pthread_mutex_t q_lock;       /* Lock on queue. */
 +  pthread_cond_t q_cond;        /* Condition on queue. */ 
I'm half-wondering if we could use sem_t instead of pthread_cond_t for
the same effect, and if it would have any noticeable timing differences.
But that should be a separate experiment on top of this one.
 +};
 +
 +static void *start_writer_thread (void *arg);
 +static int writer (void *data, const void *buf, size_t len);
 +
 +static void *
 +start_reader_thread (void *arg)
 +{ 
 +
 +    dir = nbd_aio_get_direction (nbd);
 +    if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) {
 +      /* The concurrent writer is always writable, we don't have to
 +       * test the socket in poll.  Since calling nbd_aio_notify_write
 +       * can change the state, after doing it we must restart the
 +       * loop.
 +       */
 +      nbd_aio_notify_write (nbd);
 +      continue;
 +    } 
I'm still not convinced whether we can ever see DIRECTION_WRITE, but
agree that leaving this in for safety doesn't hurt.
 +
 +    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
 +      fds[0].events |= POLLIN;
 + 
Should this ALWAYS look for POLLIN, rather than just checking
DIRECTION_READ?  I'm worried that we might deadlock if the poll() is
called with fds[0].events == 0 because we managed to sniff
nbd_aio_get_direction() at a point in time where the state machine was
transiently not in a state that blocks on read.  For this example, the
thread posting nbd_aio_pread is the same as the thread calling poll(),
so I guess that shouldn't happen (it's more of a concern for my
nbdkit-nbd usage of libnbd).
 +    if (poll (fds, 1, -1) == -1) {
 +      perror ("poll");
 +      goto error;
 +    }
 +
 +    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
 +        (fds[0].revents & POLLIN) != 0)
 +      nbd_aio_notify_read (nbd);
 +
 +    /* If a command is ready to retire, retire it. */
 +    for (j = 0; j < in_flight; ++j) {
 +      r = nbd_aio_command_completed (nbd, handles[j]);
 +      if (r == -1) {
 +        fprintf (stderr, "%s\n", nbd_get_error ());
 +        goto error;
 +      }
 +      if (r) {
 +        memmove (&handles[j], &handles[j+1],
 +                 sizeof (handles[0]) * (in_flight - j - 1));
 +        j--;
 +        in_flight--;
 +        status->requests++;
 +      }
 +    }
 +  }
 +
 +  if (nbd_shutdown (nbd) == -1) {
 +    fprintf (stderr, "%s\n", nbd_get_error ());
 +    exit (EXIT_FAILURE);
 +  }
 +
 +  nbd_close (nbd);
 +
 +  printf ("thread %zu: finished OK\n", status->i);
 + 
Still no cleanup of the writer thread.
 +  free (buf);
 +  status->status = 0;
 +  pthread_exit (status);
 +
 + error:
 +  free (buf);
 +  fprintf (stderr, "thread %zu: failed\n", status->i);
 +  status->status = -1;
 +  pthread_exit (status);
 +}
 +
 +/* This runs in the reader thread and enqueues the data which will be
 + * picked up by the writer thread.
 + */
 +static int
 +writer (void *data, const void *buf, size_t len)
 +{ 
May change if we introduce a flags parameter (per your other thread on
potential races/deadlocks that you are noticing).
 +  struct writer_data *writer_data = data;
 +  struct queue *item;
 +
 +  item = malloc (sizeof *item);
 +  if (!item) return -1;
 +  item->next = NULL;
 +  item->buf = malloc (len);
 +  if (item->buf == NULL) {
 +    free (item);
 +    return -1;
 +  }
 +  memcpy (item->buf, buf, len);
 +  item->len = len;
 +
 +  /* Enqueue the item and signal the writer thread. */
 +  pthread_mutex_lock (&writer_data->q_lock);
 +  if (writer_data->q_end == NULL)
 +    writer_data->q = writer_data->q_end = item;
 +  else {
 +    writer_data->q_end->next = item;
 +    writer_data->q_end = item;
 +  }
 +  pthread_cond_signal (&writer_data->q_cond);
 +  pthread_mutex_unlock (&writer_data->q_lock);
 +
 +  return 0;
 +}
 +
 +static void *
 +start_writer_thread (void *arg)
 +{
 +  struct writer_data *writer_data = arg;
 +  struct nbd_handle *nbd = writer_data->nbd;
 +  struct queue *item;
 +  int fd;
 +  struct pollfd fds[1];
 +  ssize_t r;
 +  void *p;
 +
 +  fd = nbd_aio_get_fd (nbd);
 +  if (fd == -1) {
 +    fprintf (stderr, "%s\n", nbd_get_error ());
 +    exit (EXIT_FAILURE);
 +  } 
You already mentioned the potential deadlock here if the writer thread
is started before nbd_connect_*.
 +
 +  for (;;) {
 +    /* Pick next job off the queue. */
 +    pthread_mutex_lock (&writer_data->q_lock);
 +    while (writer_data->q == NULL)
 +      pthread_cond_wait (&writer_data->q_cond, &writer_data->q_lock);
 +    item = writer_data->q;
 +    writer_data->q = item->next;
 +    if (writer_data->q == NULL)
 +      writer_data->q_end = NULL;
 +    pthread_mutex_unlock (&writer_data->q_lock);
 +
 +    p = item->buf;
 +    while (item->len > 0) {
 +      /* Wait for the socket to become ready to write. */
 +      fds[0].fd = fd;
 +      fds[0].events = POLLOUT;
 +      fds[0].revents = 0;
 +
 +      if (poll (fds, 1, -1) == -1) goto error;
 +
 +      r = send (fd, p, item->len, 0);
 +      if (r == -1) goto error;
 +
 +      p += r;
 +      item->len -= r;
 +    }
 +
 +    free (item->buf);
 +    free (item);
 +  }
 +
 + error:
 +  nbd_concurrent_writer_error (nbd, errno); 
Potential use-after-free if the reader thread does not join this one
before calling nbd_close() prior to this thread detecting that the fd is
no longer poll-able.
-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3226
Virtualization:  
qemu.org | 
libvirt.org