On Thu, Aug 22, 2019 at 03:39:35PM +0100, Richard W.M. Jones wrote:
Previously the inner loop would issue nbd.pread() requests
synchronously, meaning that we would issue a request for each data
block from the nbdkit server (which would in turn synchronously
request the data from VMware) and wait until nbdkit replies before
continuing.
This converts the inner loop so it issues as many pread requests
asychronously to nbdkit as the server can handle (any extra are queued
in nbd_handle). The server will answer these in parallel and possibly
out of order.
This results in somewhat better throughput (for me: 13 minutes down to
5 minutes for an "initial" run). Although unfortunately we are still
limited by VDDK's braindead threading model.
---
wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++-------------
1 file changed, 39 insertions(+), 16 deletions(-)
diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py
index e655ead..e854009 100644
--- a/wrapper/disk_sync.py
+++ b/wrapper/disk_sync.py
@@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent):
return blocks
+# This is called back when nbd_aio_pread completes.
+def read_completed(fd, buf, offset, err):
+ logging.debug('Writing %d B to offset %d B' % (buf.size(), offset))
+ os.pwrite(fd, buf.to_bytearray(), offset)
+ # By returning 1 here we auto-retire the aio_pread command.
+ return 1
+
+
+# Process any AIO requests without blocking.
+def process_aio_requests(nbd_handle):
+ while nbd_handle.poll(0) == 1:
+ pass
+
+
+# Block until all AIO commands on the handle have finished.
+def wait_for_aio_commands_to_finish(nbd_handle):
+ while nbd_handle.aio_in_flight() > 0:
+ nbd_handle.poll(-1)
+
+
def sync_data():
state = State().instance
for key, disk in state['disks'].items():
@@ -491,25 +511,28 @@ def sync_data():
(block['length'], block['offset']))
# Optimize for memory usage, maybe?
os.pwrite(fd, [0] * block['length'], block['offset'])
- copied += block['length']
- disk['progress']['copied'] = copied
- state.write()
else:
- wrote = 0
- while wrote < block['length']:
- length = min(block['length'] - wrote, MAX_PREAD_LEN)
- offset = block['offset'] + wrote
+ count = 0
+ while count < block['length']:
+ length = min(block['length'] - count, MAX_PREAD_LEN)
+ offset = block['offset'] + count
+
logging.debug('Reading %d B from offset %d B' %
(length, offset))
- # Ideally use mmap() without any temporary buffer
- data = nbd_handle.pread(length, offset)
- logging.debug('Writing %d B to offset %d B' %
- (length, offset))
- os.pwrite(fd, data, offset)
- copied += length
- wrote += length
- disk['progress']['copied'] = copied
- state.write()
+ buf = nbd.Buffer(length)
+ nbd_handle.aio_pread(
+ buf, offset,
+ lambda err, fd=fd, buf=buf, offset=offset:
+ read_completed(fd, buf, offset, err))
If the order of parameters is changed, there is no need for the anonymous
function here, but that's just a small thing I noticed.
+ count += length
+
+ process_aio_requests(nbd_handle)
In order to allow less requests in flight, would it be enough to just do
something like this here (similarly to wait_for_aio_commands_to_finish)?
while nbd_handle.aio_in_flight() > NUM_IN_FLIGHT:
nbd_handle.poll(-1)
Also, I presume all of the locking is left to libnbd to be done (and as you can
see I don't concern myself with any locking in the whole file), but if that was
to be improved, is there some python part that would require it? For example
when cleaning up the code?
Thanks for the fix, I do not know why I was expecting something way more
complicated.
+
+ copied += block['length']
+ disk['progress']['copied'] = copied
+ state.write()
+
+ wait_for_aio_commands_to_finish(nbd_handle)
if copied == 0:
logging.debug('Nothing to copy for disk: %s(key=%s)' %
--
2.22.0