From dd92459c41ce8090a0a2a3119342ededc4b78b48 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 12 Apr 2023 12:38:05 +0200 Subject: [PATCH] test(bench): ingester2 write benchmarks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a single-line & batched write request benchmarks to ingester2. When using the WAL, a floor of 10ms is added to all writes (the WAL linger time, configurable). With the WAL disabled in code, benchmarked writes complete in less than a millisecond: single row/write time: [1.9138 µs 1.9168 µs 1.9197 µs] thrpt: [520.92 Kelem/s 521.70 Kelem/s 522.51 Kelem/s] batched/write/1000 time: [129.15 µs 129.97 µs 131.28 µs] thrpt: [7.6173 Melem/s 7.6941 Melem/s 7.7429 Melem/s] Note these benchmarks exclude network I/O, and measure single-threaded, synchronous write client performance. --- ingester2/Cargo.toml | 5 ++ ingester2/benches/write.rs | 140 +++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 ingester2/benches/write.rs diff --git a/ingester2/Cargo.toml b/ingester2/Cargo.toml index 2feba2bba7..179d51aba3 100644 --- a/ingester2/Cargo.toml +++ b/ingester2/Cargo.toml @@ -75,3 +75,8 @@ name = "wal" harness = false # Require some internal types be made visible for benchmark code. required-features = ["benches"] + +[[bench]] +name = "write" +harness = false + # Require some internal types be made visible for benchmark code. diff --git a/ingester2/benches/write.rs b/ingester2/benches/write.rs new file mode 100644 index 0000000000..76e482d698 --- /dev/null +++ b/ingester2/benches/write.rs @@ -0,0 +1,140 @@ +use std::sync::Arc; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use data_types::{PartitionKey, Sequence, SequenceNumber, ShardIndex}; +use dml::{DmlMeta, DmlWrite}; +use futures::{stream::FuturesUnordered, StreamExt}; +use generated_types::influxdata::{ + iox::ingester::v1::write_service_server::WriteService, pbdata::v1::DatabaseBatch, +}; +use influxdb_iox_client::ingester::generated_types::WriteRequest; +use ingester2::IngesterRpcInterface; +use ingester2_test_ctx::{TestContext, TestContextBuilder}; +use iox_time::TimeProvider; +use mutable_batch_lp::lines_to_batches; +use mutable_batch_pb::encode::encode_write; + +const TEST_NAMESPACE: &str = "bananas"; +const PARTITION_KEY: &str = "platanos"; + +/// Return an initialised and pre-warmed ingester instance backed by a catalog +/// correctly populated to accept writes of `lp`. +async fn init(lp: impl AsRef) -> (TestContext, DatabaseBatch) { + let lp = lp.as_ref(); + + let mut ctx = TestContextBuilder::default() + // Don't stop ingest during benchmarks + .with_max_persist_queue_depth(10_000_000) + .with_persist_hot_partition_cost(10_000_000_000) + .build() + .await; + + // Ensure the namespace exists in the catalog. + let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await; + + // Perform a write to drive table / schema population in the catalog. + ctx.write_lp(TEST_NAMESPACE, lp, PartitionKey::from(PARTITION_KEY), 42) + .await; + + // Construct the write request once, and reuse it for each iteration. + let batches = lines_to_batches(lp, 0).unwrap(); + + // Build the TableId -> Batch map, resolving the tables IDs from the catalog + // in the process. + let batches_by_ids = batches + .into_iter() + .map(|(table_name, batch)| { + let catalog = Arc::clone(&ctx.catalog()); + async move { + let id = catalog + .repositories() + .await + .tables() + .create_or_get(table_name.as_str(), ns.id) + .await + .expect("table should create OK") + .id; + + (id, batch) + } + }) + .collect::>() + .collect::>() + .await; + + let op = DmlWrite::new( + ns.id, + batches_by_ids, + PartitionKey::from(PARTITION_KEY), + DmlMeta::sequenced( + Sequence::new(ShardIndex::new(42), SequenceNumber::new(42)), + iox_time::SystemProvider::new().now(), + None, + 50, + ), + ); + + (ctx, encode_write(ns.id.get(), &op)) +} + +/// Benchmark writes containing varying volumes of line protocol. +/// +/// This is definitely a more "macro" benchmark than micro, as it covers the +/// entire ingester write process (RPC request handler, RPC message +/// deserialisation, WAL commit, buffering write, RPC response, etc) but does +/// not include transport overhead (measuring only the processing time, not +/// including the network read time). +/// +/// Note that this benchmark covers the single threaded / uncontended case - as +/// the number of parallel writes increases, so does the lock contention on the +/// underlying buffer tree. +fn bench_write(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to initialise tokio runtime for benchmark"); + + { + let (ctx, op) = runtime.block_on(init("bananas greatness=\"unbounded\" 42")); + let rpc = ctx.rpc(); + + let mut group = c.benchmark_group("single row"); + group.throughput(Throughput::Elements(1)); + group.bench_function("write", |b| { + b.to_async(&runtime).iter(|| { + let op = op.clone(); + async move { + rpc.write_service() + .write(tonic::Request::new(WriteRequest { payload: Some(op) })) + .await + .unwrap(); + } + }); + }); + } + + { + let lp = std::fs::read_to_string("../test_fixtures/lineproto/metrics.lp").unwrap(); + let line_count = lp.lines().count() as u64; + + let (ctx, op) = runtime.block_on(init(lp)); + let rpc = ctx.rpc(); + + let mut group = c.benchmark_group("batched"); + group.throughput(Throughput::Elements(line_count)); + group.bench_function(BenchmarkId::new("write", line_count), |b| { + b.to_async(&runtime).iter(|| { + let op = op.clone(); + async move { + rpc.write_service() + .write(tonic::Request::new(WriteRequest { payload: Some(op) })) + .await + .unwrap(); + } + }); + }); + } +} + +criterion_group!(benches, bench_write); +criterion_main!(benches);