On Tue, Jan 4, 2022 at 6:02 PM Richard W.M. Jones <rjones(a)redhat.com> wrote:
On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote:
> When importing from vddk, nbdcopy may be blocked for few minutes(!)
> trying to get extents. While nbdcopy is blocked, imageio server closes
> the idle connections. When we finally get a request from nbdcopy, we
> fail to detect that the connection was closed.
>
> Detecting a closed connection is hard and racy. In the good case, we get
> a BrokenPipe error. In the bad case, imageio closed the socket right
> after we sent a request, and we get an invalid status line. When using
> imageio proxy, we may get http error (e.g. 500) if the proxy connection
> to imageio server on the host was closed.
>
> Even worse, when we find that the connection was closed, it is not safe
> to reopen the connection, since qemu-nbd does not ensure yet that data
> written to the previous connection will be flushed when we flush the new
> connection.
>
> Fix the issue by keeping the connections alive. A pool keeper thread
> sends a flush request on idle connection every ~30 seconds. This also
> improves data integrity and efficiency, using idle time to flush written
> data.
>
> Fixes
https://bugzilla.redhat.com/2032324
Ideally imageio would not just time out after such a short time when a
client has connections open. (Do we actually hold the TCP-level
connection open during this time?)
Is there a no-op ping-like request that we can send?
We don't have no-op request, but OPTIONS can be used
for that. It is has tiny json response that can be droped when
you use it as a "ping".
Using options can simplify the change, since we don't have to
report errors in OPTIONS, they are not critical.
If TCP-level
connection is open, can we enable TCP keepalives?
We can but I think the timeouts are too long, and it is not the right way
to keep a connection open. You want to do this in the application level,
this way you verify the entire stack on each ping.
Rich.
> output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++
> 1 file changed, 71 insertions(+)
>
> diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py
> index 8d088c4e..172da358 100644
> --- a/output/rhv-upload-plugin.py
> +++ b/output/rhv-upload-plugin.py
> @@ -13,50 +13,60 @@
> # GNU General Public License for more details.
> #
> # You should have received a copy of the GNU General Public License along
> # with this program; if not, write to the Free Software Foundation, Inc.,
> # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
>
> import json
> import queue
> import socket
> import ssl
> +import threading
> import time
>
> from contextlib import contextmanager
> from http.client import HTTPSConnection, HTTPConnection
> from urllib.parse import urlparse
>
> import nbdkit
>
> # Using version 2 supporting the buffer protocol for better performance.
> API_VERSION = 2
>
> # Maximum number of connection to imageio server. Based on testing with imageio
> # client, this give best performance.
> MAX_CONNECTIONS = 4
>
> +# Maximum idle time allowed for imageio connections.
> +IDLE_TIMEOUT = 30
> +
> # Required parameters.
> size = None
> url = None
>
> # Optional parameters.
> cafile = None
> insecure = False
> is_ovirt_host = False
>
> # List of options read from imageio server.
> options = None
>
> # Pool of HTTP connections.
> pool = None
>
> +# Set when plugin is cleaning up.
> +done = threading.Event()
> +
> +# Set when periodic flush request fails.
> +pool_error = None
> +
>
> # Parse parameters.
> def config(key, value):
> global cafile, url, is_ovirt_host, insecure, size
>
> if key == "cafile":
> cafile = value
> elif key == "insecure":
> insecure = value.lower() in ['true', '1']
> elif key == "is_ovirt_host":
> @@ -84,25 +94,31 @@ def after_fork():
> options = get_options(http, url)
> http.close()
>
> nbdkit.debug("imageio features: flush=%(can_flush)r "
> "zero=%(can_zero)r unix_socket=%(unix_socket)r "
> "max_readers=%(max_readers)r
max_writers=%(max_writers)r"
> % options)
>
> pool = create_http_pool(url, options)
>
> + t = threading.Thread(target=pool_keeper, name="poolkeeper")
> + t.daemon = True
> + t.start()
> +
>
> # This function is not actually defined before nbdkit 1.28, but it
> # doesn't particularly matter if we don't close the pool because
> # clients should call flush().
> def cleanup():
> + nbdkit.debug("cleaning up")
> + done.set()
> close_http_pool(pool)
>
>
> def thread_model():
> """
> Using parallel model to speed up transfer with multiple connections to
> imageio server.
> """
> return nbdkit.THREAD_MODEL_PARALLEL
>
> @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags):
> r = http.getresponse()
> if r.status != 200:
> request_failed(r,
> "could not write zeroes offset %d size %d" %
> (offset, count))
>
> r.read()
>
>
> def flush(h, flags):
> + if pool_error:
> + raise pool_error
> +
> # Wait until all inflight requests are completed, and send a flush
> # request for all imageio connections.
> locked = []
>
> # Lock the pool by taking all connections out.
> while len(locked) < pool.maxsize:
> locked.append(pool.get())
>
> try:
> for item in locked:
> @@ -348,26 +367,78 @@ def create_http_pool(url, options):
>
> pool = queue.Queue(count)
>
> for i in range(count):
> http = create_http(url, unix_socket=unix_socket)
> pool.put(PoolItem(http))
>
> return pool
>
>
> +def pool_keeper():
> + """
> + Thread flushing idle connections, keeping them alive.
> +
> + If a connection does not send any request for 60 seconds, imageio
> + server closes the connection. Recovering from closed connection is
> + hard and unsafe, so this thread ensure that connections never
> + becomes idle by sending a flush request if the connection is idle
> + for too much time.
> +
> + In normal conditions, all connections are busy most of the time, so
> + the keeper will find no idle connections. If there short delays in
> + nbdcopy, the keeper will find some idle connections, but will
> + quickly return them back to the pool. In the pathological case when
> + nbdcopy is blocked for 3 minutes on vddk input, the keeper will send
> + a flush request on all connections every ~30 seconds, until nbdcopy
> + starts communicating again.
> + """
> + global pool_error
> +
> + nbdkit.debug("pool keeper started")
> +
> + while not done.wait(IDLE_TIMEOUT / 2):
> + idle = []
> +
> + while True:
> + try:
> + idle.append(pool.get_nowait())
> + except queue.Empty:
> + break
> +
> + if idle:
> + now = time.monotonic()
> + for item in idle:
> + if item.last_used and now - item.last_used > IDLE_TIMEOUT:
> + nbdkit.debug("Flushing idle connection")
> + try:
> + send_flush(item.http)
> + item.last_used = now
> + except Exception as e:
> + # We will report this error on the next request.
> + pool_error = e
> + item.last_used = None
> +
> + pool.put(item)
> +
> + nbdkit.debug("pool keeper stopped")
> +
> +
> @contextmanager
> def http_context(pool):
> """
> Context manager yielding an imageio http connection from the pool. Blocks
> until a connection is available.
> """
> + if pool_error:
> + raise pool_error
> +
> item = pool.get()
> try:
> yield item.http
> finally:
> item.last_used = time.monotonic()
> pool.put(item)
>
>
> def close_http_pool(pool):
> """
> --
> 2.33.1
--
Richard Jones, Virtualization Group, Red Hat
http://people.redhat.com/~rjones
Read my programming and virtualization blog:
http://rwmj.wordpress.com
virt-df lists disk usage of guests without needing to install any
software inside the virtual machine. Supports Linux and Windows.
http://people.redhat.com/~rjones/virt-df/