When locking the http pool, we wait until all connections are idle,
and
take them from the pool. But since we used pool.qsize(), which is the
number of items currently in the queue, we did not wait for all
connections.
This leads to following issues:
- We send flush request only for some connections, which does not ensure
that all uploaded data is flushed to storage.
- We close only some of the connections in cleanup(). This should not
matter since the connections are closed when the plugin process
terminates.
An example import showing sending only one FLUSH request instead of 4:
https://bugzilla.redhat.com/2032324#c8
Fixed by creating a bounded queue and using pool.maxsize to get all the
connections from the pool.
---
output/rhv-upload-plugin.py | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py
index 1cb837dd..bad0e8a3 100644
--- a/output/rhv-upload-plugin.py
+++ b/output/rhv-upload-plugin.py
@@ -307,30 +307,30 @@ class UnixHTTPConnection(HTTPConnection):
def connect(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if self.timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
self.sock.settimeout(timeout)
self.sock.connect(self.path)
# Connection pool.
def create_http_pool(url, options):
- pool = queue.Queue()
-
count = min(options["max_readers"],
options["max_writers"],
MAX_CONNECTIONS)
nbdkit.debug("creating http pool connections=%d" % count)
unix_socket = options["unix_socket"] if is_ovirt_host else None
+ pool = queue.Queue(count)
+
for i in range(count):
http = create_http(url, unix_socket=unix_socket)
pool.put(http)
return pool
@contextmanager
def http_context(pool):
"""
@@ -347,22 +347,22 @@ def http_context(pool):
def iter_http_pool(pool):
"""
Wait until all inflight requests are done, and iterate on imageio
connections.
The pool is empty during iteration. New requests issued during iteration
will block until iteration is done.
"""
locked = []
- # Lock the pool by taking the connection out.
- while len(locked) < pool.qsize():
+ # Lock the pool by taking all connections out.
+ while len(locked) < pool.maxsize:
locked.append(pool.get())
try:
for http in locked:
yield http
finally:
# Unlock the pool by puting the connection back.
for http in locked:
pool.put(http)
@@ -371,21 +371,21 @@ def close_http_pool(pool):
"""
Wait until all inflight requests are done, close all connections and remove
them from the pool.
No request can be served by the pool after this call.
"""
nbdkit.debug("closing http pool")
locked = []
- while len(locked) < pool.qsize():
+ while len(locked) < pool.maxsize:
locked.append(pool.get())
for http in locked:
http.close()
def create_http(url, unix_socket=None):
"""
Create http connection for transfer url.