This patch adds an example using the asynchronous Rust bindings.
---
rust/Cargo.toml | 1 +
rust/examples/concurrent-read-write.rs | 149 +++++++++++++++++++++++++
rust/run-tests.sh.in | 2 +
3 files changed, 152 insertions(+)
create mode 100644 rust/examples/concurrent-read-write.rs
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index c49f9f2..0879b34 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -58,5 +58,6 @@ default = ["log", "tokio"]
anyhow = "1.0.72"
once_cell = "1.18.0"
pretty-hex = "0.3.0"
+rand = { version = "0.8.5", default-features = false, features =
["small_rng", "min_const_gen"] }
tempfile = "3.6.0"
tokio = { version = "1.29.1", default-features = false, features =
["rt-multi-thread", "macros"] }
diff --git a/rust/examples/concurrent-read-write.rs
b/rust/examples/concurrent-read-write.rs
new file mode 100644
index 0000000..4858f76
--- /dev/null
+++ b/rust/examples/concurrent-read-write.rs
@@ -0,0 +1,149 @@
+//! Example usage with nbdkit:
+//! nbdkit -U - memory 100M \
+//! --run 'cargo run --example concurrent-read-write -- $unixsocket'
+//! Or connect over a URI:
+//! nbdkit -U - memory 100M \
+//! --run 'cargo run --example concurrent-read-write -- $uri'
+//!
+//! This will read and write randomly over the first megabyte of the
+//! plugin using multi-conn, multiple threads and multiple requests in
+//! flight on each thread.
+
+#![deny(warnings)]
+use rand::prelude::*;
+use std::env;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Number of simultaneous connections to the NBD server.
+///
+/// Note that some servers only support a limited number of
+/// simultaneous connections, and/or have a configurable thread pool
+/// internally, and if you exceed those limits then something will break.
+const NR_MULTI_CONN: usize = 8;
+
+/// Number of commands that can be "in flight" at the same time on each
+/// connection. (Therefore the total number of requests in flight may
+/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
+const MAX_IN_FLIGHT: usize = 16;
+
+/// The size of large reads and writes, must be > 512.
+const BUFFER_SIZE: usize = 1024;
+
+/// Number of commands we issue (per [task][tokio::task]).
+const NR_CYCLES: usize = 32;
+
+/// Statistics gathered during the run.
+#[derive(Debug, Default)]
+struct Stats {
+ /// The total number of requests made.
+ requests: usize,
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+ let args = env::args_os().collect::<Vec<_>>();
+ if args.len() != 2 {
+ anyhow::bail!("Usage: {:?} socket", args[0]);
+ }
+
+ // We begin by making a connection to the server to get the export size
+ // and ensure that it supports multiple connections and is writable.
+ let nbd = libnbd::Handle::new()?;
+
+ // Check if the user provided a URI or a unix socket.
+ let socket_or_uri = args[1].to_str().unwrap();
+ if socket_or_uri.contains("://") {
+ nbd.connect_uri(socket_or_uri)?;
+ } else {
+ nbd.connect_unix(socket_or_uri)?;
+ }
+
+ let export_size = nbd.get_size()?;
+ anyhow::ensure!(
+ (BUFFER_SIZE as u64) < export_size,
+ "export is {export_size}B, must be larger than {BUFFER_SIZE}B"
+ );
+ anyhow::ensure!(
+ !nbd.is_read_only()?,
+ "error: this NBD export is read-only"
+ );
+ anyhow::ensure!(
+ nbd.can_multi_conn()?,
+ "error: this NBD export does not support multi-conn"
+ );
+ drop(nbd); // Close the connection.
+
+ // Start the worker tasks, one per connection.
+ let mut tasks = JoinSet::new();
+ for i in 0..NR_MULTI_CONN {
+ tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size));
+ }
+
+ // Wait for the tasks to complete.
+ let mut stats = Stats::default();
+ while !tasks.is_empty() {
+ let this_stats = tasks.join_next().await.unwrap().unwrap()?;
+ stats.requests += this_stats.requests;
+ }
+
+ // Make sure the number of requests that were required matches what
+ // we expect.
+ assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
+
+ Ok(())
+}
+
+async fn run_thread(
+ task_idx: usize,
+ socket_or_uri: String,
+ export_size: u64,
+) -> anyhow::Result<Stats> {
+ // Start a new connection to the server.
+ // We shall spawn many commands concurrently on different tasks and those
+ // futures must be `'static`, hence we wrap the handle in an [Arc].
+ let nbd = Arc::new(libnbd::AsyncHandle::new()?);
+
+ // Check if the user provided a URI or a unix socket.
+ if socket_or_uri.contains("://") {
+ nbd.connect_uri(socket_or_uri).await?;
+ } else {
+ nbd.connect_unix(socket_or_uri).await?;
+ }
+
+ let mut rng = SmallRng::seed_from_u64(44 as u64);
+
+ // Issue commands.
+ let mut stats = Stats::default();
+ let mut join_set = JoinSet::new();
+ //tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+ while stats.requests < NR_CYCLES || !join_set.is_empty() {
+ while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT
{
+ // If we want to issue another request, do so. Note that we reuse
+ // the same buffer for multiple in-flight requests. It doesn't
+ // matter here because we're just trying to write random stuff,
+ // but that would be Very Bad in a real application.
+ // Simulate a mix of large and small requests.
+ let size = if rng.gen() { BUFFER_SIZE } else { 512 };
+ let offset = rng.gen_range(0..export_size - size as u64);
+
+ let mut buf = [0u8; BUFFER_SIZE];
+ let nbd = nbd.clone();
+ if rng.gen() {
+ join_set.spawn(async move {
+ nbd.pread(&mut buf, offset, None).await
+ });
+ } else {
+ // Fill the buf with random data.
+ rng.fill(&mut buf);
+ join_set
+ .spawn(async move { nbd.pwrite(&buf, offset, None).await });
+ }
+ stats.requests += 1;
+ }
+ join_set.join_next().await.unwrap().unwrap()?;
+ }
+
+ if task_idx == 0 {}
+ Ok(stats)
+}
diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in
index 3ebf9a1..661c018 100755
--- a/rust/run-tests.sh.in
+++ b/rust/run-tests.sh.in
@@ -32,6 +32,8 @@ if [ -z "$VG" ]; then
--run '@CARGO@ run --example get-size -- $unixsocket'
@NBDKIT@ -U - floppy . \
--run '@CARGO@ run --example fetch-first-sector -- $unixsocket'
+ @NBDKIT@ -U - memory 10M \
+ --run '@CARGO@ run --example concurrent-read-write -- $unixsocket'
else
@CARGO@ test --config "target.'cfg(all())'.runner =
\"$VG\"" -- --nocapture
fi
--
2.42.0