Handling extents is complicated, in particular when using async block
status. But I could not get this working using sync block status; It
seems that libnbd eats the events on the source, and then we hang
forever waiting for inflight reads.
Since getting extents is asynchronous, and starting new request depends
on being able to get the next extent, requests have now a new a waiting
state. When a request is started when extents are not available, it is
marked as waiting. When extents request is completed, we start all
waiting requests.
Here is example log showing whats going on while copying fedora 32
image:
The first request detect that we don't have extents yet, so it starts
aync block status request.
copy-libev: r0: start extents offset=0 count=134217728
copy-libev: r0: received 14 extents for base:allocation
copy-libev: r0: extents completed time=0.001098
When extent request completes, we start al waiting requests. The first
request (r0) looked into the first extent (e0) and consumed all of it:
copy-libev: r0: start request offset=0
copy-libev: e0: offset=0 len=65536 zero=0
copy-libev: r0: extent offset=0 len=65536 zero=0
copy-libev: r0: start read offset=0 len=65536
The second request (r15) is looking into the second extent (e1) and
consume all of it, starting a zero request:
copy-libev: r15: start request offset=65536
copy-libev: e1: offset=65536 len=983040 zero=1
copy-libev: r15: extent offset=65536 len=983040 zero=1
copy-libev: r15: start zero offset=65536 len=983040
...
Request (r12) looked into the fifth extent (e4), but since this extent
was large (10747904), it consume only 1m from it:
copy-libev: r12: start request offset=2097152
copy-libev: e4: offset=2097152 len=10747904 zero=0
copy-libev: r12: extent offset=2097152 len=1048576 zero=0
copy-libev: r12: start read offset=2097152 len=1048576
The next request consumed the next 1m from the same extent (e4):
copy-libev: r11: start request offset=3145728
copy-libev: e4: offset=3145728 len=9699328 zero=0
copy-libev: r11: extent offset=3145728 len=1048576 zero=0
copy-libev: r11: start read offset=3145728 len=1048576
copy-libev: r10: start request offset=4194304
..
The last part of extent e4 was consumed, and we switched to extent e5:
copy-libev: r2: start request offset=12582912
copy-libev: e4: offset=12582912 len=262144 zero=0
copy-libev: r2: extent offset=12582912 len=262144 zero=0
copy-libev: r2: start read offset=12582912 len=262144
copy-libev: r1: start request offset=12845056
copy-libev: e5: offset=12845056 len=131072 zero=1
copy-libev: r1: extent offset=12845056 len=131072 zero=1
copy-libev: r1: start zero offset=12845056 len=131072
...
Request (r11) consumed the last extent (e13), starting a zero request.
This free the extents array:
copy-libev: r11: start request offset=133955584
copy-libev: e13: offset=133955584 len=262144 zero=1
copy-libev: r11: extent offset=133955584 len=262144 zero=1
copy-libev: r11: consumed all extents offset=134217728
copy-libev: r11: start zero offset=133955584 len=262144
...
Request (r12) started when extents array as cleared, so it started new
block status request:
copy-libev: r12: start extents offset=134217728 count=134217728
...
copy-libev: r12: received 3 extents for base:allocation
copy-libev: r12: extents completed time=0.003027
...
The rest of the flow is same as before. When all requests are done,
we shutdown the event loop and flush:
copy-libev: r4: request completed offset=6438256640 len=1048576 time=0.000132
copy-libev: r1: read completed offset=6442385408 len=65536
copy-libev: r1: start write offset=6442385408 len=65536
copy-libev: r14: request completed offset=6439305216 len=1048576 time=0.000126
copy-libev: r8: request completed offset=6440353792 len=1048576 time=0.000151
copy-libev: r2: request completed offset=6441402368 len=983040 time=0.000143
copy-libev: r1: request completed offset=6442385408 len=65536 time=0.000142
copy-libev: flush
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
examples/copy-libev.c | 232 ++++++++++++++++++++++++++++++++++++++++--
1 file changed, 224 insertions(+), 8 deletions(-)
diff --git a/examples/copy-libev.c b/examples/copy-libev.c
index 84d5c03..3030955 100644
--- a/examples/copy-libev.c
+++ b/examples/copy-libev.c
@@ -41,6 +41,7 @@
*/
#define MAX_REQUESTS 16
#define REQUEST_SIZE (1024 * 1024)
+#define EXTENTS_SIZE (128 * 1024 * 1024)
#define MIN(a,b) (a) < (b) ? (a) : (b)
@@ -62,14 +63,18 @@ struct connection {
ev_io watcher;
struct nbd_handle *nbd;
bool can_zero;
+ bool can_extents;
};
struct request {
+ ev_timer watcher; /* For starting on next loop iteration. */
int64_t offset;
size_t length;
+ bool zero;
unsigned char *data;
size_t index;
ev_tstamp started;
+ bool waiting; /* Waiting for extents completion. */
};
static struct ev_loop *loop;
@@ -77,11 +82,29 @@ static ev_prepare prepare;
static struct connection src;
static struct connection dst;
static struct request requests[MAX_REQUESTS];
+
+/* List of extents received from source server. Using the same format returned
+ * by libnbd, array of uint32_t pairs. The first item is the length of the
+ * extent, and the second is the extent flags.
+ *
+ * The number of extents is extents_len / 2. extents_pos is the index of the
+ * current extent.
+ *
+ * extents_in_progress flag is set when we start asynchronous block status
+ * request.
+ */
+static uint32_t *extents;
+static size_t extents_len;
+static size_t extents_pos;
+static bool extents_in_progress;
+
static int64_t size;
static int64_t offset;
static int64_t written;
static bool debug;
+static inline void start_request_soon (struct request *r);
+static void start_request_cb (struct ev_loop *loop, ev_timer *w, int revents);
static void start_request(struct request *r);
static void start_read(struct request *r);
static void start_write(struct request *r);
@@ -133,23 +156,206 @@ get_events(struct connection *c)
default:
return 0;
}
+}
+
+static int
+extent_callback (void *user_data, const char *metacontext, uint64_t offset,
+ uint32_t *entries, size_t nr_entries, int *error)
+{
+ struct request *r = user_data;
+
+ if (strcmp (metacontext, LIBNBD_CONTEXT_BASE_ALLOCATION) != 0) {
+ DEBUG ("Unexpected meta context: %s", metacontext);
+ return 1;
+ }
+
+ extents = malloc (nr_entries * sizeof *extents);
+ if (extents == NULL)
+ FAIL ("Cannot allocated extents: %s", strerror (errno));
+
+ memcpy (extents, entries, nr_entries * sizeof *extents);
+ extents_len = nr_entries;
+
+ DEBUG ("r%d: received %d extents for %s",
+ r->index, nr_entries / 2, metacontext);
+
+ return 1;
+}
+
+static int
+extents_completed (void *user_data, int *error)
+{
+ struct request *r = (struct request *)user_data;
+ int i;
+
+ DEBUG ("r%d: extents completed time=%.6f",
+ r->index, ev_now (loop) - r->started);
+
+ extents_in_progress = false;
+
+ if (extents == NULL) {
+ DEBUG ("r%d: received no extents, disabling extents", r->index);
+ src.can_extents = false;
+ }
+ /* Start requests waiting for extents completion on the next loop
+ * iteration, to avoid deadlock if we need to start a read.
+ */
+ for (i = 0; i < MAX_REQUESTS; i++) {
+ struct request *r = &requests[i];
+ if (r->waiting) {
+ r->waiting = false;
+ start_request_soon (r);
+ }
+ }
+ return 1;
+}
+
+static bool
+start_extents (struct request *r)
+{
+ size_t count = MIN (EXTENTS_SIZE, size - offset);
+ int64_t cookie;
+
+ if (extents_in_progress) {
+ r->waiting = true;
+ return true;
+ }
+
+ DEBUG ("r%d: start extents offset=%ld count=%ld", r->index, offset,
count);
+
+ cookie = nbd_aio_block_status (
+ src.nbd, count, offset,
+ (nbd_extent_callback) { .callback=extent_callback,
+ .user_data=r },
+ (nbd_completion_callback) { .callback=extents_completed,
+ .user_data=r },
+ 0);
+ if (cookie == -1) {
+ DEBUG ("Cannot get extents: %s", nbd_get_error ());
+ src.can_extents = false;
+ return false;
+ }
+
+ r->waiting = true;
+ extents_in_progress = true;
+
+ return true;
+}
+
+/* Return next extent to process. */
+static void
+next_extent (struct request *r)
+{
+ uint32_t limit = MIN (REQUEST_SIZE, size - offset);
+ uint32_t length = 0;
+ bool is_zero;
+
+ assert (extents);
+
+ is_zero = extents[extents_pos + 1] & LIBNBD_STATE_ZERO;
+
+ while (length < limit) {
+ DEBUG ("e%d: offset=%ld len=%ld zero=%d",
+ extents_pos / 2, offset, extents[extents_pos], is_zero);
+
+ /* If this extent is too large, steal some data from it to
+ * complete the request.
+ */
+ if (length + extents[extents_pos] > limit) {
+ uint32_t stolen = limit - length;
+
+ extents[extents_pos] -= stolen;
+ length += stolen;
+ break;
+ }
+
+ /* Consume the entire extent and start looking at the next one. */
+ length += extents[extents_pos];
+ extents[extents_pos] = 0;
+
+ if (extents_pos + 2 == extents_len)
+ break;
+
+ extents_pos += 2;
+
+ /* If next extent is different, we are done. */
+ if ((extents[extents_pos + 1] & LIBNBD_STATE_ZERO) != is_zero)
+ break;
+ }
+
+ assert (length > 0 && length <= limit);
+
+ r->offset = offset;
+ r->length = length;
+ r->zero = is_zero;
+
+ DEBUG ("r%d: extent offset=%ld len=%ld zero=%d",
+ r->index, r->offset, r->length, r->zero);
+
+ offset += length;
+
+ if (extents_pos + 2 == extents_len && extents[extents_pos] == 0) {
+ /* Processed all extents, clear extents. */
+ DEBUG ("r%d: consumed all extents offset=%ld", r->index, offset);
+ free (extents);
+ extents = NULL;
+ extents_pos = 0;
+ extents_len = 0;
+ }
+}
+
+static inline void
+start_request_soon (struct request *r)
+{
+ ev_timer_init (&r->watcher, start_request_cb, 0, 0);
+ ev_timer_start (loop, &r->watcher);
+}
+
+static void
+start_request_cb (struct ev_loop *loop, ev_timer *w, int revents)
+{
+ struct request *r = (struct request *)w;
+ start_request (r);
}
/* Start async copy or zero request. */
static void
start_request(struct request *r)
{
- assert (offset < size);
+ /* Cancel the request if we are done. */
+ if (offset == size)
+ return;
r->started = ev_now (loop);
- r->length = MIN (REQUEST_SIZE, size - offset);
- r->offset = offset;
- start_read (r);
-
- offset += r->length;
+ /* If needed, get more extents from server. */
+ if (src.can_extents && extents == NULL && start_extents (r))
+ return;
+
+ DEBUG ("r%d: start request offset=%ld", r->index, offset);
+
+ if (src.can_extents) {
+ /* Handle the next extent. */
+ next_extent (r);
+ if (r->zero) {
+ if (dst.can_zero) {
+ start_zero (r);
+ } else {
+ memset (r->data, 0, r->length);
+ start_write (r);
+ }
+ } else {
+ start_read (r);
+ }
+ } else {
+ /* Extents not available. */
+ r->length = MIN (REQUEST_SIZE, size - offset);
+ r->offset = offset;
+ start_read (r);
+ offset += r->length;
+ }
}
static void
@@ -240,9 +446,11 @@ request_completed (void *user_data, int *error)
ev_break (loop, EVBREAK_ALL);
}
- /* If we have data to read, start a new read. */
+ /* If we have more work, start a new request on the next loop
+ * iteration, to avoid deadlock if we need to start a zero or write.
+ */
if (offset < size)
- start_request(r);
+ start_request_soon(r);
return 1;
}
@@ -304,11 +512,19 @@ main (int argc, char *argv[])
debug = getenv ("COPY_LIBEV_DEBUG") != NULL;
+ /* Configure soruce to report extents. */
+
+ if (nbd_add_meta_context (src.nbd, LIBNBD_CONTEXT_BASE_ALLOCATION))
+ FAIL ("Cannot add base:allocation: %s", nbd_get_error ());
+
/* Connecting is fast, so use the syncronous API. */
if (nbd_connect_uri (src.nbd, argv[1]))
FAIL ("Cannot connect to source: %s", nbd_get_error ());
+ src.can_extents = nbd_can_meta_context (
+ src.nbd, LIBNBD_CONTEXT_BASE_ALLOCATION) > 0;
+
if (nbd_connect_uri (dst.nbd, argv[2]))
FAIL ("Cannot connect to destination: %s", nbd_get_error ());
--
2.26.2