Track the last time a connection was used. This will be used to detect
idle connections.
---
output/rhv-upload-plugin.py | 30 ++++++++++++++++++++----------
1 file changed, 20 insertions(+), 10 deletions(-)
diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py
index f7e5950f..8d088c4e 100644
--- a/output/rhv-upload-plugin.py
+++ b/output/rhv-upload-plugin.py
@@ -13,20 +13,21 @@
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import json
import queue
import socket
import ssl
+import time
from contextlib import contextmanager
from http.client import HTTPSConnection, HTTPConnection
from urllib.parse import urlparse
import nbdkit
# Using version 2 supporting the buffer protocol for better performance.
API_VERSION = 2
@@ -280,26 +281,27 @@ def emulate_zero(h, count, offset, flags):
def flush(h, flags):
# Wait until all inflight requests are completed, and send a flush
# request for all imageio connections.
locked = []
# Lock the pool by taking all connections out.
while len(locked) < pool.maxsize:
locked.append(pool.get())
try:
- for http in locked:
- send_flush(http)
+ for item in locked:
+ send_flush(item.http)
+ item.last_used = time.monotonic()
finally:
# Unlock the pool by puting the connection back.
- for http in locked:
- pool.put(http)
+ for item in locked:
+ pool.put(item)
def send_flush(http):
# Construct the JSON request for flushing.
buf = json.dumps({'op': "flush"}).encode()
headers = {"Content-Type": "application/json",
"Content-Length": str(len(buf))}
http.request("PATCH", url.path, body=buf, headers=headers)
@@ -320,68 +322,76 @@ class UnixHTTPConnection(HTTPConnection):
self.path = path
HTTPConnection.__init__(self, "localhost", timeout=timeout)
def connect(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if self.timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
self.sock.settimeout(timeout)
self.sock.connect(self.path)
+class PoolItem:
+
+ def __init__(self, http):
+ self.http = http
+ self.last_used = None
+
+
# Connection pool.
def create_http_pool(url, options):
count = min(options["max_readers"],
options["max_writers"],
MAX_CONNECTIONS)
nbdkit.debug("creating http pool connections=%d" % count)
unix_socket = options["unix_socket"] if is_ovirt_host else None
pool = queue.Queue(count)
for i in range(count):
http = create_http(url, unix_socket=unix_socket)
- pool.put(http)
+ pool.put(PoolItem(http))
return pool
@contextmanager
def http_context(pool):
"""
Context manager yielding an imageio http connection from the pool. Blocks
until a connection is available.
"""
- http = pool.get()
+ item = pool.get()
try:
- yield http
+ yield item.http
finally:
- pool.put(http)
+ item.last_used = time.monotonic()
+ pool.put(item)
def close_http_pool(pool):
"""
Wait until all inflight requests are done, close all connections and remove
them from the pool.
No request can be served by the pool after this call.
"""
nbdkit.debug("closing http pool")
locked = []
while len(locked) < pool.maxsize:
locked.append(pool.get())
- for http in locked:
- http.close()
+ for item in locked:
+ item.http.close()
def create_http(url, unix_socket=None):
"""
Create http connection for transfer url.
Returns HTTPConnection.
"""
if unix_socket:
nbdkit.debug("creating unix http connection socket=%r" % unix_socket)
--
2.33.1