The original plan was to have a background thread doing the reclaim.
However that cannot work given the design of filters, because a
background thread cannot access the next_ops struct which is only
available during requests.
Therefore we spread the work over the request threads.  Each blk_*
function checks whether there is work to do, and if there is will
reclaim up to two blocks from the cache (thus ensuring that we always
make forward progress, since each blk_* function can only add at most
one block to the cache).
---
 filters/cache/nbdkit-cache-filter.pod |  50 ++++++++
 filters/cache/cache.h                 |  13 ++
 filters/cache/blk.c                   | 166 ++++++++++++++++++++++++++
 filters/cache/cache.c                 |  65 ++++++++++
 filters/cache/lru.c                   |  14 ++-
 TODO                                  |  11 +-
 tests/Makefile.am                     |   4 +-
 tests/test-cache-max-size.sh          |  96 +++++++++++++++
 8 files changed, 404 insertions(+), 15 deletions(-)
diff --git a/filters/cache/nbdkit-cache-filter.pod
b/filters/cache/nbdkit-cache-filter.pod
index 09dc39f..ae2fb42 100644
--- a/filters/cache/nbdkit-cache-filter.pod
+++ b/filters/cache/nbdkit-cache-filter.pod
@@ -5,6 +5,9 @@ nbdkit-cache-filter - nbdkit caching filter
 =head1 SYNOPSIS
 
  nbdkit --filter=cache plugin [cache=writeback|writethrough|unsafe]
+                              [cache-max-size=SIZE]
+                              [cache-high-threshold=N]
+                              [cache-low-threshold=N]
                               [cache-on-read=true|false]
                               [plugin-args...]
 
@@ -51,6 +54,14 @@ This is dangerous and can cause data loss, but this may be acceptable
 if you only use it for testing or with data that you don't care about
 or can cheaply reconstruct.
 
+=item B<cache-max-size=SIZE>
+
+=item B<cache-high-threshold=N>
+
+=item B<cache-low-threshold=N>
+
+Limit the size of the cache to C<SIZE>.  See L</CACHE MAXIMUM SIZE> below.
+
 =item B<cache-on-read=true>
 
 Cache read requests as well as write requests.  Any time a block is
@@ -69,6 +80,45 @@ Do not cache read requests (this is the default).
 
 =back
 
+=head1 CACHE MAXIMUM SIZE
+
+By default the cache can grow to any size (although not larger than
+the virtual size of the underlying plugin) and you have to ensure
+there is sufficient space in C<$TMPDIR> for it.
+
+Using the parameters C<cache-max-size>, C<cache-high-threshold> and
+C<cache-low-threshold> you can limit the maximum size of the cache.
+
+This requires kernel and filesystem support (for L<fallocate(2)>
+C<FALLOC_FL_PUNCH_HOLE>), so it may not work on all platforms.
+
+Some examples:
+
+=over 4
+
+=item C<cache-max-size=1G>
+
+The cache is limited to around 1 gigabyte.
+
+=item C<cache-max-size=1G cache-high-threshold=95 cache-low-threshold=80>
+
+Once the cache size reaches 972M (95% of 1G), blocks are reclaimed
+from the cache until the size is reduced to 819M (80% of 1G).
+
+=back
+
+The way this works is once the size of the cache exceeds
+S<C<SIZE> ✕ the high threshold>, the filter works to reduce the size
+of the cache until it is less than S<C<SIZE> ✕ the low threshold>.
+Once the size is below the low threshold, no more reclaim work is done
+until the size exceeds the high threshold again.
+
+The default thresholds are high 95% and low 80%.  You must set
+S<0 E<lt> low E<lt> high>.  The thresholds are expressed as integer
+percentages of C<cache-max-size>.
+
+Least recently used blocks are discarded first.
+
 =head1 ENVIRONMENT VARIABLES
 
 =over 4
diff --git a/filters/cache/cache.h b/filters/cache/cache.h
index 50416b1..233a4fa 100644
--- a/filters/cache/cache.h
+++ b/filters/cache/cache.h
@@ -34,6 +34,8 @@
 #ifndef NBDKIT_CACHE_H
 #define NBDKIT_CACHE_H
 
