On Mon, Feb 22, 2021 at 5:42 PM Richard W.M. Jones <rjones(a)redhat.com> wrote:
 Make this a (fairly) abstract structure.  At least hide the subtype
 fields from the main program.  This change is pure refactoring and
 doesn’t change the semantics. 
Nicer this way, although a little less type safe.
 ---
  copy/file-ops.c             | 164 +++++++++++++++++--
  copy/main.c                 | 315 ++++++++----------------------------
  copy/multi-thread-copying.c | 104 ++++++------
  copy/nbd-ops.c              | 248 +++++++++++++++++++++++++---
  copy/nbdcopy.h              |  53 +++---
  copy/null-ops.c             |  50 +++++-
  copy/pipe-ops.c             |  64 +++++++-
  copy/synch-copying.c        |  30 ++--
  8 files changed, 649 insertions(+), 379 deletions(-)
 diff --git a/copy/file-ops.c b/copy/file-ops.c
 index 2a239d0..81b08ce 100644
 --- a/copy/file-ops.c
 +++ b/copy/file-ops.c
 @@ -36,35 +36,159 @@
  #include "isaligned.h"
  #include "nbdcopy.h"
 +static struct rw_ops file_ops;
 +
 +struct rw_file {
 +  struct rw rw;
 +  int fd;
 +  struct stat stat;
 +  bool seek_hole_supported;
 +  int sector_size;
 +};
 +
 +static bool
 +seek_hole_supported (int fd)
 +{
 +#ifndef SEEK_HOLE
 +  return false;
 +#else
 +  off_t r = lseek (fd, 0, SEEK_HOLE);
 +  return r >= 0;
 +#endif
 +}
 +
 +struct rw *
 +file_create (const char *name, const struct stat *stat, int fd)
 +{
 +  struct rw_file *rwf = calloc (1, sizeof *rwf);
 +  if (rwf == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
 +
 +  rwf->rw.ops = &file_ops;
 +  rwf->rw.name = name;
 +  rwf->stat = *stat;
 +  rwf->fd = fd;
 +
 +  if (S_ISBLK (stat->st_mode)) {
 +    /* Block device. */
 +    rwf->rw.size = lseek (fd, 0, SEEK_END);
 +    if (rwf->rw.size == -1) {
 +      perror ("lseek");
 +      exit (EXIT_FAILURE);
 +    }
 +    if (lseek (fd, 0, SEEK_SET) == -1) {
 +      perror ("lseek");
 +      exit (EXIT_FAILURE);
 +    }
 +    rwf->seek_hole_supported = seek_hole_supported (fd);
 +    rwf->sector_size = 4096;
 +#ifdef BLKSSZGET
 +    if (ioctl (fd, BLKSSZGET, &rwf->sector_size))
 +      fprintf (stderr, "warning: cannot get sector size: %s: %m", name);
 +#endif
 +  }
 +  else if (S_ISREG (stat->st_mode)) {
 +    /* Regular file. */
 +    rwf->rw.size = stat->st_size;
 +    rwf->seek_hole_supported = seek_hole_supported (fd);
 +  }
 +  else
 +    abort ();
 +
 +  return (struct rw *) rwf; 
We can avoid the cast here by returning &(rwf->rw)
 +}
 +
  static void
  file_close (struct rw *rw)
  {
 -  if (close (rw->u.local.fd) == -1) {
 +  struct rw_file *rwf = (struct rw_file *)rw;
 +
 +  if (close (rwf->fd) == -1) {
      fprintf (stderr, "%s: close: %m\n", rw->name);
      exit (EXIT_FAILURE);
    }
 +  free (rw);
 +}
 +
 +static void
 +file_truncate (struct rw *rw, int64_t size)
 +{
 +  struct rw_file *rwf = (struct rw_file *) rw;
 +
 +  /* If the destination is an ordinary file then the original file
 +   * size doesn't matter.  Truncate it to the source size.  But
 +   * truncate it to zero first so the file is completely empty and
 +   * sparse.
 +   */
 +  if (! S_ISREG (rwf->stat.st_mode))
 +    return;
 +
 +  if (ftruncate (rwf->fd, 0) == -1 ||
 +      ftruncate (rwf->fd, size) == -1) {
 +    perror ("truncate");
 +    exit (EXIT_FAILURE);
 +  }
 +  rwf->rw.size = size;
 +
 +  /* We can assume the destination is zero. */
 +  destination_is_zero = true;
  }
  static void
  file_flush (struct rw *rw)
  {
 -  if ((S_ISREG (rw->u.local.stat.st_mode) ||
 -       S_ISBLK (rw->u.local.stat.st_mode)) &&
 -      fsync (rw->u.local.fd) == -1) {
 +  struct rw_file *rwf = (struct rw_file *)rw;
 +
 +  if ((S_ISREG (rwf->stat.st_mode) ||
 +       S_ISBLK (rwf->stat.st_mode)) &&
 +      fsync (rwf->fd) == -1) {
      perror (rw->name);
      exit (EXIT_FAILURE);
    }
  }
 +static bool
 +file_is_read_only (struct rw *rw)
 +{
 +  /* Permissions are hard, and this is only used as an early check
 +   * before the copy.  Proceed with the copy and fail if it fails.
 +   */
 +  return false;
 +}
 +
 +static bool
 +file_can_extents (struct rw *rw)
 +{
 +#ifdef SEEK_HOLE
 +  return true;
 +#else
 +  return false;
 +#endif
 +}
 +
 +static bool
 +file_can_multi_conn (struct rw *rw)
 +{
 +  return true;
 +}
 +
 +static void
 +file_start_multi_conn (struct rw *rw)
 +{
 +  /* Don't need to do anything for files since we can read/write on a
 +   * single file descriptor.
 +   */
 +}
 +
  static size_t
  file_synch_read (struct rw *rw,
                   void *data, size_t len, uint64_t offset)
  {
 +  struct rw_file *rwf = (struct rw_file *)rw;
    size_t n = 0;
    ssize_t r;
    while (len > 0) {
 -    r = pread (rw->u.local.fd, data, len, offset);
 +    r = pread (rwf->fd, data, len, offset);
      if (r == -1) {
        perror (rw->name);
        exit (EXIT_FAILURE);
 @@ -85,10 +209,11 @@ static void
  file_synch_write (struct rw *rw,
                    const void *data, size_t len, uint64_t offset)
  {
 +  struct rw_file *rwf = (struct rw_file *)rw;
    ssize_t r;
    while (len > 0) {
 -    r = pwrite (rw->u.local.fd, data, len, offset);
 +    r = pwrite (rwf->fd, data, len, offset);
      if (r == -1) {
        perror (rw->name);
        exit (EXIT_FAILURE);
 @@ -103,7 +228,8 @@ static bool
  file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
  {
  #ifdef FALLOC_FL_PUNCH_HOLE
 -  int fd = rw->u.local.fd;
 +  struct rw_file *rwf = (struct rw_file *)rw;
 +  int fd = rwf->fd;
    int r;
    r = fallocate (fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
 @@ -121,9 +247,11 @@ file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
  static bool
  file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
  {
 -  if (S_ISREG (rw->u.local.stat.st_mode)) {
 +  struct rw_file *rwf = (struct rw_file *)rw;
 +
 +  if (S_ISREG (rwf->stat.st_mode)) {
  #ifdef FALLOC_FL_ZERO_RANGE
 -    int fd = rw->u.local.fd;
 +    int fd = rwf->fd;
      int r;
      r = fallocate (fd, FALLOC_FL_ZERO_RANGE, offset, count);
 @@ -134,10 +262,10 @@ file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
      return true;
  #endif
    }
 -  else if (S_ISBLK (rw->u.local.stat.st_mode) &&
 -           IS_ALIGNED (offset | count, rw->u.local.sector_size)) {
 +  else if (S_ISBLK (rwf->stat.st_mode) &&
 +           IS_ALIGNED (offset | count, rwf->sector_size)) {
  #ifdef BLKZEROOUT
 -    int fd = rw->u.local.fd;
 +    int fd = rwf->fd;
      int r;
      uint64_t range[2] = {offset, count};
 @@ -223,11 +351,12 @@ file_get_extents (struct rw *rw, uintptr_t index,
    ret->size = 0;
  #ifdef SEEK_HOLE
 +  struct rw_file *rwf = (struct rw_file *)rw;
    static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER;
 -  if (rw->u.local.seek_hole_supported) {
 +  if (rwf->seek_hole_supported) {
      uint64_t end = offset + count;
 -    int fd = rw->u.local.fd;
 +    int fd = rwf->fd;
      off_t pos;
      struct extent e;
      size_t last;
 @@ -302,9 +431,14 @@ file_get_extents (struct rw *rw, uintptr_t index,
    default_get_extents (rw, index, offset, count, ret);
  }
 -struct rw_ops file_ops = {
 +static struct rw_ops file_ops = {
    .ops_name = "file_ops",
    .close = file_close,
 +  .is_read_only = file_is_read_only,
 +  .can_extents = file_can_extents,
 +  .can_multi_conn = file_can_multi_conn,
 +  .start_multi_conn = file_start_multi_conn,
 +  .truncate = file_truncate,
    .flush = file_flush,
    .synch_read = file_synch_read,
    .synch_write = file_synch_write,
 diff --git a/copy/main.c b/copy/main.c
 index 68a6030..6fdc6fd 100644
 --- a/copy/main.c
 +++ b/copy/main.c
 @@ -53,19 +53,11 @@ int progress_fd = -1;           /* --progress=FD */
  unsigned sparse_size = 4096;    /* --sparse */
  bool synchronous;               /* --synchronous flag */
  unsigned threads;               /* --threads */
 -struct rw src, dst;             /* The source and destination. */
 +struct rw *src, *dst;           /* The source and destination. */
  bool verbose;                   /* --verbose flag */
  static bool is_nbd_uri (const char *s);
 -static bool seek_hole_supported (int fd);
 -static void open_null (struct rw *rw);
 -static int open_local (const char *prog,
 -                       const char *filename, bool writing, struct rw *rw);
 -static void open_nbd_uri (const char *prog,
 -                          const char *uri, bool writing, struct rw *rw);
 -static void open_nbd_subprocess (const char *prog,
 -                                 const char **argv, size_t argc,
 -                                 bool writing, struct rw *rw);
 +static struct rw *open_local (const char *filename, bool writing);
  static void print_rw (struct rw *rw, const char *prefix, FILE *fp);
  static void __attribute__((noreturn))
 @@ -242,18 +234,18 @@ main (int argc, char *argv[])
    found1:
      connections = 1;            /* multi-conn not supported */
 -    src.name = argv[optind+1];
 -    open_nbd_subprocess (argv[0],
 -                         (const char **) &argv[optind+1], i-optind-1,
 -                         false, &src);
 +    src =
 +      nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1,
 +                                false);
      optind = i+1;
    }
    else {                        /* Source is not [...]. */
 -    src.name = argv[optind++];
 -    if (! is_nbd_uri (src.name))
 -      src.u.local.fd = open_local (argv[0], src.name, false, &src);
 +    const char *src_name = argv[optind++];
 +
 +    if (! is_nbd_uri (src_name))
 +      src = open_local (src_name, false);
      else
 -      open_nbd_uri (argv[0], src.name, false, &src);
 +      src = nbd_rw_create_uri (src_name, src_name, false);
    }
    if (optind >= argc)
 @@ -267,48 +259,46 @@ main (int argc, char *argv[])
    found2:
      connections = 1;            /* multi-conn not supported */
 -    dst.name = argv[optind+1];
 -    open_nbd_subprocess (argv[0],
 -                         (const char **) &argv[optind+1], i-optind-1,
 -                         true, &dst);
 +    dst =
 +      nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1,
 +                                true);
      optind = i+1;
    }
    else {                        /* Destination is not [...] */
 -    dst.name = argv[optind++];
 -    if (strcmp (dst.name, "null:") == 0)
 -      open_null (&dst);
 -    else if (! is_nbd_uri (dst.name))
 -      dst.u.local.fd = open_local (argv[0], dst.name, true /* writing */, &dst);
 -    else {
 -      open_nbd_uri (argv[0], dst.name, true, &dst);
 +    const char *dst_name = argv[optind++];
 -      /* Obviously this is not going to work if the server is
 -       * advertising read-only, so fail early with a nice error message.
 -       */
 -      if (nbd_is_read_only (dst.u.nbd.handles.ptr[0])) {
 -        fprintf (stderr, "%s: %s: "
 -                 "this NBD server is read-only, cannot write to it\n",
 -                 argv[0], dst.name);
 -        exit (EXIT_FAILURE);
 -      }
 -    }
 +    if (strcmp (dst_name, "null:") == 0)
 +      dst = null_create (dst_name);
 +    else if (! is_nbd_uri (dst_name))
 +      dst = open_local (dst_name, true /* writing */);
 +    else
 +      dst = nbd_rw_create_uri (dst_name, dst_name, true);
    }
    /* There must be no extra parameters. */
    if (optind != argc)
      usage (stderr, EXIT_FAILURE);
 -  /* Check we've set the fields of src and dst. */
 -  assert (src.ops);
 -  assert (src.name);
 -  assert (dst.ops);
 -  assert (dst.name);
 +  /* Check we've created src and dst and set the expected fields. */
 +  assert (src != NULL);
 +  assert (dst != NULL);
 +  assert (src->ops != NULL);
 +  assert (src->name != NULL);
 +  assert (dst->ops != NULL);
 +  assert (dst->name != NULL);
 +
 +  /* Obviously this is not going to work if the destination is
 +   * read-only, so fail early with a nice error message.
 +   */
 +  if (dst->ops->is_read_only (dst)) {
 +    fprintf (stderr, "%s: %s: "
 +             "the destination is read-only, cannot write to it\n",
 +             argv[0], dst->name);
 +    exit (EXIT_FAILURE);
 +  }
    /* If multi-conn is not supported, force connections to 1. */
 -  if ((src.ops == &nbd_ops &&
 -       ! nbd_can_multi_conn (src.u.nbd.handles.ptr[0])) ||
 -      (dst.ops == &nbd_ops &&
 -       ! nbd_can_multi_conn (dst.u.nbd.handles.ptr[0])))
 +  if (! src->ops->can_multi_conn (src) || ! dst->ops->can_multi_conn (dst))
      connections = 1;
    /* Calculate the number of threads from the number of connections. */
 @@ -335,44 +325,17 @@ main (int argc, char *argv[])
    if (threads < connections)
      connections = threads;
 -  /* Calculate the source and destination sizes.  We set these to -1
 -   * if the size is not known (because it's a stream).  Note that for
 -   * local types, open_local set something in *.size already.
 +  /* Truncate the destination to the same size as the source.  Only
 +   * has an effect on regular files.
     */
 -  if (src.ops == &nbd_ops) {
 -    src.size = nbd_get_size (src.u.nbd.handles.ptr[0]);
 -    if (src.size == -1) {
 -      fprintf (stderr, "%s: %s: %s\n", argv[0], src.name, nbd_get_error ());
 -      exit (EXIT_FAILURE);
 -    }
 -  }
 -  if (dst.ops != &nbd_ops && S_ISREG (dst.u.local.stat.st_mode)) {
 -    /* If the destination is an ordinary file then the original file
 -     * size doesn't matter.  Truncate it to the source size.  But
 -     * truncate it to zero first so the file is completely empty and
 -     * sparse.
 -     */
 -    dst.size = src.size;
 -    if (ftruncate (dst.u.local.fd, 0) == -1 ||
 -        ftruncate (dst.u.local.fd, dst.size) == -1) {
 -      perror ("truncate");
 -      exit (EXIT_FAILURE);
 -    }
 -    destination_is_zero = true;
 -  }
 -  else if (dst.ops == &nbd_ops) {
 -    dst.size = nbd_get_size (dst.u.nbd.handles.ptr[0]);
 -    if (dst.size == -1) {
 -      fprintf (stderr, "%s: %s: %s\n", argv[0], dst.name, nbd_get_error ());
 -      exit (EXIT_FAILURE);
 -    }
 -  }
 +  if (dst->ops->truncate)
 +    dst->ops->truncate (dst, src->size);
    /* Check if the source is bigger than the destination, since that
     * would truncate (ie. lose) data.  Copying from smaller to larger
     * is OK.
     */
 -  if (src.size >= 0 && dst.size >= 0 && src.size > dst.size) {
 +  if (src->size >= 0 && dst->size >= 0 && src->size >
dst->size) {
      fprintf (stderr,
               "nbdcopy: error: destination size is smaller than source
size\n");
      exit (EXIT_FAILURE);
 @@ -383,37 +346,29 @@ main (int argc, char *argv[])
     * settings.
     */
    if (verbose) {
 -    print_rw (&src, "nbdcopy: src", stderr);
 -    print_rw (&dst, "nbdcopy: dst", stderr);
 +    print_rw (src, "nbdcopy: src", stderr);
 +    print_rw (dst, "nbdcopy: dst", stderr);
      fprintf (stderr, "nbdcopy: connections=%u requests=%u threads=%u "
               "synchronous=%s\n",
               connections, max_requests, threads,
               synchronous ? "true" : "false");
    }
 -  /* If #connections > 1 then multi-conn is enabled at both ends and
 -   * we need to open further connections.
 +  /* If multi-conn is enabled on either side, then at this point we
 +   * need to ask the backend to open the extra connections.
     */
    if (connections > 1) {
      assert (threads == connections);
 -
 -    if (src.ops == &nbd_ops) {
 -      for (i = 1; i < connections; ++i)
 -        open_nbd_uri (argv[0], src.name, false, &src);
 -      assert (src.u.nbd.handles.size == connections);
 -    }
 -    if (dst.ops == &nbd_ops) {
 -      for (i = 1; i < connections; ++i)
 -        open_nbd_uri (argv[0], dst.name, true, &dst);
 -      assert (dst.u.nbd.handles.size == connections);
 -    }
 +    if (src->ops->can_multi_conn (src))
 +      src->ops->start_multi_conn (src);
 +    if (dst->ops->can_multi_conn (dst))
 +      dst->ops->start_multi_conn (dst);
    }
    /* If the source is NBD and we couldn't negotiate meta
     * base:allocation then turn off extents.
     */
 -  if (src.ops == &nbd_ops &&
 -      !nbd_can_meta_context (src.u.nbd.handles.ptr[0], "base:allocation"))
 +  if (! src->ops->can_extents (src))
      extents = false;
    /* Always set the progress bar to 0% at the start of the copy. */
 @@ -429,12 +384,12 @@ main (int argc, char *argv[])
    progress_bar (1, 1);
    /* Shut down the source side. */
 -  src.ops->close (&src);
 +  src->ops->close (src);
    /* Shut down the destination side. */
    if (flush)
 -    dst.ops->flush (&dst);
 -  dst.ops->close (&dst);
 +    dst->ops->flush (dst);
 +  dst->ops->close (dst);
    exit (EXIT_SUCCESS);
  }
 @@ -452,33 +407,25 @@ is_nbd_uri (const char *s)
      strncmp (s, "nbds+vsock:", 11) == 0;
  }
 -/* Open null: (destination only). */
 -static void
 -open_null (struct rw *rw)
 -{
 -  rw->ops = &null_ops;
 -  rw->size = INT64_MAX;
 -}
 -
  /* Open a local (non-NBD) file, ie. a file, device, or "-" for stdio.
 - * Returns the open file descriptor which the caller must close.
 + * Returns the struct rw * which the caller must close.
   *
   * “writing” is true if this is the destination parameter.
   * “rw->u.local.stat” and “rw->size” return the file stat and size,
   * but size can be returned as -1 if we don't know the size (if it's a
   * pipe or stdio).
   */
 -static int
 -open_local (const char *prog,
 -            const char *filename, bool writing, struct rw *rw)
 +static struct rw *
 +open_local (const char *filename, bool writing)
  {
    int flags, fd;
 +  struct stat stat;
    if (strcmp (filename, "-") == 0) {
      synchronous = true;
      fd = writing ? STDOUT_FILENO : STDIN_FILENO;
      if (writing && isatty (fd)) {
 -      fprintf (stderr, "%s: refusing to write to tty\n", prog);
 +      fprintf (stderr, "%s: refusing to write to tty\n", "nbdcopy");
Is it intended to replace prog with "nbdcopy"?
        exit (EXIT_FAILURE);
      }
    }
 @@ -502,146 +449,17 @@ open_local (const char *prog,
      }
    }
 -  if (fstat (fd, &rw->u.local.stat) == -1) {
 +  if (fstat (fd, &stat) == -1) {
      perror (filename);
      exit (EXIT_FAILURE);
    }
 -  if (S_ISBLK (rw->u.local.stat.st_mode)) {
 -    /* Block device. */
 -    rw->ops = &file_ops;
 -    rw->size = lseek (fd, 0, SEEK_END);
 -    if (rw->size == -1) {
 -      perror ("lseek");
 -      exit (EXIT_FAILURE);
 -    }
 -    if (lseek (fd, 0, SEEK_SET) == -1) {
 -      perror ("lseek");
 -      exit (EXIT_FAILURE);
 -    }
 -    rw->u.local.seek_hole_supported = seek_hole_supported (fd);
 -    rw->u.local.sector_size = 4096;
 -#ifdef BLKSSZGET
 -    if (ioctl (fd, BLKSSZGET, &rw->u.local.sector_size))
 -      fprintf (stderr, "warning: cannot get sector size: %s: %m",
rw->name);
 -#endif
 -  }
 -  else if (S_ISREG (rw->u.local.stat.st_mode)) {
 -    /* Regular file. */
 -    rw->ops = &file_ops;
 -    rw->size = rw->u.local.stat.st_size;
 -    rw->u.local.seek_hole_supported = seek_hole_supported (fd);
 -  }
 +  if (S_ISBLK (stat.st_mode) || S_ISREG (stat.st_mode))
 +    return file_create (filename, &stat, fd);
    else {
 -    /* Probably stdin/stdout, a pipe or a socket.  Set size == -1
 -     * which means don't know, and force synchronous mode.
 -     */
 -    synchronous = true;
 -    rw->ops = &pipe_ops;
 -    rw->size = -1;
 -    rw->u.local.seek_hole_supported = false;
 +    /* Probably stdin/stdout, a pipe or a socket. */
 +    synchronous = true;        /* Force synchronous mode for pipes. */
 +    return pipe_create (filename, fd);
    }
 -
 -  return fd;
 -}
 -
 -static bool
 -seek_hole_supported (int fd)
 -{
 -#ifndef SEEK_HOLE
 -  return false;
 -#else
 -  off_t r = lseek (fd, 0, SEEK_HOLE);
 -  return r >= 0;
 -#endif
 -}
 -
 -static void
 -open_nbd_uri (const char *prog,
 -              const char *uri, bool writing, struct rw *rw)
 -{
 -  struct nbd_handle *nbd;
 -
 -  rw->ops = &nbd_ops;
 -  nbd = nbd_create ();
 -  if (nbd == NULL) {
 -    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
 -    exit (EXIT_FAILURE);
 -  }
 -  nbd_set_debug (nbd, verbose);
 -  nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */
 -  if (extents && !writing &&
 -      nbd_add_meta_context (nbd, "base:allocation") == -1) {
 -    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
 -    exit (EXIT_FAILURE);
 -  }
 -
 -  if (handles_append (&rw->u.nbd.handles, nbd) == -1) {
 -    perror ("realloc");
 -    exit (EXIT_FAILURE);
 -  }
 -
 -  if (nbd_connect_uri (nbd, uri) == -1) {
 -    fprintf (stderr, "%s: %s: %s\n", prog, uri, nbd_get_error ());
 -    exit (EXIT_FAILURE);
 -  }
 -
 -  /* Cache these.  We assume with multi-conn that each handle will act
 -   * the same way.
 -   */
 -  rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0;
 -  rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0;
 -}
 -
 -DEFINE_VECTOR_TYPE (const_string_vector, const char *);
 -
 -static void
 -open_nbd_subprocess (const char *prog,
 -                     const char **argv, size_t argc,
 -                     bool writing, struct rw *rw)
 -{
 -  struct nbd_handle *nbd;
 -  const_string_vector copy = empty_vector;
 -  size_t i;
 -
 -  rw->ops = &nbd_ops;
 -  nbd = nbd_create ();
 -  if (nbd == NULL) {
 -    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
 -    exit (EXIT_FAILURE);
 -  }
 -  nbd_set_debug (nbd, verbose);
 -  if (extents && !writing &&
 -      nbd_add_meta_context (nbd, "base:allocation") == -1) {
 -    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
 -    exit (EXIT_FAILURE);
 -  }
 -
 -  if (handles_append (&rw->u.nbd.handles, nbd) == -1) {
 -  memory_error:
 -    perror ("realloc");
 -    exit (EXIT_FAILURE);
 -  }
 -
 -  /* We have to copy the args so we can null-terminate them. */
 -  for (i = 0; i < argc; ++i) {
 -    if (const_string_vector_append (©, argv[i]) == -1)
 -      goto memory_error;
 -  }
 -  if (const_string_vector_append (©, NULL) == -1)
 -    goto memory_error;
 -
 -  if (nbd_connect_systemd_socket_activation (nbd, (char **) copy.ptr) == -1) {
 -    fprintf (stderr, "%s: %s: %s\n", prog, argv[0], nbd_get_error ());
 -    exit (EXIT_FAILURE);
 -  }
 -
 -  /* Cache these.  We assume with multi-conn that each handle will act
 -   * the same way.
 -   */
 -  rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0;
 -  rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0;
 -
 -  free (copy.ptr);
  }
  /* Print an rw struct, used in --verbose mode. */
 @@ -650,7 +468,6 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp)
  {
    fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name,
rw->name);
    fprintf (fp, "%s: size=%" PRIi64 "\n", prefix, rw->size);
 -  /* Could print other stuff here, but that's enough for debugging. */
  }
  /* Default implementation of rw->ops->get_extents for backends which
 diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
 index 98b4056..4f57054 100644
 --- a/copy/multi-thread-copying.c
 +++ b/copy/multi-thread-copying.c
 @@ -50,13 +50,13 @@ get_next_offset (uint64_t *offset, uint64_t *count)
    bool r = false;               /* returning false means no more work */
    pthread_mutex_lock (&lock);
 -  if (next_offset < src.size) {
 +  if (next_offset < src->size) {
      *offset = next_offset;
      /* Work out how large this range is.  The last range may be
       * smaller than THREAD_WORK_SIZE.
       */
 -    *count = src.size - *offset;
 +    *count = src->size - *offset;
      if (*count > THREAD_WORK_SIZE)
        *count = THREAD_WORK_SIZE;
 @@ -69,7 +69,7 @@ get_next_offset (uint64_t *offset, uint64_t *count)
       * are called from threads and not necessarily in monotonic order
       * so the progress bar would move erratically.
       */
 -    progress_bar (*offset, dst.size);
 +    progress_bar (*offset, dst->size);
    }
    pthread_mutex_unlock (&lock);
    return r;
 @@ -89,11 +89,13 @@ multi_thread_copying (void)
     */
    assert (threads > 0);
    assert (threads == connections);
 +/*
    if (src.ops == &nbd_ops)
      assert (src.u.nbd.handles.size == connections);
    if (dst.ops == &nbd_ops)
      assert (dst.u.nbd.handles.size == connections);
 -  assert (src.size != -1);
 +*/
 +  assert (src->size != -1);
    workers = malloc (sizeof (pthread_t) * threads);
    if (workers == NULL) {
 @@ -147,9 +149,9 @@ worker_thread (void *indexp)
      assert (0 < count && count <= THREAD_WORK_SIZE);
      if (extents)
 -      src.ops->get_extents (&src, index, offset, count, &exts);
 +      src->ops->get_extents (src, index, offset, count, &exts);
      else
 -      default_get_extents (&src, index, offset, count, &exts);
 +      default_get_extents (src, index, offset, count, &exts);
      for (i = 0; i < exts.size; ++i) {
        struct command *command;
 @@ -208,11 +210,11 @@ worker_thread (void *indexp)
            wait_for_request_slots (index);
            /* Begin the asynch read operation. */
 -          src.ops->asynch_read (&src, command,
 -                                (nbd_completion_callback) {
 -                                  .callback = finished_read,
 -                                  .user_data = command,
 -                                });
 +          src->ops->asynch_read (src, command,
 +                                 (nbd_completion_callback) {
 +                                   .callback = finished_read,
 +                                   .user_data = command,
 +                                 });
            exts.ptr[i].offset += len;
            exts.ptr[i].length -= len;
 @@ -254,7 +256,7 @@ wait_for_request_slots (uintptr_t index)
  static unsigned
  in_flight (uintptr_t index)
  {
 -  return src.ops->in_flight (&src, index) + dst.ops->in_flight (&dst,
index);
 +  return src->ops->in_flight (src, index) + dst->ops->in_flight (dst,
index);
  }
  /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
 @@ -271,7 +273,7 @@ poll_both_ends (uintptr_t index)
    /* Note: if polling is not supported, this function will
     * set fd == -1 which poll ignores.
     */
 -  src.ops->get_polling_fd (&src, index, &fds[0].fd, &direction);
 +  src->ops->get_polling_fd (src, index, &fds[0].fd, &direction);
    if (fds[0].fd >= 0) {
      switch (direction) {
      case LIBNBD_AIO_DIRECTION_READ:
 @@ -286,7 +288,7 @@ poll_both_ends (uintptr_t index)
      }
    }
 -  dst.ops->get_polling_fd (&dst, index, &fds[1].fd, &direction);
 +  dst->ops->get_polling_fd (dst, index, &fds[1].fd, &direction);
    if (fds[1].fd >= 0) {
      switch (direction) {
      case LIBNBD_AIO_DIRECTION_READ:
 @@ -311,24 +313,24 @@ poll_both_ends (uintptr_t index)
    if (fds[0].fd >= 0) {
      if ((fds[0].revents & (POLLIN | POLLHUP)) != 0)
 -      src.ops->asynch_notify_read (&src, index);
 +      src->ops->asynch_notify_read (src, index);
      else if ((fds[0].revents & POLLOUT) != 0)
 -      src.ops->asynch_notify_write (&src, index);
 +      src->ops->asynch_notify_write (src, index);
      else if ((fds[0].revents & (POLLERR | POLLNVAL)) != 0) {
        errno = ENOTCONN;
 -      perror (src.name);
 +      perror (src->name);
        exit (EXIT_FAILURE);
      }
    }
    if (fds[1].fd >= 0) {
      if ((fds[1].revents & (POLLIN | POLLHUP)) != 0)
 -      dst.ops->asynch_notify_read (&dst, index);
 +      dst->ops->asynch_notify_read (dst, index);
      else if ((fds[1].revents & POLLOUT) != 0)
 -      dst.ops->asynch_notify_write (&dst, index);
 +      dst->ops->asynch_notify_write (dst, index);
      else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) {
        errno = ENOTCONN;
 -      perror (dst.name);
 +      perror (dst->name);
        exit (EXIT_FAILURE);
      }
    }
 @@ -377,11 +379,11 @@ finished_read (void *vp, int *error)
      /* If sparseness detection (see below) is turned off then we write
       * the whole command.
       */
 -    dst.ops->asynch_write (&dst, command,
 -                           (nbd_completion_callback) {
 -                             .callback = free_command,
 -                             .user_data = command,
 -                           });
 +    dst->ops->asynch_write (dst, command,
 +                            (nbd_completion_callback) {
 +                              .callback = free_command,
 +                              .user_data = command,
 +                            });
    }
    else {                               /* Sparseness detection. */
      const uint64_t start = command->offset;
 @@ -408,11 +410,11 @@ finished_read (void *vp, int *error)
              newcommand = copy_subcommand (command,
                                            last_offset, i - last_offset,
                                            false);
 -            dst.ops->asynch_write (&dst, newcommand,
 -                                   (nbd_completion_callback) {
 -                                     .callback = free_command,
 -                                     .user_data = newcommand,
 -                                   });
 +            dst->ops->asynch_write (dst, newcommand,
 +                                    (nbd_completion_callback) {
 +                                      .callback = free_command,
 +                                      .user_data = newcommand,
 +                                    });
            }
            /* Start the new hole. */
            last_offset = i;
 @@ -445,11 +447,11 @@ finished_read (void *vp, int *error)
          newcommand = copy_subcommand (command,
                                        last_offset, i - last_offset,
                                        false);
 -        dst.ops->asynch_write (&dst, newcommand,
 -                               (nbd_completion_callback) {
 -                                 .callback = free_command,
 -                                 .user_data = newcommand,
 -                               });
 +        dst->ops->asynch_write (dst, newcommand,
 +                                (nbd_completion_callback) {
 +                                  .callback = free_command,
 +                                  .user_data = newcommand,
 +                                });
        }
        else {
          newcommand = copy_subcommand (command,
 @@ -462,11 +464,11 @@ finished_read (void *vp, int *error)
      /* There may be an unaligned tail, so write that. */
      if (end - i > 0) {
        newcommand = copy_subcommand (command, i, end - i, false);
 -      dst.ops->asynch_write (&dst, newcommand,
 -                             (nbd_completion_callback) {
 -                               .callback = free_command,
 -                               .user_data = newcommand,
 -                             });
 +      dst->ops->asynch_write (dst, newcommand,
 +                              (nbd_completion_callback) {
 +                                .callback = free_command,
 +                                .user_data = newcommand,
 +                              });
      }
      /* Free the original command since it has been split into
 @@ -503,20 +505,20 @@ fill_dst_range_with_zeroes (struct command *command)
    if (!allocated) {
      /* Try trimming. */
 -    if (dst.ops->asynch_trim (&dst, command,
 -                              (nbd_completion_callback) {
 -                                .callback = free_command,
 -                                .user_data = command,
 -                              }))
 +    if (dst->ops->asynch_trim (dst, command,
 +                               (nbd_completion_callback) {
 +                                 .callback = free_command,
 +                                 .user_data = command,
 +                               }))
        return;
    }
    /* Try efficient zeroing. */
 -  if (dst.ops->asynch_zero (&dst, command,
 -                            (nbd_completion_callback) {
 -                              .callback = free_command,
 -                              .user_data = command,
 -                            }))
 +  if (dst->ops->asynch_zero (dst, command,
 +                             (nbd_completion_callback) {
 +                               .callback = free_command,
 +                               .user_data = command,
 +                             }))
      return;
    /* Fall back to loop writing zeroes.  This is going to be slow
 @@ -533,7 +535,7 @@ fill_dst_range_with_zeroes (struct command *command)
      if (len > MAX_REQUEST_SIZE)
        len = MAX_REQUEST_SIZE;
 -    dst.ops->synch_write (&dst, data, len, command->offset);
 +    dst->ops->synch_write (dst, data, len, command->offset);
      command->slice.len -= len;
      command->offset += len;
    }
 diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c
 index 7b48cbc..24970c2 100644
 --- a/copy/nbd-ops.c
 +++ b/copy/nbd-ops.c
 @@ -27,45 +27,221 @@
  #include "nbdcopy.h"
 +static struct rw_ops nbd_ops;
 +
 +DEFINE_VECTOR_TYPE (handles, struct nbd_handle *)
 +DEFINE_VECTOR_TYPE (const_string_vector, const char *);
 +
 +struct rw_nbd {
 +  struct rw rw;
 +
 +  /* Because of multi-conn we have to remember enough state in this
 +   * handle in order to be able to open another connection with the
 +   * same parameters after nbd_rw_create* has been called once.
 +   */
 +  enum { CREATE_URI, CREATE_SUBPROCESS } create_t;
 +  const char *uri;              /* For CREATE_URI */
 +  const_string_vector argv;     /* For CREATE_SUBPROCESS */
 +  bool writing;
 +
 +  handles handles;              /* One handle per connection. */
 +  bool can_trim, can_zero;      /* Cached nbd_can_trim, nbd_can_zero. */
 +};
 +
 +static void
 +open_one_nbd_handle (struct rw_nbd *rwn)
 +{
 +  struct nbd_handle *nbd;
 +
 +  nbd = nbd_create ();
 +  if (nbd == NULL) {
 +    fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ());
 +    exit (EXIT_FAILURE);
 +  }
 +
 +  nbd_set_debug (nbd, verbose);
 +
 +  if (extents && !rwn->writing &&
 +      nbd_add_meta_context (nbd, "base:allocation") == -1) {
 +    fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ());
 +    exit (EXIT_FAILURE);
 +  }
 +
 +  switch (rwn->create_t) {
 +  case CREATE_URI:
 +    nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */
 +
 +    if (nbd_connect_uri (nbd, rwn->uri) == -1) {
 +      fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->uri,
nbd_get_error ());
 +      exit (EXIT_FAILURE);
 +    }
 +    break;
 +
 +  case CREATE_SUBPROCESS:
 +    if (nbd_connect_systemd_socket_activation (nbd,
 +                                               (char **) rwn->argv.ptr)
 +        == -1) {
 +      fprintf (stderr, "%s: %s: %s\n", "nbdcopy",
rwn->argv.ptr[0],
 +               nbd_get_error ());
 +      exit (EXIT_FAILURE);
 +    }
 +  }
 +
 +  /* Cache these.  We assume with multi-conn that each handle will act
 +   * the same way.
 +   */
 +  if (rwn->handles.size == 0) {
 +    rwn->can_trim = nbd_can_trim (nbd) > 0;
 +    rwn->can_zero = nbd_can_zero (nbd) > 0;
 +    rwn->rw.size = nbd_get_size (nbd);
 +    if (rwn->rw.size == -1) {
 +      fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->rw.name,
 +               nbd_get_error ());
 +      exit (EXIT_FAILURE);
 +    }
 +  }
 +
 +  if (handles_append (&rwn->handles, nbd) == -1) {
 +    perror ("realloc");
 +    exit (EXIT_FAILURE);
 +  }
 +}
 +
 +struct rw *
 +nbd_rw_create_uri (const char *name, const char *uri, bool writing)
 +{
 +  struct rw_nbd *rwn = calloc (1, sizeof *rwn);
 +  if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
 +
 +  rwn->rw.ops = &nbd_ops;
 +  rwn->rw.name = name;
 +  rwn->create_t = CREATE_URI;
 +  rwn->uri = uri;
 +  rwn->writing = writing;
 +
 +  open_one_nbd_handle (rwn);
 +
 +  return (struct rw *) rwn;
 +}
 +
 +struct rw *
 +nbd_rw_create_subprocess (const char **argv, size_t argc, bool writing)
 +{
 +  size_t i;
 +  struct rw_nbd *rwn = calloc (1, sizeof *rwn);
 +  if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
 +
 +  rwn->rw.ops = &nbd_ops;
 +  rwn->rw.name = argv[0];
 +  rwn->create_t = CREATE_SUBPROCESS;
 +  rwn->writing = writing;
 +
 +  /* We have to copy the args so we can null-terminate them. */
 +  for (i = 0; i < argc; ++i) {
 +    if (const_string_vector_append (&rwn->argv, argv[i]) == -1) {
 +    memory_error:
 +      perror ("realloc");
 +      exit (EXIT_FAILURE);
 +    }
 +  }
 +  if (const_string_vector_append (&rwn->argv, NULL) == -1)
 +    goto memory_error;
 +
 +  open_one_nbd_handle (rwn);
 +
 +  return (struct rw *) rwn;
 +}
 +
  static void
  nbd_ops_close (struct rw *rw)
  {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
    size_t i;
 -  for (i = 0; i < rw->u.nbd.handles.size; ++i) {
 -    if (nbd_shutdown (rw->u.nbd.handles.ptr[i], 0) == -1) {
 +  for (i = 0; i < rwn->handles.size; ++i) {
 +    if (nbd_shutdown (rwn->handles.ptr[i], 0) == -1) {
        fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
        exit (EXIT_FAILURE);
      }
 -    nbd_close (rw->u.nbd.handles.ptr[i]);
 +    nbd_close (rwn->handles.ptr[i]);
    }
 -  handles_reset (&rw->u.nbd.handles);
 +  handles_reset (&rwn->handles);
 +  const_string_vector_reset (&rwn->argv);
 +  free (rw);
  }
  static void
  nbd_ops_flush (struct rw *rw)
  {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
    size_t i;
 -  for (i = 0; i < rw->u.nbd.handles.size; ++i) {
 -    if (nbd_flush (rw->u.nbd.handles.ptr[i], 0) == -1) {
 +  for (i = 0; i < rwn->handles.size; ++i) {
 +    if (nbd_flush (rwn->handles.ptr[i], 0) == -1) {
        fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
        exit (EXIT_FAILURE);
      }
    }
  }
 +static bool
 +nbd_ops_is_read_only (struct rw *rw)
 +{
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (rwn->handles.size > 0)
 +    return nbd_is_read_only (rwn->handles.ptr[0]);
 +  else
 +    return false;
 +}
 +
 +static bool
 +nbd_ops_can_extents (struct rw *rw)
 +{
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (rwn->handles.size > 0)
 +    return nbd_can_meta_context (rwn->handles.ptr[0], "base:allocation");
 +  else
 +    return false;
 +}
 +
 +static bool
 +nbd_ops_can_multi_conn (struct rw *rw)
 +{
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (rwn->handles.size > 0)
 +    return nbd_can_multi_conn (rwn->handles.ptr[0]);
 +  else
 +    return false;
 +}
 +
 +static void
 +nbd_ops_start_multi_conn (struct rw *rw)
 +{
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +  size_t i;
 +
 +  for (i = 1; i < connections; ++i)
 +    open_one_nbd_handle (rwn);
 +
 +  assert (rwn->handles.size == connections);
 +}
 +
  static size_t
  nbd_ops_synch_read (struct rw *rw,
                  void *data, size_t len, uint64_t offset)
  {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
    if (len > rw->size - offset)
      len = rw->size - offset;
    if (len == 0)
      return 0;
 -  if (nbd_pread (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) {
 +  if (nbd_pread (rwn->handles.ptr[0], data, len, offset, 0) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
      exit (EXIT_FAILURE);
    }
 @@ -77,7 +253,9 @@ static void
  nbd_ops_synch_write (struct rw *rw,
                   const void *data, size_t len, uint64_t offset)
  {
 -  if (nbd_pwrite (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (nbd_pwrite (rwn->handles.ptr[0], data, len, offset, 0) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
      exit (EXIT_FAILURE);
    }
 @@ -86,10 +264,12 @@ nbd_ops_synch_write (struct rw *rw,
  static bool
  nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
  {
 -  if (!rw->u.nbd.can_trim)
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (!rwn->can_trim)
      return false;
 -  if (nbd_trim (rw->u.nbd.handles.ptr[0], count, offset, 0) == -1) {
 +  if (nbd_trim (rwn->handles.ptr[0], count, offset, 0) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
      exit (EXIT_FAILURE);
    }
 @@ -99,10 +279,12 @@ nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
  static bool
  nbd_ops_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
  {
 -  if (!rw->u.nbd.can_zero)
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (!rwn->can_zero)
      return false;
 -  if (nbd_zero (rw->u.nbd.handles.ptr[0],
 +  if (nbd_zero (rwn->handles.ptr[0],
                  count, offset, LIBNBD_CMD_FLAG_NO_HOLE) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
      exit (EXIT_FAILURE);
 @@ -115,7 +297,9 @@ nbd_ops_asynch_read (struct rw *rw,
                       struct command *command,
                       nbd_completion_callback cb)
  {
 -  if (nbd_aio_pread (rw->u.nbd.handles.ptr[command->index],
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (nbd_aio_pread (rwn->handles.ptr[command->index],
                       slice_ptr (command->slice),
                       command->slice.len, command->offset,
                       cb, 0) == -1) {
 @@ -129,7 +313,9 @@ nbd_ops_asynch_write (struct rw *rw,
                        struct command *command,
                        nbd_completion_callback cb)
  {
 -  if (nbd_aio_pwrite (rw->u.nbd.handles.ptr[command->index],
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (nbd_aio_pwrite (rwn->handles.ptr[command->index],
                        slice_ptr (command->slice),
                        command->slice.len, command->offset,
                        cb, 0) == -1) {
 @@ -142,12 +328,14 @@ static bool
  nbd_ops_asynch_trim (struct rw *rw, struct command *command,
                       nbd_completion_callback cb)
  {
 -  if (!rw->u.nbd.can_trim)
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (!rwn->can_trim)
      return false;
    assert (command->slice.len <= UINT32_MAX);
 -  if (nbd_aio_trim (rw->u.nbd.handles.ptr[command->index],
 +  if (nbd_aio_trim (rwn->handles.ptr[command->index],
                      command->slice.len, command->offset,
                      cb, 0) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
 @@ -160,12 +348,14 @@ static bool
  nbd_ops_asynch_zero (struct rw *rw, struct command *command,
                       nbd_completion_callback cb)
  {
 -  if (!rw->u.nbd.can_zero)
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
 +  if (!rwn->can_zero)
      return false;
    assert (command->slice.len <= UINT32_MAX);
 -  if (nbd_aio_zero (rw->u.nbd.handles.ptr[command->index],
 +  if (nbd_aio_zero (rwn->handles.ptr[command->index],
                      command->slice.len, command->offset,
                      cb, LIBNBD_CMD_FLAG_NO_HOLE) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
 @@ -212,19 +402,22 @@ add_extent (void *vp, const char *metacontext,
  static unsigned
  nbd_ops_in_flight (struct rw *rw, uintptr_t index)
  {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +
    /* Since the commands are auto-retired in the callbacks we don't
     * need to count "done" commands.
     */
 -  return nbd_aio_in_flight (rw->u.nbd.handles.ptr[index]);
 +  return nbd_aio_in_flight (rwn->handles.ptr[index]);
  }
  static void
  nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
                          int *fd, int *direction)
  {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
    struct nbd_handle *nbd;
 -  nbd = rw->u.nbd.handles.ptr[index];
 +  nbd = rwn->handles.ptr[index];
    *fd = nbd_aio_get_fd (nbd);
    if (*fd == -1) {
 @@ -240,7 +433,8 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
  static void
  nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
  {
 -  if (nbd_aio_notify_read (rw->u.nbd.handles.ptr[index]) == -1) {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +  if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
      exit (EXIT_FAILURE);
    }
 @@ -249,7 +443,8 @@ nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
  static void
  nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index)
  {
 -  if (nbd_aio_notify_write (rw->u.nbd.handles.ptr[index]) == -1) {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
 +  if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) {
      fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
      exit (EXIT_FAILURE);
    }
 @@ -266,10 +461,11 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index,
                       uint64_t offset, uint64_t count,
                       extent_list *ret)
  {
 +  struct rw_nbd *rwn = (struct rw_nbd *) rw;
    extent_list exts = empty_vector;
    struct nbd_handle *nbd;
 -  nbd = rw->u.nbd.handles.ptr[index];
 +  nbd = rwn->handles.ptr[index];
    ret->size = 0;
 @@ -331,9 +527,13 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index,
    free (exts.ptr);
  }
 -struct rw_ops nbd_ops = {
 +static struct rw_ops nbd_ops = {
    .ops_name = "nbd_ops",
    .close = nbd_ops_close,
 +  .is_read_only = nbd_ops_is_read_only,
 +  .can_extents = nbd_ops_can_extents,
 +  .can_multi_conn = nbd_ops_can_multi_conn,
 +  .start_multi_conn = nbd_ops_start_multi_conn,
    .flush = nbd_ops_flush,
    .synch_read = nbd_ops_synch_read,
    .synch_write = nbd_ops_synch_write,
 diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
 index 69fac2a..94fbdeb 100644
 --- a/copy/nbdcopy.h
 +++ b/copy/nbdcopy.h
 @@ -36,8 +36,6 @@
   */
  #define THREAD_WORK_SIZE (128 * 1024 * 1024)
 -DEFINE_VECTOR_TYPE (handles, struct nbd_handle *)
 -
  /* Abstracts the input (src) and output (dst) parameters on the
   * command line.
   */
 @@ -45,21 +43,20 @@ struct rw {
    struct rw_ops *ops;           /* Operations. */
    const char *name;             /* Printable name, for error messages etc. */
    int64_t size;                 /* May be -1 for streams. */
 -  union {
 -    struct {                    /* For files and pipes. */
 -      int fd;
 -      struct stat stat;
 -      bool seek_hole_supported;
 -      int sector_size;
 -    } local;
 -    struct {
 -      handles handles;          /* For NBD, one handle per connection. */
 -      bool can_trim, can_zero;  /* Cached nbd_can_trim, nbd_can_zero. */
 -    } nbd;
 -  } u;
 +  /* Followed by private data for the particular subtype. */
  };
 -extern struct rw src, dst;
 +extern struct rw *src, *dst;
 +
 +/* Create subtypes. */
 +extern struct rw *file_create (const char *name,
 +                               const struct stat *stat, int fd);
 +extern struct rw *nbd_rw_create_uri (const char *name,
 +                                     const char *uri, bool writing);
 +extern struct rw *nbd_rw_create_subprocess (const char **argv, size_t argc,
 +                                            bool writing);
 +extern struct rw *null_create (const char *name);
 +extern struct rw *pipe_create (const char *name, int fd);
  /* Underlying data buffers. */
  struct buffer {
 @@ -117,6 +114,28 @@ struct rw_ops {
    /* Close the connection and free up associated resources. */
    void (*close) (struct rw *rw);
 +  /* Return true if this is a read-only connection. */
 +  bool (*is_read_only) (struct rw *rw);
 +
 +  /* For source only, does it support reading extents? */
 +  bool (*can_extents) (struct rw *rw);
 +
 +  /* Return true if the connection can do multi-conn.  This is true
 +   * for files, false for streams, and passed through for NBD.
 +   */
 +  bool (*can_multi_conn) (struct rw *rw);
 +
 +  /* For multi-conn capable backends, before copying we must call this
 +   * to begin multi-conn.  For NBD this means opening the additional
 +   * connections.
 +   */
 +  void (*start_multi_conn) (struct rw *rw);
 +
 +  /* Truncate, only called on output files.  This callback can be NULL
 +   * for types that don't support this.
 +   */
 +  void (*truncate) (struct rw *rw, int64_t size);
 +
    /* Flush pending writes to permanent storage. */
    void (*flush) (struct rw *rw);
 @@ -188,10 +207,6 @@ struct rw_ops {
                         uint64_t offset, uint64_t count,
                         extent_list *ret);
  };
 -extern struct rw_ops file_ops;
 -extern struct rw_ops nbd_ops;
 -extern struct rw_ops pipe_ops;
 -extern struct rw_ops null_ops;
  extern void default_get_extents (struct rw *rw, uintptr_t index,
                                   uint64_t offset, uint64_t count,
 diff --git a/copy/null-ops.c b/copy/null-ops.c
 index b2ca66f..3262fb5 100644
 --- a/copy/null-ops.c
 +++ b/copy/null-ops.c
 @@ -30,10 +30,28 @@
   * and fast zeroing.
   */
 +static struct rw_ops null_ops;
 +
 +struct rw_null {
 +  struct rw rw;
 +};
 +
 +struct rw *
 +null_create (const char *name)
 +{
 +  struct rw_null *rw = calloc (1, sizeof *rw);
 +  if (rw == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
 +
 +  rw->rw.ops = &null_ops;
 +  rw->rw.name = name;
 +  rw->rw.size = INT64_MAX;
 +  return (struct rw *) rw;
 +}
 +
  static void
  null_close (struct rw *rw)
  {
 -  /* nothing */
 +  free (rw);
  }
  static void
 @@ -42,6 +60,30 @@ null_flush (struct rw *rw)
    /* nothing */
  }
 +static bool
 +null_is_read_only (struct rw *rw)
 +{
 +  return false;
 +}
 +
 +static bool
 +null_can_extents (struct rw *rw)
 +{
 +  return false;
 +}
 +
 +static bool
 +null_can_multi_conn (struct rw *rw)
 +{
 +  return true;
 +}
 +
 +static void
 +null_start_multi_conn (struct rw *rw)
 +{
 +  /* nothing */
 +}
 +
  static size_t
  null_synch_read (struct rw *rw,
                   void *data, size_t len, uint64_t offset)
 @@ -126,9 +168,13 @@ null_get_extents (struct rw *rw, uintptr_t index,
    abort ();
  }
 -struct rw_ops null_ops = {
 +static struct rw_ops null_ops = {
    .ops_name = "null_ops",
    .close = null_close,
 +  .is_read_only = null_is_read_only,
 +  .can_extents = null_can_extents,
 +  .can_multi_conn = null_can_multi_conn,
 +  .start_multi_conn = null_start_multi_conn,
    .flush = null_flush,
    .synch_read = null_synch_read,
    .synch_write = null_synch_write,
 diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c
 index d127dad..286e6c0 100644
 --- a/copy/pipe-ops.c
 +++ b/copy/pipe-ops.c
 @@ -26,10 +26,33 @@
  #include "nbdcopy.h"
 +static struct rw_ops pipe_ops;
 +
 +struct rw_pipe {
 +  struct rw rw;
 +  int fd;
 +};
 +
 +struct rw *
 +pipe_create (const char *name, int fd)
 +{
 +  struct rw_pipe *rwp = calloc (1, sizeof *rwp);
 +  if (rwp == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
 +
 +  /* Set size == -1 which means don't know. */
 +  rwp->rw.ops = &pipe_ops;
 +  rwp->rw.name = name;
 +  rwp->rw.size = -1;
 +  rwp->fd = fd;
 +  return (struct rw *) rwp;
 +}
 +
  static void
  pipe_close (struct rw *rw)
  {
 -  if (close (rw->u.local.fd) == -1) {
 +  struct rw_pipe *rwp = (struct rw_pipe *) rw;
 +
 +  if (close (rwp->fd) == -1) {
      fprintf (stderr, "%s: close: %m\n", rw->name);
      exit (EXIT_FAILURE);
    }
 @@ -43,13 +66,39 @@ pipe_flush (struct rw *rw)
     */
  }
 +static bool
 +pipe_is_read_only (struct rw *rw)
 +{
 +  return false;
 +}
 +
 +static bool
 +pipe_can_extents (struct rw *rw)
 +{
 +  return false;
 +}
 +
 +static bool
 +pipe_can_multi_conn (struct rw *rw)
 +{
 +  return false;
 +}
 +
 +static void
 +pipe_start_multi_conn (struct rw *rw)
 +{
 +  /* Should never be called. */
 +  abort ();
 +}
 +
  static size_t
  pipe_synch_read (struct rw *rw,
                   void *data, size_t len, uint64_t offset)
  {
 +  struct rw_pipe *rwp = (struct rw_pipe *) rw;
    ssize_t r;
 -  r = read (rw->u.local.fd, data, len);
 +  r = read (rwp->fd, data, len);
    if (r == -1) {
      perror (rw->name);
      exit (EXIT_FAILURE);
 @@ -61,10 +110,11 @@ static void
  pipe_synch_write (struct rw *rw,
                    const void *data, size_t len, uint64_t offset)
  {
 +  struct rw_pipe *rwp = (struct rw_pipe *) rw;
    ssize_t r;
    while (len > 0) {
 -    r = write (rw->u.local.fd, data, len);
 +    r = write (rwp->fd, data, len);
      if (r == -1) {
        perror (rw->name);
        exit (EXIT_FAILURE);
 @@ -109,10 +159,16 @@ pipe_in_flight (struct rw *rw, uintptr_t index)
    return 0;
  }
 -struct rw_ops pipe_ops = {
 +static struct rw_ops pipe_ops = {
    .ops_name = "pipe_ops",
    .close = pipe_close,
 +
 +  .is_read_only = pipe_is_read_only,
 +  .can_extents = pipe_can_extents,
 +  .can_multi_conn = pipe_can_multi_conn,
 +  .start_multi_conn = pipe_start_multi_conn,
 +
    .flush = pipe_flush,
    .synch_read = pipe_synch_read,
 diff --git a/copy/synch-copying.c b/copy/synch-copying.c
 index 2712c10..985f005 100644
 --- a/copy/synch-copying.c
 +++ b/copy/synch-copying.c
 @@ -38,13 +38,13 @@ synch_copying (void)
    /* If the source size is unknown then we copy data and cannot use
     * extent information.
     */
 -  if (src.size == -1) {
 +  if (src->size == -1) {
      size_t r;
 -    while ((r = src.ops->synch_read (&src, buf, sizeof buf, offset)) > 0) {
 -      dst.ops->synch_write (&dst, buf, r, offset);
 +    while ((r = src->ops->synch_read (src, buf, sizeof buf, offset)) > 0) {
 +      dst->ops->synch_write (dst, buf, r, offset);
        offset += r;
 -      progress_bar (offset, src.size);
 +      progress_bar (offset, src->size);
      }
    }
 @@ -52,47 +52,47 @@ synch_copying (void)
     * blocks and use extent information to optimize the case.
     */
    else {
 -    while (offset < src.size) {
 +    while (offset < src->size) {
        extent_list exts = empty_vector;
 -      uint64_t count = src.size - offset;
 +      uint64_t count = src->size - offset;
        size_t i, r;
        if (count > sizeof buf)
          count = sizeof buf;
        if (extents)
 -        src.ops->get_extents (&src, 0, offset, count, &exts);
 +        src->ops->get_extents (src, 0, offset, count, &exts);
        else
 -        default_get_extents (&src, 0, offset, count, &exts);
 +        default_get_extents (src, 0, offset, count, &exts);
        for (i = 0; i < exts.size; ++i) {
          assert (exts.ptr[i].length <= count);
          if (exts.ptr[i].zero) {
 -          if (!dst.ops->synch_trim (&dst, offset, exts.ptr[i].length) &&
 -              !dst.ops->synch_zero (&dst, offset, exts.ptr[i].length)) {
 +          if (!dst->ops->synch_trim (dst, offset, exts.ptr[i].length) &&
 +              !dst->ops->synch_zero (dst, offset, exts.ptr[i].length)) {
              /* If neither trimming nor efficient zeroing are possible,
               * write zeroes the hard way.
               */
              memset (buf, 0, exts.ptr[i].length);
 -            dst.ops->synch_write (&dst, buf, exts.ptr[i].length, offset);
 +            dst->ops->synch_write (dst, buf, exts.ptr[i].length, offset);
            }
            offset += exts.ptr[i].length;
          }
          else /* data */ {
 -          r = src.ops->synch_read (&src, buf, exts.ptr[i].length, offset);
 +          r = src->ops->synch_read (src, buf, exts.ptr[i].length, offset);
            /* These cases should never happen unless the file is
             * truncated underneath us.
             */
            if (r == 0 || r < exts.ptr[i].length) {
 -            fprintf (stderr, "%s: unexpected end of file\n", src.name);
 +            fprintf (stderr, "%s: unexpected end of file\n", src->name);
              exit (EXIT_FAILURE);
            }
 -          dst.ops->synch_write (&dst, buf, r, offset);
 +          dst->ops->synch_write (dst, buf, r, offset);
            offset += r;
 -          progress_bar (offset, src.size);
 +          progress_bar (offset, src->size);
          }
        }
 --
 2.29.0.rc2
 
I looked at it briefly, this is a large change, but generally it
looks good.
Nir