I want to keep more info per worker, and using a worker struct is the
natural way to do this. This also allows cleaning up the ops-* interface
which accepted uintptr_t index while the index is never a pointer. I
think the pointer is a result of passing the index to the thread using
the void* pointer.
The worker struct is used only by the multi-threading-copy module, but
in future patch I want to keep the worker pointer in the command, to
allow commands to update worker state when they finish.
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
copy/file-ops.c | 4 +--
copy/main.c | 6 ++---
copy/multi-thread-copying.c | 49 +++++++++++++++++++------------------
copy/nbd-ops.c | 10 ++++----
copy/nbdcopy.h | 24 +++++++++++-------
copy/null-ops.c | 4 +--
copy/pipe-ops.c | 2 +-
7 files changed, 53 insertions(+), 46 deletions(-)
diff --git a/copy/file-ops.c b/copy/file-ops.c
index aaf04ade..ab378754 100644
--- a/copy/file-ops.c
+++ b/copy/file-ops.c
@@ -614,27 +614,27 @@ file_asynch_zero (struct rw *rw, struct command *command,
{
int dummy = 0;
if (!file_synch_zero (rw, command->offset, command->slice.len, allocate))
return false;
cb.callback (cb.user_data, &dummy);
return true;
}
static unsigned
-file_in_flight (struct rw *rw, uintptr_t index)
+file_in_flight (struct rw *rw, size_t index)
{
return 0;
}
static void
-file_get_extents (struct rw *rw, uintptr_t index,
+file_get_extents (struct rw *rw, size_t index,
uint64_t offset, uint64_t count,
extent_list *ret)
{
ret->len = 0;
#ifdef SEEK_HOLE
struct rw_file *rwf = (struct rw_file *)rw;
static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER;
if (rwf->seek_hole_supported) {
diff --git a/copy/main.c b/copy/main.c
index 67788b5d..390de1eb 100644
--- a/copy/main.c
+++ b/copy/main.c
@@ -513,44 +513,44 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp)
fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name,
rw->name);
fprintf (fp, "%s: size=%" PRIi64 " (%s)\n",
prefix, rw->size, human_size (buf, rw->size, NULL));
}
/* Default implementation of rw->ops->get_extents for backends which
* don't/can't support extents. Also used for the --no-extents case.
*/
void
-default_get_extents (struct rw *rw, uintptr_t index,
+default_get_extents (struct rw *rw, size_t index,
uint64_t offset, uint64_t count,
extent_list *ret)
{
struct extent e;
ret->len = 0;
e.offset = offset;
e.length = count;
e.zero = false;
if (extent_list_append (ret, e) == -1) {
perror ("realloc");
exit (EXIT_FAILURE);
}
}
/* Implementations of get_polling_fd and asynch_notify_* for backends
* which don't support polling.
*/
void
-get_polling_fd_not_supported (struct rw *rw, uintptr_t index,
+get_polling_fd_not_supported (struct rw *rw, size_t index,
int *fd_rtn, int *direction_rtn)
{
/* Not an error, this causes poll to ignore the fd. */
*fd_rtn = -1;
*direction_rtn = LIBNBD_AIO_DIRECTION_READ;
}
void
-asynch_notify_read_write_not_supported (struct rw *rw, uintptr_t index)
+asynch_notify_read_write_not_supported (struct rw *rw, size_t index)
{
/* nothing */
}
diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
index aa6a9f41..a1a8d09c 100644
--- a/copy/multi-thread-copying.c
+++ b/copy/multi-thread-copying.c
@@ -70,184 +70,185 @@ get_next_offset (uint64_t *offset, uint64_t *count)
* the commands. We might move this into a callback, but those
* are called from threads and not necessarily in monotonic order
* so the progress bar would move erratically.
*/
progress_bar (*offset, src->size);
}
pthread_mutex_unlock (&lock);
return r;
}
-static void *worker_thread (void *ip);
+static void *worker_thread (void *wp);
void
multi_thread_copying (void)
{
- pthread_t *workers;
+ struct worker *workers;
size_t i;
int err;
/* Some invariants that should be true if the main program called us
* correctly.
*/
assert (threads > 0);
assert (threads == connections);
/*
if (src.ops == &nbd_ops)
assert (src.u.nbd.handles.size == connections);
if (dst.ops == &nbd_ops)
assert (dst.u.nbd.handles.size == connections);
*/
assert (src->size != -1);
- workers = malloc (sizeof (pthread_t) * threads);
+ workers = calloc (threads, sizeof *workers);
if (workers == NULL) {
- perror ("malloc");
+ perror ("calloc");
exit (EXIT_FAILURE);
}
/* Start the worker threads. */
for (i = 0; i < threads; ++i) {
- err = pthread_create (&workers[i], NULL, worker_thread,
- (void *)(uintptr_t)i);
+ workers[i].index = i;
+ err = pthread_create (&workers[i].thread, NULL, worker_thread,
+ &workers[i]);
if (err != 0) {
errno = err;
perror ("pthread_create");
exit (EXIT_FAILURE);
}
}
/* Wait until all worker threads exit. */
for (i = 0; i < threads; ++i) {
- err = pthread_join (workers[i], NULL);
+ err = pthread_join (workers[i].thread, NULL);
if (err != 0) {
errno = err;
perror ("pthread_join");
exit (EXIT_FAILURE);
}
}
free (workers);
}
-static void wait_for_request_slots (uintptr_t index);
-static unsigned in_flight (uintptr_t index);
-static void poll_both_ends (uintptr_t index);
+static void wait_for_request_slots (size_t index);
+static unsigned in_flight (size_t index);
+static void poll_both_ends (size_t index);
static int finished_read (void *vp, int *error);
static int finished_command (void *vp, int *error);
static void free_command (struct command *command);
static void fill_dst_range_with_zeroes (struct command *command);
static struct command *create_command (uint64_t offset, size_t len, bool zero,
- uintptr_t index);
+ size_t index);
/* There are 'threads' worker threads, each copying work ranges from
* src to dst until there are no more work ranges.
*/
static void *
-worker_thread (void *indexp)
+worker_thread (void *wp)
{
- uintptr_t index = (uintptr_t) indexp;
+ struct worker *w = wp;
uint64_t offset, count;
extent_list exts = empty_vector;
while (get_next_offset (&offset, &count)) {
size_t i;
assert (0 < count && count <= THREAD_WORK_SIZE);
if (extents)
- src->ops->get_extents (src, index, offset, count, &exts);
+ src->ops->get_extents (src, w->index, offset, count, &exts);
else
- default_get_extents (src, index, offset, count, &exts);
+ default_get_extents (src, w->index, offset, count, &exts);
for (i = 0; i < exts.len; ++i) {
struct command *command;
size_t len;
if (exts.ptr[i].zero) {
/* The source is zero so we can proceed directly to skipping,
* fast zeroing, or writing zeroes at the destination.
*/
command = create_command (exts.ptr[i].offset, exts.ptr[i].length,
- true, index);
+ true, w->index);
fill_dst_range_with_zeroes (command);
}
else /* data */ {
/* As the extent might be larger than permitted for a single
* command, we may have to split this into multiple read
* requests.
*/
while (exts.ptr[i].length > 0) {
len = exts.ptr[i].length;
if (len > request_size)
len = request_size;
command = create_command (exts.ptr[i].offset, len,
- false, index);
+ false, w->index);
- wait_for_request_slots (index);
+ wait_for_request_slots (w->index);
/* Begin the asynch read operation. */
src->ops->asynch_read (src, command,
(nbd_completion_callback) {
.callback = finished_read,
.user_data = command,
});
exts.ptr[i].offset += len;
exts.ptr[i].length -= len;
}
}
offset += count;
count = 0;
} /* for extents */
}
/* Wait for in flight NBD requests to finish. */
- while (in_flight (index) > 0)
- poll_both_ends (index);
+ while (in_flight (w->index) > 0)
+ poll_both_ends (w->index);
free (exts.ptr);
return NULL;
}
/* If the number of requests in flight exceeds the limit, poll
* waiting for at least one request to finish. This enforces
* the user --requests option.
*
* NB: Unfortunately it's not possible to call this from a callback,
* since it will deadlock trying to grab the libnbd handle lock. This
* means that although the worker thread calls this and enforces the
* limit, when we split up requests into subrequests (eg. doing
* sparseness detection) we will probably exceed the user request
* limit. XXX
*/
static void
-wait_for_request_slots (uintptr_t index)
+wait_for_request_slots (size_t index)
{
while (in_flight (index) >= max_requests)
poll_both_ends (index);
}
/* Count the number of asynchronous commands in flight. */
static unsigned
-in_flight (uintptr_t index)
+in_flight (size_t index)
{
return src->ops->in_flight (src, index) + dst->ops->in_flight (dst,
index);
}
/* Poll (optional) NBD src and NBD dst, moving the state machine(s)
* along. This is a lightly modified nbd_poll.
*/
static void
-poll_both_ends (uintptr_t index)
+poll_both_ends (size_t index)
{
struct pollfd fds[2];
int r, direction;
memset (fds, 0, sizeof fds);
/* Note: if polling is not supported, this function will
* set fd == -1 which poll ignores.
*/
src->ops->get_polling_fd (src, index, &fds[0].fd, &direction);
@@ -331,21 +332,21 @@ create_buffer (size_t len)
exit (EXIT_FAILURE);
}
buffer->refs = 1;
return buffer;
}
/* Create a new command for read or zero. */
static struct command *
-create_command (uint64_t offset, size_t len, bool zero, uintptr_t index)
+create_command (uint64_t offset, size_t len, bool zero, size_t index)
{
struct command *command;
command = calloc (1, sizeof *command);
if (command == NULL) {
perror ("calloc");
exit (EXIT_FAILURE);
}
command->offset = offset;
diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c
index 10551d3a..dca86e88 100644
--- a/copy/nbd-ops.c
+++ b/copy/nbd-ops.c
@@ -377,32 +377,32 @@ add_extent (void *vp, const char *metacontext,
exit (EXIT_FAILURE);
}
offset += entries[i];
}
return 0;
}
static unsigned
-nbd_ops_in_flight (struct rw *rw, uintptr_t index)
+nbd_ops_in_flight (struct rw *rw, size_t index)
{
struct rw_nbd *rwn = (struct rw_nbd *) rw;
/* Since the commands are auto-retired in the callbacks we don't
* need to count "done" commands.
*/
return nbd_aio_in_flight (rwn->handles.ptr[index]);
}
static void
-nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
+nbd_ops_get_polling_fd (struct rw *rw, size_t index,
int *fd, int *direction)
{
struct rw_nbd *rwn = (struct rw_nbd *) rw;
struct nbd_handle *nbd;
nbd = rwn->handles.ptr[index];
*fd = nbd_aio_get_fd (nbd);
if (*fd == -1)
goto error;
@@ -412,47 +412,47 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
goto error;
return;
error:
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
static void
-nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
+nbd_ops_asynch_notify_read (struct rw *rw, size_t index)
{
struct rw_nbd *rwn = (struct rw_nbd *) rw;
if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
}
static void
-nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index)
+nbd_ops_asynch_notify_write (struct rw *rw, size_t index)
{
struct rw_nbd *rwn = (struct rw_nbd *) rw;
if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) {
fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
exit (EXIT_FAILURE);
}
}
/* This is done synchronously, but that's fine because commands from
* the previous work range in flight continue to run, it's difficult
* to (sanely) start new work until we have the full list of extents,
* and in almost every case the remote NBD server can answer our
* request for extents in a single round trip.
*/
static void
-nbd_ops_get_extents (struct rw *rw, uintptr_t index,
+nbd_ops_get_extents (struct rw *rw, size_t index,
uint64_t offset, uint64_t count,
extent_list *ret)
{
struct rw_nbd *rwn = (struct rw_nbd *) rw;
extent_list exts = empty_vector;
struct nbd_handle *nbd;
nbd = rwn->handles.ptr[index];
ret->len = 0;
diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
index c070f8d7..4fe8bee6 100644
--- a/copy/nbdcopy.h
+++ b/copy/nbdcopy.h
@@ -71,36 +71,42 @@ struct buffer {
struct slice {
size_t len; /* Length of slice. */
size_t base; /* Start of slice relative to buffer. */
struct buffer *buffer; /* Underlying allocation (may be shared
* or NULL).
*/
};
#define slice_ptr(slice) ((slice).buffer->data + (slice).base)
+/* Worker state used by multi-threaded copying. */
+struct worker {
+ pthread_t thread;
+ size_t index;
+};
+
/* Commands for asynchronous operations in flight.
*
* We don't store the command type (read/write/zero/etc) because it is
* implicit in the function being called and because commands
* naturally change from read -> write/zero/etc as they progress.
*
* slice.buffer may be NULL for commands (like zero) that have no
* associated data.
*
* A separate set of commands, slices and buffers is maintained per
* thread so no locking is necessary.
*/
struct command {
uint64_t offset; /* Offset relative to start of disk. */
struct slice slice; /* Data slice. */
- uintptr_t index; /* Thread number. */
+ size_t index; /* Thread number. */
};
/* List of extents for rw->ops->get_extents. */
struct extent {
uint64_t offset;
uint64_t length;
bool zero;
};
DEFINE_VECTOR_TYPE(extent_list, struct extent);
@@ -173,51 +179,51 @@ struct rw_ops {
struct command *command,
nbd_completion_callback cb);
/* Asynchronously zero. command->slice.buffer is not used. If not possible,
* returns false. 'cb' must be called only if returning true.
*/
bool (*asynch_zero) (struct rw *rw, struct command *command,
nbd_completion_callback cb, bool allocate);
/* Number of asynchronous commands in flight for a particular thread. */
- unsigned (*in_flight) (struct rw *rw, uintptr_t index);
+ unsigned (*in_flight) (struct rw *rw, size_t index);
/* Get polling file descriptor and direction, and notify read/write.
* For sources which cannot be polled (such as files and pipes)
* get_polling_fd returns fd == -1 (NOT an error), and the
* asynch_notify_* functions are no-ops.
*/
- void (*get_polling_fd) (struct rw *rw, uintptr_t index,
+ void (*get_polling_fd) (struct rw *rw, size_t index,
int *fd_rtn, int *direction_rtn);
- void (*asynch_notify_read) (struct rw *rw, uintptr_t index);
- void (*asynch_notify_write) (struct rw *rw, uintptr_t index);
+ void (*asynch_notify_read) (struct rw *rw, size_t index);
+ void (*asynch_notify_write) (struct rw *rw, size_t index);
/* Read base:allocation extents metadata for a region of the source.
* For local files the same information is read from the kernel.
*
* Note that qemu-img fetches extents for the entire disk up front,
* and we want to avoid doing that because it had very negative
* behaviour for certain sources (ie. VDDK).
*/
- void (*get_extents) (struct rw *rw, uintptr_t index,
+ void (*get_extents) (struct rw *rw, size_t index,
uint64_t offset, uint64_t count,
extent_list *ret);
};
-extern void default_get_extents (struct rw *rw, uintptr_t index,
+extern void default_get_extents (struct rw *rw, size_t index,
uint64_t offset, uint64_t count,
extent_list *ret);
-extern void get_polling_fd_not_supported (struct rw *rw, uintptr_t index,
+extern void get_polling_fd_not_supported (struct rw *rw, size_t index,
int *fd_rtn, int *direction_rtn);
extern void asynch_notify_read_write_not_supported (struct rw *rw,
- uintptr_t index);
+ size_t index);
extern bool allocated;
extern unsigned connections;
extern bool destination_is_zero;
extern bool extents;
extern bool flush;
extern unsigned max_requests;
extern bool progress;
extern int progress_fd;
extern unsigned request_size;
diff --git a/copy/null-ops.c b/copy/null-ops.c
index 5f1fda50..1218a623 100644
--- a/copy/null-ops.c
+++ b/copy/null-ops.c
@@ -126,27 +126,27 @@ static bool
null_asynch_zero (struct rw *rw, struct command *command,
nbd_completion_callback cb, bool allocate)
{
int dummy = 0;
cb.callback (cb.user_data, &dummy);
return true;
}
static unsigned
-null_in_flight (struct rw *rw, uintptr_t index)
+null_in_flight (struct rw *rw, size_t index)
{
return 0;
}
static void
-null_get_extents (struct rw *rw, uintptr_t index,
+null_get_extents (struct rw *rw, size_t index,
uint64_t offset, uint64_t count,
extent_list *ret)
{
abort ();
}
static struct rw_ops null_ops = {
.ops_name = "null_ops",
.close = null_close,
.is_read_only = null_is_read_only,
diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c
index f9b8599a..3c8b6c2b 100644
--- a/copy/pipe-ops.c
+++ b/copy/pipe-ops.c
@@ -147,21 +147,21 @@ pipe_asynch_write (struct rw *rw,
}
static bool
pipe_asynch_zero (struct rw *rw, struct command *command,
nbd_completion_callback cb, bool allocate)
{
return false; /* not supported by pipes */
}
static unsigned
-pipe_in_flight (struct rw *rw, uintptr_t index)
+pipe_in_flight (struct rw *rw, size_t index)
{
return 0;
}
static struct rw_ops pipe_ops = {
.ops_name = "pipe_ops",
.close = pipe_close,
.is_read_only = pipe_is_read_only,
--
2.35.1