+#include <fcntl.h>
+
 /* Size of a block in the cache.  A 4K block size means that we need
  * 64 MB of memory to store the bitmaps for a 1 TB underlying image.
  * It is also smaller than the usual hole size for sparse files, which
@@ -48,7 +50,18 @@ extern enum cache_mode {
   CACHE_MODE_UNSAFE,
 } cache_mode;
 
+/* Maximum size of the cache and high/low thresholds. */
+extern int64_t max_size;
+extern int hi_thresh, lo_thresh;
+
 /* Cache read requests. */
 extern bool cache_on_read;
 
+/* Do we support reclaiming cache blocks? */
+#ifdef FALLOC_FL_PUNCH_HOLE
+#define HAVE_CACHE_RECLAIM 1
+#else
+#undef HAVE_CACHE_RECLAIM
+#endif
+
 #endif /* NBDKIT_CACHE_H */
diff --git a/filters/cache/blk.c b/filters/cache/blk.c
index b256446..294f0cb 100644
--- a/filters/cache/blk.c
+++ b/filters/cache/blk.c
@@ -46,6 +46,8 @@
 #include <unistd.h>
 #include <fcntl.h>
 #include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
 
 #include <nbdkit-filter.h>
 
@@ -74,6 +76,34 @@ enum bm_entry {
   BLOCK_DIRTY = 3,
 };
 
