Extract finalize_transfer() function hiding the complexity of finalizing
and waiting for a transfer, and separating plugin logic like writing the
disk_id file from oVirt logic.
Since canceling a transfer is not part of finalizing, but the current
plugin policy, move cancellation after finalize failure to close().
When waiting for the disk status change, handle the ILLEGAL status, so
we can fail quickly if oVirt failed to finalized and paused the
transfer, instead of waiting for timeout.
---
v2v/rhv-upload-plugin.py | 106 +++++++++++++++++++++++++--------------
1 file changed, 67 insertions(+), 39 deletions(-)
diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py
index b9b9e967..85dec7a6 100644
--- a/v2v/rhv-upload-plugin.py
+++ b/v2v/rhv-upload-plugin.py
@@ -306,10 +306,7 @@ def close(h):
http = h['http']
connection = h['connection']
transfer = h['transfer']
-
- transfer_service = (connection.system_service()
- .image_transfers_service()
- .image_transfer_service(transfer.id))
+ disk_id = h['disk_id']
# This is sometimes necessary because python doesn't set up
# sys.stderr to be line buffered and so debug, errors or
@@ -323,49 +320,22 @@ def close(h):
# the transfer will delete the disk.
if h['failed']:
try:
- transfer_service.cancel()
+ cancel_transfer(connection, transfer)
finally:
connection.close()
return
+ # Try to finalize the transfer. On errors the transfer may be paused by the
+ # system, and we need to cancel the transfer to remove the disk.
try:
- disk_id = h['disk_id']
-
- transfer_service.finalize()
-
- # Wait until the transfer disk job is completed since
- # only then we can be sure the disk is unlocked. As this
- # code is not very clear, what's happening is that we are
- # waiting for the transfer object to cease to exist, which
- # falls through to the exception case and then we can
- # continue.
- disk_service = (
- connection.system_service().disks_service().disk_service(disk_id))
- start = time.time()
- try:
- while True:
- time.sleep(1)
- disk = disk_service.get()
- if disk.status == types.DiskStatus.LOCKED:
- if time.time() > start + timeout:
- raise RuntimeError("timed out waiting for transfer "
- "to finalize")
- continue
- if disk.status == types.DiskStatus.OK:
- debug("finalized after %s seconds" % (time.time() -
start))
- break
- except sdk.NotFoundError:
- raise RuntimeError("transfer failed: disk %s not found" % disk_id)
-
+ finalize_transfer(connection, transfer, disk_id)
+ except:
+ cancel_transfer(connection, transfer)
+ raise
+ else:
# Write the disk ID file. Only do this on successful completion.
with builtins.open(params['diskid_file'], 'w') as fp:
fp.write(disk_id)
-
- except:
- # If oVirt engine fails to finalize the transfer, it will pause the
- # transfer and keep the disk.
- transfer_service.cancel()
- raise
finally:
connection.close()
@@ -541,6 +511,64 @@ def cancel_transfer(connection, transfer):
.image_transfer_service(transfer.id))
transfer_service.cancel()
+def finalize_transfer(connection, transfer, disk_id):
+ """
+ Finalize a transfer, making the transfer disk available.
+
+ If finalizing succeeds, transfer's phase will change to FINISHED_SUCCESS
+ and the transer's disk status will change to OK. On errors, the transfer's
+ phase will changed to FINISHED_FAILURE and the disk status will change to
+ ILLEGAL and it will be removed. In both cases the transfer entity will be
+ removed shortly after.
+
+ If oVirt fails to finalize the transfer, transfer's phase will changed to
+ PAUSED_SYSTEM. In this case the disk's status will change to ILLEGAL and it
+ will not be removed.
+
+ For simplicity, we track only disk's status changes.
+
+ For more info see:
+ -
http://ovirt.github.io/ovirt-engine-api-model/4.4/#services/image_transfer
+ -
http://ovirt.github.io/ovirt-engine-sdk/master/types.m.html#ovirtsdk4.typ...
+ """
+ debug("finalizing transfer %s" % transfer.id)
+ transfer_service = (connection.system_service()
+ .image_transfers_service()
+ .image_transfer_service(transfer.id))
+
+ start = time.time()
+
+ transfer_service.finalize()
+
+ disk_service = (connection.system_service()
+ .disks_service()
+ .disk_service(disk_id))
+
+ while True:
+ time.sleep(1)
+ try:
+ disk = disk_service.get()
+ except sdk.NotFoundError:
+ # Disk verification failed and the system removed the disk.
+ raise RuntimeError(
+ "transfer %s failed: disk %s was removed"
+ % (transfer.id, disk_id))
+
+ if disk.status == types.DiskStatus.ILLEGAL:
+ # Disk verification failed or transfer was paused by the system.
+ raise RuntimeError(
+ "transfer %s failed: disk is ILLEGAL" % transfer.id)
+
+ if disk.status == types.DiskStatus.OK:
+ debug("transfer %s finalized in %.3f seconds"
+ % (transfer.id, time.time() - start))
+ break
+
+ if time.time() > start + timeout:
+ raise RuntimeError(
+ "timed out waiting for transfer %s to finalize"
+ % transfer.id)
+
# oVirt imageio operations
def parse_transfer_url(transfer):
--
2.21.0