We can use now ImageioClient to communicate with ovirt-imageio server
on oVirt host.
Using the client greatly simplifies the plugin, and enables new features
like transparent proxy support. The client will use transfer_url if
possible, or fall back to proxy_url.
Since the client implements the buffer protocol, move to version 2 of
the API for more efficient pread().
Another advantage the client is maintained by oVirt, and fixes are
delivered quickly in oVirt, without depending on RHEL release schedule.
Not ready yet, we have several issues:
- The client does not support "http", so the tests will fail now. This
is good since we should test with real imageio server. I will work on
better tests later.
- Need to require ovirt-imageio-client, providing the client library.
- params['rhv_direct'] is ignored, we always try direct transfer now.
- ImageioClient is not released yet. The patches are here:
https://gerrit.ovirt.org/q/topic:client+is:open+project:ovirt-imageio
- Not tested yet.
---
v2v/rhv-upload-plugin.py | 359 +++++++--------------------------------
1 file changed, 61 insertions(+), 298 deletions(-)
diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py
index 8c11012b..172da602 100644
--- a/v2v/rhv-upload-plugin.py
+++ b/v2v/rhv-upload-plugin.py
@@ -26,12 +26,17 @@ import ssl
import sys
import time
+from ovirt_imageio.client import ImageioClient
+
from http.client import HTTPSConnection, HTTPConnection
from urllib.parse import urlparse
import ovirtsdk4 as sdk
import ovirtsdk4.types as types
+# Version 2 supports the buffer protocol, improving performance.
+API_VERSION = 2
+
# Timeout to wait for oVirt disks to change status, or the transfer
# object to finish initializing [seconds].
timeout = 5 * 60
@@ -114,67 +119,61 @@ def open(readonly):
transfer = create_transfer(connection, disk, host)
try:
- destination_url = parse_transfer_url(transfer)
- http = create_http(destination_url)
- options = get_options(http, destination_url)
- http = optimize_http(http, host, options)
+ # ImageioClient uses transfer_url if possible, and proxy_url if not.
+ # TODO: How to handle params['rhv_direct']?
+ client = ImageioClient(
+ transfer.transfer_url,
+ cafile=params['rhv_cafile'],
+ secure=not params['insecure'],
+ proxy_url=transfer.proxy_url)
except:
cancel_transfer(connection, transfer)
raise
- debug("imageio features: flush=%(can_flush)r trim=%(can_trim)r "
- "zero=%(can_zero)r unix_socket=%(unix_socket)r"
- % options)
-
# Save everything we need to make requests in the handle.
return {
- 'can_flush': options['can_flush'],
- 'can_trim': options['can_trim'],
- 'can_zero': options['can_zero'],
- 'needs_auth': options['needs_auth'],
+ 'client': client,
'connection': connection,
'disk_id': disk.id,
'transfer': transfer,
'failed': False,
- 'highestwrite': 0,
- 'http': http,
- 'path': destination_url.path,
}
-@failing
def can_trim(h):
- return h['can_trim']
+ # Imageio does not support trim. Image sparsness is controled on the server
+ # side. If transfer ticket["sparse"] is True, zeroing deallocates space.
+ # Otherwise zeroing allocates space.
+ return False
-@failing
def can_flush(h):
- return h['can_flush']
+ # Imageio client can always flush.
+ return True
+
+
+def can_zero(h):
+ # Imageio client can always zero. If the server does not support zero, the client
+ # emulates it by streaming zeroes to server.
+ return True
@failing
def get_size(h):
- return params['disk_size']
+ client = h['client']
+ try:
+ return client.size()
+ except Exception as e:
+ request_failed("cannot get size", e)
# Any unexpected HTTP response status from the server will end up calling this
# function which logs the full error, and raises a RuntimeError exception.
-def request_failed(r, msg):
- status = r.status
- reason = r.reason
- try:
- body = r.read()
- except EnvironmentError as e:
- body = "(Unable to read response body: %s)" % e
-
- # Log the full error if we're verbose.
+def request_failed(msg, reason):
+ error = "%s: %s" % (msg, reason)
debug("unexpected response from imageio server:")
- debug(msg)
- debug("%d: %s" % (status, reason))
- debug(body)
-
- # Only a short error is included in the exception.
- raise RuntimeError("%s: %d %s: %r" % (msg, status, reason, body[:200]))
+ debug(error)
+ raise RuntimeError(error)
# For documentation see:
@@ -184,168 +183,46 @@ def request_failed(r, msg):
@failing
-def pread(h, count, offset):
- http = h['http']
- transfer = h['transfer']
-
- headers = {"Range": "bytes=%d-%d" % (offset, offset + count -
1)}
- if h['needs_auth']:
- headers["Authorization"] = transfer.signed_ticket
-
- http.request("GET", h['path'], headers=headers)
-
- r = http.getresponse()
- # 206 = HTTP Partial Content.
- if r.status != 206:
- request_failed(r,
- "could not read sector offset %d size %d" %
- (offset, count))
-
- return r.read()
-
-
-@failing
-def pwrite(h, buf, offset):
- http = h['http']
- transfer = h['transfer']
-
- count = len(buf)
- h['highestwrite'] = max(h['highestwrite'], offset + count)
-
- http.putrequest("PUT", h['path'] + "?flush=n")
- if h['needs_auth']:
- http.putheader("Authorization", transfer.signed_ticket)
- # The oVirt server only uses the first part of the range, and the
- # content-length.
- http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset
+ count - 1))
- http.putheader("Content-Length", str(count))
- http.endheaders()
-
+def pread(h, buf, offset, flags):
+ client = h['client']
try:
- http.send(buf)
- except BrokenPipeError:
- pass
-
- r = http.getresponse()
- if r.status != 200:
- request_failed(r,
- "could not write sector offset %d size %d" %
- (offset, count))
-
- r.read()
+ client.read(offset, buf)
+ except Exception as e:
+ request_failed(
+ "could not read offset %d size %d" % (offset, len(buf)), e)
@failing
-def zero(h, count, offset, may_trim):
- http = h['http']
-
- # Unlike the trim and flush calls, there is no 'can_zero' method
- # so nbdkit could call this even if the server doesn't support
- # zeroing. If this is the case we must emulate.
- if not h['can_zero']:
- emulate_zero(h, count, offset)
- return
-
- # Construct the JSON request for zeroing.
- buf = json.dumps({'op': "zero",
- 'offset': offset,
- 'size': count,
- 'flush': False}).encode()
-
- headers = {"Content-Type": "application/json",
- "Content-Length": str(len(buf))}
-
- http.request("PATCH", h['path'], body=buf, headers=headers)
-
- r = http.getresponse()
- if r.status != 200:
- request_failed(r,
- "could not zero sector offset %d size %d" %
- (offset, count))
-
- r.read()
-
-
-def emulate_zero(h, count, offset):
- http = h['http']
- transfer = h['transfer']
-
- # qemu-img convert starts by trying to zero/trim the whole device.
- # Since we've just created a new disk it's safe to ignore these
- # requests as long as they are smaller than the highest write seen.
- # After that we must emulate them with writes.
- if offset + count < h['highestwrite']:
- http.putrequest("PUT", h['path'])
- if h['needs_auth']:
- http.putheader("Authorization", transfer.signed_ticket)
- http.putheader("Content-Range",
- "bytes %d-%d/*" % (offset, offset + count - 1))
- http.putheader("Content-Length", str(count))
- http.endheaders()
-
- try:
- buf = bytearray(128 * 1024)
- while count > len(buf):
- http.send(buf)
- count -= len(buf)
- http.send(memoryview(buf)[:count])
- except BrokenPipeError:
- pass
-
- r = http.getresponse()
- if r.status != 200:
- request_failed(r,
- "could not write zeroes offset %d size %d" %
- (offset, count))
-
- r.read()
+def pwrite(h, buf, offset, flags):
+ client = h['client']
+ try:
+ client.write(offset, buf)
+ except Exception as e:
+ request_failed(
+ "could not write offset %d size %d" % (offset, len(buf)), e)
@failing
-def trim(h, count, offset):
- http = h['http']
-
- # Construct the JSON request for trimming.
- buf = json.dumps({'op': "trim",
- 'offset': offset,
- 'size': count,
- 'flush': False}).encode()
-
- headers = {"Content-Type": "application/json",
- "Content-Length": str(len(buf))}
-
- http.request("PATCH", h['path'], body=buf, headers=headers)
-
- r = http.getresponse()
- if r.status != 200:
- request_failed(r,
- "could not trim sector offset %d size %d" %
- (offset, count))
-
- r.read()
+def zero(h, count, offset, flags):
+ client = h['client']
+ try:
+ client.zero(offset, count)
+ except Exception as e:
+ request_failed(
+ "could not zero offset %d size %d" % (offset, count), e)
@failing
def flush(h):
- http = h['http']
-
- # Construct the JSON request for flushing.
- buf = json.dumps({'op': "flush"}).encode()
-
- headers = {"Content-Type": "application/json",
- "Content-Length": str(len(buf))}
-
- http.request("PATCH", h['path'], body=buf, headers=headers)
-
- r = http.getresponse()
- if r.status != 200:
- request_failed(r, "could not flush")
-
- r.read()
+ client = h['client']
+ try:
+ client.flush()
+ except Exception as e:
+ request_failed("could not flush", e)
def close(h):
- http = h['http']
+ client = h['client']
connection = h['connection']
transfer = h['transfer']
disk_id = h['disk_id']
@@ -356,7 +233,7 @@ def close(h):
# plugin exits.
sys.stderr.flush()
- http.close()
+ client.close()
# If the connection failed earlier ensure we cancel the transfer. Canceling
# the transfer will delete the disk.
@@ -382,24 +259,6 @@ def close(h):
connection.close()
-# Modify http.client.HTTPConnection to work over a Unix domain socket.
-# Derived from uhttplib written by Erik van Zijst under an MIT license.
-# (
https://pypi.org/project/uhttplib/)
-# Ported to Python 3 by Irit Goihman.
-
-
-class UnixHTTPConnection(HTTPConnection):
- def __init__(self, path, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
- self.path = path
- HTTPConnection.__init__(self, "localhost", timeout=timeout)
-
- def connect(self):
- self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- if self.timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
- self.sock.settimeout(timeout)
- self.sock.connect(self.path)
-
-
# oVirt SDK operations
@@ -659,99 +518,3 @@ def transfer_supports_format():
"""
sig = inspect.signature(types.ImageTransfer)
return "format" in sig.parameters
-
-
-# oVirt imageio operations
-
-
-def parse_transfer_url(transfer):
- """
- Returns a parsed transfer url, preferring direct transfer if possible.
- """
- if params['rhv_direct']:
- if transfer.transfer_url is None:
- raise RuntimeError("direct upload to host not supported, "
- "requires ovirt-engine >= 4.2 and only works
"
- "when virt-v2v is run within the oVirt/RHV "
- "environment, eg. on an oVirt node.")
- return urlparse(transfer.transfer_url)
- else:
- return urlparse(transfer.proxy_url)
-
-
-def create_http(url):
- """
- Create http connection for transfer url.
-
- Returns HTTPConnection.
- """
- if url.scheme == "https":
- context = \
- ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
- cafile=params['rhv_cafile'])
- if params['insecure']:
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
-
- return HTTPSConnection(url.hostname, url.port, context=context)
- elif url.scheme == "http":
- return HTTPConnection(url.hostname, url.port)
- else:
- raise RuntimeError("unknown URL scheme (%s)" % url.scheme)
-
-
-def get_options(http, url):
- """
- Send OPTIONS request to imageio server and return options dict.
- """
- http.request("OPTIONS", url.path)
- r = http.getresponse()
- data = r.read()
-
- if r.status == 200:
- j = json.loads(data)
- features = j["features"]
- return {
- # New imageio never used authentication.
- "needs_auth": False,
- "can_flush": "flush" in features,
- "can_trim": "trim" in features,
- "can_zero": "zero" in features,
- "unix_socket": j.get('unix_socket'),
- }
-
- elif r.status == 405 or r.status == 204:
- # Old imageio servers returned either 405 Method Not Allowed or
- # 204 No Content (with an empty body).
- return {
- # Authentication was required only when using old imageio proxy.
- # Can be removed when dropping support for oVirt < 4.2.
- "needs_auth": not params['rhv_direct'],
- "can_flush": False,
- "can_trim": False,
- "can_zero": False,
- "unix_socket": None,
- }
- else:
- raise RuntimeError("could not use OPTIONS request: %d: %s" %
- (r.status, r.reason))
-
-
-def optimize_http(http, host, options):
- """
- Return an optimized http connection using unix socket if we are connected
- to imageio server on the local host and it features a unix socket.
- """
- unix_socket = options['unix_socket']
-
- if host is not None and unix_socket is not None:
- try:
- http = UnixHTTPConnection(unix_socket)
- except Exception as e:
- # Very unlikely failure, but we can recover by using the https
- # connection.
- debug("cannot create unix socket connection, using https: %s" % e)
- else:
- debug("optimizing connection using unix socket %r" % unix_socket)
-
- return http
--
2.25.4