>From be039f70da0c3ece9075724bf5ff29a45038dce5 Mon Sep 17 00:00:00 2001 From: "Richard W.M. Jones" Date: Tue, 14 Oct 2014 16:38:50 +0200 Subject: [PATCH 1/2] streaming: Implement sliding window and add window=SIZE parameter. --- plugins/streaming/nbdkit-streaming-plugin.pod | 17 +- plugins/streaming/streaming.c | 290 +++++++++++++++++++++----- 2 files changed, 256 insertions(+), 51 deletions(-) diff --git a/plugins/streaming/nbdkit-streaming-plugin.pod b/plugins/streaming/nbdkit-streaming-plugin.pod index a21ed4f..635af69 100644 --- a/plugins/streaming/nbdkit-streaming-plugin.pod +++ b/plugins/streaming/nbdkit-streaming-plugin.pod @@ -6,7 +6,7 @@ nbdkit-streaming-plugin - nbdkit streaming plugin =head1 SYNOPSIS - nbdkit streaming pipe=FILENAME [size=SIZE] + nbdkit streaming pipe=FILENAME [size=SIZE] [window=SIZE] =head1 DESCRIPTION @@ -50,12 +50,23 @@ Whether you need to specify this parameter depends on the client. Some clients don't check the size and just write/stream, others do checks or calculations based on the apparent size. +=item B + +Specify a sliding window of data, allowing limited seeking backwards +and reads. You can use any size specifier permitted by +C, eg. C. + +Note that this is disabled (set to 0) by default, since enabling it +causes writes to be delayed until the client moves the window forward +or until nbdkit exits. + =back =head1 TO DO -This plugin would be much nicer if it supported the concept of a -"window" of data, allowing limited reverse seeks and reads. +Separate read and write windows would make more sense, allowing a +large read window and a small write window. The smaller (or zero) +write window would mean that writes are not delayed. =head1 SEE ALSO diff --git a/plugins/streaming/streaming.c b/plugins/streaming/streaming.c index f58fa46..2d08803 100644 --- a/plugins/streaming/streaming.c +++ b/plugins/streaming/streaming.c @@ -41,20 +41,28 @@ #include #include #include +#include #include +#define min(a,b) ((a)<(b)?(a):(b)) + static char *filename = NULL; static int fd = -1; /* In theory INT64_MAX, but it breaks qemu's NBD driver. */ static int64_t size = INT64_MAX/2; -/* Flag if we have entered the unrecoverable error state because of - * a seek backwards. +/* Flag if we have entered the unrecoverable error state because of a + * seek backwards beyond the window. */ static int errorstate = 0; +/* Window. */ +static int64_t window_max_size = 0; /* window= parameter */ +static int64_t window_size = 0; /* current size */ +static char *window = NULL; + /* Highest byte (+1) that has been written in the data stream. */ static uint64_t highestwrite = 0; @@ -73,6 +81,11 @@ streaming_config (const char *key, const char *value) if (size == -1) return -1; } + else if (strcmp (key, "window") == 0) { + window_max_size = nbdkit_parse_size (value); + if (window_max_size == -1) + return -1; + } else { nbdkit_error ("unknown parameter '%s'", key); return -1; @@ -110,18 +123,10 @@ streaming_config_complete (void) return 0; } -/* nbdkit is shutting down. */ -static void -streaming_unload (void) -{ - if (fd >= 0) - close (fd); - free (filename); -} - #define streaming_config_help \ "pipe= (required) The filename to serve.\n" \ - "size= (optional) Stream size." + "size= (optional) Stream size.\n" \ + "window= (optional) Window size." /* Create the per-connection handle. */ static void * @@ -160,13 +165,66 @@ streaming_get_size (void *handle) return size; } +static int +xwrite (int fd, const char *buf, size_t n) +{ + ssize_t r; + + while (n > 0) { + r = write (fd, buf, n); + if (r == -1) { + nbdkit_error ("write: %m"); + return -1; + } + buf += r; + n -= r; + } + return 0; +} + +static int +xwrite_zeroes (int fd, size_t n) +{ + ssize_t r; + char buf[4096]; + + memset (buf, 0, sizeof buf); + + while (n > 0) { + r = write (fd, buf, min (n, sizeof buf)); + if (r == -1) { + nbdkit_error ("write: %m"); + return -1; + } + n -= r; + } + return 0; +} + +/* +This diagram should help when trying to understand the pread and +pwrite calls below. + +Note that we recursively split read and write calls to make the cases +tractable. + + |<------- window_max_size ------->| + |<---- window_size ----->| + +------------------------+------------------------+--------+---------- + ^ ^ ^ ^ + 0 windowstart highestwrite maxwindow + + */ + /* Write data to the stream. */ static int streaming_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) { - size_t n; - ssize_t r; + int r; + uint64_t windowstart; + uint64_t maxwindow; + int64_t delta; if (errorstate) { nbdkit_error ("unrecoverable error state"); @@ -174,63 +232,199 @@ streaming_pwrite (void *handle, const void *buf, return -1; } - if (offset < highestwrite) { - nbdkit_error ("client tried to seek backwards and write: the streaming plugin does not currently support this"); + /* This just makes the recursive case easier to reason about. */ + if (count == 0) + return 0; + + windowstart = highestwrite - window_size; + + if (offset < windowstart) { + nbdkit_error ("client seeked backwards > window size: you must increase the window size"); errorstate = 1; errno = EIO; return -1; } - /* Need to write some zeroes. */ - if (offset > highestwrite) { - int64_t size = offset - highestwrite; - char buf[4096]; - - memset (buf, 0, sizeof buf); - - while (size > 0) { - n = size > sizeof buf ? sizeof buf : size; - r = write (fd, buf, n); - if (r == -1) { - nbdkit_error ("write: %m"); - errorstate = 1; - return -1; - } - highestwrite += r; - size -= r; - } + /* Split writes across highestwrite and maxwindow boundaries. + * Splitting here means we do not have to deal with writes across + * the boundary in the code below. + */ + if (offset < highestwrite && offset + count > highestwrite) { + uint64_t size = highestwrite - offset; + + r = streaming_pwrite (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pwrite (handle, buf, count, offset); + } + + maxwindow = windowstart + window_max_size; + + if (offset < maxwindow && offset + count > maxwindow) { + uint64_t size = maxwindow - offset; + + r = streaming_pwrite (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pwrite (handle, buf, count, offset); + } + + /* Handle a write entirely within the current window. */ + if (offset < highestwrite) { + uint64_t windowoffset = window_size - (highestwrite - offset); + memcpy (&window[windowoffset], buf, count); + return 0; } - /* Write the data. */ - while (count > 0) { - r = write (fd, buf, count); - if (r == -1) { - nbdkit_error ("write: %m"); - errorstate = 1; + /* A write after highestwrite but not larger than maxwindow causes + * the window to be extended but not moved. + */ + if (offset < maxwindow) { + uint64_t new_highestwrite = offset + count; + uint64_t new_size = new_highestwrite - windowstart; + char *new_window; + + new_window = realloc (window, new_size); + if (new_window == NULL) { + nbdkit_error ("realloc: %m"); return -1; } - buf += r; - highestwrite += r; - count -= r; + window = new_window; + /* Make sure the extended window is zeroes to start with. */ + memset (&window[window_size], 0, new_size - window_size); + highestwrite = new_highestwrite; + /* Copy the buffer to the new window. */ + memcpy (&window[offset - windowstart], buf, count); + return 0; + } + + /* Split writes after maxwindow at highestwrite + window_max_size. */ + if (offset < highestwrite + window_max_size && + offset + count > highestwrite + window_max_size) { + uint64_t size = highestwrite + window_max_size - offset; + + r = streaming_pwrite (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pwrite (handle, buf, count, offset); + } + + /* Any write here is going to cause the window to move. Delta is + * the amount by which the window will move (NB: might be greater + * than the window size). + */ + delta = offset + count - highestwrite; + + if (delta <= window_size) { + /* Write out the oldest part of the window. */ + if (xwrite (fd, window, delta) == -1) + return -1; + + /* Move the data in the window down. */ + memmove (window, window + delta, window_size - delta); + + /* Copy the buffer to the new window. */ + memcpy (window + window_size - count, buf, count); + highestwrite += delta; + return 0; } - return 0; + /* The window will move by more than a single window size. Write out + * the whole of the old window, then write zeroes, then continue the + * write. + */ + if (xwrite (fd, window, window_size) == -1) + return -1; + memset (window, 0, window_size); + + if (xwrite_zeroes (fd, delta - window_size) == -1) + return -1; + + highestwrite += delta - window_size; + + return streaming_pwrite (handle, buf, count, offset); } /* Read data back from the stream. */ static int streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset) { + uint64_t windowstart; + int r; + if (errorstate) { nbdkit_error ("unrecoverable error state"); errno = EIO; return -1; } - nbdkit_error ("client tried to read: the streaming plugin does not currently support this"); - errorstate = 1; - errno = EIO; - return -1; + /* This just makes the recursive case easier to reason about. */ + if (count == 0) + return 0; + + windowstart = highestwrite - window_size; + + if (offset < windowstart) { + nbdkit_error ("client seeked backwards > window size: you must increase the window size"); + errorstate = 1; + errno = EIO; + return -1; + } + + /* Split reads across highestwrite boundary. Splitting here means + * we do not have to deal with reads across the boundary in the code + * below. + */ + if (offset < highestwrite && offset + count > highestwrite) { + uint64_t size = highestwrite - offset; + + r = streaming_pread (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pread (handle, buf, count, offset); + } + + /* Handle a read entirely within the window by simply reading the + * window contents. + */ + if (offset < highestwrite) { + uint64_t windowoffset = window_size - (highestwrite - offset); + memcpy (buf, &window[windowoffset], count); + return 0; + } + + /* Else any read ahead of the current highest write is returned as + * all zeroes. + */ + memset (buf, 0, count); + return 0; +} + +/* nbdkit is shutting down - the rest of the window should be written out. */ +static void +streaming_unload (void) +{ + if (fd >= 0) { + /* XXX impossible to report an error to the client here */ + xwrite (fd, window, window_size); + + close (fd); + } + + free (window); + free (filename); } static struct nbdkit_plugin plugin = { -- 2.0.4