test(bench): ingester query partition pruning
Adds benchmarks that exercise partition pruning during query execution within the ingester, for varying partition counts within a table, and varying row counts within each partition.pull/24376/head
@ -1,8 +1,17 @@
use arrow::datatypes::DataType;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use data_types::{NamespaceId, PartitionKey, TableId};
use data_types::{
partition_template::NamespacePartitionTemplateOverride, NamespaceId, PartitionKey, TableId,
use datafusion::{
prelude::{col, lit},
use influxdb_iox_client::table::generated_types::{PartitionTemplate, TemplatePart};
use ingester::IngesterRpcInterface;
use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
use ingester_test_ctx::{TestContext, TestContextBuilder};
use predicate::Predicate;
use std::{fmt::Write, sync::Arc, time::Instant};
use tokio::sync::Barrier;
@ -37,7 +46,7 @@ async fn init(
// Ensure the namespace exists in the catalog.
let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await;
let ns = ctx.ensure_namespace(TEST_NAMESPACE, None, None).await;
// Write the test data
@ -197,5 +206,124 @@ fn run_concurrent_bench(
criterion_group!(benches, bench_query, bench_query_concurrent);
/// Benchmark executing a query against a table that is configured to allow
/// pruning based on tag values, varying the number of rows in the partitions,
/// and the number of partitions pruned (always selecting 1 partition).
fn bench_partition_pruning(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.expect("failed to initialise tokio runtime for benchmark");
static ROW_COUNTS: &[usize] = &[10, 10_000, 100_000];
static PRUNED_PARTITIONS: &[usize] = &[0, 100, 200, 500];
for rows in ROW_COUNTS {
for partition_count in PRUNED_PARTITIONS {
// plus one, so we're pruning the specified number, and returning 1
// partition.
let partition_count = partition_count + 1;
run_partition_prune_bench(*rows, partition_count, &runtime, c);
fn run_partition_prune_bench(
rows: usize,
partition_count: usize,
runtime: &tokio::runtime::Runtime,
c: &mut Criterion,
) {
// Initialise the ingester configured with this partition template.
let (ctx, namespace_id, table_id) = runtime.block_on(partition_init(rows, partition_count));
let predicate = Some(
Box::new(ScalarValue::from("0")), // Always query for partition with platanos=0
let mut group = c.benchmark_group("partition_prune");
group.throughput(Throughput::Elements(1)); // Queries per second
format!("rows_{rows}/prune_{v}_partitions", v = partition_count - 1),
|b| {
let ctx = &ctx;
let predicate = &predicate;
b.to_async(runtime).iter(|| async move {
ctx.query(IngesterQueryRequest {
namespace_id: namespace_id.get(),
table_id: table_id.get(),
columns: vec![],
predicate: predicate.clone(),
.expect("query request failed");
/// A specialised init function that creates many partitions.
async fn partition_init(
rows: usize,
partition_count: usize,
) -> (TestContext<impl IngesterRpcInterface>, NamespaceId, TableId) {
// Partition based on the tag value "platanos"
let partition_template = NamespacePartitionTemplateOverride::try_from(PartitionTemplate {
parts: vec![TemplatePart {
part: Some(influxdb_iox_client::table::generated_types::Part::TagValue(
.expect("valid template");
let mut ctx = TestContextBuilder::default()
// Don't stop ingest during benchmarks
// Ensure the namespace exists in the catalog.
let ns = ctx
.ensure_namespace(TEST_NAMESPACE, None, Some(partition_template))
// Generate LP that contains a tag "platanos" and a integer value - always
// query for partition 0.
let mut lp = String::new();
for p in 0..partition_count {
for i in 0..rows {
writeln!(lp, "bananas,platanos={p} v={i} 42").unwrap();
let table_id = ctx.table_id(TEST_NAMESPACE, "bananas").await;
(ctx, ns.id, table_id)
@ -33,7 +33,7 @@ async fn init(lp: impl AsRef<str>) -> (TestContext<impl IngesterRpcInterface>, D
// Ensure the namespace exists in the catalog.
let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await;
let ns = ctx.ensure_namespace(TEST_NAMESPACE, None, None).await;
// Perform a write to drive table / schema population in the catalog.
@ -9,7 +9,7 @@ use metric::{DurationHistogram, U64Histogram};
async fn write_query() {
let namespace_name = "write_query_test_namespace";
let mut ctx = TestContextBuilder::default().build().await;
let ns = ctx.ensure_namespace(namespace_name, None).await;
let ns = ctx.ensure_namespace(namespace_name, None, None).await;
// Initial write
let partition_key = PartitionKey::from("1970-01-01");
@ -94,7 +94,7 @@ async fn write_query() {
async fn write_query_projection() {
let namespace_name = "write_query_test_namespace";
let mut ctx = TestContextBuilder::default().build().await;
let ns = ctx.ensure_namespace(namespace_name, None).await;
let ns = ctx.ensure_namespace(namespace_name, None, None).await;
// Initial write
let partition_key = PartitionKey::from("1970-01-01");
@ -23,7 +23,7 @@ use trace::{ctx::SpanContext, RingBufferTraceCollector};
async fn write_persist() {
let namespace_name = "write_query_test_namespace";
let mut ctx = TestContextBuilder::default().build().await;
let ns = ctx.ensure_namespace(namespace_name, None).await;
let ns = ctx.ensure_namespace(namespace_name, None, None).await;
let partition_key = PartitionKey::from("1970-01-01");
@ -191,7 +191,7 @@ async fn wal_replay() {
let ns = ctx.ensure_namespace(namespace_name, None).await;
let ns = ctx.ensure_namespace(namespace_name, None, None).await;
// Initial write
let partition_key = PartitionKey::from("1970-01-01");
@ -285,7 +285,7 @@ async fn graceful_shutdown() {
let ns = ctx.ensure_namespace(namespace_name, None).await;
let ns = ctx.ensure_namespace(namespace_name, None, None).await;
let namespace_id = ns.id;
// Initial write
@ -394,7 +394,7 @@ async fn wal_reference_dropping() {
let ns = ctx.ensure_namespace(TEST_NAMESPACE_NAME, None).await;
let ns = ctx.ensure_namespace(TEST_NAMESPACE_NAME, None, None).await;
// Initial write
let partition_key = PartitionKey::from("1970-01-01");
@ -497,7 +497,7 @@ fn get_file_names_in_dir(dir: &Path) -> Result<Vec<OsString>, std::io::Error> {
async fn write_tracing() {
let namespace_name = "write_tracing_test_namespace";
let mut ctx = TestContextBuilder::default().build().await;
let ns = ctx.ensure_namespace(namespace_name, None).await;
let ns = ctx.ensure_namespace(namespace_name, None, None).await;
let trace_collector = Arc::new(RingBufferTraceCollector::new(5));
let span_ctx = SpanContext::new(Arc::new(Arc::clone(&trace_collector)));
@ -22,8 +22,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use arrow::record_batch::RecordBatch;
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
use data_types::{
partition_template::TablePartitionTemplateOverride, Namespace, NamespaceId, NamespaceSchema,
ParquetFile, PartitionKey, SequenceNumber, TableId,
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId,
use dml::{DmlMeta, DmlWrite};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
@ -222,6 +222,7 @@ where
&mut self,
name: &str,
retention_period_ns: Option<i64>,
partition_template: Option<NamespacePartitionTemplateOverride>,
) -> Namespace {
let mut repos = self.catalog.repositories().await;
let ns = arbitrary_namespace(&mut *repos, name).await;
@ -236,7 +237,7 @@ where
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE as usize,
max_tables: iox_catalog::DEFAULT_MAX_TABLES as usize,
partition_template: Default::default(),
partition_template: partition_template.unwrap_or_default(),
Reference in New Issue