test(bench): ingester2 write benchmarks

Adds a single-line & batched write request benchmarks to ingester2.

When using the WAL, a floor of 10ms is added to all writes (the WAL
linger time, configurable). With the WAL disabled in code, benchmarked
writes complete in less than a millisecond:

    single row/write
        time:   [1.9138 µs 1.9168 µs 1.9197 µs]
        thrpt:  [520.92 Kelem/s 521.70 Kelem/s 522.51 Kelem/s]

    batched/write/1000
        time:   [129.15 µs 129.97 µs 131.28 µs]
        thrpt:  [7.6173 Melem/s 7.6941 Melem/s 7.7429 Melem/s]

Note these benchmarks exclude network I/O, and measure single-threaded,
synchronous write client performance.
pull/24376/head
Dom Dwyer 2023-04-12 12:38:05 +02:00
parent 02c7ec0727
commit dd92459c41
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
2 changed files with 145 additions and 0 deletions

View File

@ -75,3 +75,8 @@ name = "wal"
harness = false
# Require some internal types be made visible for benchmark code.
required-features = ["benches"]
[[bench]]
name = "write"
harness = false
# Require some internal types be made visible for benchmark code.

140
ingester2/benches/write.rs Normal file
View File

@ -0,0 +1,140 @@
use std::sync::Arc;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use data_types::{PartitionKey, Sequence, SequenceNumber, ShardIndex};
use dml::{DmlMeta, DmlWrite};
use futures::{stream::FuturesUnordered, StreamExt};
use generated_types::influxdata::{
iox::ingester::v1::write_service_server::WriteService, pbdata::v1::DatabaseBatch,
};
use influxdb_iox_client::ingester::generated_types::WriteRequest;
use ingester2::IngesterRpcInterface;
use ingester2_test_ctx::{TestContext, TestContextBuilder};
use iox_time::TimeProvider;
use mutable_batch_lp::lines_to_batches;
use mutable_batch_pb::encode::encode_write;
const TEST_NAMESPACE: &str = "bananas";
const PARTITION_KEY: &str = "platanos";
/// Return an initialised and pre-warmed ingester instance backed by a catalog
/// correctly populated to accept writes of `lp`.
async fn init(lp: impl AsRef<str>) -> (TestContext<impl IngesterRpcInterface>, DatabaseBatch) {
let lp = lp.as_ref();
let mut ctx = TestContextBuilder::default()
// Don't stop ingest during benchmarks
.with_max_persist_queue_depth(10_000_000)
.with_persist_hot_partition_cost(10_000_000_000)
.build()
.await;
// Ensure the namespace exists in the catalog.
let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await;
// Perform a write to drive table / schema population in the catalog.
ctx.write_lp(TEST_NAMESPACE, lp, PartitionKey::from(PARTITION_KEY), 42)
.await;
// Construct the write request once, and reuse it for each iteration.
let batches = lines_to_batches(lp, 0).unwrap();
// Build the TableId -> Batch map, resolving the tables IDs from the catalog
// in the process.
let batches_by_ids = batches
.into_iter()
.map(|(table_name, batch)| {
let catalog = Arc::clone(&ctx.catalog());
async move {
let id = catalog
.repositories()
.await
.tables()
.create_or_get(table_name.as_str(), ns.id)
.await
.expect("table should create OK")
.id;
(id, batch)
}
})
.collect::<FuturesUnordered<_>>()
.collect::<hashbrown::HashMap<_, _>>()
.await;
let op = DmlWrite::new(
ns.id,
batches_by_ids,
PartitionKey::from(PARTITION_KEY),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(42), SequenceNumber::new(42)),
iox_time::SystemProvider::new().now(),
None,
50,
),
);
(ctx, encode_write(ns.id.get(), &op))
}
/// Benchmark writes containing varying volumes of line protocol.
///
/// This is definitely a more "macro" benchmark than micro, as it covers the
/// entire ingester write process (RPC request handler, RPC message
/// deserialisation, WAL commit, buffering write, RPC response, etc) but does
/// not include transport overhead (measuring only the processing time, not
/// including the network read time).
///
/// Note that this benchmark covers the single threaded / uncontended case - as
/// the number of parallel writes increases, so does the lock contention on the
/// underlying buffer tree.
fn bench_write(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("failed to initialise tokio runtime for benchmark");
{
let (ctx, op) = runtime.block_on(init("bananas greatness=\"unbounded\" 42"));
let rpc = ctx.rpc();
let mut group = c.benchmark_group("single row");
group.throughput(Throughput::Elements(1));
group.bench_function("write", |b| {
b.to_async(&runtime).iter(|| {
let op = op.clone();
async move {
rpc.write_service()
.write(tonic::Request::new(WriteRequest { payload: Some(op) }))
.await
.unwrap();
}
});
});
}
{
let lp = std::fs::read_to_string("../test_fixtures/lineproto/metrics.lp").unwrap();
let line_count = lp.lines().count() as u64;
let (ctx, op) = runtime.block_on(init(lp));
let rpc = ctx.rpc();
let mut group = c.benchmark_group("batched");
group.throughput(Throughput::Elements(line_count));
group.bench_function(BenchmarkId::new("write", line_count), |b| {
b.to_async(&runtime).iter(|| {
let op = op.clone();
async move {
rpc.write_service()
.write(tonic::Request::new(WriteRequest { payload: Some(op) }))
.await
.unwrap();
}
});
});
}
}
criterion_group!(benches, bench_write);
criterion_main!(benches);