diff --git a/ingester/benches/query.rs b/ingester/benches/query.rs index 2c9fb5e6f6..f699460f54 100644 --- a/ingester/benches/query.rs +++ b/ingester/benches/query.rs @@ -1,8 +1,17 @@ +use arrow::datatypes::DataType; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use data_types::{NamespaceId, PartitionKey, TableId}; +use data_types::{ + partition_template::NamespacePartitionTemplateOverride, NamespaceId, PartitionKey, TableId, +}; +use datafusion::{ + prelude::{col, lit}, + scalar::ScalarValue, +}; +use influxdb_iox_client::table::generated_types::{PartitionTemplate, TemplatePart}; use ingester::IngesterRpcInterface; use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest; use ingester_test_ctx::{TestContext, TestContextBuilder}; +use predicate::Predicate; use std::{fmt::Write, sync::Arc, time::Instant}; use tokio::sync::Barrier; @@ -37,7 +46,7 @@ async fn init( .await; // Ensure the namespace exists in the catalog. - let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await; + let ns = ctx.ensure_namespace(TEST_NAMESPACE, None, None).await; // Write the test data ctx.write_lp( @@ -197,5 +206,124 @@ fn run_concurrent_bench( ); } -criterion_group!(benches, bench_query, bench_query_concurrent); +/// Benchmark executing a query against a table that is configured to allow +/// pruning based on tag values, varying the number of rows in the partitions, +/// and the number of partitions pruned (always selecting 1 partition). +fn bench_partition_pruning(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to initialise tokio runtime for benchmark"); + + static ROW_COUNTS: &[usize] = &[10, 10_000, 100_000]; + static PRUNED_PARTITIONS: &[usize] = &[0, 100, 200, 500]; + + for rows in ROW_COUNTS { + for partition_count in PRUNED_PARTITIONS { + // plus one, so we're pruning the specified number, and returning 1 + // partition. + let partition_count = partition_count + 1; + + run_partition_prune_bench(*rows, partition_count, &runtime, c); + } + } +} + +fn run_partition_prune_bench( + rows: usize, + partition_count: usize, + runtime: &tokio::runtime::Runtime, + c: &mut Criterion, +) { + // Initialise the ingester configured with this partition template. + let (ctx, namespace_id, table_id) = runtime.block_on(partition_init(rows, partition_count)); + + let predicate = Some( + Predicate::new() + .with_expr(col("platanos").eq(lit(ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::from("0")), // Always query for partition with platanos=0 + )))) + .try_into() + .unwrap(), + ); + + let mut group = c.benchmark_group("partition_prune"); + group.throughput(Throughput::Elements(1)); // Queries per second + group.bench_function( + format!("rows_{rows}/prune_{v}_partitions", v = partition_count - 1), + |b| { + let ctx = &ctx; + let predicate = &predicate; + b.to_async(runtime).iter(|| async move { + ctx.query(IngesterQueryRequest { + namespace_id: namespace_id.get(), + table_id: table_id.get(), + columns: vec![], + predicate: predicate.clone(), + }) + .await + .expect("query request failed"); + }); + }, + ); +} + +/// A specialised init function that creates many partitions. +async fn partition_init( + rows: usize, + partition_count: usize, +) -> (TestContext, NamespaceId, TableId) { + // Partition based on the tag value "platanos" + let partition_template = NamespacePartitionTemplateOverride::try_from(PartitionTemplate { + parts: vec![TemplatePart { + part: Some(influxdb_iox_client::table::generated_types::Part::TagValue( + "platanos".to_string(), + )), + }], + }) + .expect("valid template"); + + let mut ctx = TestContextBuilder::default() + // Don't stop ingest during benchmarks + .with_max_persist_queue_depth(10_000_000) + .with_persist_hot_partition_cost(10_000_000_000) + .build() + .await; + + // Ensure the namespace exists in the catalog. + let ns = ctx + .ensure_namespace(TEST_NAMESPACE, None, Some(partition_template)) + .await; + + // Generate LP that contains a tag "platanos" and a integer value - always + // query for partition 0. + let mut lp = String::new(); + for p in 0..partition_count { + lp.clear(); + for i in 0..rows { + writeln!(lp, "bananas,platanos={p} v={i} 42").unwrap(); + } + + ctx.write_lp( + TEST_NAMESPACE, + &lp, + PartitionKey::from(p.to_string()), + 42, + None, + ) + .await; + } + + let table_id = ctx.table_id(TEST_NAMESPACE, "bananas").await; + + (ctx, ns.id, table_id) +} + +criterion_group!( + benches, + bench_query, + bench_query_concurrent, + bench_partition_pruning +); criterion_main!(benches); diff --git a/ingester/benches/write.rs b/ingester/benches/write.rs index ac310d58c5..ac2ba77d88 100644 --- a/ingester/benches/write.rs +++ b/ingester/benches/write.rs @@ -33,7 +33,7 @@ async fn init(lp: impl AsRef) -> (TestContext, D .await; // Ensure the namespace exists in the catalog. - let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await; + let ns = ctx.ensure_namespace(TEST_NAMESPACE, None, None).await; // Perform a write to drive table / schema population in the catalog. ctx.write_lp( diff --git a/ingester/tests/query.rs b/ingester/tests/query.rs index dcf3dc3faf..74035f8283 100644 --- a/ingester/tests/query.rs +++ b/ingester/tests/query.rs @@ -9,7 +9,7 @@ use metric::{DurationHistogram, U64Histogram}; async fn write_query() { let namespace_name = "write_query_test_namespace"; let mut ctx = TestContextBuilder::default().build().await; - let ns = ctx.ensure_namespace(namespace_name, None).await; + let ns = ctx.ensure_namespace(namespace_name, None, None).await; // Initial write let partition_key = PartitionKey::from("1970-01-01"); @@ -94,7 +94,7 @@ async fn write_query() { async fn write_query_projection() { let namespace_name = "write_query_test_namespace"; let mut ctx = TestContextBuilder::default().build().await; - let ns = ctx.ensure_namespace(namespace_name, None).await; + let ns = ctx.ensure_namespace(namespace_name, None, None).await; // Initial write let partition_key = PartitionKey::from("1970-01-01"); diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index 35135f50ac..380b5dbf7c 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -23,7 +23,7 @@ use trace::{ctx::SpanContext, RingBufferTraceCollector}; async fn write_persist() { let namespace_name = "write_query_test_namespace"; let mut ctx = TestContextBuilder::default().build().await; - let ns = ctx.ensure_namespace(namespace_name, None).await; + let ns = ctx.ensure_namespace(namespace_name, None, None).await; let partition_key = PartitionKey::from("1970-01-01"); ctx.write_lp( @@ -191,7 +191,7 @@ async fn wal_replay() { .build() .await; - let ns = ctx.ensure_namespace(namespace_name, None).await; + let ns = ctx.ensure_namespace(namespace_name, None, None).await; // Initial write let partition_key = PartitionKey::from("1970-01-01"); @@ -285,7 +285,7 @@ async fn graceful_shutdown() { .build() .await; - let ns = ctx.ensure_namespace(namespace_name, None).await; + let ns = ctx.ensure_namespace(namespace_name, None, None).await; let namespace_id = ns.id; // Initial write @@ -394,7 +394,7 @@ async fn wal_reference_dropping() { .build() .await; - let ns = ctx.ensure_namespace(TEST_NAMESPACE_NAME, None).await; + let ns = ctx.ensure_namespace(TEST_NAMESPACE_NAME, None, None).await; // Initial write let partition_key = PartitionKey::from("1970-01-01"); @@ -497,7 +497,7 @@ fn get_file_names_in_dir(dir: &Path) -> Result, std::io::Error> { async fn write_tracing() { let namespace_name = "write_tracing_test_namespace"; let mut ctx = TestContextBuilder::default().build().await; - let ns = ctx.ensure_namespace(namespace_name, None).await; + let ns = ctx.ensure_namespace(namespace_name, None, None).await; let trace_collector = Arc::new(RingBufferTraceCollector::new(5)); let span_ctx = SpanContext::new(Arc::new(Arc::clone(&trace_collector))); diff --git a/ingester_test_ctx/src/lib.rs b/ingester_test_ctx/src/lib.rs index 24a8f0382c..392a7a827e 100644 --- a/ingester_test_ctx/src/lib.rs +++ b/ingester_test_ctx/src/lib.rs @@ -22,8 +22,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::record_batch::RecordBatch; use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket}; use data_types::{ - partition_template::TablePartitionTemplateOverride, Namespace, NamespaceId, NamespaceSchema, - ParquetFile, PartitionKey, SequenceNumber, TableId, + partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, + Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId, }; use dml::{DmlMeta, DmlWrite}; use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt}; @@ -222,6 +222,7 @@ where &mut self, name: &str, retention_period_ns: Option, + partition_template: Option, ) -> Namespace { let mut repos = self.catalog.repositories().await; let ns = arbitrary_namespace(&mut *repos, name).await; @@ -236,7 +237,7 @@ where max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE as usize, max_tables: iox_catalog::DEFAULT_MAX_TABLES as usize, retention_period_ns, - partition_template: Default::default(), + partition_template: partition_template.unwrap_or_default(), }, ) .is_none(),