+#ifdef HAVE_CACHE_RECLAIM
+/* If we are currently reclaiming blocks from the cache.
+ *
+ * The state machine starts in the NOT_RECLAIMING state.  When the
+ * size of the cache exceeds the high threshold, we move to
+ * RECLAIMING_LRU.  Once we have exhausted all LRU blocks, we move to
+ * RECLAIMING_ANY (reclaiming any blocks).
+ *
+ * If at any time the size of the cache goes below the low threshold
+ * we move back to the NOT_RECLAIMING state.
+ *
+ * A possible future enhancement is to add an extra state between LRU
+ * and ANY which reclaims blocks from bm[1].
+ *
+ * reclaim_blk is the last block that we looked at.
+ */
+enum reclaim_state {
+  NOT_RECLAIMING = 0,
+  RECLAIMING_LRU = 1,
+  RECLAIMING_ANY = 2,
+};
+
+static enum reclaim_state reclaiming = NOT_RECLAIMING;
+static uint64_t reclaim_blk;
+#endif
+
+static void reclaim (void);
+
 int
 blk_init (void)
 {
@@ -146,6 +176,8 @@ blk_read (struct nbdkit_next_ops *next_ops, void *nxdata,
   off_t offset = blknum * BLKSIZE;
   enum bm_entry state = bitmap_get_blk (&bm, blknum, BLOCK_NOT_CACHED);
 
+  reclaim ();
+
   nbdkit_debug ("cache: blk_read block %" PRIu64 " (offset %" PRIu64
") is %s",
                 blknum, (uint64_t) offset,
                 state == BLOCK_NOT_CACHED ? "not cached" :
@@ -193,6 +225,8 @@ blk_writethrough (struct nbdkit_next_ops *next_ops, void *nxdata,
 {
   off_t offset = blknum * BLKSIZE;
 
+  reclaim ();
+
   nbdkit_debug ("cache: writethrough block %" PRIu64 " (offset %"
PRIu64 ")",
                 blknum, (uint64_t) offset);
 
@@ -224,6 +258,8 @@ blk_write (struct nbdkit_next_ops *next_ops, void *nxdata,
 
   offset = blknum * BLKSIZE;
 
+  reclaim ();
+
   nbdkit_debug ("cache: writeback block %" PRIu64 " (offset %" PRIu64
")",
                 blknum, (uint64_t) offset);
 
@@ -254,3 +290,133 @@ for_each_dirty_block (block_callback f, void *vp)
 
   return 0;
 }
+
+#ifdef HAVE_CACHE_RECLAIM
+
+static void reclaim_one (void);
+static void reclaim_lru (void);
+static void reclaim_any (void);
+static void reclaim_block (void);
+
+/* See if we need to reclaim blocks. */
+static void
+reclaim (void)
+{
+  struct stat statbuf;
+  uint64_t cache_allocated;
+
+  /* If the user didn't set cache-max-size, do nothing. */
+  if (max_size == -1) return;
+
+  /* Check the allocated size of the cache. */
+  if (fstat (fd, &statbuf) == -1) {
+    nbdkit_debug ("cache: fstat: %m");
+    return;
+  }
+  cache_allocated = statbuf.st_blocks * UINT64_C(512);
+
+  if (reclaiming) {
+    /* Keep reclaiming until the cache size drops below the low threshold. */
+    if (cache_allocated < max_size * lo_thresh / 100) {
+      nbdkit_debug ("cache: stop reclaiming");
+      reclaiming = NOT_RECLAIMING;
+      return;
+    }
+  }
+  else {
+    if (cache_allocated < max_size * hi_thresh / 100)
+      return;
+
+    /* Start reclaiming if the cache size goes over the high threshold. */
+    nbdkit_debug ("cache: start reclaiming");
+    reclaiming = RECLAIMING_LRU;
+  }
+
+  /* Reclaim up to 2 cache blocks. */
+  reclaim_one ();
+  reclaim_one ();
+}
+
+/* Reclaim a single cache block. */
+static void
+reclaim_one (void)
+{
+  assert (reclaiming);
+
+  if (reclaiming == RECLAIMING_LRU)
+    reclaim_lru ();
+  else
+    reclaim_any ();
+}
+
+static void
+reclaim_lru (void)
+{
+  uint64_t old_reclaim_blk;
+
+  /* Find the next block in the cache. */
+  reclaim_blk = bitmap_next (&bm, reclaim_blk+1);
+  old_reclaim_blk = reclaim_blk;
+
+  /* Search for an LRU block after this one. */
+  do {
+    if (! lru_has_been_recently_accessed (reclaim_blk)) {
+      reclaim_block ();
+      return;
+    }
+
+    reclaim_blk = bitmap_next (&bm, reclaim_blk+1);
+    if (reclaim_blk == -1)    /* wrap around */
+      reclaim_blk = bitmap_next (&bm, 0);
+  } while (reclaim_blk >= 0 && old_reclaim_blk != reclaim_blk);
+
+  if (old_reclaim_blk == reclaim_blk) {
+    /* Run out of LRU blocks, so start reclaiming any block in the cache. */
+    nbdkit_debug ("cache: reclaiming any blocks");
+    reclaiming = RECLAIMING_ANY;
+    reclaim_one ();
+    return;
+  }
+}
+
+static void
+reclaim_any (void)
+{
+  /* Find the next block in the cache. */
+  reclaim_blk = bitmap_next (&bm, reclaim_blk+1);
+  if (reclaim_blk == -1)        /* wrap around */
+    reclaim_blk = bitmap_next (&bm, 0);
+
+  reclaim_block ();
+}
+
+static void
+reclaim_block (void)
+{
+  if (reclaim_blk == -1) {
+    nbdkit_debug ("cache: run out of blocks to reclaim!");
+    return;
+  }
+
+  nbdkit_debug ("cache: reclaiming block %" PRIu64, reclaim_blk);
+#ifdef FALLOC_FL_PUNCH_HOLE
+  if (fallocate (fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
+                 reclaim_blk * BLKSIZE, BLKSIZE) == -1) {
+    nbdkit_error ("cache: reclaiming cache blocks: "
+                  "fallocate: FALLOC_FL_PUNCH_HOLE: %m");
+    return;
+  }
+#else
+#error "no implementation for punching holes"
+#endif
+
+  bitmap_set_blk (&bm, reclaim_blk, BLOCK_NOT_CACHED);
+}
+
+#else /* !HAVE_CACHE_RECLAIM */
+static void
+reclaim (void)
+{
+  /* nothing */
+}
+#endif /* !HAVE_CACHE_RECLAIM */
diff --git a/filters/cache/cache.c b/filters/cache/cache.c
index a14f789..0b8bcdf 100644
--- a/filters/cache/cache.c
+++ b/filters/cache/cache.c
@@ -65,6 +65,8 @@
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 
 enum cache_mode cache_mode = CACHE_MODE_WRITEBACK;
+int64_t max_size = -1;
+int hi_thresh = 95, lo_thresh = 80;
 bool cache_on_read = false;
 
 static int cache_flush (struct nbdkit_next_ops *next_ops, void *nxdata, void *handle,
uint32_t flags, int *err);
@@ -105,6 +107,53 @@ cache_config (nbdkit_next_config *next, void *nxdata,
       return -1;
     }
   }
+#ifdef HAVE_CACHE_RECLAIM
+  else if (strcmp (key, "cache-max-size") == 0) {
+    int64_t r;
+
+    r = nbdkit_parse_size (value);
+    if (r == -1)
+      return -1;
+    /* We set a lower limit for the cache size just to keep out of
+     * trouble.
+     */
+    if (r < 1024*1024) {
+      nbdkit_error ("cache-max-size is too small");
+      return -1;
+    }
+    max_size = r;
+    return 0;
+  }
+  else if (strcmp (key, "cache-high-threshold") == 0) {
+    if (sscanf (value, "%d", &hi_thresh) != 1) {
+      nbdkit_error ("invalid cache-high-threshold parameter: %s", value);
+      return -1;
+    }
+    if (hi_thresh <= 0) {
+      nbdkit_error ("cache-high-threshold must be greater than zero");
+      return -1;
+    }
+    return 0;
+  }
+  else if (strcmp (key, "cache-low-threshold") == 0) {
+    if (sscanf (value, "%d", &lo_thresh) != 1) {
+      nbdkit_error ("invalid cache-low-threshold parameter: %s", value);
+      return -1;
+    }
+    if (lo_thresh <= 0) {
+      nbdkit_error ("cache-low-threshold must be greater than zero");
+      return -1;
+    }
+    return 0;
+  }
+#else /* !HAVE_CACHE_RECLAIM */
+  else if (strcmp (key, "cache-max-size") == 0 ||
+           strcmp (key, "cache-high-threshold") == 0 ||
+           strcmp (key, "cache-low-threshold") == 0) {
+    nbdkit_error ("this platform does not support cache reclaim");
+    return -1;
+  }
+#endif /* !HAVE_CACHE_RECLAIM */
   else if (strcmp (key, "cache-on-read") == 0) {
     int r;
 
@@ -119,6 +168,21 @@ cache_config (nbdkit_next_config *next, void *nxdata,
   }
 }
 
+static int
+cache_config_complete (nbdkit_next_config_complete *next, void *nxdata)
+{
+  /* If cache-max-size was set then check the thresholds. */
+  if (max_size != -1) {
+    if (lo_thresh >= hi_thresh) {
+      nbdkit_error ("cache-low-threshold must be "
+                    "less than cache-high-threshold");
+      return -1;
+    }
+  }
+
+  return next (nxdata);
+}
+
 static void *
 cache_open (nbdkit_next_open *next, void *nxdata, int readonly)
 {
@@ -416,6 +480,7 @@ static struct nbdkit_filter filter = {
   .load              = cache_load,
   .unload            = cache_unload,
   .config            = cache_config,
+  .config_complete   = cache_config_complete,
   .open              = cache_open,
   .prepare           = cache_prepare,
   .get_size          = cache_get_size,
diff --git a/filters/cache/lru.c b/filters/cache/lru.c
index 60cf379..2f379d1 100644
--- a/filters/cache/lru.c
+++ b/filters/cache/lru.c
@@ -69,9 +69,10 @@
  *
  * To do this we keep two bitmaps.
  *
- * When a block is accessed, we set the corresponding bit in bm[0] and
- * increment c0 (c0 counts the number of bits set in bm[0]).  If c0 ==
- * N/2 then we swap the two bitmaps, clear bm[0], and reset c0 to 0.
+ * When a new block is accessed, we set the corresponding bit in bm[0]
+ * and increment c0 (c0 counts the number of bits set in bm[0]).  If
+ * c0 == N/2 then we swap the two bitmaps, clear bm[0], and reset c0
+ * to 0.
  *
  * To check if a block has been accessed within the previous N
  * distinct accesses, we simply have to check both bitmaps.  If it is
@@ -109,8 +110,11 @@ lru_set_size (uint64_t new_size)
   if (bitmap_resize (&bm[1], new_size) == -1)
     return -1;
 
-  /* XXX Choose this better. */
-  N = MAX (new_size / BLKSIZE / 4, 100);
+  if (max_size != -1)
+    /* Make the threshold about 1/4 the maximum size of the cache. */
+    N = MAX (max_size / BLKSIZE / 4, 100);
+  else
+    N = MAX (new_size / BLKSIZE / 4, 100);
 
   return 0;
 }
diff --git a/TODO b/TODO
index 9d6e727..7107ea9 100644
--- a/TODO
+++ b/TODO
@@ -100,15 +100,8 @@ Suggestions for filters
 * masking plugin features for testing clients (see 'nozero' and 'fua'
   filters for examples)
 
-nbdkit-cache-filter needs considerable work:
-
-* allow the user to limit the total size of the cache
-
-* handle ENOSPC errors
-
-* implement some cache replacement policies
-
-* some sort of background task or thread to write out dirty blocks
+* nbdkit-cache-filter should handle ENOSPC errors automatically by
+  reclaiming blocks from the cache
 
 Composing nbdkit
 ----------------
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 03cd206..1e6049f 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -42,6 +42,7 @@ EXTRA_DIST = \
 	shebang.rb \
 	test-blocksize.sh \
 	test-cache.sh \
+	test-cache-max-size.sh \
 	test-cache-on-read.sh \
 	test-captive.sh \
 	test-cow.sh \
@@ -709,7 +710,8 @@ TESTS += test-blocksize.sh
 if HAVE_GUESTFISH
 TESTS += \
 	test-cache.sh \
-	test-cache-on-read.sh
+	test-cache-on-read.sh \
+	test-cache-max-size.sh
 endif HAVE_GUESTFISH
 
 # cow filter test.
diff --git a/tests/test-cache-max-size.sh b/tests/test-cache-max-size.sh
new file mode 100755
index 0000000..9f2fc88
--- /dev/null
+++ b/tests/test-cache-max-size.sh
@@ -0,0 +1,96 @@
+#!/usr/bin/env bash
+# nbdkit
+# Copyright (C) 2018 Red Hat Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of Red Hat nor the names of its contributors may be
+# used to endorse or promote products derived from this software without
+# specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+
+source ./functions.sh
+set -e
+set -x
+
+# Check that this is a Linux-like system supporting /proc/pid/fd.
+if ! test -d /proc/self/fd; then
+    echo "$0: not a Linux-like system supporting /proc/\$pid/fd"
+    exit 77
+fi
+
+# Test that qemu-io works
+if ! qemu-io --help >/dev/null; then
+    echo "$0: missing or broken qemu-io"
+    exit 77
+fi
+
+# Need the stat command from coreutils.
+if ! stat --version >/dev/null; then
+    echo "$0: missing or broken stat command"
+    exit 77
+fi
+
+d=cache-max-size.d
+rm -rf $d
+mkdir -p $d
+cleanup_fn rm -rf $d
+
+# Create a cache directory.
+mkdir $d/cache
+TMPDIR=$d/cache
+export TMPDIR
+
+# Create an empty base image.
+truncate -s 1G $d/cache-max-size.img
+
+# Run nbdkit with the caching filter and a low size limit to ensure
+# that the reclaim code is exercised.
+start_nbdkit -P $d/cache-max-size.pid -U $d/cache-max-size.sock \
+             --filter=cache \
+             file $d/cache-max-size.img \
+             cache-max-size=10M cache-on-read=true
+
+# Write > 10M to the plugin.
+qemu-io -f raw "nbd+unix://?socket=$d/cache-max-size.sock" \
+        -c "w -P 0 0 10M" \
+        -c "w -P 0 20M 10M" \
+        -c "r 20M 10M" \
+        -c "r 30M 10M" \
+        -c "w -P 0 40M 10M"
+
+# We can't use ‘du’ on the cache directory because the cache file is
+# deleted by the filter, and so is only accessible via /proc/$pid/fd.
+# Get the /proc link to the cache file, and the size of it in bytes.
+fddir="/proc/$( cat $d/cache-max-size.pid )/fd"
+ls -l $fddir
+fd="$fddir/$( ls -l $fddir | grep $TMPDIR | head -1 | awk '{print $9}'
)"
+stat -L $fd
+size=$(( $(stat -L -c '%b * %B' $fd) ))
+
+if [ "$size" -gt $(( 11 * 1024 * 1024 )) ]; then
+    echo "$0: cache size is larger than 10M (actual size: $size bytes)"
+    exit 1
+fi
-- 
2.19.2