On 8/15/2023 11:00 AM, Richard W.M. Jones wrote:
On Thu, Aug 10, 2023 at 11:24:36AM +0000, Tage Johansson wrote:
> This patch adds an example using the asynchronous Rust bindings.
> ---
> rust/Cargo.toml | 1 +
> rust/examples/concurrent-read-write.rs | 135 +++++++++++++++++++++++++
> rust/run-tests.sh.in | 2 +
> 3 files changed, 138 insertions(+)
> create mode 100644 rust/examples/concurrent-read-write.rs
>
> diff --git a/rust/Cargo.toml b/rust/Cargo.toml
> index d001248..4332783 100644
> --- a/rust/Cargo.toml
> +++ b/rust/Cargo.toml
> @@ -54,5 +54,6 @@ default = ["log", "tokio"]
> anyhow = "1.0.72"
> once_cell = "1.18.0"
> pretty-hex = "0.3.0"
> +rand = { version = "0.8.5", default-features = false, features =
["small_rng", "min_const_gen"] }
> tempfile = "3.6.0"
> tokio = { version = "1.29.1", default-features = false, features =
["rt-multi-thread", "macros"] }
> diff --git a/rust/examples/concurrent-read-write.rs
b/rust/examples/concurrent-read-write.rs
> new file mode 100644
> index 0000000..a1c3e8a
> --- /dev/null
> +++ b/rust/examples/concurrent-read-write.rs
> @@ -0,0 +1,135 @@
> +//! Example usage with nbdkit:
> +//!
> +//! nbdkit -U - memory 100M \
> +//! --run 'cargo run --example concurrent-read-write -- $unixsocket'
It would be nice to make the example use a URI (ie. nbd_connect_uri).
Is that possible?
Yes, of course.
> +//! This will read and write randomly over the first megabyte of
the
> +//! plugin using multi-conn, multiple threads and multiple requests in
> +//! flight on each thread.
> +
> +#![deny(warnings)]
> +use rand::prelude::*;
> +use std::env;
> +use std::path::PathBuf;
> +use std::sync::Arc;
> +use tokio::task::JoinSet;
> +
> +/// Number of simultaneous connections to the NBD server.
> +///
> +/// Note that some servers only support a limited number of
> +/// simultaneous connections, and/or have a configurable thread pool
> +/// internally, and if you exceed those limits then something will break.
> +const NR_MULTI_CONN: usize = 8;
> +
> +/// Number of commands that can be "in flight" at the same time on each
> +/// connection. (Therefore the total number of requests in flight may
> +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
> +const MAX_IN_FLIGHT: usize = 16;
> +
> +/// The size of large reads and writes, must be > 512.
> +const BUFFER_SIZE: usize = 1024;
> +
> +/// Number of commands we issue (per [task][tokio::task]).
> +const NR_CYCLES: usize = 32;
> +
> +/// Statistics gathered during the run.
> +#[derive(Debug, Default)]
> +struct Stats {
> + /// The total number of requests made.
> + requests: usize,
> +}
> +
> +#[tokio::main]
> +async fn main() -> anyhow::Result<()> {
> + let args = env::args_os().collect::<Vec<_>>();
> + if args.len() != 2 {
> + anyhow::bail!("Usage: {:?} socket", args[0]);
> + }
> + let socket = &args[1];
> +
> + // We begin by making a connection to the server to get the export size
> + // and ensure that it supports multiple connections and is writable.
> + let nbd = libnbd::Handle::new()?;
> + nbd.connect_unix(&socket)?;
ie. here ^^^
> + let export_size = nbd.get_size()?;
> + anyhow::ensure!(
> + (BUFFER_SIZE as u64) < export_size,
> + "export is {export_size}B, must be larger than {BUFFER_SIZE}B"
> + );
> + anyhow::ensure!(
> + !nbd.is_read_only()?,
> + "error: this NBD export is read-only"
> + );
> + anyhow::ensure!(
> + nbd.can_multi_conn()?,
> + "error: this NBD export does not support multi-conn"
> + );
> + drop(nbd); // Close the connection.
> +
> + // Start the worker tasks, one per connection.
> + let mut tasks = JoinSet::new();
> + for i in 0..NR_MULTI_CONN {
> + tasks.spawn(run_thread(i, socket.clone().into(), export_size));
> + }
> +
> + // Wait for the tasks to complete.
> + let mut stats = Stats::default();
> + while !tasks.is_empty() {
> + let this_stats = tasks.join_next().await.unwrap().unwrap()?;
> + stats.requests += this_stats.requests;
> + }
> +
> + // Make sure the number of requests that were required matches what
> + // we expect.
> + assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
> +
> + Ok(())
> +}
> +
> +async fn run_thread(
> + task_idx: usize,
> + socket: PathBuf,
> + export_size: u64,
> +) -> anyhow::Result<Stats> {
> + // Start a new connection to the server.
> + // We shall spawn many commands concurrently on different tasks and those
> + // futures must be `'static`, hence we wrap the handle in an [Arc].
> + let nbd = Arc::new(libnbd::AsyncHandle::new()?);
> + nbd.connect_unix(socket).await?;
> +
> + let mut rng = SmallRng::seed_from_u64(44 as u64);
> +
> + // Issue commands.
> + let mut stats = Stats::default();
> + let mut join_set = JoinSet::new();
> + //tokio::time::sleep(std::time::Duration::from_secs(1)).await;
> + while stats.requests < NR_CYCLES || !join_set.is_empty() {
> + while stats.requests < NR_CYCLES && join_set.len() <
MAX_IN_FLIGHT {
> + // If we want to issue another request, do so. Note that we reuse
> + // the same buffer for multiple in-flight requests. It doesn't
> + // matter here because we're just trying to write random stuff,
> + // but that would be Very Bad in a real application.
> + // Simulate a mix of large and small requests.
> + let size = if rng.gen() { BUFFER_SIZE } else { 512 };
> + let offset = rng.gen_range(0..export_size - size as u64);
> +
> + let mut buf = [0u8; BUFFER_SIZE];
> + let nbd = nbd.clone();
> + if rng.gen() {
> + join_set.spawn(async move {
> + nbd.pread(&mut buf, offset, None).await
> + });
> + } else {
> + // Fill the buf with random data.
> + rng.fill(&mut buf);
> + join_set
> + .spawn(async move { nbd.pwrite(&buf, offset, None).await
});
> + }
> + stats.requests += 1;
> + }
> + join_set.join_next().await.unwrap().unwrap()?;
> + }
> +
> + if task_idx == 0 {}
> + Ok(stats)
> +}
> diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in
> index 3ebf9a1..661c018 100755
> --- a/rust/run-tests.sh.in
> +++ b/rust/run-tests.sh.in
> @@ -32,6 +32,8 @@ if [ -z "$VG" ]; then
> --run '@CARGO@ run --example get-size -- $unixsocket'
> @NBDKIT@ -U - floppy . \
> --run '@CARGO@ run --example fetch-first-sector --
$unixsocket'
> + @NBDKIT@ -U - memory 10M \
> + --run '@CARGO@ run --example concurrent-read-write --
$unixsocket'
I didn't spot that the other examples could only use a unix socket,
rather than a URI, but it'd be nice to allow them to use URIs too.
I will add that in a separate patch in the next version of the patch series.
I guess that we still use nbdkit with a unix socket when testing the
examples in run-tests.sh, right?
Best regards,
Tage
For this commit:
Reviewed-by: Richard W.M. Jones <rjones(a)redhat.com>
Rich.
> else
> @CARGO@ test --config "target.'cfg(all())'.runner =
\"$VG\"" -- --nocapture
> fi
> --
> 2.41.0