It is necessary to kick the state machine any time another thread
blocks due to write() encountering a full buffer, to get the reader
thread to learn that it must now poll on POLLOUT rather than just
POLLIN. POSIX only provides two ways to do this: either poll() on a
second fd (our pipe-to-self trick), or interrupt the polling with
EINTR due to delivery of a signal. So, I played with the latter
approach by running the reader thread with SIGUSR1 blocked except
during the poll, where other threads change their kick action from
writing into the pipe-to-self to instead calling pthread_kill() to
force EINTR.
Unfortunately, using only poll() for this is racy; to fix the race, we
either have to resort to POSIX pselect() (which is horribly klunky and
does not scale well to large fd values if we have lots of parallel
clients) or use the non-POSIX ppoll() (these days, BSD and Linux
systems all have it; if someone complains about failure to compile, we
can #ifdef the code and fall back to pipe-to-self on the systems
lacking ppoll()).
Also unfortunately, the timing does not favor this approach. Repeating
the setup for commit e897ed70, I see the following changes:
Pre-patch, the runs averaged 17.232s, 9.76E+09 bits/s
Post-patch, the runs averaged 17.993s, 9.34E+09 bits/s
I'm posting this in case something changes in the future (for example,
I envision that libnbd might add a callback function for performing a
kick such that we signal the reader thread ONLY when we actually
blocked on write(), rather than after every single request sent to the
server), but will not be applying it for now.
Signed-off-by: Eric Blake <eblake(a)redhat.com>
---
plugins/nbd/nbd.c | 86 +++++++++++++++++++++++++++++------------------
1 file changed, 53 insertions(+), 33 deletions(-)
diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
index 941e4cb7..02b8119a 100644
--- a/plugins/nbd/nbd.c
+++ b/plugins/nbd/nbd.c
@@ -45,6 +45,7 @@
#include <pthread.h>
#include <semaphore.h>
#include <poll.h>
+#include <signal.h>
#include <libnbd.h>
@@ -67,7 +68,6 @@ struct handle {
/* These fields are read-only once initialized */
struct nbd_handle *nbd;
int fd; /* Cache of nbd_aio_get_fd */
- int fds[2]; /* Pipe for kicking the reader thread */
bool readonly;
pthread_t reader;
@@ -105,6 +105,15 @@ static char *tls_psk;
static struct handle *nbdplug_open_handle (int readonly);
static void nbdplug_close_handle (struct handle *h);
+/* Original signal mask, with SIGUSR1 unblocked */
+static sigset_t origmask;
+
+/* No-op signal handler for interrupting ppoll on SIGUSR1 */
+static void
+nbdplug_sigusr1 (int sig)
+{
+}
+
static void
nbdplug_unload (void)
{
@@ -195,10 +204,30 @@ nbdplug_config (const char *key, const char *value)
return 0;
}
-/* Check the user passed exactly one socket description. */
+/* Finalize the configuration. */
static int
nbdplug_config_complete (void)
{
+ struct sigaction act = {
+ .sa_handler = nbdplug_sigusr1,
+ .sa_flags = SA_RESTART,
+ };
+
+ /* Register a no-op SIGUSR1 handler for use with ppoll */
+ if ((errno = pthread_sigmask (SIG_SETMASK, NULL, &origmask))) {
+ nbdkit_error ("pthread_sigmask: %m");
+ return -1;
+ }
+ if (sigismember (&origmask, SIGUSR1)) {
+ nbdkit_error ("SIGUSR1 should not be blocked");
+ return -1;
+ }
+ if (sigaction (SIGUSR1, &act, NULL) == -1) {
+ nbdkit_error ("sigaction: %m");
+ return -1;
+ }
+
+ /* Check the user passed exactly one socket description. */
if (sockname) {
struct sockaddr_un sock;
@@ -306,37 +335,27 @@ nbdplug_reader (void *handle)
int r;
while (!nbd_aio_is_dead (h->nbd) && !nbd_aio_is_closed (h->nbd)) {
- struct pollfd fds[2] = {
- [0].fd = h->fd,
- [1].fd = h->fds[0],
- [1].events = POLLIN,
- };
+ struct pollfd fd;
struct transaction *trans, **prev;
int dir;
- char c;
+ fd.fd = h->fd;
dir = nbd_aio_get_direction (h->nbd);
nbdkit_debug ("polling, dir=%d", dir);
if (dir & LIBNBD_AIO_DIRECTION_READ)
- fds[0].events |= POLLIN;
+ fd.events |= POLLIN;
if (dir & LIBNBD_AIO_DIRECTION_WRITE)
- fds[0].events |= POLLOUT;
- if (poll (fds, 2, -1) == -1) {
- nbdkit_error ("poll: %m");
+ fd.events |= POLLOUT;
+ if (ppoll (&fd, 1, NULL, &origmask) == -1 && errno != EINTR) {
+ nbdkit_error ("ppoll: %m");
break;
}
- if (dir & LIBNBD_AIO_DIRECTION_READ && fds[0].revents & POLLIN)
+ if (dir & LIBNBD_AIO_DIRECTION_READ && fd.revents & POLLIN)
nbd_aio_notify_read (h->nbd);
- else if (dir & LIBNBD_AIO_DIRECTION_WRITE && fds[0].revents &
POLLOUT)
+ else if (dir & LIBNBD_AIO_DIRECTION_WRITE && fd.revents & POLLOUT)
nbd_aio_notify_write (h->nbd);
- /* Check if we were kicked because a command was started */
- if (fds[1].revents & POLLIN && read (h->fds[0], &c, 1) != 1) {
- nbdkit_error ("failed to read pipe: %m");
- break;
- }
-
ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
trans = h->trans;
prev = &h->trans;
@@ -400,7 +419,6 @@ static struct transaction *
nbdplug_register (struct handle *h, int64_t cookie)
{
struct transaction *trans;
- char c = 0;
if (cookie == -1) {
nbdkit_error ("command failed: %s", nbd_get_error ());
@@ -417,8 +435,8 @@ nbdplug_register (struct handle *h, int64_t cookie)
/* While locked, kick the reader thread and add our transaction */
ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
- if (write (h->fds[1], &c, 1) != 1) {
- nbdkit_error ("write to pipe: %m");
+ if ((errno = pthread_kill (h->reader, SIGUSR1))) {
+ nbdkit_error ("pthread_kill: %m");
free (trans);
return NULL;
}
@@ -466,17 +484,13 @@ nbdplug_open_handle (int readonly)
struct handle *h;
int r;
unsigned long retries = retry;
+ sigset_t blocked;
h = calloc (1, sizeof *h);
if (h == NULL) {
nbdkit_error ("malloc: %m");
return NULL;
}
- if (pipe (h->fds)) {
- nbdkit_error ("pipe: %m");
- free (h);
- return NULL;
- }
retry:
h->fd = -1;
@@ -520,22 +534,30 @@ nbdplug_open_handle (int readonly)
if (readonly)
h->readonly = true;
- /* Spawn a dedicated reader thread */
+ /* Spawn a dedicated reader thread with SIGUSR1 blocked */
if ((errno = pthread_mutex_init (&h->trans_lock, NULL))) {
nbdkit_error ("failed to initialize transaction mutex: %m");
goto err;
}
+ if (sigemptyset (&blocked) == -1 ||
+ sigaddset (&blocked, SIGUSR1) == -1)
+ assert (false);
+ if ((errno = pthread_sigmask (SIG_BLOCK, &blocked, NULL))) {
+ nbdkit_error ("failed to set signal mask: %m");
+ pthread_mutex_destroy (&h->trans_lock);
+ goto err;
+ }
if ((errno = pthread_create (&h->reader, NULL, nbdplug_reader, h))) {
nbdkit_error ("failed to initialize reader thread: %m");
pthread_mutex_destroy (&h->trans_lock);
goto err;
}
+ if (pthread_sigmask (SIG_SETMASK, &origmask, NULL))
+ assert (false);
return h;
err:
- close (h->fds[0]);
- close (h->fds[1]);
nbdkit_error ("failure while creating nbd handle: %s", nbd_get_error ());
if (h->nbd)
nbd_close (h->nbd);
@@ -560,8 +582,6 @@ nbdplug_close_handle (struct handle *h)
nbdkit_debug ("failed to clean up handle: %s", nbd_get_error ());
if ((errno = pthread_join (h->reader, NULL)))
nbdkit_debug ("failed to join reader thread: %m");
- close (h->fds[0]);
- close (h->fds[1]);
nbd_close (h->nbd);
pthread_mutex_destroy (&h->trans_lock);
free (h);
--
2.20.1