Previously the inner loop would issue nbd.pread() requests
synchronously, meaning that we would issue a request for each data
block from the nbdkit server (which would in turn synchronously
request the data from VMware) and wait until nbdkit replies before
continuing.
This converts the inner loop so it issues as many pread requests
asychronously to nbdkit as the server can handle (any extra are queued
in nbd_handle). The server will answer these in parallel and possibly
out of order.
This results in somewhat better throughput (for me: 13 minutes down to
5 minutes for an "initial" run). Although unfortunately we are still
limited by VDDK's braindead threading model.
---
wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++-------------
1 file changed, 39 insertions(+), 16 deletions(-)
diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py
index e655ead..e854009 100644
--- a/wrapper/disk_sync.py
+++ b/wrapper/disk_sync.py
@@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent):
return blocks
+# This is called back when nbd_aio_pread completes.
+def read_completed(fd, buf, offset, err):
+ logging.debug('Writing %d B to offset %d B' % (buf.size(), offset))
+ os.pwrite(fd, buf.to_bytearray(), offset)
+ # By returning 1 here we auto-retire the aio_pread command.
+ return 1
+
+
+# Process any AIO requests without blocking.
+def process_aio_requests(nbd_handle):
+ while nbd_handle.poll(0) == 1:
+ pass
+
+
+# Block until all AIO commands on the handle have finished.
+def wait_for_aio_commands_to_finish(nbd_handle):
+ while nbd_handle.aio_in_flight() > 0:
+ nbd_handle.poll(-1)
+
+
def sync_data():
state = State().instance
for key, disk in state['disks'].items():
@@ -491,25 +511,28 @@ def sync_data():
(block['length'], block['offset']))
# Optimize for memory usage, maybe?
os.pwrite(fd, [0] * block['length'], block['offset'])
- copied += block['length']
- disk['progress']['copied'] = copied
- state.write()
else:
- wrote = 0
- while wrote < block['length']:
- length = min(block['length'] - wrote, MAX_PREAD_LEN)
- offset = block['offset'] + wrote
+ count = 0
+ while count < block['length']:
+ length = min(block['length'] - count, MAX_PREAD_LEN)
+ offset = block['offset'] + count
+
logging.debug('Reading %d B from offset %d B' %
(length, offset))
- # Ideally use mmap() without any temporary buffer
- data = nbd_handle.pread(length, offset)
- logging.debug('Writing %d B to offset %d B' %
- (length, offset))
- os.pwrite(fd, data, offset)
- copied += length
- wrote += length
- disk['progress']['copied'] = copied
- state.write()
+ buf = nbd.Buffer(length)
+ nbd_handle.aio_pread(
+ buf, offset,
+ lambda err, fd=fd, buf=buf, offset=offset:
+ read_completed(fd, buf, offset, err))
+ count += length
+
+ process_aio_requests(nbd_handle)
+
+ copied += block['length']
+ disk['progress']['copied'] = copied
+ state.write()
+
+ wait_for_aio_commands_to_finish(nbd_handle)
if copied == 0:
logging.debug('Nothing to copy for disk: %s(key=%s)' %
--
2.22.0