Make this a (fairly) abstract structure. At least hide the subtype
fields from the main program. This change is pure refactoring and
doesn’t change the semantics.
---
copy/file-ops.c | 164 +++++++++++++++++--
copy/main.c | 315 ++++++++----------------------------
copy/multi-thread-copying.c | 104 ++++++------
copy/nbd-ops.c | 248 +++++++++++++++++++++++++---
copy/nbdcopy.h | 53 +++---
copy/null-ops.c | 50 +++++-
copy/pipe-ops.c | 64 +++++++-
copy/synch-copying.c | 30 ++--
8 files changed, 649 insertions(+), 379 deletions(-)
diff --git a/copy/file-ops.c b/copy/file-ops.c
index 2a239d0..81b08ce 100644
--- a/copy/file-ops.c
+++ b/copy/file-ops.c
@@ -36,35 +36,159 @@
#include "isaligned.h"
#include "nbdcopy.h"
+static struct rw_ops file_ops;
+
+struct rw_file {
+ struct rw rw;
+ int fd;
+ struct stat stat;
+ bool seek_hole_supported;
+ int sector_size;
+};
+
+static bool
+seek_hole_supported (int fd)
+{
+#ifndef SEEK_HOLE
+ return false;
+#else
+ off_t r = lseek (fd, 0, SEEK_HOLE);
+ return r >= 0;
+#endif
+}
+
+struct rw *
+file_create (const char *name, const struct stat *stat, int fd)
+{
+ struct rw_file *rwf = calloc (1, sizeof *rwf);
+ if (rwf == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+ rwf->rw.ops = &file_ops;
+ rwf->rw.name = name;
+ rwf->stat = *stat;
+ rwf->fd = fd;
+
+ if (S_ISBLK (stat->st_mode)) {
+ /* Block device. */
+ rwf->rw.size = lseek (fd, 0, SEEK_END);
+ if (rwf->rw.size == -1) {
+ perror ("lseek");
+ exit (EXIT_FAILURE);
+ }
+ if (lseek (fd, 0, SEEK_SET) == -1) {
+ perror ("lseek");
+ exit (EXIT_FAILURE);
+ }
+ rwf->seek_hole_supported = seek_hole_supported (fd);
+ rwf->sector_size = 4096;
+#ifdef BLKSSZGET
+ if (ioctl (fd, BLKSSZGET, &rwf->sector_size))
+ fprintf (stderr, "warning: cannot get sector size: %s: %m", name);
+#endif
+ }
+ else if (S_ISREG (stat->st_mode)) {
+ /* Regular file. */
+ rwf->rw.size = stat->st_size;
+ rwf->seek_hole_supported = seek_hole_supported (fd);
+ }
+ else
+ abort ();
+
+ return (struct rw *) rwf;
+}
+
static void
file_close (struct rw *rw)
{
- if (close (rw->u.local.fd) == -1) {
+ struct rw_file *rwf = (struct rw_file *)rw;
+
+ if (close (rwf->fd) == -1) {
fprintf (stderr, "%s: close: %m\n", rw->name);
exit (EXIT_FAILURE);
}
+ free (rw);
+}
+
+static void
+file_truncate (struct rw *rw, int64_t size)
+{
+ struct rw_file *rwf = (struct rw_file *) rw;
+
+ /* If the destination is an ordinary file then the original file
+ * size doesn't matter. Truncate it to the source size. But
+ * truncate it to zero first so the file is completely empty and
+ * sparse.
+ */
+ if (! S_ISREG (rwf->stat.st_mode))
+ return;
+
+ if (ftruncate (rwf->fd, 0) == -1 ||
+ ftruncate (rwf->fd, size) == -1) {
+ perror ("truncate");
+ exit (EXIT_FAILURE);
+ }
+ rwf->rw.size = size;
+
+ /* We can assume the destination is zero. */
+ destination_is_zero = true;
}
static void
file_flush (struct rw *rw)
{
- if ((S_ISREG (rw->u.local.stat.st_mode) ||
- S_ISBLK (rw->u.local.stat.st_mode)) &&
- fsync (rw->u.local.fd) == -1) {
+ struct rw_file *rwf = (struct rw_file *)rw;
+
+ if ((S_ISREG (rwf->stat.st_mode) ||
+ S_ISBLK (rwf->stat.st_mode)) &&
+ fsync (rwf->fd) == -1) {
perror (rw->name);
exit (EXIT_FAILURE);
}
}
+static bool
+file_is_read_only (struct rw *rw)
+{
+ /* Permissions are hard, and this is only used as an early check
+ * before the copy. Proceed with the copy and fail if it fails.
+ */
+ return false;
+}
+
+static bool
+file_can_extents (struct rw *rw)
+{
+#ifdef SEEK_HOLE
+ return true;
+#else
+ return false;
+#endif
+}
+
+static bool
+file_can_multi_conn (struct rw *rw)
+{
+ return true;
+}
+
+static void
+file_start_multi_conn (struct rw *rw)
+{
+ /* Don't need to do anything for files since we can read/write on a
+ * single file descriptor.
+ */
+}
+
static size_t
file_synch_read (struct rw *rw,
void *data, size_t len, uint64_t offset)
{
+ struct rw_file *rwf = (struct rw_file *)rw;
size_t n = 0;
ssize_t r;
while (len > 0) {
- r = pread (rw->u.local.fd, data, len, offset);
+ r = pread (rwf->fd, data, len, offset);
if (r == -1) {
perror (rw->name);
exit (EXIT_FAILURE);
@@ -85,10 +209,11 @@ static void
file_synch_write (struct rw *rw,
const void *data, size_t len, uint64_t offset)
{
+ struct rw_file *rwf = (struct rw_file *)rw;
ssize_t r;
while (len > 0) {
- r = pwrite (rw->u.local.fd, data, len, offset);
+ r = pwrite (rwf->fd, data, len, offset);
if (r == -1) {
perror (rw->name);
exit (EXIT_FAILURE);
@@ -103,7 +228,8 @@ static bool
file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
{
#ifdef FALLOC_FL_PUNCH_HOLE
- int fd = rw->u.local.fd;
+ struct rw_file *rwf = (struct rw_file *)rw;
+ int fd = rwf->fd;
int r;
r = fallocate (fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
@@ -121,9 +247,11 @@ file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
static bool
file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
{
- if (S_ISREG (rw->u.local.stat.st_mode)) {
+ struct rw_file *rwf = (struct rw_file *)rw;
+
+ if (S_ISREG (rwf->stat.st_mode)) {
#ifdef FALLOC_FL_ZERO_RANGE
- int fd = rw->u.local.fd;
+ int fd = rwf->fd;
int r;
r = fallocate (fd, FALLOC_FL_ZERO_RANGE, offset, count);
@@ -134,10 +262,10 @@ file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
return true;
#endif
}
- else if (S_ISBLK (rw->u.local.stat.st_mode) &&
- IS_ALIGNED (offset | count, rw->u.local.sector_size)) {
+ else if (S_ISBLK (rwf->stat.st_mode) &&
+ IS_ALIGNED (offset | count, rwf->sector_size)) {
#ifdef BLKZEROOUT
- int fd = rw->u.local.fd;
+ int fd = rwf->fd;
int r;
uint64_t range[2] = {offset, count};
@@ -223,11 +351,12 @@ file_get_extents (struct rw *rw, uintptr_t index,
ret->size = 0;
#ifdef SEEK_HOLE
+ struct rw_file *rwf = (struct rw_file *)rw;
static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER;
- if (rw->u.local.seek_hole_supported) {
+ if (rwf->seek_hole_supported) {
uint64_t end = offset + count;
- int fd = rw->u.local.fd;
+ int fd = rwf->fd;
off_t pos;
struct extent e;
size_t last;
@@ -302,9 +431,14 @@ file_get_extents (struct rw *rw, uintptr_t index,
default_get_extents (rw, index, offset, count, ret);
}
-struct rw_ops file_ops = {
+static struct rw_ops file_ops = {
.ops_name = "file_ops",
.close = file_close,
+ .is_read_only = file_is_read_only,
+ .can_extents = file_can_extents,
+ .can_multi_conn = file_can_multi_conn,
+ .start_multi_conn = file_start_multi_conn,
+ .truncate = file_truncate,
.flush = file_flush,
.synch_read = file_synch_read,
.synch_write = file_synch_write,
diff --git a/copy/main.c b/copy/main.c
index 68a6030..6fdc6fd 100644
--- a/copy/main.c
+++ b/copy/main.c
@@ -53,19 +53,11 @@ int progress_fd = -1; /* --progress=FD */
unsigned sparse_size = 4096; /* --sparse */
bool synchronous; /* --synchronous flag */
unsigned threads; /* --threads */
-struct rw src, dst; /* The source and destination. */
+struct rw *src, *dst; /* The source and destination. */
bool verbose; /* --verbose flag */
static bool is_nbd_uri (const char *s);
-static bool seek_hole_supported (int fd);
-static void open_null (struct rw *rw);
-static int open_local (const char *prog,
- const char *filename, bool writing, struct rw *rw);
-static void open_nbd_uri (const char *prog,
- const char *uri, bool writing, struct rw *rw);
-static void open_nbd_subprocess (const char *prog,
- const char **argv, size_t argc,
- bool writing, struct rw *rw);
+static struct rw *open_local (const char *filename, bool writing);
static void print_rw (struct rw *rw, const char *prefix, FILE *fp);
static void __attribute__((noreturn))
@@ -242,18 +234,18 @@ main (int argc, char *argv[])
found1:
connections = 1; /* multi-conn not supported */
- src.name = argv[optind+1];
- open_nbd_subprocess (argv[0],
- (const char **) &argv[optind+1], i-optind-1,
- false, &src);
+ src =
+ nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1,
+ false);
optind = i+1;
}
else { /* Source is not [...]. */
- src.name = argv[optind++];
- if (! is_nbd_uri (src.name))
- src.u.local.fd = open_local (argv[0], src.name, false, &src);
+ const char *src_name = argv[optind++];
+
+ if (! is_nbd_uri (src_name))
+ src = open_local (src_name, false);
else
- open_nbd_uri (argv[0], src.name, false, &src);
+ src = nbd_rw_create_uri (src_name, src_name, false);
}
if (optind >= argc)
@@ -267,48 +259,46 @@ main (int argc, char *argv[])
found2:
connections = 1; /* multi-conn not supported */
- dst.name = argv[optind+1];
- open_nbd_subprocess (argv[0],
- (const char **) &argv[optind+1], i-optind-1,
- true, &dst);
+ dst =
+ nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1,
+ true);
optind = i+1;
}
else { /* Destination is not [...] */
- dst.name = argv[optind++];
- if (strcmp (dst.name, "null:") == 0)
- open_null (&dst);
- else if (! is_nbd_uri (dst.name))
- dst.u.local.fd = open_local (argv[0], dst.name, true /* writing */, &dst);
- else {
- open_nbd_uri (argv[0], dst.name, true, &dst);
+ const char *dst_name = argv[optind++];
- /* Obviously this is not going to work if the server is
- * advertising read-only, so fail early with a nice error message.
- */
- if (nbd_is_read_only (dst.u.nbd.handles.ptr[0])) {
- fprintf (stderr, "%s: %s: "
- "this NBD server is read-only, cannot write to it\n",
- argv[0], dst.name);
- exit (EXIT_FAILURE);
- }
- }
+ if (strcmp (dst_name, "null:") == 0)
+ dst = null_create (dst_name);
+ else if (! is_nbd_uri (dst_name))
+ dst = open_local (dst_name, true /* writing */);
+ else
+ dst = nbd_rw_create_uri (dst_name, dst_name, true);
}
/* There must be no extra parameters. */
if (optind != argc)
usage (stderr, EXIT_FAILURE);
- /* Check we've set the fields of src and dst. */
- assert (src.ops);
- assert (src.name);
- assert (dst.ops);
- assert (dst.name);
+ /* Check we've created src and dst and set the expected fields. */
+ assert (src != NULL);
+ assert (dst != NULL);
+ assert (src->ops != NULL);
+ assert (src->name != NULL);
+ assert (dst->ops != NULL);
+ assert (dst->name != NULL);
+
+ /* Obviously this is not going to work if the destination is
+ * read-only, so fail early with a nice error message.
+ */
+ if (dst->ops->is_read_only (dst)) {
+ fprintf (stderr, "%s: %s: "
+ "the destination is read-only, cannot write to it\n",
+ argv[0], dst->name);
+ exit (EXIT_FAILURE);
+ }
/* If multi-conn is not supported, force connections to 1. */
- if ((src.ops == &nbd_ops &&
- ! nbd_can_multi_conn (src.u.nbd.handles.ptr[0])) ||
- (dst.ops == &nbd_ops &&
- ! nbd_can_multi_conn (dst.u.nbd.handles.ptr[0])))
+ if (! src->ops->can_multi_conn (src) || ! dst->ops->can_multi_conn (dst))
connections = 1;
/* Calculate the number of threads from the number of connections. */
@@ -335,44 +325,17 @@ main (int argc, char *argv[])
if (threads < connections)
connections = threads;
- /* Calculate the source and destination sizes. We set these to -1
- * if the size is not known (because it's a stream). Note that for
- * local types, open_local set something in *.size already.
+ /* Truncate the destination to the same size as the source. Only
+ * has an effect on regular files.
*/
- if (src.ops == &nbd_ops) {
- src.size = nbd_get_size (src.u.nbd.handles.ptr[0]);
- if (src.size == -1) {
- fprintf (stderr, "%s: %s: %s\n", argv[0], src.name, nbd_get_error ());
- exit (EXIT_FAILURE);
- }
- }
- if (dst.ops != &nbd_ops && S_ISREG (dst.u.local.stat.st_mode)) {
- /* If the destination is an ordinary file then the original file
- * size doesn't matter. Truncate it to the source size. But
- * truncate it to zero first so the file is completely empty and
- * sparse.
- */
- dst.size = src.size;
- if (ftruncate (dst.u.local.fd, 0) == -1 ||
- ftruncate (dst.u.local.fd, dst.size) == -1) {
- perror ("truncate");
- exit (EXIT_FAILURE);
- }
- destination_is_zero = true;
- }
- else if (dst.ops == &nbd_ops) {
- dst.size = nbd_get_size (dst.u.nbd.handles.ptr[0]);
- if (dst.size == -1) {
- fprintf (stderr, "%s: %s: %s\n", argv[0], dst.name, nbd_get_error ());
- exit (EXIT_FAILURE);
- }
- }
+ if (dst->ops->truncate)
+ dst->ops->truncate (dst, src->size);
/* Check if the source is bigger than the destination, since that
* would truncate (ie. lose) data. Copying from smaller to larger
* is OK.
*/
- if (src.size >= 0 && dst.size >= 0 && src.size > dst.size) {
+ if (src->size >= 0 && dst->size >= 0 && src->size >
dst->size) {
fprintf (stderr,
"nbdcopy: error: destination size is smaller than source
size\n");
exit (EXIT_FAILURE);
@@ -383,37 +346,29 @@ main (int argc, char *argv[])
* settings.
*/
if (verbose) {
- print_rw (&src, "nbdcopy: src", stderr);
- print_rw (&dst, "nbdcopy: dst", stderr);
+ print_rw (src, "nbdcopy: src", stderr);
+ print_rw (dst, "nbdcopy: dst", stderr);
fprintf (stderr, "nbdcopy: connections=%u requests=%u threads=%u "
"synchronous=%s\n",
connections, max_requests, threads,
synchronous ? "true" : "false");
}
- /* If #connections > 1 then multi-conn is enabled at both ends and
- * we need to open further connections.
+ /* If multi-conn is enabled on either side, then at this point we
+ * need to ask the backend to open the extra connections.
*/
if (connections > 1) {
assert (threads == connections);
-
- if (src.ops == &nbd_ops) {
- for (i = 1; i < connections; ++i)
- open_nbd_uri (argv[0], src.name, false, &src);
- assert (src.u.nbd.handles.size == connections);
- }
- if (dst.ops == &nbd_ops) {
- for (i = 1; i < connections; ++i)
- open_nbd_uri (argv[0], dst.name, true, &dst);
- assert (dst.u.nbd.handles.size == connections);
- }
+ if (src->ops->can_multi_conn (src))
+ src->ops->start_multi_conn (src);
+ if (dst->ops->can_multi_conn (dst))
+ dst->ops->start_multi_conn (dst);
}
/* If the source is NBD and we couldn't negotiate meta
* base:allocation then turn off extents.
*/
- if (src.ops == &nbd_ops &&
- !nbd_can_meta_context (src.u.nbd.handles.ptr[0], "base:allocation"))
+ if (! src->ops->can_extents (src))
extents = false;
/* Always set the progress bar to 0% at the start of the copy. */
@@ -429,12 +384,12 @@ main (int argc, char *argv[])
progress_bar (1, 1);
/* Shut down the source side. */
- src.ops->close (&src);
+ src->ops->close (src);
/* Shut down the destination side. */
if (flush)
- dst.ops->flush (&dst);
- dst.ops->close (&dst);
+ dst->ops->flush (dst);
+ dst->ops->close (dst);
exit (EXIT_SUCCESS);
}
@@ -452,33 +407,25 @@ is_nbd_uri (const char *s)
strncmp (s, "nbds+vsock:", 11) == 0;
}
-/* Open null: (destination only). */
-static void
-open_null (struct rw *rw)
-{
- rw->ops = &null_ops;
- rw->size = INT64_MAX;
-}
-
/* Open a local (non-NBD) file, ie. a file, device, or "-" for stdio.
- * Returns the open file descriptor which the caller must close.
+ * Returns the struct rw * which the caller must close.
*
* “writing” is true if this is the destination parameter.
* “rw->u.local.stat” and “rw->size” return the file stat and size,
* but size can be returned as -1 if we don't know the size (if it's a
* pipe or stdio).
*/
-static int
-open_local (const char *prog,
- const char *filename, bool writing, struct rw *rw)
+static struct rw *
+open_local (const char *filename, bool writing)
{
int flags, fd;
+ struct stat stat;
if (strcmp (filename, "-") == 0) {
synchronous = true;
fd = writing ? STDOUT_FILENO : STDIN_FILENO;
if (writing && isatty (fd)) {
- fprintf (stderr, "%s: refusing to write to tty\n", prog);
+ fprintf (stderr, "%s: refusing to write to tty\n", "nbdcopy");
exit (EXIT_FAILURE);
}
}
@@ -502,146 +449,17 @@ open_local (const char *prog,
}
}
- if (fstat (fd, &rw->u.local.stat) == -1) {
+ if (fstat (fd, &stat) == -1) {
perror (filename);
exit (EXIT_FAILURE);
}
- if (S_ISBLK (rw->u.local.stat.st_mode)) {
- /* Block device. */
- rw->ops = &file_ops;
- rw->size = lseek (fd, 0, SEEK_END);
- if (rw->size == -1) {
- perror ("lseek");
- exit (EXIT_FAILURE);
- }
- if (lseek (fd, 0, SEEK_SET) == -1) {
- perror ("lseek");
- exit (EXIT_FAILURE);
- }
- rw->u.local.seek_hole_supported = seek_hole_supported (fd);
- rw->u.local.sector_size = 4096;
-#ifdef BLKSSZGET
- if (ioctl (fd, BLKSSZGET, &rw->u.local.sector_size))
- fprintf (stderr, "warning: cannot get sector size: %s: %m",
rw->name);
-#endif
- }
- else if (S_ISREG (rw->u.local.stat.st_mode)) {
- /* Regular file. */
- rw->ops = &file_ops;
- rw->size = rw->u.local.stat.st_size;
- rw->u.local.seek_hole_supported = seek_hole_supported (fd);
- }
+ if (S_ISBLK (stat.st_mode) || S_ISREG (stat.st_mode))
+ return file_create (filename, &stat, fd);
else {
- /* Probably stdin/stdout, a pipe or a socket. Set size == -1
- * which means don't know, and force synchronous mode.
- */
- synchronous = true;
- rw->ops = &pipe_ops;
- rw->size = -1;
- rw->u.local.seek_hole_supported = false;
+ /* Probably stdin/stdout, a pipe or a socket. */
+ synchronous = true; /* Force synchronous mode for pipes. */
+ return pipe_create (filename, fd);
}
-
- return fd;
-}
-
-static bool
-seek_hole_supported (int fd)
-{
-#ifndef SEEK_HOLE
- return false;
-#else
- off_t r = lseek (fd, 0, SEEK_HOLE);
- return r >= 0;
-#endif
-}
-
-static void
-open_nbd_uri (const char *prog,
- const char *uri, bool writing, struct rw *rw)
-{
- struct nbd_handle *nbd;
-
- rw->ops = &nbd_ops;
- nbd = nbd_create ();
- if (nbd == NULL) {
- fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
- exit (EXIT_FAILURE);
- }
- nbd_set_debug (nbd, verbose);
- nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */
- if (extents && !writing &&
- nbd_add_meta_context (nbd, "base:allocation") == -1) {
- fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
- exit (EXIT_FAILURE);
- }
-
- if (handles_append (&rw->u.nbd.handles, nbd) == -1) {
- perror ("realloc");
- exit (EXIT_FAILURE);
- }
-
- if (nbd_connect_uri (nbd, uri) == -1) {
- fprintf (stderr, "%s: %s: %s\n", prog, uri, nbd_get_error ());
- exit (EXIT_FAILURE);
- }
-
- /* Cache these. We assume with multi-conn that each handle will act
- * the same way.
- */
- rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0;
- rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0;
-}
-
-DEFINE_VECTOR_TYPE (const_string_vector, const char *);
-
-static void
-open_nbd_subprocess (const char *prog,
- const char **argv, size_t argc,
- bool writing, struct rw *rw)
-{
- struct nbd_handle *nbd;
- const_string_vector copy = empty_vector;
- size_t i;
-
- rw->ops = &nbd_ops;
- nbd = nbd_create ();
- if (nbd == NULL) {
- fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
- exit (EXIT_FAILURE);
- }
- nbd_set_debug (nbd, verbose);
- if (extents && !writing &&
- nbd_add_meta_context (nbd, "base:allocation") == -1) {
- fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
- exit (EXIT_FAILURE);
- }
-
- if (handles_append (&rw->u.nbd.handles, nbd) == -1) {
- memory_error:
- perror ("realloc");
- exit (EXIT_FAILURE);
- }
-
- /* We have to copy the args so we can null-terminate them. */
- for (i = 0; i < argc; ++i) {
- if (const_string_vector_append (©, argv[i]) == -1)
- goto memory_error;
- }
- if (const_string_vector_append (©, NULL) == -1)
- goto memory_error;
-
- if (nbd_connect_systemd_socket_activation (nbd, (char **) copy.ptr) == -1) {
- fprintf (stderr, "%s: %s: %s\n", prog, argv[0], nbd_get_error ());
- exit (EXIT_FAILURE);
- }
-
- /* Cache these. We assume with multi-conn that each handle will act
- * the same way.
- */
- rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0;
- rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0;
-
- free (copy.ptr);
}
/* Print an rw struct, used in --verbose mode. */
@@ -650,7 +468,6 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp)
{
fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name,
rw->name);
fprintf (fp, "%s: size=%" PRIi64 "\n", prefix, rw->size);
- /* Could print other stuff here, but that's enough for debugging. */
}
/* Default implementation of rw->ops->get_extents for backends which
diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
index 98b4056..4f57054 100644
--- a/copy/multi-thread-copying.c
+++ b/copy/multi-thread-copying.c
@@ -50,13 +50,13 @@ get_next_offset (uint64_t *offset, uint64_t *count)
bool r = false; /* returning false means no more work */
pthread_mutex_lock (&lock);
- if (next_offset < src.size) {
+ if (next_offset < src->size) {
*offset = next_offset;
/* Work out how large this range is. The last range may be
* smaller than THREAD_WORK_SIZE.
*/
- *count = src.size - *offset;
+ *count = src->size - *offset;
if (*count > THREAD_WORK_SIZE)
*count = THREAD_WORK_SIZE;
@@ -69,7 +69,7 @@ get_next_offset (uint64_t *offset, uint64_t *count)
* are called from threads and not necessarily in monotonic order
* so the progress bar would move erratically.
*/
- progress_bar (*offset, dst.size);
+ progress_bar (*offset, dst->size);
}
pthread_mutex_unlock (&lock);
return r;
@@ -89,11 +89,13 @@ multi_thread_copying (void)
*/
assert (threads > 0);
assert (threads == connections);
+/*
if (src.ops == &nbd_ops)
assert (src.u.nbd.handles.size == connections);
if (dst.ops == &nbd_ops)
assert (dst.u.nbd.handles.size == connections);
- assert (src.size != -1);
+*/
+ assert (src->size != -1);
workers = malloc (sizeof (pthread_t) * threads);
if (workers == NULL) {
@@ -147,9 +149,9 @@ worker_thread (void *indexp)
assert (0 < count && count <= THREAD_WORK_SIZE);
if (extents)
- src.ops->get_extents (&src, index, offset, count, &exts);
+ src->ops->get_extents (src, index, offset, count, &exts);
else
- default_get_extents (&src, index, offset, count, &exts);
+ default_get_extents (src, index, offset, count, &exts);
for (i = 0; i < exts.size; ++i) {
struct command *command;
@@ -208,11 +210,11 @@ worker_thread (void *indexp)
wait_for_request_slots (index);
/* Begin the asynch read operation. */
- src.ops->asynch_read (&src, command,
- (nbd_completion_callback) {
- .callback = finished_read,
- .user_data = command,
- });
+ src->ops->asynch_read (src, command,
+ (nbd_completion_callback) {
+ .callback = finished_read,
+ .user_data = command,
+ });
exts.ptr[i].offset += len;
exts.ptr[i].length -= len;
@@ -254,7 +256,7 @@ wait_for_request_slots (uintptr_t index)
static unsigned
in_flight (uintptr_t index)
{
- return src.ops->in_flight (&src, index) + dst.ops->in_flight (&dst,
index);
+ return src->ops->in_flight (src, index) + dst->ops->in_flight (dst,
index);
}
/* Poll (optional) NBD src and NBD dst, moving the state machine(s)
@@ -271,7 +273,7 @@ poll_both_ends (uintptr_t index)
/* Note: if polling is not supported, this function will
* set fd == -1 which poll ignores.
*/
- src.ops->get_polling_fd (&src, index, &fds[0].fd, &direction);
+ src->ops->get_polling_fd (src, index, &fds[0].fd, &direction);
if (fds[0].fd >= 0) {
switch (direction) {
case LIBNBD_AIO_DIRECTION_READ:
@@ -286,7 +288,7 @@ poll_both_ends (uintptr_t index)
}
}
- dst.ops->get_polling_fd (&dst, index, &fds[1].fd, &direction);
+ dst->ops->get_polling_fd (dst, index, &fds[1].fd, &direction);
if (fds[1].fd >= 0) {
switch (direction) {
case LIBNBD_AIO_DIRECTION_READ:
@@ -311,24 +313,24 @@ poll_both_ends (uintptr_t index)
if (fds[0].fd >= 0) {
if ((fds[0].revents & (POLLIN | POLLHUP)) != 0)
- src.ops->asynch_notify_read (&src, index);
+ src->ops->asynch_notify_read (src, index);
else if ((fds[0].revents & POLLOUT) != 0)
- src.ops->asynch_notify_write (&src, index);
+ src->ops->asynch_notify_write (src, index);
else if ((fds[0].revents & (POLLERR | POLLNVAL)) != 0) {
errno = ENOTCONN;
- perror (src.name);
+ perror (src->name);
exit (EXIT_FAILURE);
}
}
if (fds[1].fd >= 0) {
if ((fds[1].revents & (POLLIN | POLLHUP)) != 0)
- dst.ops->asynch_notify_read (&dst, index);
+ dst->ops->asynch_notify_read (dst, index);
else if ((fds[1].revents & POLLOUT) != 0)
- dst.ops->asynch_notify_write (&dst, index);
+ dst->ops->asynch_notify_write (dst, index);
else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) {
errno = ENOTCONN;
- perror (dst.name);
+ perror (dst->name);
exit (EXIT_FAILURE);
}
}
@@ -377,11 +379,11 @@ finished_read (void *vp, int *error)
/* If sparseness detection (see below) is turned off then we write
* the whole command.
*/
- dst.ops->asynch_write (&dst, command,
- (nbd_completion_callback) {
- .callback = free_command,
- .user_data = command,
- });
+ dst->ops->asynch_write (dst, command,
+ (nbd_completion_callback) {
+ .callback = free_command,
+ .user_data = command,
+ });
}
else { /* Sparseness detection. */
const uint64_t start = command->offset;
@@ -408,11 +410,11 @@ finished_read (void *vp, int *error)
newcommand = copy_subcommand (command,
last_offset, i - last_offset,
false);
- dst.ops->asynch_write (&dst, newcommand,
- (nbd_completion_callback) {
- .callback = free_command,
- .user_data = newcommand,
- });
+ dst->ops->asynch_write (dst, newcommand,
+ (nbd_completion_callback) {
+ .callback = free_command,
+ .user_data = newcommand,
+ });
}
/* Start the new hole. */
last_offset = i;
@@ -445,11 +447,11 @@ finished_read (void *vp, int *error)
newcommand = copy_subcommand (command,
last_offset, i - last_offset,
false);
- dst.ops->asynch_write (&dst, newcommand,
- (nbd_completion_callback) {
- .callback = free_command,
- .user_data = newcommand,
- });
+ dst->ops->asynch_write (dst, newcommand,
+ (nbd_completion_callback) {
+ .callback = free_command,
+ .user_data = newcommand,
+ });
}
else {
newcommand = copy_subcommand (command,
@@ -462,11 +464,11 @@ finished_read (void *vp, int *error)
/* There may be an unaligned tail, so write that. */
if (end - i > 0) {
newcommand = copy_subcommand (command, i, end - i, false);
- dst.ops->asynch_write (&dst, newcommand,
- (nbd_completion_callback) {
- .callback = free_command,
- .user_data = newcommand,
- });
+ dst->ops->asynch_write (dst, newcommand,
+ (nbd_completion_callback) {
+ .callback = free_command,
+ .user_data = newcommand,
+ });
}
/* Free the original command since it has been split into
@@ -503,20 +505,20 @@ fill_dst_range_with_zeroes (struct command *command)
if (!allocated) {
/* Try trimming. */
- if (dst.ops->asynch_trim (&dst, command,
- (nbd_completion_callback) {
- .callback = free_command,
- .user_data = command,
- }))
+ if (dst->ops->asynch_trim (dst, command,
+ (nbd_completion_callback) {
+ .callback = free_command,
+ .user_data = command,
+ }))
return;
}
/* Try efficient zeroing. */
- if (dst.ops->asynch_zero (&dst, command,
- (nbd_completion_callback) {
- .callback = free_command,
- .user_data = command,
- }))
+ if (dst->ops->asynch_zero (dst, command,
+ (nbd_completion_callback) {
+ .callback = free_command,
+ .user_data = command,
+ }))
return;
/* Fall back to loop writing zeroes. This is going to be slow
@@ -533,7 +535,7 @@ fill_dst_range_with_zeroes (struct command *command)
if (len > MAX_REQUEST_SIZE)
len = MAX_REQUEST_SIZE;
- dst.ops->synch_write (&dst, data, len, command->offset);
+ dst->ops->synch_write (dst, data, len, command->offset);
command->slice.len -= len;
command->offset += len;
}
diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c
index 7b48cbc..24970c2 100644
--- a/copy/nbd-ops.c
+++ b/copy/nbd-ops.c
@@ -27,45 +27,221 @@
#include "nbdcopy.h"
+static struct rw_ops nbd_ops;
+
+DEFINE_VECTOR_TYPE (handles, struct nbd_handle *)
+DEFINE_VECTOR_TYPE (const_string_vector, const char *);
+
+struct rw_nbd {
+ struct rw rw;
+
+ /* Because of multi-conn we have to remember enough state in this
+ * handle in order to be able to open another connection with the
+ * same parameters after nbd_rw_create* has been called once.
+ */
+ enum { CREATE_URI, CREATE_SUBPROCESS } create_t;
+ const char *uri; /* For CREATE_URI */
+ const_string_vector argv; /* For CREATE_SUBPROCESS */
+ bool writing;
+
+ handles handles; /* One handle per connection. */
+ bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */
+};
+
+static void
+open_one_nbd_handle (struct rw_nbd *rwn)
+{
+ struct nbd_handle *nbd;
+
+ nbd = nbd_create ();
+ if (nbd == NULL) {
+ fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ());
+ exit (EXIT_FAILURE);
+ }
+
+ nbd_set_debug (nbd, verbose);
+
+ if (extents && !rwn->writing &&
+ nbd_add_meta_context (nbd, "base:allocation") == -1) {
+ fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ());
+ exit (EXIT_FAILURE);
+ }
+
+ switch (rwn->create_t) {
+ case CREATE_URI:
+ nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */
+
+ if (nbd_connect_uri (nbd, rwn->uri) == -1) {
+ fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->uri,
nbd_get_error ());
+ exit (EXIT_FAILURE);
+ }
+ break;
+
+ case CREATE_SUBPROCESS:
+ if (nbd_connect_systemd_socket_activation (nbd,
+ (char **) rwn->argv.ptr)
+ == -1) {
+ fprintf (stderr, "%s: %s: %s\n", "nbdcopy",
rwn->argv.ptr[0],
+ nbd_get_error ());
+ exit (EXIT_FAILURE);
+ }
+ }
+
+ /* Cache these. We assume with multi-conn that each handle will act
+ * the same way.
+ */
+ if (rwn->handles.size == 0) {
+ rwn->can_trim = nbd_can_trim (nbd) > 0;
+ rwn->can_zero = nbd_can_zero (nbd) > 0;
+ rwn->rw.size = nbd_get_size (nbd);
+ if (rwn->rw.size == -1) {
+ fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->rw.name,
+ nbd_get_error ());
+ exit (EXIT_FAILURE);
+ }
+ }
+
+ if (handles_append (&rwn->handles, nbd) == -1) {
+ perror ("realloc");
+ exit (EXIT_FAILURE);
+ }
+}
+
+struct rw *
+nbd_rw_create_uri (const char *name, const char *uri, bool writing)
+{
+ struct rw_nbd *rwn = calloc (1, sizeof *rwn);
+ if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+ rwn->rw.ops = &nbd_ops;
+ rwn->rw.name = name;
+ rwn->create_t = CREATE_URI;
+ rwn->uri = uri;
+ rwn->writing = writing;
+
+ open_one_nbd_handle (rwn);
+
+ return (struct rw *) rwn;
+}
+
+struct rw *
+nbd_rw_create_subprocess (const char **argv, size_t argc, bool writing)
+{
+ size_t i;
+ struct rw_nbd *rwn = calloc (1, sizeof *rwn);
+ if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+ rwn->rw.ops = &nbd_ops;
+ rwn->rw.name = argv[0];
+ rwn->create_t = CREATE_SUBPROCESS;
+ rwn->writing = writing;
+
+ /* We have to copy the args so we can null-terminate them. */
+ for (i = 0; i < argc; ++i) {
+ if (const_string_vector_append (&rwn->argv, argv[i]) == -1) {
+ memory_error:
+ perror ("realloc");
+ exit (EXIT_FAILURE);
+ }
+ }
+ if (const_string_vector_append (&rwn->argv, NULL) == -1)
+ goto memory_error;
+
+ open_one_nbd_handle (rwn);
+
+ return (struct rw *) rwn;
+}
+
static void
nbd_ops_close (struct rw *rw)
{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
size_t i;
- for (i = 0; i < rw->u.nbd.handles.size; ++i) {
- if (nbd_shutdown (rw->u.nbd.handles.ptr[i], 0) == -1) {
+ for (i = 0; i < rwn->handles.size; ++i) {
+ if (nbd_shutdown (rwn->handles.ptr[i], 0) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
- nbd_close (rw->u.nbd.handles.ptr[i]);
+ nbd_close (rwn->handles.ptr[i]);
}
- handles_reset (&rw->u.nbd.handles);
+ handles_reset (&rwn->handles);
+ const_string_vector_reset (&rwn->argv);
+ free (rw);
}
static void
nbd_ops_flush (struct rw *rw)
{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
size_t i;
- for (i = 0; i < rw->u.nbd.handles.size; ++i) {
- if (nbd_flush (rw->u.nbd.handles.ptr[i], 0) == -1) {
+ for (i = 0; i < rwn->handles.size; ++i) {
+ if (nbd_flush (rwn->handles.ptr[i], 0) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
}
}
+static bool
+nbd_ops_is_read_only (struct rw *rw)
+{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (rwn->handles.size > 0)
+ return nbd_is_read_only (rwn->handles.ptr[0]);
+ else
+ return false;
+}
+
+static bool
+nbd_ops_can_extents (struct rw *rw)
+{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (rwn->handles.size > 0)
+ return nbd_can_meta_context (rwn->handles.ptr[0], "base:allocation");
+ else
+ return false;
+}
+
+static bool
+nbd_ops_can_multi_conn (struct rw *rw)
+{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (rwn->handles.size > 0)
+ return nbd_can_multi_conn (rwn->handles.ptr[0]);
+ else
+ return false;
+}
+
+static void
+nbd_ops_start_multi_conn (struct rw *rw)
+{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+ size_t i;
+
+ for (i = 1; i < connections; ++i)
+ open_one_nbd_handle (rwn);
+
+ assert (rwn->handles.size == connections);
+}
+
static size_t
nbd_ops_synch_read (struct rw *rw,
void *data, size_t len, uint64_t offset)
{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
if (len > rw->size - offset)
len = rw->size - offset;
if (len == 0)
return 0;
- if (nbd_pread (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) {
+ if (nbd_pread (rwn->handles.ptr[0], data, len, offset, 0) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
@@ -77,7 +253,9 @@ static void
nbd_ops_synch_write (struct rw *rw,
const void *data, size_t len, uint64_t offset)
{
- if (nbd_pwrite (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) {
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (nbd_pwrite (rwn->handles.ptr[0], data, len, offset, 0) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
@@ -86,10 +264,12 @@ nbd_ops_synch_write (struct rw *rw,
static bool
nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
{
- if (!rw->u.nbd.can_trim)
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (!rwn->can_trim)
return false;
- if (nbd_trim (rw->u.nbd.handles.ptr[0], count, offset, 0) == -1) {
+ if (nbd_trim (rwn->handles.ptr[0], count, offset, 0) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
@@ -99,10 +279,12 @@ nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
static bool
nbd_ops_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
{
- if (!rw->u.nbd.can_zero)
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (!rwn->can_zero)
return false;
- if (nbd_zero (rw->u.nbd.handles.ptr[0],
+ if (nbd_zero (rwn->handles.ptr[0],
count, offset, LIBNBD_CMD_FLAG_NO_HOLE) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
@@ -115,7 +297,9 @@ nbd_ops_asynch_read (struct rw *rw,
struct command *command,
nbd_completion_callback cb)
{
- if (nbd_aio_pread (rw->u.nbd.handles.ptr[command->index],
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (nbd_aio_pread (rwn->handles.ptr[command->index],
slice_ptr (command->slice),
command->slice.len, command->offset,
cb, 0) == -1) {
@@ -129,7 +313,9 @@ nbd_ops_asynch_write (struct rw *rw,
struct command *command,
nbd_completion_callback cb)
{
- if (nbd_aio_pwrite (rw->u.nbd.handles.ptr[command->index],
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (nbd_aio_pwrite (rwn->handles.ptr[command->index],
slice_ptr (command->slice),
command->slice.len, command->offset,
cb, 0) == -1) {
@@ -142,12 +328,14 @@ static bool
nbd_ops_asynch_trim (struct rw *rw, struct command *command,
nbd_completion_callback cb)
{
- if (!rw->u.nbd.can_trim)
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (!rwn->can_trim)
return false;
assert (command->slice.len <= UINT32_MAX);
- if (nbd_aio_trim (rw->u.nbd.handles.ptr[command->index],
+ if (nbd_aio_trim (rwn->handles.ptr[command->index],
command->slice.len, command->offset,
cb, 0) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
@@ -160,12 +348,14 @@ static bool
nbd_ops_asynch_zero (struct rw *rw, struct command *command,
nbd_completion_callback cb)
{
- if (!rw->u.nbd.can_zero)
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+ if (!rwn->can_zero)
return false;
assert (command->slice.len <= UINT32_MAX);
- if (nbd_aio_zero (rw->u.nbd.handles.ptr[command->index],
+ if (nbd_aio_zero (rwn->handles.ptr[command->index],
command->slice.len, command->offset,
cb, LIBNBD_CMD_FLAG_NO_HOLE) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
@@ -212,19 +402,22 @@ add_extent (void *vp, const char *metacontext,
static unsigned
nbd_ops_in_flight (struct rw *rw, uintptr_t index)
{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
/* Since the commands are auto-retired in the callbacks we don't
* need to count "done" commands.
*/
- return nbd_aio_in_flight (rw->u.nbd.handles.ptr[index]);
+ return nbd_aio_in_flight (rwn->handles.ptr[index]);
}
static void
nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
int *fd, int *direction)
{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
struct nbd_handle *nbd;
- nbd = rw->u.nbd.handles.ptr[index];
+ nbd = rwn->handles.ptr[index];
*fd = nbd_aio_get_fd (nbd);
if (*fd == -1) {
@@ -240,7 +433,8 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
static void
nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
{
- if (nbd_aio_notify_read (rw->u.nbd.handles.ptr[index]) == -1) {
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+ if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
@@ -249,7 +443,8 @@ nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
static void
nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index)
{
- if (nbd_aio_notify_write (rw->u.nbd.handles.ptr[index]) == -1) {
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
+ if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
@@ -266,10 +461,11 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index,
uint64_t offset, uint64_t count,
extent_list *ret)
{
+ struct rw_nbd *rwn = (struct rw_nbd *) rw;
extent_list exts = empty_vector;
struct nbd_handle *nbd;
- nbd = rw->u.nbd.handles.ptr[index];
+ nbd = rwn->handles.ptr[index];
ret->size = 0;
@@ -331,9 +527,13 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index,
free (exts.ptr);
}
-struct rw_ops nbd_ops = {
+static struct rw_ops nbd_ops = {
.ops_name = "nbd_ops",
.close = nbd_ops_close,
+ .is_read_only = nbd_ops_is_read_only,
+ .can_extents = nbd_ops_can_extents,
+ .can_multi_conn = nbd_ops_can_multi_conn,
+ .start_multi_conn = nbd_ops_start_multi_conn,
.flush = nbd_ops_flush,
.synch_read = nbd_ops_synch_read,
.synch_write = nbd_ops_synch_write,
diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
index 69fac2a..94fbdeb 100644
--- a/copy/nbdcopy.h
+++ b/copy/nbdcopy.h
@@ -36,8 +36,6 @@
*/
#define THREAD_WORK_SIZE (128 * 1024 * 1024)
-DEFINE_VECTOR_TYPE (handles, struct nbd_handle *)
-
/* Abstracts the input (src) and output (dst) parameters on the
* command line.
*/
@@ -45,21 +43,20 @@ struct rw {
struct rw_ops *ops; /* Operations. */
const char *name; /* Printable name, for error messages etc. */
int64_t size; /* May be -1 for streams. */
- union {
- struct { /* For files and pipes. */
- int fd;
- struct stat stat;
- bool seek_hole_supported;
- int sector_size;
- } local;
- struct {
- handles handles; /* For NBD, one handle per connection. */
- bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */
- } nbd;
- } u;
+ /* Followed by private data for the particular subtype. */
};
-extern struct rw src, dst;
+extern struct rw *src, *dst;
+
+/* Create subtypes. */
+extern struct rw *file_create (const char *name,
+ const struct stat *stat, int fd);
+extern struct rw *nbd_rw_create_uri (const char *name,
+ const char *uri, bool writing);
+extern struct rw *nbd_rw_create_subprocess (const char **argv, size_t argc,
+ bool writing);
+extern struct rw *null_create (const char *name);
+extern struct rw *pipe_create (const char *name, int fd);
/* Underlying data buffers. */
struct buffer {
@@ -117,6 +114,28 @@ struct rw_ops {
/* Close the connection and free up associated resources. */
void (*close) (struct rw *rw);
+ /* Return true if this is a read-only connection. */
+ bool (*is_read_only) (struct rw *rw);
+
+ /* For source only, does it support reading extents? */
+ bool (*can_extents) (struct rw *rw);
+
+ /* Return true if the connection can do multi-conn. This is true
+ * for files, false for streams, and passed through for NBD.
+ */
+ bool (*can_multi_conn) (struct rw *rw);
+
+ /* For multi-conn capable backends, before copying we must call this
+ * to begin multi-conn. For NBD this means opening the additional
+ * connections.
+ */
+ void (*start_multi_conn) (struct rw *rw);
+
+ /* Truncate, only called on output files. This callback can be NULL
+ * for types that don't support this.
+ */
+ void (*truncate) (struct rw *rw, int64_t size);
+
/* Flush pending writes to permanent storage. */
void (*flush) (struct rw *rw);
@@ -188,10 +207,6 @@ struct rw_ops {
uint64_t offset, uint64_t count,
extent_list *ret);
};
-extern struct rw_ops file_ops;
-extern struct rw_ops nbd_ops;
-extern struct rw_ops pipe_ops;
-extern struct rw_ops null_ops;
extern void default_get_extents (struct rw *rw, uintptr_t index,
uint64_t offset, uint64_t count,
diff --git a/copy/null-ops.c b/copy/null-ops.c
index b2ca66f..3262fb5 100644
--- a/copy/null-ops.c
+++ b/copy/null-ops.c
@@ -30,10 +30,28 @@
* and fast zeroing.
*/
+static struct rw_ops null_ops;
+
+struct rw_null {
+ struct rw rw;
+};
+
+struct rw *
+null_create (const char *name)
+{
+ struct rw_null *rw = calloc (1, sizeof *rw);
+ if (rw == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+ rw->rw.ops = &null_ops;
+ rw->rw.name = name;
+ rw->rw.size = INT64_MAX;
+ return (struct rw *) rw;
+}
+
static void
null_close (struct rw *rw)
{
- /* nothing */
+ free (rw);
}
static void
@@ -42,6 +60,30 @@ null_flush (struct rw *rw)
/* nothing */
}
+static bool
+null_is_read_only (struct rw *rw)
+{
+ return false;
+}
+
+static bool
+null_can_extents (struct rw *rw)
+{
+ return false;
+}
+
+static bool
+null_can_multi_conn (struct rw *rw)
+{
+ return true;
+}
+
+static void
+null_start_multi_conn (struct rw *rw)
+{
+ /* nothing */
+}
+
static size_t
null_synch_read (struct rw *rw,
void *data, size_t len, uint64_t offset)
@@ -126,9 +168,13 @@ null_get_extents (struct rw *rw, uintptr_t index,
abort ();
}
-struct rw_ops null_ops = {
+static struct rw_ops null_ops = {
.ops_name = "null_ops",
.close = null_close,
+ .is_read_only = null_is_read_only,
+ .can_extents = null_can_extents,
+ .can_multi_conn = null_can_multi_conn,
+ .start_multi_conn = null_start_multi_conn,
.flush = null_flush,
.synch_read = null_synch_read,
.synch_write = null_synch_write,
diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c
index d127dad..286e6c0 100644
--- a/copy/pipe-ops.c
+++ b/copy/pipe-ops.c
@@ -26,10 +26,33 @@
#include "nbdcopy.h"
+static struct rw_ops pipe_ops;
+
+struct rw_pipe {
+ struct rw rw;
+ int fd;
+};
+
+struct rw *
+pipe_create (const char *name, int fd)
+{
+ struct rw_pipe *rwp = calloc (1, sizeof *rwp);
+ if (rwp == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+ /* Set size == -1 which means don't know. */
+ rwp->rw.ops = &pipe_ops;
+ rwp->rw.name = name;
+ rwp->rw.size = -1;
+ rwp->fd = fd;
+ return (struct rw *) rwp;
+}
+
static void
pipe_close (struct rw *rw)
{
- if (close (rw->u.local.fd) == -1) {
+ struct rw_pipe *rwp = (struct rw_pipe *) rw;
+
+ if (close (rwp->fd) == -1) {
fprintf (stderr, "%s: close: %m\n", rw->name);
exit (EXIT_FAILURE);
}
@@ -43,13 +66,39 @@ pipe_flush (struct rw *rw)
*/
}
+static bool
+pipe_is_read_only (struct rw *rw)
+{
+ return false;
+}
+
+static bool
+pipe_can_extents (struct rw *rw)
+{
+ return false;
+}
+
+static bool
+pipe_can_multi_conn (struct rw *rw)
+{
+ return false;
+}
+
+static void
+pipe_start_multi_conn (struct rw *rw)
+{
+ /* Should never be called. */
+ abort ();
+}
+
static size_t
pipe_synch_read (struct rw *rw,
void *data, size_t len, uint64_t offset)
{
+ struct rw_pipe *rwp = (struct rw_pipe *) rw;
ssize_t r;
- r = read (rw->u.local.fd, data, len);
+ r = read (rwp->fd, data, len);
if (r == -1) {
perror (rw->name);
exit (EXIT_FAILURE);
@@ -61,10 +110,11 @@ static void
pipe_synch_write (struct rw *rw,
const void *data, size_t len, uint64_t offset)
{
+ struct rw_pipe *rwp = (struct rw_pipe *) rw;
ssize_t r;
while (len > 0) {
- r = write (rw->u.local.fd, data, len);
+ r = write (rwp->fd, data, len);
if (r == -1) {
perror (rw->name);
exit (EXIT_FAILURE);
@@ -109,10 +159,16 @@ pipe_in_flight (struct rw *rw, uintptr_t index)
return 0;
}
-struct rw_ops pipe_ops = {
+static struct rw_ops pipe_ops = {
.ops_name = "pipe_ops",
.close = pipe_close,
+
+ .is_read_only = pipe_is_read_only,
+ .can_extents = pipe_can_extents,
+ .can_multi_conn = pipe_can_multi_conn,
+ .start_multi_conn = pipe_start_multi_conn,
+
.flush = pipe_flush,
.synch_read = pipe_synch_read,
diff --git a/copy/synch-copying.c b/copy/synch-copying.c
index 2712c10..985f005 100644
--- a/copy/synch-copying.c
+++ b/copy/synch-copying.c
@@ -38,13 +38,13 @@ synch_copying (void)
/* If the source size is unknown then we copy data and cannot use
* extent information.
*/
- if (src.size == -1) {
+ if (src->size == -1) {
size_t r;
- while ((r = src.ops->synch_read (&src, buf, sizeof buf, offset)) > 0) {
- dst.ops->synch_write (&dst, buf, r, offset);
+ while ((r = src->ops->synch_read (src, buf, sizeof buf, offset)) > 0) {
+ dst->ops->synch_write (dst, buf, r, offset);
offset += r;
- progress_bar (offset, src.size);
+ progress_bar (offset, src->size);
}
}
@@ -52,47 +52,47 @@ synch_copying (void)
* blocks and use extent information to optimize the case.
*/
else {
- while (offset < src.size) {
+ while (offset < src->size) {
extent_list exts = empty_vector;
- uint64_t count = src.size - offset;
+ uint64_t count = src->size - offset;
size_t i, r;
if (count > sizeof buf)
count = sizeof buf;
if (extents)
- src.ops->get_extents (&src, 0, offset, count, &exts);
+ src->ops->get_extents (src, 0, offset, count, &exts);
else
- default_get_extents (&src, 0, offset, count, &exts);
+ default_get_extents (src, 0, offset, count, &exts);
for (i = 0; i < exts.size; ++i) {
assert (exts.ptr[i].length <= count);
if (exts.ptr[i].zero) {
- if (!dst.ops->synch_trim (&dst, offset, exts.ptr[i].length) &&
- !dst.ops->synch_zero (&dst, offset, exts.ptr[i].length)) {
+ if (!dst->ops->synch_trim (dst, offset, exts.ptr[i].length) &&
+ !dst->ops->synch_zero (dst, offset, exts.ptr[i].length)) {
/* If neither trimming nor efficient zeroing are possible,
* write zeroes the hard way.
*/
memset (buf, 0, exts.ptr[i].length);
- dst.ops->synch_write (&dst, buf, exts.ptr[i].length, offset);
+ dst->ops->synch_write (dst, buf, exts.ptr[i].length, offset);
}
offset += exts.ptr[i].length;
}
else /* data */ {
- r = src.ops->synch_read (&src, buf, exts.ptr[i].length, offset);
+ r = src->ops->synch_read (src, buf, exts.ptr[i].length, offset);
/* These cases should never happen unless the file is
* truncated underneath us.
*/
if (r == 0 || r < exts.ptr[i].length) {
- fprintf (stderr, "%s: unexpected end of file\n", src.name);
+ fprintf (stderr, "%s: unexpected end of file\n", src->name);
exit (EXIT_FAILURE);
}
- dst.ops->synch_write (&dst, buf, r, offset);
+ dst->ops->synch_write (dst, buf, r, offset);
offset += r;
- progress_bar (offset, src.size);
+ progress_bar (offset, src->size);
}
}
--
2.29.0.rc2