From b76fdab1a498ea6b9ea55af25a04cee201a65618 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 30 Jun 2023 14:17:35 -0400 Subject: [PATCH 01/14] refactor: Move querier::df_stats to iox_query::chunk_statistics so it can be shared with ingester --- .../df_stats.rs => iox_query/src/chunk_statistics.rs | 0 iox_query/src/lib.rs | 1 + querier/src/cache/partition.rs | 3 +-- querier/src/ingester/mod.rs | 11 ++++++----- querier/src/lib.rs | 1 - querier/src/parquet/mod.rs | 3 +-- querier/src/table/mod.rs | 3 +-- querier/src/table/test_util.rs | 5 +++-- 8 files changed, 13 insertions(+), 14 deletions(-) rename querier/src/df_stats.rs => iox_query/src/chunk_statistics.rs (100%) diff --git a/querier/src/df_stats.rs b/iox_query/src/chunk_statistics.rs similarity index 100% rename from querier/src/df_stats.rs rename to iox_query/src/chunk_statistics.rs diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index f63089957b..9592f4aa77 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -34,6 +34,7 @@ use schema::{ }; use std::{any::Any, fmt::Debug, sync::Arc}; +pub mod chunk_statistics; pub mod config; pub mod exec; pub mod frontend; diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 77a3aaf321..fba27593e7 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -17,6 +17,7 @@ use data_types::{ }; use datafusion::scalar::ScalarValue; use iox_catalog::interface::Catalog; +use iox_query::chunk_statistics::{ColumnRange, ColumnRanges}; use iox_time::TimeProvider; use observability_deps::tracing::debug; use schema::sort::SortKey; @@ -27,8 +28,6 @@ use std::{ }; use trace::span::Span; -use crate::df_stats::{ColumnRange, ColumnRanges}; - use super::{namespace::CachedTable, ram::RamSize}; const CACHE_ID: &str = "partition"; diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index c13b05f77f..d726e94162 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -6,10 +6,7 @@ use self::{ invalidate_on_error::InvalidateOnErrorFlightClient, test_util::MockIngesterConnection, }; -use crate::{ - cache::{namespace::CachedTable, CatalogCache}, - df_stats::{create_chunk_statistics, ColumnRanges}, -}; +use crate::cache::{namespace::CachedTable, CatalogCache}; use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; use arrow_flight::decode::DecodedPayload; use async_trait::async_trait; @@ -22,7 +19,11 @@ use ingester_query_grpc::{ encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata, IngesterQueryRequest, }; -use iox_query::{util::compute_timenanosecond_min_max, QueryChunk, QueryChunkData}; +use iox_query::{ + chunk_statistics::{create_chunk_statistics, ColumnRanges}, + util::compute_timenanosecond_min_max, + QueryChunk, QueryChunkData, +}; use iox_time::{Time, TimeProvider}; use metric::{DurationHistogram, Metric}; use observability_deps::tracing::{debug, trace, warn}; diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 9985f1d0e5..0a18796b2a 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -18,7 +18,6 @@ use workspace_hack as _; mod cache; mod database; -mod df_stats; mod ingester; mod namespace; mod parquet; diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index 24ac694d78..da1a72ed83 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -2,6 +2,7 @@ use data_types::{ChunkId, ChunkOrder, PartitionId}; use datafusion::physical_plan::Statistics; +use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRanges}; use parquet_file::chunk::ParquetChunk; use schema::sort::SortKey; use std::sync::Arc; @@ -11,8 +12,6 @@ mod query_access; pub use creation::ChunkAdapter; -use crate::df_stats::{create_chunk_statistics, ColumnRanges}; - /// Immutable metadata attached to a [`QuerierParquetChunk`]. #[derive(Debug)] pub struct QuerierParquetChunkMeta { diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 25bbb08b7a..fcdc3c55aa 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -492,7 +492,6 @@ mod tests { use super::*; use crate::{ cache::test_util::{assert_cache_access_metric_count, assert_catalog_access_metric_count}, - df_stats::ColumnRange, ingester::{test_util::MockIngesterConnection, IngesterPartition}, table::test_util::{querier_table, IngesterPartitionBuilder}, }; @@ -506,7 +505,7 @@ mod tests { use generated_types::influxdata::iox::partition_template::v1::{ template_part::Part, PartitionTemplate, TemplatePart, }; - use iox_query::exec::IOxSessionContext; + use iox_query::{chunk_statistics::ColumnRange, exec::IOxSessionContext}; use iox_tests::{TestCatalog, TestParquetFileBuilder, TestTable}; use predicate::Predicate; use schema::{builder::SchemaBuilder, InfluxFieldType, TIME_COLUMN_NAME}; diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index 182a77b18a..842c4eb9cb 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -1,11 +1,12 @@ use super::{PruneMetrics, QuerierTable, QuerierTableArgs}; use crate::{ - cache::CatalogCache, create_ingester_connection_for_testing, df_stats::ColumnRanges, - parquet::ChunkAdapter, IngesterPartition, + cache::CatalogCache, create_ingester_connection_for_testing, parquet::ChunkAdapter, + IngesterPartition, }; use arrow::record_batch::RecordBatch; use data_types::ChunkId; use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows}; +use iox_query::chunk_statistics::ColumnRanges; use iox_tests::{TestCatalog, TestPartition, TestTable}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use schema::{Projection, Schema}; From 246c2b074904a9bc2d923f3f6710d00eb8ed9e03 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Fri, 30 Jun 2023 11:21:38 +0100 Subject: [PATCH 02/14] refactor(ingester): Accept a predicate as parameter to `query_exec` This will allow the ingester to apply a predicate when serving a query and only stream back data that satisfies the predicate. --- ingester/src/buffer_tree/namespace.rs | 4 +- ingester/src/buffer_tree/root.rs | 39 ++++++++++++++++---- ingester/src/buffer_tree/table.rs | 3 ++ ingester/src/query/exec_instrumentation.rs | 6 ++- ingester/src/query/mock_query_exec.rs | 2 + ingester/src/query/result_instrumentation.rs | 38 ++++++++++++++++--- ingester/src/query/tracing.rs | 12 +++++- ingester/src/query/trait.rs | 5 ++- ingester/src/server/grpc/query.rs | 30 +++++++++++---- 9 files changed, 114 insertions(+), 25 deletions(-) diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index aa599b3586..8346d9d43c 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use metric::U64Counter; +use predicate::Predicate; use trace::span::Span; use super::{ @@ -189,6 +190,7 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { assert_eq!( self.namespace_id, namespace_id, @@ -204,7 +206,7 @@ where // a tracing delegate to emit a child span. Ok(QueryResponse::new( QueryExecTracing::new(inner, "table") - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await?, )) } diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 9576fc006c..e33181fd96 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use metric::U64Counter; use parking_lot::Mutex; +use predicate::Predicate; use trace::span::Span; use super::{ @@ -202,6 +203,7 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { // Extract the namespace if it exists. let inner = self @@ -211,7 +213,7 @@ where // Delegate query execution to the namespace, wrapping the execution in // a tracing delegate to emit a child span. QueryExecTracing::new(inner, "namespace") - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await } } @@ -370,7 +372,7 @@ mod tests { // Execute the query against ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID let batches = buf - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None, None) .await .expect("query should succeed") .into_partition_stream() @@ -829,7 +831,13 @@ mod tests { // Query the empty tree let err = buf - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect_err("query should fail"); assert_matches!(err, QueryError::NamespaceNotFound(ns) => { @@ -854,7 +862,7 @@ mod tests { // Ensure an unknown table errors let err = buf - .query_exec(ARBITRARY_NAMESPACE_ID, TABLE2_ID, vec![], None) + .query_exec(ARBITRARY_NAMESPACE_ID, TABLE2_ID, vec![], None, None) .await .expect_err("query should fail"); assert_matches!(err, QueryError::TableNotFound(ns, t) => { @@ -863,9 +871,15 @@ mod tests { }); // Ensure a valid namespace / table does not error - buf.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) - .await - .expect("namespace / table should exist"); + buf.query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) + .await + .expect("namespace / table should exist"); } /// This test asserts the read consistency properties defined in the @@ -931,7 +945,13 @@ mod tests { // Execute a query of the buffer tree, generating the result stream, but // DO NOT consume it. let stream = buf - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed") .into_partition_stream(); @@ -996,4 +1016,7 @@ mod tests { &batches ); } + + // TODO(savage): Consider what tests need to be added here for Predicate + // support? } diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 93a83de978..17ee85ce13 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -8,6 +8,7 @@ use async_trait::async_trait; use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; use mutable_batch::MutableBatch; use parking_lot::Mutex; +use predicate::Predicate; use schema::Projection; use trace::span::{Span, SpanRecorder}; @@ -204,6 +205,7 @@ where table_id: TableId, columns: Vec, span: Option, + _predicate: Option, ) -> Result { assert_eq!(self.table_id, table_id, "buffer tree index inconsistency"); assert_eq!( @@ -228,6 +230,7 @@ where let ret = match data { Some(data) => { + // TODO(savage): Apply predicate here through the projection? assert_eq!(id, data.partition_id()); // Project the data if necessary diff --git a/ingester/src/query/exec_instrumentation.rs b/ingester/src/query/exec_instrumentation.rs index 4a05367d81..f6bc6f2737 100644 --- a/ingester/src/query/exec_instrumentation.rs +++ b/ingester/src/query/exec_instrumentation.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; +use predicate::Predicate; use trace::span::Span; use super::QueryExec; @@ -64,12 +65,13 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { let t = self.time_provider.now(); let res = self .inner - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await; if let Some(delta) = self.time_provider.now().checked_duration_since(t) { @@ -113,7 +115,7 @@ mod tests { // Call the decorator and assert the return value let got = decorator - .query_exec(NamespaceId::new(42), TableId::new(24), vec![], None) + .query_exec(NamespaceId::new(42), TableId::new(24), vec![], None, None) .await; assert_matches!(got, $($want_ret)+); diff --git a/ingester/src/query/mock_query_exec.rs b/ingester/src/query/mock_query_exec.rs index 580e139f02..db032e992b 100644 --- a/ingester/src/query/mock_query_exec.rs +++ b/ingester/src/query/mock_query_exec.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use parking_lot::Mutex; +use predicate::Predicate; use trace::span::Span; use super::{response::QueryResponse, QueryError, QueryExec}; @@ -27,6 +28,7 @@ impl QueryExec for MockQueryExec { _table_id: TableId, _columns: Vec, _span: Option, + _predicate: Option, ) -> Result { self.response .lock() diff --git a/ingester/src/query/result_instrumentation.rs b/ingester/src/query/result_instrumentation.rs index a412059b82..53ba57a5f3 100644 --- a/ingester/src/query/result_instrumentation.rs +++ b/ingester/src/query/result_instrumentation.rs @@ -58,6 +58,7 @@ use iox_time::{SystemProvider, Time, TimeProvider}; use metric::{DurationHistogram, Metric, U64Histogram, U64HistogramOptions}; use observability_deps::tracing::debug; use pin_project::{pin_project, pinned_drop}; +use predicate::Predicate; use trace::span::Span; use crate::query::{ @@ -204,12 +205,15 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { let started_at = self.time_provider.now(); + // TODO(savage): Would accepting a predicate here require additional + // metrics to be added? let stream = self .inner - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await?; let stream = QueryMetricContext::new( @@ -467,7 +471,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); @@ -548,7 +558,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); @@ -628,7 +644,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); @@ -708,7 +730,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); diff --git a/ingester/src/query/tracing.rs b/ingester/src/query/tracing.rs index 34b171e30d..86751e1413 100644 --- a/ingester/src/query/tracing.rs +++ b/ingester/src/query/tracing.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use async_trait::async_trait; use data_types::{NamespaceId, TableId}; +use predicate::Predicate; use trace::span::{Span, SpanRecorder}; use super::QueryExec; @@ -42,12 +43,19 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { let mut recorder = SpanRecorder::new(span).child(self.name.clone()); match self .inner - .query_exec(namespace_id, table_id, columns, recorder.span().cloned()) + .query_exec( + namespace_id, + table_id, + columns, + recorder.span().cloned(), + predicate, + ) .await { Ok(v) => { @@ -111,6 +119,7 @@ mod tests { TableId::new(24), vec![], Some(span.child("root span")), + None, ) .await .expect("wrapper should not modify result"); @@ -134,6 +143,7 @@ mod tests { TableId::new(24), vec![], Some(span.child("root span")), + None, ) .await .expect_err("wrapper should not modify result"); diff --git a/ingester/src/query/trait.rs b/ingester/src/query/trait.rs index 30655eff66..c031dd5d2b 100644 --- a/ingester/src/query/trait.rs +++ b/ingester/src/query/trait.rs @@ -2,6 +2,7 @@ use std::{fmt::Debug, ops::Deref, sync::Arc}; use async_trait::async_trait; use data_types::{NamespaceId, TableId}; +use predicate::Predicate; use thiserror::Error; use trace::span::Span; @@ -25,6 +26,7 @@ pub(crate) trait QueryExec: Send + Sync + Debug { table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result; } @@ -41,9 +43,10 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { self.deref() - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await } } diff --git a/ingester/src/server/grpc/query.rs b/ingester/src/server/grpc/query.rs index 77d457f368..d736f1d4d9 100644 --- a/ingester/src/server/grpc/query.rs +++ b/ingester/src/server/grpc/query.rs @@ -12,6 +12,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use ingester_query_grpc::influxdata::iox::ingester::v1 as proto; use metric::{DurationHistogram, U64Counter}; use observability_deps::tracing::*; +use predicate::Predicate; use prost::Message; use thiserror::Error; use tokio::sync::{Semaphore, TryAcquireError}; @@ -48,6 +49,10 @@ enum Error { /// The number of simultaneous queries being executed has been reached. #[error("simultaneous query limit exceeded")] RequestLimit, + + /// The payload within the request has an invalid field value. + #[error("field violation: {0}")] + FieldViolation(#[from] ingester_query_grpc::FieldViolation), } /// Map a query-execution error into a [`tonic::Status`]. @@ -77,6 +82,10 @@ impl From for tonic::Status { warn!("simultaneous query limit exceeded"); Code::ResourceExhausted } + Error::FieldViolation(_) => { + debug!(error=%e, "request contains field violation"); + Code::InvalidArgument + } }; Self::new(code, e.to_string()) @@ -188,18 +197,25 @@ where let ticket = request.into_inner(); let request = proto::IngesterQueryRequest::decode(&*ticket.ticket).map_err(Error::from)?; - // Extract the namespace/table identifiers + // Extract the namespace/table identifiers and the query predicate let namespace_id = NamespaceId::new(request.namespace_id); let table_id = TableId::new(request.table_id); - - // Predicate pushdown is part of the API, but not implemented. - if let Some(p) = request.predicate { - debug!(predicate=?p, "ignoring query predicate (unsupported)"); - } + let predicate = if let Some(p) = request.predicate { + debug!(predicate=?p, "received query predicate"); + Some(Predicate::try_from(p).map_err(Error::from)?) + } else { + None + }; let response = match self .query_handler - .query_exec(namespace_id, table_id, request.columns, span.clone()) + .query_exec( + namespace_id, + table_id, + request.columns, + span.clone(), + predicate, + ) .await { Ok(v) => v, From 5f759528d37a13de0ba0d167097c501d9ca1c9fd Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Fri, 30 Jun 2023 15:16:19 +0100 Subject: [PATCH 03/14] test(ingester): Add `BufferTree` test for predicate-filtered queries --- ingester/src/buffer_tree/root.rs | 136 +++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 15 deletions(-) diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index e33181fd96..0ca9000249 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -229,6 +229,19 @@ where #[cfg(test)] mod tests { + use std::{sync::Arc, time::Duration}; + + use assert_matches::assert_matches; + use data_types::{PartitionId, PartitionKey}; + use datafusion::{ + assert_batches_eq, assert_batches_sorted_eq, + prelude::{col, lit}, + }; + use futures::StreamExt; + use lazy_static::lazy_static; + use metric::{Attributes, Metric}; + use predicate::Predicate; + use super::*; use crate::{ buffer_tree::{ @@ -245,13 +258,6 @@ mod tests { ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER, }, }; - use assert_matches::assert_matches; - use data_types::{PartitionId, PartitionKey}; - use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; - use futures::StreamExt; - use lazy_static::lazy_static; - use metric::{Attributes, Metric}; - use std::{sync::Arc, time::Duration}; const PARTITION2_ID: PartitionId = PartitionId::new(2); const PARTITION3_ID: PartitionId = PartitionId::new(3); @@ -339,9 +345,12 @@ mod tests { macro_rules! test_write_query { ( $name:ident, - partitions = [$($partition:expr), +], // The set of PartitionData for the mock partition provider + partitions = [$($partition:expr), +], // The set of PartitionData for the mock + // partition provider writes = [$($write:expr), *], // The set of WriteOperation to apply() - want = $want:expr // The expected results of querying ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID + predicate = $predicate:expr, // An optional predicate to use for the query + want = $want:expr // The expected results of querying + // ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID ) => { paste::paste! { #[tokio::test] @@ -372,7 +381,13 @@ mod tests { // Execute the query against ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID let batches = buf - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None, None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + $predicate + ) .await .expect("query should succeed") .into_partition_stream() @@ -409,6 +424,7 @@ mod tests { ), None, )], + predicate = None, want = [ "+----------+------+-------------------------------+", "| region | temp | time |", @@ -458,6 +474,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+----------+------+-------------------------------+", "| region | temp | time |", @@ -510,6 +527,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+--------+------+-------------------------------+", "| region | temp | time |", @@ -522,7 +540,7 @@ mod tests { // A query that ensures the data across multiple tables (with the same table // name!) is correctly filtered to return only the queried table. test_write_query!( - filter_multiple_tabls, + filter_multiple_tables, partitions = [ PartitionDataBuilder::new() .with_partition_id(ARBITRARY_PARTITION_ID) @@ -560,6 +578,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+--------+------+-------------------------------+", "| region | temp | time |", @@ -605,6 +624,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+----------+------+-------------------------------+", "| region | temp | time |", @@ -615,6 +635,95 @@ mod tests { ] ); + // This test asserts that the results returned from a query to the + // [`BufferTree`] filters rows from the result as directed by the + // query's [`Predicate`]. + // + // It makes sure that for a [`BufferTree`] with a set of partitions split + // by some key a query with a predicate ` == ` + // returns partition data that has been filtered to contain only rows which + // contain the specified value in that partition key column. + test_write_query!( + filter_by_predicate_partition_key, + partitions = [ + PartitionDataBuilder::new() + .with_partition_id(ARBITRARY_PARTITION_ID) + .with_partition_key(ARBITRARY_PARTITION_KEY.clone()) + .build(), + PartitionDataBuilder::new() + .with_partition_id(PARTITION2_ID) + .with_partition_key(PARTITION2_KEY.clone()) + .build() + ], + writes = [ + make_write_op( + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 0, + &format!( + r#"{},region={} temp=35 4242424242"#, + &*ARBITRARY_TABLE_NAME, &*ARBITRARY_PARTITION_KEY + ), + None, + ), + make_write_op( + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 1, + &format!( + r#"{},region={} temp=12 4242424242"#, + &*ARBITRARY_TABLE_NAME, &*ARBITRARY_PARTITION_KEY + ), + None, + ), + make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 2, + &format!( + r#"{},region={} temp=17 7676767676"#, + &*ARBITRARY_TABLE_NAME, *PARTITION2_KEY + ), + None, + ), + make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 3, + &format!( + r#"{},region={} temp=13 7676767676"#, + &*ARBITRARY_TABLE_NAME, *PARTITION2_KEY, + ), + None, + ) + ], + predicate = Some(Predicate::new().with_expr(col("region").eq(lit(PARTITION2_KEY.inner())))), + want = [ + "+----------+------+-------------------------------+", + "| region | temp | time |", + "+----------+------+-------------------------------+", + format!( + "| {} | 17.0 | 1970-01-01T00:00:07.676767676 |", + *PARTITION2_KEY + ) + .as_str(), + format!( + "| {} | 13.0 | 1970-01-01T00:00:07.676767676 |", + *PARTITION2_KEY + ) + .as_str(), + "+----------+------+-------------------------------+", + ] + ); + /// Assert that multiple writes to a single namespace/table results in a /// single namespace being created, and matching metrics. #[tokio::test] @@ -629,7 +738,7 @@ mod tests { ) .with_partition( PartitionDataBuilder::new() - .with_partition_id(ARBITRARY_PARTITION_ID) + .with_partition_id(PARTITION2_ID) .with_partition_key(PARTITION2_KEY.clone()) .build(), ), @@ -1016,7 +1125,4 @@ mod tests { &batches ); } - - // TODO(savage): Consider what tests need to be added here for Predicate - // support? } From da34eb7b35d228f7046199c3a04529c775d32d49 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Fri, 30 Jun 2023 18:08:05 +0100 Subject: [PATCH 04/14] feat: Load both table name and partition template in the ingester --- ingester/src/buffer_tree/namespace.rs | 20 +++--- ingester/src/buffer_tree/partition.rs | 24 +++---- .../buffer_tree/partition/resolver/cache.rs | 40 +++++------- .../buffer_tree/partition/resolver/catalog.rs | 22 +++++-- .../partition/resolver/coalesce.rs | 34 ++++------ .../buffer_tree/partition/resolver/mock.rs | 8 +-- .../buffer_tree/partition/resolver/trait.rs | 27 +++----- ingester/src/buffer_tree/root.rs | 38 ++++++----- ingester/src/buffer_tree/table.rs | 64 +++++++++++++++---- ...{name_resolver.rs => metadata_resolver.rs} | 57 +++++++++-------- ingester/src/init.rs | 8 +-- ingester/src/persist/context.rs | 16 ++--- ingester/src/persist/handle.rs | 4 +- ingester/src/persist/mod.rs | 4 +- ingester/src/persist/worker.rs | 22 +++---- ingester/src/test_util.rs | 28 ++++---- 16 files changed, 227 insertions(+), 189 deletions(-) rename ingester/src/buffer_tree/table/{name_resolver.rs => metadata_resolver.rs} (67%) diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index 8346d9d43c..058443141c 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -13,7 +13,7 @@ use trace::span::Span; use super::{ partition::resolver::PartitionProvider, post_write::PostWriteObserver, - table::{name_resolver::TableNameProvider, TableData}, + table::{metadata_resolver::TableProvider, TableData}, }; use crate::{ arcmap::ArcMap, @@ -61,12 +61,10 @@ pub(crate) struct NamespaceData { /// A set of tables this [`NamespaceData`] instance has processed /// [`IngestOp`]'s for. /// - /// The [`TableNameProvider`] acts as a [`DeferredLoad`] constructor to - /// resolve the [`TableName`] for new [`TableData`] out of the hot path. - /// - /// [`TableName`]: crate::buffer_tree::table::TableName + /// The [`TableProvider`] acts as a [`DeferredLoad`] constructor to + /// resolve the catalog [`Table`] for new [`TableData`] out of the hot path. tables: ArcMap>, - table_name_resolver: Arc, + catalog_table_resolver: Arc, /// The count of tables initialised in this Ingester so far, across all /// namespaces. table_count: U64Counter, @@ -84,7 +82,7 @@ impl NamespaceData { pub(super) fn new( namespace_id: NamespaceId, namespace_name: Arc>, - table_name_resolver: Arc, + catalog_table_resolver: Arc, partition_provider: Arc, post_write_observer: Arc, metrics: &metric::Registry, @@ -100,7 +98,7 @@ impl NamespaceData { namespace_id, namespace_name, tables: Default::default(), - table_name_resolver, + catalog_table_resolver, table_count, partition_provider, post_write_observer, @@ -152,7 +150,7 @@ where self.table_count.inc(1); Arc::new(TableData::new( table_id, - Arc::new(self.table_name_resolver.for_table(table_id)), + Arc::new(self.catalog_table_resolver.for_table(table_id)), self.namespace_id, Arc::clone(&self.namespace_name), Arc::clone(&self.partition_provider), @@ -228,7 +226,7 @@ mod tests { test_util::{ defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, - ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER, }, }; @@ -245,7 +243,7 @@ mod tests { let ns = NamespaceData::new( ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_ms(), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), &metrics, diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index f68550f3dd..98c3c816db 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -14,7 +14,7 @@ use self::{ buffer::{traits::Queryable, BufferState, DataBuffer, Persisting}, persisting::{BatchIdent, PersistingData}, }; -use super::{namespace::NamespaceName, table::TableName}; +use super::{namespace::NamespaceName, table::TableMetadata}; use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor}; mod buffer; @@ -73,9 +73,9 @@ pub struct PartitionData { /// The catalog ID for the table this partition is part of. table_id: TableId, - /// The name of the table this partition is part of, potentially unresolved + /// The catalog metadata for the table this partition is part of, potentially unresolved /// / deferred. - table_name: Arc>, + table: Arc>, /// A [`DataBuffer`] for incoming writes. buffer: DataBuffer, @@ -108,7 +108,7 @@ impl PartitionData { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, sort_key: SortKeyState, ) -> Self { Self { @@ -119,7 +119,7 @@ impl PartitionData { namespace_id, namespace_name, table_id, - table_name, + table, buffer: DataBuffer::default(), persisting: VecDeque::with_capacity(1), started_persistence_count: BatchIdent::default(), @@ -139,7 +139,7 @@ impl PartitionData { trace!( namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, "buffered write" @@ -175,7 +175,7 @@ impl PartitionData { trace!( namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, n_batches = data.len(), @@ -221,7 +221,7 @@ impl PartitionData { debug!( namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, %batch_ident, @@ -271,7 +271,7 @@ impl PartitionData { persistence_count = %self.completed_persistence_count, namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, batch_ident = %batch.batch_ident(), @@ -302,10 +302,10 @@ impl PartitionData { self.completed_persistence_count } - /// Return the name of the table this [`PartitionData`] is buffering writes + /// Return the metadata of the table this [`PartitionData`] is buffering writes /// for. - pub(crate) fn table_name(&self) -> &Arc> { - &self.table_name + pub(crate) fn table(&self) -> &Arc> { + &self.table } /// Return the table ID for this partition. diff --git a/ingester/src/buffer_tree/partition/resolver/cache.rs b/ingester/src/buffer_tree/partition/resolver/cache.rs index 6ab23c1026..047d9176b1 100644 --- a/ingester/src/buffer_tree/partition/resolver/cache.rs +++ b/ingester/src/buffer_tree/partition/resolver/cache.rs @@ -14,7 +14,7 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{resolver::SortKeyResolver, PartitionData, SortKeyState}, - table::TableName, + table::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -173,7 +173,7 @@ where namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { // Use the cached PartitionKey instead of the caller's partition_key, // instead preferring to reuse the already-shared Arc in the cache. @@ -203,7 +203,7 @@ where namespace_id, namespace_name, table_id, - table_name, + table, SortKeyState::Deferred(Arc::new(sort_key_resolver)), ))); } @@ -212,13 +212,7 @@ where // Otherwise delegate to the catalog / inner impl. self.inner - .get_partition( - partition_key, - namespace_id, - namespace_name, - table_id, - table_name, - ) + .get_partition(partition_key, namespace_id, namespace_name, table_id, table) .await } } @@ -234,7 +228,7 @@ mod tests { use crate::{ buffer_tree::partition::resolver::mock::MockPartitionProvider, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_PARTITION_KEY_STR, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, @@ -270,15 +264,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); assert_eq!( &**got.lock().namespace_name().get().await, @@ -309,15 +303,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); assert_eq!( &**got.lock().namespace_name().get().await, @@ -366,15 +360,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), other_key_id); assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); } @@ -402,15 +396,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), other_table, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); assert_eq!(got.lock().table_id(), other_table); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); } } diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index 5eff133978..191899eb24 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -15,7 +15,7 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{PartitionData, SortKeyState}, - table::TableName, + table::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -61,12 +61,12 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { debug!( %partition_key, %table_id, - %table_name, + %table, "upserting partition in catalog" ); let p = Backoff::new(&self.backoff_config) @@ -86,7 +86,7 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_id, namespace_name, table_id, - table_name, + table, SortKeyState::Provided(p.sort_key()), ))) } @@ -103,6 +103,7 @@ mod tests { use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table}; use super::*; + use crate::buffer_tree::table::TableName; const TABLE_NAME: &str = "bananas"; const NAMESPACE_NAME: &str = "ns-bananas"; @@ -138,17 +139,24 @@ mod tests { table_id, Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { TableName::from(TABLE_NAME) }, + async { + TableMetadata::with_default_partition_template_for_testing(TableName::from( + TABLE_NAME, + )) + }, &metrics, )), ) .await; // Ensure the table name is available. - let _ = got.lock().table_name().get().await; + let _ = got.lock().table().get().await.name(); assert_eq!(got.lock().namespace_id(), namespace_id); - assert_eq!(got.lock().table_name().to_string(), table_name.to_string()); + assert_eq!( + got.lock().table().get().await.name().to_string(), + table_name.to_string() + ); assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None)); assert!(got.lock().partition_key.ptr_eq(&callers_partition_key)); diff --git a/ingester/src/buffer_tree/partition/resolver/coalesce.rs b/ingester/src/buffer_tree/partition/resolver/coalesce.rs index b89ca460e4..57b6673e08 100644 --- a/ingester/src/buffer_tree/partition/resolver/coalesce.rs +++ b/ingester/src/buffer_tree/partition/resolver/coalesce.rs @@ -14,7 +14,7 @@ use hashbrown::{hash_map::Entry, HashMap}; use parking_lot::Mutex; use crate::{ - buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, + buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata}, deferred_load::DeferredLoad, }; @@ -146,7 +146,7 @@ where namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { let key = Key { namespace_id, @@ -170,7 +170,7 @@ where namespace_id, namespace_name, table_id, - table_name, + table, )); // Make the future poll-able by many callers, all of which @@ -233,7 +233,7 @@ async fn do_fetch( namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> where T: PartitionProvider + 'static, @@ -248,13 +248,7 @@ where // (which would cause the connection to be returned). tokio::spawn(async move { inner - .get_partition( - partition_key, - namespace_id, - namespace_name, - table_id, - table_name, - ) + .get_partition(partition_key, namespace_id, namespace_name, table_id, table) .await }) .await @@ -280,7 +274,7 @@ mod tests { use crate::{ buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState}, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, }, }; @@ -308,7 +302,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) }) .collect::>() @@ -342,7 +336,7 @@ mod tests { _namespace_id: NamespaceId, _namespace_name: Arc>, _table_id: TableId, - _table_name: Arc>, + _table: Arc>, ) -> core::pin::Pin< Box< dyn core::future::Future>> @@ -368,7 +362,7 @@ mod tests { let data = PartitionDataBuilder::new().build(); let namespace_loader = defer_namespace_name_1_sec(); - let table_name_loader = defer_table_name_1_sec(); + let table_loader = defer_table_metadata_1_sec(); // Add a single instance of the partition - if more than one call is // made to the mock, it will panic. @@ -384,14 +378,14 @@ mod tests { ARBITRARY_NAMESPACE_ID, Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, - Arc::clone(&table_name_loader), + Arc::clone(&table_loader), ); let pa_2 = layer.get_partition( ARBITRARY_PARTITION_KEY.clone(), ARBITRARY_NAMESPACE_ID, Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, - Arc::clone(&table_name_loader), + Arc::clone(&table_loader), ); let waker = futures::task::noop_waker(); @@ -411,7 +405,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, namespace_loader, ARBITRARY_TABLE_ID, - table_name_loader, + table_loader, ) .with_timeout_panic(Duration::from_secs(5)) .await; @@ -441,7 +435,7 @@ mod tests { _namespace_id: NamespaceId, _namespace_name: Arc>, _table_id: TableId, - _table_name: Arc>, + _table: Arc>, ) -> Arc> { let waker = self.wait.notified(); let permit = self.sem.acquire().await.unwrap(); @@ -481,7 +475,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ); let waker = futures::task::noop_waker(); diff --git a/ingester/src/buffer_tree/partition/resolver/mock.rs b/ingester/src/buffer_tree/partition/resolver/mock.rs index f5ca824bd5..c16554ca46 100644 --- a/ingester/src/buffer_tree/partition/resolver/mock.rs +++ b/ingester/src/buffer_tree/partition/resolver/mock.rs @@ -8,7 +8,7 @@ use parking_lot::Mutex; use super::r#trait::PartitionProvider; use crate::{ - buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, + buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata}, deferred_load::{self, DeferredLoad}, }; @@ -53,7 +53,7 @@ impl PartitionProvider for MockPartitionProvider { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { let p = self .partitions @@ -75,8 +75,8 @@ impl PartitionProvider for MockPartitionProvider { deferred_load::UNRESOLVED_DISPLAY_STRING, ); - let actual_table_name = p.table_name().to_string(); - let expected_table_name = table_name.get().await.to_string(); + let actual_table_name = p.table().to_string(); + let expected_table_name = table.get().await.name().to_string(); assert!( (actual_table_name.as_str() == expected_table_name) || (actual_table_name == deferred_load::UNRESOLVED_DISPLAY_STRING), diff --git a/ingester/src/buffer_tree/partition/resolver/trait.rs b/ingester/src/buffer_tree/partition/resolver/trait.rs index 158d9bdda2..e525cd5a6f 100644 --- a/ingester/src/buffer_tree/partition/resolver/trait.rs +++ b/ingester/src/buffer_tree/partition/resolver/trait.rs @@ -5,7 +5,7 @@ use data_types::{NamespaceId, PartitionKey, TableId}; use parking_lot::Mutex; use crate::{ - buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, + buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata}, deferred_load::DeferredLoad, }; @@ -24,7 +24,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc>; } @@ -39,16 +39,10 @@ where namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { (**self) - .get_partition( - partition_key, - namespace_id, - namespace_name, - table_id, - table_name, - ) + .get_partition(partition_key, namespace_id, namespace_name, table_id, table) .await } } @@ -61,7 +55,7 @@ mod tests { use crate::{ buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState}, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, }, @@ -70,10 +64,10 @@ mod tests { #[tokio::test] async fn test_arc_impl() { let namespace_loader = defer_namespace_name_1_sec(); - let table_name_loader = defer_table_name_1_sec(); + let table_loader = defer_table_metadata_1_sec(); let data = PartitionDataBuilder::new() - .with_table_name_loader(Arc::clone(&table_name_loader)) + .with_table_loader(Arc::clone(&table_loader)) .with_namespace_loader(Arc::clone(&namespace_loader)) .build(); @@ -85,7 +79,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, - Arc::clone(&table_name_loader), + Arc::clone(&table_loader), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); @@ -94,9 +88,6 @@ mod tests { got.lock().namespace_name().to_string(), namespace_loader.to_string() ); - assert_eq!( - got.lock().table_name().to_string(), - table_name_loader.to_string() - ); + assert_eq!(got.lock().table().to_string(), table_loader.to_string()); } } diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 0ca9000249..105b90d37f 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -11,7 +11,7 @@ use super::{ namespace::{name_resolver::NamespaceNameProvider, NamespaceData}, partition::{resolver::PartitionProvider, PartitionData}, post_write::PostWriteObserver, - table::name_resolver::TableNameProvider, + table::metadata_resolver::TableProvider, }; use crate::{ arcmap::ArcMap, @@ -93,12 +93,12 @@ pub(crate) struct BufferTree { /// [`NamespaceName`]: data_types::NamespaceName namespaces: ArcMap>, namespace_name_resolver: Arc, - /// The [`TableName`] provider used by [`NamespaceData`] to initialise a + /// The [`TableMetadata`] provider used by [`NamespaceData`] to initialise a /// [`TableData`]. /// - /// [`TableName`]: crate::buffer_tree::table::TableName + /// [`TableMetadata`]: crate::buffer_tree::table::TableMetadata /// [`TableData`]: crate::buffer_tree::table::TableData - table_name_resolver: Arc, + table_resolver: Arc, metrics: Arc, namespace_count: U64Counter, @@ -113,7 +113,7 @@ where /// Initialise a new [`BufferTree`] that emits metrics to `metrics`. pub(crate) fn new( namespace_name_resolver: Arc, - table_name_resolver: Arc, + table_resolver: Arc, partition_provider: Arc, post_write_observer: Arc, metrics: Arc, @@ -128,7 +128,7 @@ where Self { namespaces: Default::default(), namespace_name_resolver, - table_name_resolver, + table_resolver, metrics, partition_provider, post_write_observer, @@ -179,7 +179,7 @@ where Arc::new(NamespaceData::new( namespace_id, Arc::new(self.namespace_name_resolver.for_namespace(namespace_id)), - Arc::clone(&self.table_name_resolver), + Arc::clone(&self.table_resolver), Arc::clone(&self.partition_provider), Arc::clone(&self.post_write_observer), &self.metrics, @@ -248,14 +248,14 @@ mod tests { namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceData}, partition::resolver::mock::MockPartitionProvider, post_write::mock::MockPostWriteObserver, - table::TableName, + table::TableMetadata, }, deferred_load::{self, DeferredLoad}, query::partition_response::PartitionResponse, test_util::{ defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, - ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER, }, }; @@ -286,7 +286,7 @@ mod tests { let ns = NamespaceData::new( ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_ms(), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), &metrics, @@ -366,7 +366,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), @@ -749,7 +749,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::clone(&metrics), @@ -833,9 +833,13 @@ mod tests { .with_partition_id(PARTITION3_ID) .with_partition_key(PARTITION3_KEY.clone()) .with_table_id(TABLE2_ID) - .with_table_name_loader(Arc::new(DeferredLoad::new( + .with_table_loader(Arc::new(DeferredLoad::new( Duration::from_secs(1), - async move { TableName::from(TABLE2_NAME) }, + async move { + TableMetadata::with_default_partition_template_for_testing( + TABLE2_NAME.into(), + ) + }, &metric::Registry::default(), ))) .build(), @@ -845,7 +849,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::clone(&Arc::new(metric::Registry::default())), @@ -932,7 +936,7 @@ mod tests { // Init the BufferTree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), @@ -1029,7 +1033,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 17ee85ce13..1953a8dd44 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -1,11 +1,14 @@ //! Table level data buffer structures. -pub(crate) mod name_resolver; +pub(crate) mod metadata_resolver; use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; +use data_types::{ + partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionKey, SequenceNumber, + Table, TableId, +}; use mutable_batch::MutableBatch; use parking_lot::Mutex; use predicate::Predicate; @@ -25,6 +28,45 @@ use crate::{ }, }; +/// Metadata from the catalog for a table +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct TableMetadata { + name: TableName, + partition_template: TablePartitionTemplateOverride, +} + +impl TableMetadata { + pub fn with_default_partition_template_for_testing(name: TableName) -> Self { + Self { + name, + partition_template: Default::default(), + } + } + + pub(crate) fn name(&self) -> &TableName { + &self.name + } + + pub(crate) fn partition_template(&self) -> &TablePartitionTemplateOverride { + &self.partition_template + } +} + +impl From for TableMetadata { + fn from(t: Table) -> Self { + Self { + name: t.name.into(), + partition_template: t.partition_template, + } + } +} + +impl std::fmt::Display for TableMetadata { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.name, f) + } +} + /// The string name / identifier of a Table. /// /// A reference-counted, cheap clone-able string. @@ -70,7 +112,7 @@ impl PartialEq for TableName { #[derive(Debug)] pub(crate) struct TableData { table_id: TableId, - table_name: Arc>, + catalog_table: Arc>, /// The catalog ID of the namespace this table is being populated from. namespace_id: NamespaceId, @@ -94,7 +136,7 @@ impl TableData { /// for the first time. pub(super) fn new( table_id: TableId, - table_name: Arc>, + catalog_table: Arc>, namespace_id: NamespaceId, namespace_name: Arc>, partition_provider: Arc, @@ -102,7 +144,7 @@ impl TableData { ) -> Self { Self { table_id, - table_name, + catalog_table, namespace_id, namespace_name, partition_data: Default::default(), @@ -133,9 +175,9 @@ impl TableData { self.table_id } - /// Returns the name of this table. - pub(crate) fn table_name(&self) -> &Arc> { - &self.table_name + /// Returns the catalog data for this table. + pub(crate) fn catalog_table(&self) -> &Arc> { + &self.catalog_table } /// Return the [`NamespaceId`] this table is a part of. @@ -167,7 +209,7 @@ where self.namespace_id, Arc::clone(&self.namespace_name), self.table_id, - Arc::clone(&self.table_name), + Arc::clone(&self.catalog_table), ) .await; // Add the partition to the map. @@ -268,7 +310,7 @@ mod tests { post_write::mock::MockPostWriteObserver, }, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, }, @@ -283,7 +325,7 @@ mod tests { let table = TableData::new( ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), partition_provider, diff --git a/ingester/src/buffer_tree/table/name_resolver.rs b/ingester/src/buffer_tree/table/metadata_resolver.rs similarity index 67% rename from ingester/src/buffer_tree/table/name_resolver.rs rename to ingester/src/buffer_tree/table/metadata_resolver.rs index 37bc1ce605..d7dc7a34db 100644 --- a/ingester/src/buffer_tree/table/name_resolver.rs +++ b/ingester/src/buffer_tree/table/metadata_resolver.rs @@ -4,24 +4,24 @@ use backoff::{Backoff, BackoffConfig}; use data_types::TableId; use iox_catalog::interface::Catalog; -use super::TableName; +use super::TableMetadata; use crate::deferred_load::DeferredLoad; /// An abstract provider of a [`DeferredLoad`] configured to fetch the -/// [`TableName`] of the specified [`TableId`]. -pub(crate) trait TableNameProvider: Send + Sync + std::fmt::Debug { - fn for_table(&self, id: TableId) -> DeferredLoad; +/// catalog [`TableMetadata`] of the specified [`TableId`]. +pub(crate) trait TableProvider: Send + Sync + std::fmt::Debug { + fn for_table(&self, id: TableId) -> DeferredLoad; } #[derive(Debug)] -pub(crate) struct TableNameResolver { +pub(crate) struct TableResolver { max_smear: Duration, catalog: Arc, backoff_config: BackoffConfig, metrics: Arc, } -impl TableNameResolver { +impl TableResolver { pub(crate) fn new( max_smear: Duration, catalog: Arc, @@ -36,16 +36,16 @@ impl TableNameResolver { } } - /// Fetch the [`TableName`] from the [`Catalog`] for specified + /// Fetch the [`TableMetadata`] from the [`Catalog`] for specified /// `table_id`, retrying endlessly when errors occur. pub(crate) async fn fetch( table_id: TableId, catalog: Arc, backoff_config: BackoffConfig, - ) -> TableName { + ) -> TableMetadata { Backoff::new(&backoff_config) - .retry_all_errors("fetch table name", || async { - let s = catalog + .retry_all_errors("fetch table", || async { + let table = catalog .repositories() .await .tables() @@ -54,18 +54,17 @@ impl TableNameResolver { .unwrap_or_else(|| { panic!("resolving table name for non-existent table id {table_id}") }) - .name .into(); - Result::<_, iox_catalog::interface::Error>::Ok(s) + Result::<_, iox_catalog::interface::Error>::Ok(table) }) .await .expect("retry forever") } } -impl TableNameProvider for TableNameResolver { - fn for_table(&self, id: TableId) -> DeferredLoad { +impl TableProvider for TableResolver { + fn for_table(&self, id: TableId) -> DeferredLoad { DeferredLoad::new( self.max_smear, Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()), @@ -79,28 +78,32 @@ pub(crate) mod mock { use super::*; #[derive(Debug)] - pub(crate) struct MockTableNameProvider { - name: TableName, + pub(crate) struct MockTableProvider { + table: TableMetadata, } - impl MockTableNameProvider { - pub(crate) fn new(name: impl Into) -> Self { - Self { name: name.into() } + impl MockTableProvider { + pub(crate) fn new(table: impl Into) -> Self { + Self { + table: table.into(), + } } } - impl Default for MockTableNameProvider { + impl Default for MockTableProvider { fn default() -> Self { - Self::new("bananas") + Self::new(TableMetadata::with_default_partition_template_for_testing( + "bananas".into(), + )) } } - impl TableNameProvider for MockTableNameProvider { - fn for_table(&self, _id: TableId) -> DeferredLoad { - let name = self.name.clone(); + impl TableProvider for MockTableProvider { + fn for_table(&self, _id: TableId) -> DeferredLoad { + let table = self.table.clone(); DeferredLoad::new( Duration::from_secs(1), - async { name }, + async { table }, &metric::Registry::default(), ) } @@ -129,7 +132,7 @@ mod tests { // Populate the catalog with the namespace / table let (_ns_id, table_id) = populate_catalog(&*catalog, NAMESPACE_NAME, TABLE_NAME).await; - let fetcher = Arc::new(TableNameResolver::new( + let fetcher = Arc::new(TableResolver::new( Duration::from_secs(10), Arc::clone(&catalog), backoff_config.clone(), @@ -141,6 +144,6 @@ mod tests { .get() .with_timeout_panic(Duration::from_secs(5)) .await; - assert_eq!(&**got, TABLE_NAME); + assert_eq!(got.name(), TABLE_NAME); } } diff --git a/ingester/src/init.rs b/ingester/src/init.rs index ff1a7fd6bd..9f84a448ba 100644 --- a/ingester/src/init.rs +++ b/ingester/src/init.rs @@ -29,7 +29,7 @@ use crate::{ partition::resolver::{ CatalogPartitionResolver, CoalescePartitionResolver, PartitionCache, PartitionProvider, }, - table::name_resolver::{TableNameProvider, TableNameResolver}, + table::metadata_resolver::{TableProvider, TableResolver}, BufferTree, }, dml_sink::{instrumentation::DmlSinkInstrumentation, tracing::DmlSinkTracing}, @@ -246,8 +246,8 @@ where Arc::clone(&metrics), )); - // Initialise the deferred table name resolver. - let table_name_provider: Arc = Arc::new(TableNameResolver::new( + // Initialise the deferred table metadata resolver. + let table_provider: Arc = Arc::new(TableResolver::new( persist_background_fetch_time, Arc::clone(&catalog), BackoffConfig::default(), @@ -319,7 +319,7 @@ where let buffer = Arc::new(BufferTree::new( namespace_name_provider, - table_name_provider, + table_provider, partition_provider, Arc::new(hot_partition_persister), Arc::clone(&metrics), diff --git a/ingester/src/persist/context.rs b/ingester/src/persist/context.rs index 0e5fc55a67..c441d38a5e 100644 --- a/ingester/src/persist/context.rs +++ b/ingester/src/persist/context.rs @@ -18,7 +18,7 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{persisting::PersistingData, PartitionData, SortKeyState}, - table::TableName, + table::TableMetadata, }, deferred_load::DeferredLoad, persist::completion_observer::CompletedPersist, @@ -94,14 +94,14 @@ pub(super) struct Context { // The partition key for this partition partition_key: PartitionKey, - /// Deferred strings needed for persistence. + /// Deferred data needed for persistence. /// /// These [`DeferredLoad`] are given a pre-fetch hint when this [`Context`] /// is constructed to load them in the background (if not already resolved) /// in order to avoid incurring the query latency when the values are /// needed. namespace_name: Arc>, - table_name: Arc>, + table: Arc>, /// The [`SortKey`] for the [`PartitionData`] at the time of [`Context`] /// construction. @@ -164,7 +164,7 @@ impl Context { partition_hash_id: guard.partition_hash_id().cloned(), partition_key: guard.partition_key().clone(), namespace_name: Arc::clone(guard.namespace_name()), - table_name: Arc::clone(guard.table_name()), + table: Arc::clone(guard.table()), // Technically the sort key isn't immutable, but MUST NOT be // changed by an external actor (by something other than code in @@ -182,7 +182,7 @@ impl Context { // Pre-fetch the deferred values in a background thread (if not already // resolved) s.namespace_name.prefetch_now(); - s.table_name.prefetch_now(); + s.table.prefetch_now(); if let SortKeyState::Deferred(ref d) = s.sort_key { d.prefetch_now(); } @@ -253,7 +253,7 @@ impl Context { namespace_id = %self.namespace_id, namespace_name = %self.namespace_name, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, total_persist_duration = ?now.duration_since(self.enqueued_at), @@ -315,7 +315,7 @@ impl Context { self.namespace_name.as_ref() } - pub(super) fn table_name(&self) -> &DeferredLoad { - self.table_name.as_ref() + pub(super) fn table(&self) -> &DeferredLoad { + self.table.as_ref() } } diff --git a/ingester/src/persist/handle.rs b/ingester/src/persist/handle.rs index e823767ef2..a6fdefcf94 100644 --- a/ingester/src/persist/handle.rs +++ b/ingester/src/persist/handle.rs @@ -501,7 +501,7 @@ mod tests { test_util::{ make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, - ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER, }, }; @@ -510,7 +510,7 @@ mod tests { async fn new_partition(sort_key: SortKeyState) -> Arc> { let buffer_tree = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), Arc::new( MockPartitionProvider::default().with_partition( PartitionDataBuilder::new() diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index 44311a2e9e..3dc23b8cd5 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -48,7 +48,7 @@ mod tests { test_util::{ make_write_op, populate_catalog, ARBITRARY_NAMESPACE_NAME, ARBITRARY_NAMESPACE_NAME_PROVIDER, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_NAME, - ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_PROVIDER, }, }; @@ -67,7 +67,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::clone(&*ARBITRARY_NAMESPACE_NAME_PROVIDER), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), Arc::new(CatalogPartitionResolver::new(Arc::clone(&catalog))), Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index e9911297c2..857265e92f 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -202,7 +202,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), ?sort_key, @@ -218,7 +218,7 @@ where compact_persisting_batch( &worker_state.exec, sort_key, - ctx.table_name().get().await, + ctx.table().get().await.name().clone(), ctx.data().query_adaptor(), ) .await @@ -249,7 +249,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, @@ -265,7 +265,7 @@ where namespace_id: ctx.namespace_id(), namespace_name: Arc::clone(&*ctx.namespace_name().get().await), table_id: ctx.table_id(), - table_name: Arc::clone(&*ctx.table_name().get().await), + table_name: Arc::clone(ctx.table().get().await.name()), partition_key: ctx.partition_key().clone(), compaction_level: CompactionLevel::Initial, sort_key: Some(data_sort_key), @@ -291,7 +291,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, @@ -358,7 +358,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), ?new_sort_key, @@ -394,7 +394,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), expected=?old_sort_key, @@ -420,7 +420,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), expected=?old_sort_key, @@ -460,7 +460,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), ?old_sort_key, @@ -488,7 +488,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, @@ -512,7 +512,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 2fe3b5b1c4..a23cb1eacf 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -15,8 +15,8 @@ use crate::{ }, partition::{PartitionData, SortKeyState}, table::{ - name_resolver::{mock::MockTableNameProvider, TableNameProvider}, - TableName, + metadata_resolver::{mock::MockTableProvider, TableProvider}, + TableMetadata, TableName, }, }, deferred_load::DeferredLoad, @@ -44,10 +44,12 @@ pub(crate) fn defer_namespace_name_1_ms() -> Arc> { )) } -pub(crate) fn defer_table_name_1_sec() -> Arc> { +pub(crate) fn defer_table_metadata_1_sec() -> Arc> { Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { ARBITRARY_TABLE_NAME.clone() }, + async { + TableMetadata::with_default_partition_template_for_testing(ARBITRARY_TABLE_NAME.clone()) + }, &metric::Registry::default(), )) } @@ -60,8 +62,11 @@ lazy_static! { pub(crate) static ref ARBITRARY_NAMESPACE_NAME_PROVIDER: Arc = Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)); pub(crate) static ref ARBITRARY_TABLE_NAME: TableName = TableName::from("bananas"); - pub(crate) static ref ARBITRARY_TABLE_NAME_PROVIDER: Arc = - Arc::new(MockTableNameProvider::new(&**ARBITRARY_TABLE_NAME)); + pub(crate) static ref ARBITRARY_TABLE_PROVIDER: Arc = Arc::new( + MockTableProvider::new(TableMetadata::with_default_partition_template_for_testing( + ARBITRARY_TABLE_NAME.clone() + )) + ); } /// Build a [`PartitionData`] with mostly arbitrary-yet-valid values for tests. @@ -71,7 +76,7 @@ pub(crate) struct PartitionDataBuilder { partition_key: Option, namespace_id: Option, table_id: Option, - table_name_loader: Option>>, + table_loader: Option>>, namespace_loader: Option>>, sort_key: Option, } @@ -101,11 +106,11 @@ impl PartitionDataBuilder { self } - pub(crate) fn with_table_name_loader( + pub(crate) fn with_table_loader( mut self, - table_name_loader: Arc>, + table_loader: Arc>, ) -> Self { - self.table_name_loader = Some(table_name_loader); + self.table_loader = Some(table_loader); self } @@ -134,8 +139,7 @@ impl PartitionDataBuilder { self.namespace_loader .unwrap_or_else(defer_namespace_name_1_sec), self.table_id.unwrap_or(ARBITRARY_TABLE_ID), - self.table_name_loader - .unwrap_or_else(defer_table_name_1_sec), + self.table_loader.unwrap_or_else(defer_table_metadata_1_sec), self.sort_key.unwrap_or(SortKeyState::Provided(None)), ) } From 8ebf390d9c6bccf9bd9b534250e621623780709e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 30 Jun 2023 14:05:17 -0400 Subject: [PATCH 05/14] feat: Try to prune ingester partitions by partition key This is hacktastic. --- Cargo.lock | 1 + ingester/Cargo.toml | 1 + ingester/src/buffer_tree/table.rs | 110 ++++++++++++++++++++++++++++-- ingester/src/query_adaptor.rs | 18 +++-- iox_query/src/pruning.rs | 39 +++++++++++ 5 files changed, 159 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 922aac4734..cbd6afd06f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2628,6 +2628,7 @@ dependencies = [ "parquet_file", "paste", "pin-project", + "predicate", "prost", "rand", "schema", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index e36ac29330..91c168deaf 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -32,6 +32,7 @@ once_cell = "1.18" parking_lot = "0.12.1" parquet_file = { version = "0.1.0", path = "../parquet_file" } pin-project = "1.1.2" +predicate = { version = "0.1.0", path = "../predicate" } prost = { version = "0.11.9", default-features = false, features = ["std"] } rand = "0.8.5" schema = { version = "0.1.0", path = "../schema" } diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 1953a8dd44..b050a13449 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -2,12 +2,18 @@ pub(crate) mod metadata_resolver; -use std::{fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; use data_types::{ - partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionKey, SequenceNumber, - Table, TableId, + partition_template::{build_column_values, ColumnValue, TablePartitionTemplateOverride}, + NamespaceId, PartitionKey, SequenceNumber, Table, TableId, +}; +use datafusion::scalar::ScalarValue; +use iox_query::{ + chunk_statistics::{create_chunk_statistics, ColumnRange}, + pruning::keep_after_pruning, + QueryChunk, }; use mutable_batch::MutableBatch; use parking_lot::Mutex; @@ -247,7 +253,7 @@ where table_id: TableId, columns: Vec, span: Option, - _predicate: Option, + predicate: Option, ) -> Result { assert_eq!(self.table_id, table_id, "buffer tree index inconsistency"); assert_eq!( @@ -255,26 +261,118 @@ where "buffer tree index inconsistency" ); + let table_partition_template = self.catalog_table.get().await.partition_template; + // Gather the partition data from all of the partitions in this table. let span = SpanRecorder::new(span); let partitions = self.partitions().into_iter().map(move |p| { let mut span = span.child("partition read"); - let (id, hash_id, completed_persistence_count, data) = { + let (id, hash_id, completed_persistence_count, data, partition_key) = { let mut p = p.lock(); ( p.partition_id(), p.partition_hash_id().cloned(), p.completed_persistence_count(), p.get_query_data(), + p.partition_key().clone(), ) }; let ret = match data { Some(data) => { - // TODO(savage): Apply predicate here through the projection? assert_eq!(id, data.partition_id()); + let data = Arc::new(data); + if let Some(predicate) = &predicate { + // Filter using the partition key + let column_ranges = Arc::new( + build_column_values(&table_partition_template, partition_key.inner()) + .filter_map(|(col, val)| { + let range = match val { + ColumnValue::Identity(s) => { + let s = Arc::new(ScalarValue::from(s.as_ref())); + ColumnRange { + min_value: Arc::clone(&s), + max_value: s, + } + } + ColumnValue::Prefix(p) => { + if p.is_empty() { + // full range => value is useless + return None; + } + + // If the partition only has a prefix of the tag value (it was truncated) then form a conservative + // range: + // + // + // # Minimum + // Use the prefix itself. + // + // Note that the minimum is inclusive. + // + // All values in the partition are either: + // - identical to the prefix, in which case they are included by the inclusive minimum + // - have the form `""`, and it holds that `"" > ""` for all + // strings `""`. + // + // + // # Maximum + // Use `""`. + // + // Note that the maximum is inclusive. + // + // All strings in this partition must be smaller than this constructed maximum, because + // string comparison is front-to-back and the `"" > ""`. + + let min_value = Arc::new(ScalarValue::from(p.as_ref())); + + let mut chars = p.as_ref().chars().collect::>(); + *chars + .last_mut() + .expect("checked that prefix is not empty") = + std::char::MAX; + let max_value = Arc::new(ScalarValue::from( + chars.into_iter().collect::().as_str(), + )); + + ColumnRange { + min_value, + max_value, + } + } + }; + + Some((Arc::from(col), range)) + }) + .collect::>(), + ); + + let chunk_statistics = Arc::new(create_chunk_statistics( + data.num_rows(), + data.schema(), + data.ts_min_max(), + &column_ranges, + )); + + if !keep_after_pruning( + data.schema(), + chunk_statistics, + data.schema().as_arrow(), + predicate, + ) + .expect("TODO FIX THIS") + { + return PartitionResponse::new( + vec![], + id, + hash_id, + completed_persistence_count, + ); + } + } + // Project the data if necessary let columns = columns.iter().map(String::as_str).collect::>(); let selection = if columns.is_empty() { diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index 591fca003c..2a4b7a91e1 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -5,7 +5,7 @@ use std::{any::Any, sync::Arc}; use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; -use data_types::{ChunkId, ChunkOrder, PartitionId}; +use data_types::{ChunkId, ChunkOrder, PartitionId, TimestampMinMax}; use datafusion::physical_plan::Statistics; use iox_query::{ util::{compute_timenanosecond_min_max, create_basic_summary}, @@ -105,16 +105,26 @@ impl QueryAdaptor { pub(crate) fn partition_id(&self) -> PartitionId { self.partition_id } + + /// Number of rows, useful for building stats + pub(crate) fn num_rows(&self) -> u64 { + self.data.iter().map(|b| b.num_rows()).sum::() as u64 + } + + /// Time range, useful for building stats + pub(crate) fn ts_min_max(&self) -> TimestampMinMax { + compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref())) + .expect("Should have time range") + } } impl QueryChunk for QueryAdaptor { fn stats(&self) -> Arc { Arc::clone(self.stats.get_or_init(|| { - let ts_min_max = compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref())) - .expect("Should have time range"); + let ts_min_max = self.ts_min_max(); Arc::new(create_basic_summary( - self.data.iter().map(|b| b.num_rows()).sum::() as u64, + self.num_rows(), self.schema(), ts_min_max, )) diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index d75195d0a0..6db84f385a 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -86,6 +86,45 @@ pub fn prune_chunks( prune_summaries(table_schema, &summaries, predicate) } +/// Check one set of statistics against the predicate +pub fn keep_after_pruning( + table_schema: &Schema, + chunk_statistics: Arc, + chunk_schema: SchemaRef, + predicate: &Predicate, +) -> Result { + let filter_expr = match predicate.filter_expr() { + Some(expr) => expr, + None => { + return Ok(true); + } + }; + trace!(%filter_expr, "Filter_expr of pruning chunk"); + + let props = ExecutionProps::new(); + let pruning_predicate = + match create_pruning_predicate(&props, &filter_expr, &table_schema.as_arrow()) { + Ok(p) => p, + Err(e) => { + warn!(%e, ?filter_expr, "Cannot create pruning predicate"); + return Err(NotPrunedReason::CanNotCreatePruningPredicate); + } + }; + + let statistics = ChunkPruningStatistics { + table_schema, + summaries: &[(chunk_statistics, chunk_schema)], + }; + + let result = match pruning_predicate.prune(&statistics) { + Ok(results) => results.into_iter().next().expect("TODO FIX THIS"), + Err(e) => { + warn!(%e, ?filter_expr, "DataFusion pruning failed"); + return Err(NotPrunedReason::DataFusionPruningFailed); + } + }; + Ok(result) +} /// Given a `Vec` of pruning summaries, return a `Vec` where `false` indicates that the /// predicate can be proven to evaluate to `false` for every single row. pub fn prune_summaries( From cd28bf03377b55d7a4e755f9c62b4fc072b218b9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 29 Jun 2023 15:53:11 -0400 Subject: [PATCH 06/14] test: Query an ingester with a predicate that should prune partitions --- .../tests/end_to_end_cases/ingester.rs | 122 ++++++++++++++++++ test_helpers_end_to_end/src/mini_cluster.rs | 3 + 2 files changed, 125 insertions(+) diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index 437d233f78..7c05f35b6a 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -1,8 +1,10 @@ use arrow_flight::{error::FlightError, Ticket}; use arrow_util::assert_batches_sorted_eq; use data_types::{NamespaceId, TableId}; +use datafusion::prelude::{col, lit}; use futures::FutureExt; use http::StatusCode; +use influxdb_iox_client::table::generated_types::{Part, PartitionTemplate, TemplatePart}; use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest}; use prost::Message; use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState}; @@ -170,6 +172,126 @@ async fn ingester_flight_api() { assert_ne!(ingester_response.app_metadata.ingester_uuid, ingester_uuid); } +#[tokio::test] +async fn ingester_partition_pruning() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + // Set up cluster + let mut cluster = MiniCluster::create_shared_never_persist(database_url).await; + + let mut steps: Vec<_> = vec![Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let namespace_name = state.cluster().namespace(); + + let mut namespace_client = influxdb_iox_client::namespace::Client::new( + state.cluster().router().router_grpc_connection(), + ); + namespace_client + .create_namespace( + namespace_name, + None, + None, + Some(PartitionTemplate { + parts: vec![ + TemplatePart { + part: Some(Part::TagValue("tag1".into())), + }, + TemplatePart { + part: Some(Part::TagValue("tag3".into())), + }, + ], + }), + ) + .await + .unwrap(); + + let mut table_client = influxdb_iox_client::table::Client::new( + state.cluster().router().router_grpc_connection(), + ); + + // table1: create implicitly by writing to it + + // table2: do not override partition template => use namespace template + table_client + .create_table(namespace_name, "table2", None) + .await + .unwrap(); + + // table3: overide namespace template + table_client + .create_table( + namespace_name, + "table3", + Some(PartitionTemplate { + parts: vec![TemplatePart { + part: Some(Part::TagValue("tag2".into())), + }], + }), + ) + .await + .unwrap(); + } + .boxed() + }))] + .into_iter() + .chain((1..=3).flat_map(|tid| { + [Step::WriteLineProtocol( + [ + format!("table{tid},tag1=v1a,tag2=v2a,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2a,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1a,tag2=v2b,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2b,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1a,tag2=v2a,tag3=v3b f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2a,tag3=v3b f=1 11"), + format!("table{tid},tag1=v1a,tag2=v2b,tag3=v3b f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2b,tag3=v3b f=1 11"), + ] + .join("\n"), + )] + .into_iter() + })) + .collect(); + + steps.push(Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let predicate = ::predicate::Predicate::new().with_expr(col("tag1").eq(lit("v1a"))); + + let query = IngesterQueryRequest::new( + state.cluster().namespace_id().await, + state.cluster().table_id("table1").await, + vec![], + Some(predicate), + ); + + let query: proto::IngesterQueryRequest = query.try_into().unwrap(); + let ingester_response = state + .cluster() + .query_ingester( + query.clone(), + state.cluster().ingester().ingester_grpc_connection(), + ) + .await + .unwrap(); + + let expected = [ + "+-----+------+------+------+--------------------------------+", + "| f | tag1 | tag2 | tag3 | time |", + "+-----+------+------+------+--------------------------------+", + "| 1.0 | v1a | v2a | v3a | 1970-01-01T00:00:00.000000011Z |", + "| 1.0 | v1a | v2a | v3b | 1970-01-01T00:00:00.000000011Z |", + "| 1.0 | v1a | v2b | v3a | 1970-01-01T00:00:00.000000011Z |", + "| 1.0 | v1a | v2b | v3b | 1970-01-01T00:00:00.000000011Z |", + "+-----+------+------+------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + } + .boxed() + }))); + + StepTest::new(&mut cluster, steps).run().await +} + #[tokio::test] async fn ingester_flight_api_namespace_not_found() { test_helpers::maybe_start_logging(); diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index dfa19f4c7a..14a8179ef2 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -490,6 +490,9 @@ impl MiniCluster { let mut record_batches = vec![]; while let Some((msg, _md)) = next_message(&mut performed_query).await { + if matches!(msg, DecodedPayload::None | DecodedPayload::Schema(_)) { + continue; + } let batch = unwrap_record_batch(msg); record_batches.push(batch); } From 0bcf85d48c05dab5b01e869c369bb657ad82284a Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 3 Jul 2023 11:38:14 +0200 Subject: [PATCH 07/14] refactor: de-dup code --- ingester/src/buffer_tree/table.rs | 13 +++++++---- iox_query/src/pruning.rs | 39 ------------------------------- 2 files changed, 8 insertions(+), 44 deletions(-) diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index b050a13449..9e3cdabd01 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -12,7 +12,7 @@ use data_types::{ use datafusion::scalar::ScalarValue; use iox_query::{ chunk_statistics::{create_chunk_statistics, ColumnRange}, - pruning::keep_after_pruning, + pruning::prune_summaries, QueryChunk, }; use mutable_batch::MutableBatch; @@ -356,14 +356,17 @@ where &column_ranges, )); - if !keep_after_pruning( + let keep_after_pruning = prune_summaries( data.schema(), - chunk_statistics, - data.schema().as_arrow(), + &[(chunk_statistics, data.schema().as_arrow())], predicate, ) .expect("TODO FIX THIS") - { + .into_iter() + .next() + .expect("one chunk in, one chunk out"); + + if !keep_after_pruning { return PartitionResponse::new( vec![], id, diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index 6db84f385a..d75195d0a0 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -86,45 +86,6 @@ pub fn prune_chunks( prune_summaries(table_schema, &summaries, predicate) } -/// Check one set of statistics against the predicate -pub fn keep_after_pruning( - table_schema: &Schema, - chunk_statistics: Arc, - chunk_schema: SchemaRef, - predicate: &Predicate, -) -> Result { - let filter_expr = match predicate.filter_expr() { - Some(expr) => expr, - None => { - return Ok(true); - } - }; - trace!(%filter_expr, "Filter_expr of pruning chunk"); - - let props = ExecutionProps::new(); - let pruning_predicate = - match create_pruning_predicate(&props, &filter_expr, &table_schema.as_arrow()) { - Ok(p) => p, - Err(e) => { - warn!(%e, ?filter_expr, "Cannot create pruning predicate"); - return Err(NotPrunedReason::CanNotCreatePruningPredicate); - } - }; - - let statistics = ChunkPruningStatistics { - table_schema, - summaries: &[(chunk_statistics, chunk_schema)], - }; - - let result = match pruning_predicate.prune(&statistics) { - Ok(results) => results.into_iter().next().expect("TODO FIX THIS"), - Err(e) => { - warn!(%e, ?filter_expr, "DataFusion pruning failed"); - return Err(NotPrunedReason::DataFusionPruningFailed); - } - }; - Ok(result) -} /// Given a `Vec` of pruning summaries, return a `Vec` where `false` indicates that the /// predicate can be proven to evaluate to `false` for every single row. pub fn prune_summaries( From e9b456df1f955fd5a735a27a68bf055f71af2907 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 3 Jul 2023 11:43:55 +0200 Subject: [PATCH 08/14] fix: do not panic for pruning errors --- ingester/src/buffer_tree/table.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 9e3cdabd01..e70500503b 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -361,10 +361,16 @@ where &[(chunk_statistics, data.schema().as_arrow())], predicate, ) - .expect("TODO FIX THIS") - .into_iter() - .next() - .expect("one chunk in, one chunk out"); + // Errors are logged by `iox_query` and sometimes fine, e.g. for not implemented DataFusion + // features or upstream bugs. The querier uses the same strategy. Pruning is a mere + // optimization and should not lead to crashes. + .ok() + .map(|vals| { + vals.into_iter() + .next() + .expect("one chunk in, one chunk out") + }) + .unwrap_or(true); if !keep_after_pruning { return PartitionResponse::new( From b1a4e3955e20338e96fdb56c528843d39e3f897d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 3 Jul 2023 11:49:04 +0200 Subject: [PATCH 09/14] test: `ingester_partition_pruning` must perform type coercion --- influxdb_iox/tests/end_to_end_cases/ingester.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index 7c05f35b6a..37d37f33e3 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -1,7 +1,11 @@ +use arrow::datatypes::DataType; use arrow_flight::{error::FlightError, Ticket}; use arrow_util::assert_batches_sorted_eq; use data_types::{NamespaceId, TableId}; -use datafusion::prelude::{col, lit}; +use datafusion::{ + prelude::{col, lit}, + scalar::ScalarValue, +}; use futures::FutureExt; use http::StatusCode; use influxdb_iox_client::table::generated_types::{Part, PartitionTemplate, TemplatePart}; @@ -255,7 +259,15 @@ async fn ingester_partition_pruning() { steps.push(Step::Custom(Box::new(move |state: &mut StepTestState| { async move { - let predicate = ::predicate::Predicate::new().with_expr(col("tag1").eq(lit("v1a"))); + // Note: The querier will perform correct type coercion. We must simulate this here, otherwise the ingester + // will NOT be able to prune the data because the predicate evaluation will fail with a type error + // and the predicate will be ignored. + let predicate = ::predicate::Predicate::new().with_expr(col("tag1").eq(lit( + ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::from("v1a")), + ), + ))); let query = IngesterQueryRequest::new( state.cluster().namespace_id().await, From 70b44f78eed52c647b6ffe43aefdce332f3726e3 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 3 Jul 2023 12:15:43 +0200 Subject: [PATCH 10/14] test: correctly decode ingester reponses in end2end tests --- influxdb_iox/tests/end_to_end_cases/cli.rs | 9 ++- .../tests/end_to_end_cases/ingester.rs | 57 ++++++++++++--- .../querier/multi_ingester.rs | 11 ++- test_helpers_end_to_end/src/mini_cluster.rs | 71 ++++++++++--------- 4 files changed, 102 insertions(+), 46 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 732e4d4ffa..1b8efb84ef 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -1323,10 +1323,15 @@ async fn assert_ingester_contains_results( .await .unwrap(); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid; + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("at least one ingester partition"); + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid; assert!(!ingester_uuid.is_empty()); - assert_batches_sorted_eq!(expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(expected, &ingester_partition.record_batches); } #[tokio::test] diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index 37d37f33e3..446495263d 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -45,7 +45,14 @@ async fn persist_on_demand() { .await .unwrap(); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid; + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid; assert!(!ingester_uuid.is_empty()); let expected = [ @@ -55,7 +62,7 @@ async fn persist_on_demand() { "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", "+------+------+--------------------------------+-----+", ]; - assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches); } .boxed() })), @@ -83,8 +90,15 @@ async fn persist_on_demand() { .await .unwrap(); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + let num_files_persisted = - ingester_response.app_metadata.completed_persistence_count; + ingester_partition.app_metadata.completed_persistence_count; assert_eq!(num_files_persisted, 1); } .boxed() @@ -127,11 +141,17 @@ async fn ingester_flight_api() { .query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection()) .await .unwrap(); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid.clone(); + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid.clone(); assert!(!ingester_uuid.is_empty()); - let schema = ingester_response.schema.unwrap(); + let schema = ingester_partition.schema.unwrap(); let expected = [ "+------+------+--------------------------------+-----+", @@ -141,11 +161,11 @@ async fn ingester_flight_api() { "| B | A | 1970-01-01T00:00:00.001234567Z | 84 |", "+------+------+--------------------------------+-----+", ]; - assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches); // Also ensure that the schema of the batches matches what is // reported by the performed_query. - ingester_response + ingester_partition .record_batches .iter() .enumerate() @@ -158,7 +178,13 @@ async fn ingester_flight_api() { .query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection()) .await .unwrap(); - assert_eq!(ingester_response.app_metadata.ingester_uuid, ingester_uuid); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + assert_eq!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid); // Restart the ingesters cluster.restart_ingesters().await; @@ -173,7 +199,13 @@ async fn ingester_flight_api() { .query_ingester(query, cluster.ingester().ingester_grpc_connection()) .await .unwrap(); - assert_ne!(ingester_response.app_metadata.ingester_uuid, ingester_uuid); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + assert_ne!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid); } #[tokio::test] @@ -296,7 +328,12 @@ async fn ingester_partition_pruning() { "| 1.0 | v1a | v2b | v3b | 1970-01-01T00:00:00.000000011Z |", "+-----+------+------+------+--------------------------------+", ]; - assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + let record_batches = ingester_response + .partitions + .into_iter() + .flat_map(|p| p.record_batches) + .collect::>(); + assert_batches_sorted_eq!(&expected, &record_batches); } .boxed() }))); diff --git a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs index 0669cd22e7..e081f878f9 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs @@ -193,7 +193,14 @@ async fn write_replication() { .await .unwrap(); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid; + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid; assert!(!ingester_uuid.is_empty()); let expected = [ @@ -212,7 +219,7 @@ async fn write_replication() { "| A | B | 1970-01-01T00:00:00.000000020Z | 20 |", "+------+------+--------------------------------+-----+", ]; - assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches); } .boxed() }))); diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index 14a8179ef2..fa7ee4a7a1 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -481,27 +481,43 @@ impl MiniCluster { .await? .into_inner(); - let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap(); - assert!(matches!(msg, DecodedPayload::None), "{msg:?}"); - - let schema = next_message(&mut performed_query) - .await - .map(|(msg, _)| unwrap_schema(msg)); - - let mut record_batches = vec![]; - while let Some((msg, _md)) = next_message(&mut performed_query).await { - if matches!(msg, DecodedPayload::None | DecodedPayload::Schema(_)) { - continue; + let mut partitions = vec![]; + let mut current_partition = None; + while let Some((msg, app_metadata)) = next_message(&mut performed_query).await { + match msg { + DecodedPayload::None => { + if let Some(p) = std::mem::take(&mut current_partition) { + partitions.push(p); + } + current_partition = Some(IngesterResponsePartition { + app_metadata, + schema: None, + record_batches: vec![], + }); + } + DecodedPayload::Schema(schema) => { + let current_partition = + current_partition.as_mut().expect("schema w/o partition"); + assert!( + current_partition.schema.is_none(), + "got two schemas for a single partition" + ); + current_partition.schema = Some(schema); + } + DecodedPayload::RecordBatch(batch) => { + let current_partition = + current_partition.as_mut().expect("batch w/o partition"); + assert!(current_partition.schema.is_some(), "batch w/o schema"); + current_partition.record_batches.push(batch); + } } - let batch = unwrap_record_batch(msg); - record_batches.push(batch); } - Ok(IngesterResponse { - app_metadata, - schema, - record_batches, - }) + if let Some(p) = current_partition { + partitions.push(p); + } + + Ok(IngesterResponse { partitions }) } /// Ask all of the ingesters to persist their data. @@ -592,6 +608,11 @@ impl MiniCluster { /// Gathers data from ingester Flight queries #[derive(Debug)] pub struct IngesterResponse { + pub partitions: Vec, +} + +#[derive(Debug)] +pub struct IngesterResponsePartition { pub app_metadata: IngesterQueryResponseMetadata, pub schema: Option, pub record_batches: Vec, @@ -711,17 +732,3 @@ async fn next_message( Some((payload, app_metadata)) } - -fn unwrap_schema(msg: DecodedPayload) -> SchemaRef { - match msg { - DecodedPayload::Schema(s) => s, - _ => panic!("Unexpected message type: {msg:?}"), - } -} - -fn unwrap_record_batch(msg: DecodedPayload) -> RecordBatch { - match msg { - DecodedPayload::RecordBatch(b) => b, - _ => panic!("Unexpected message type: {msg:?}"), - } -} From 171b2a14c795f158bb258e29eeb712cbfc12aa13 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 3 Jul 2023 12:22:13 +0200 Subject: [PATCH 11/14] fix: doc link --- ingester/src/buffer_tree/namespace.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index 058443141c..e521e1a735 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -63,6 +63,9 @@ pub(crate) struct NamespaceData { /// /// The [`TableProvider`] acts as a [`DeferredLoad`] constructor to /// resolve the catalog [`Table`] for new [`TableData`] out of the hot path. + /// + /// + /// [`Table`]: data_types::Table tables: ArcMap>, catalog_table_resolver: Arc, /// The count of tables initialised in this Ingester so far, across all From 36ed914689527ddcdc93fe5d20e372c637bb3b4f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 3 Jul 2023 12:34:25 +0200 Subject: [PATCH 12/14] test: type coercion in ingester tests --- ingester/src/buffer_tree/root.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 105b90d37f..5c44d7adea 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -231,16 +231,19 @@ where mod tests { use std::{sync::Arc, time::Duration}; + use arrow::datatypes::DataType; use assert_matches::assert_matches; use data_types::{PartitionId, PartitionKey}; use datafusion::{ assert_batches_eq, assert_batches_sorted_eq, prelude::{col, lit}, + scalar::ScalarValue, }; use futures::StreamExt; use lazy_static::lazy_static; use metric::{Attributes, Metric}; use predicate::Predicate; + use test_helpers::maybe_start_logging; use super::*; use crate::{ @@ -355,6 +358,8 @@ mod tests { paste::paste! { #[tokio::test] async fn []() { + maybe_start_logging(); + // Configure the mock partition provider with the provided // partitions. let partition_provider = Arc::new(MockPartitionProvider::default() @@ -705,7 +710,14 @@ mod tests { None, ) ], - predicate = Some(Predicate::new().with_expr(col("region").eq(lit(PARTITION2_KEY.inner())))), + // NOTE: The querier will coerce the type of the predicates correctly, so the ingester does NOT need to perform + // type coercion. This type should reflect that. + predicate = Some(Predicate::new().with_expr(col("region").eq(lit( + ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::from(PARTITION2_KEY.inner())) + ) + )))), want = [ "+----------+------+-------------------------------+", "| region | temp | time |", From edf6686130cb24aeea381eec100396010ead55d8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 3 Jul 2023 16:53:55 +0200 Subject: [PATCH 13/14] fix(test): custom partitioning template pruning Configure the partition pruning test to use a partition template that partitions on the "region" field. This will allow it to be used for pruning at query time. --- .../buffer_tree/partition/resolver/catalog.rs | 7 +-- ingester/src/buffer_tree/root.rs | 47 +++++++++++-------- ingester/src/buffer_tree/table.rs | 8 +++- .../buffer_tree/table/metadata_resolver.rs | 3 +- ingester/src/test_util.rs | 20 +++++--- 5 files changed, 52 insertions(+), 33 deletions(-) diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index 191899eb24..9b80f8dac8 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -140,9 +140,10 @@ mod tests { Arc::new(DeferredLoad::new( Duration::from_secs(1), async { - TableMetadata::with_default_partition_template_for_testing(TableName::from( - TABLE_NAME, - )) + TableMetadata::new_for_testing( + TableName::from(TABLE_NAME), + Default::default(), + ) }, &metrics, )), diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 5c44d7adea..b66e7b1d4b 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -233,7 +233,10 @@ mod tests { use arrow::datatypes::DataType; use assert_matches::assert_matches; - use data_types::{PartitionId, PartitionKey}; + use data_types::{ + partition_template::{test_table_partition_override, TemplatePart}, + PartitionId, PartitionKey, + }; use datafusion::{ assert_batches_eq, assert_batches_sorted_eq, prelude::{col, lit}, @@ -251,7 +254,7 @@ mod tests { namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceData}, partition::resolver::mock::MockPartitionProvider, post_write::mock::MockPostWriteObserver, - table::TableMetadata, + table::{metadata_resolver::mock::MockTableProvider, TableMetadata}, }, deferred_load::{self, DeferredLoad}, query::partition_response::PartitionResponse, @@ -348,6 +351,7 @@ mod tests { macro_rules! test_write_query { ( $name:ident, + $(table_provider = $table_provider:expr,)? // An optional table provider partitions = [$($partition:expr), +], // The set of PartitionData for the mock // partition provider writes = [$($write:expr), *], // The set of WriteOperation to apply() @@ -368,10 +372,16 @@ mod tests { )+ ); + #[allow(unused_variables)] + let table_provider = Arc::clone(&*ARBITRARY_TABLE_PROVIDER); + $( + let table_provider: Arc = $table_provider; + )? + // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_PROVIDER), + table_provider, partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), @@ -650,14 +660,18 @@ mod tests { // contain the specified value in that partition key column. test_write_query!( filter_by_predicate_partition_key, + table_provider = Arc::new(MockTableProvider::new(TableMetadata::new_for_testing( + ARBITRARY_TABLE_NAME.clone(), + test_table_partition_override(vec![TemplatePart::TagValue("region")]) + ))), partitions = [ PartitionDataBuilder::new() .with_partition_id(ARBITRARY_PARTITION_ID) - .with_partition_key(ARBITRARY_PARTITION_KEY.clone()) + .with_partition_key(ARBITRARY_PARTITION_KEY.clone()) // "platanos" .build(), PartitionDataBuilder::new() .with_partition_id(PARTITION2_ID) - .with_partition_key(PARTITION2_KEY.clone()) + .with_partition_key(PARTITION2_KEY.clone()) // "p2" .build() ], writes = [ @@ -719,20 +733,12 @@ mod tests { ) )))), want = [ - "+----------+------+-------------------------------+", - "| region | temp | time |", - "+----------+------+-------------------------------+", - format!( - "| {} | 17.0 | 1970-01-01T00:00:07.676767676 |", - *PARTITION2_KEY - ) - .as_str(), - format!( - "| {} | 13.0 | 1970-01-01T00:00:07.676767676 |", - *PARTITION2_KEY - ) - .as_str(), - "+----------+------+-------------------------------+", + "+--------+------+-------------------------------+", + "| region | temp | time |", + "+--------+------+-------------------------------+", + "| p2 | 13.0 | 1970-01-01T00:00:07.676767676 |", + "| p2 | 17.0 | 1970-01-01T00:00:07.676767676 |", + "+--------+------+-------------------------------+", ] ); @@ -848,8 +854,9 @@ mod tests { .with_table_loader(Arc::new(DeferredLoad::new( Duration::from_secs(1), async move { - TableMetadata::with_default_partition_template_for_testing( + TableMetadata::new_for_testing( TABLE2_NAME.into(), + Default::default(), ) }, &metric::Registry::default(), diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index e70500503b..db1f70e9ba 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -42,10 +42,14 @@ pub(crate) struct TableMetadata { } impl TableMetadata { - pub fn with_default_partition_template_for_testing(name: TableName) -> Self { + #[cfg(test)] + pub fn new_for_testing( + name: TableName, + partition_template: TablePartitionTemplateOverride, + ) -> Self { Self { name, - partition_template: Default::default(), + partition_template, } } diff --git a/ingester/src/buffer_tree/table/metadata_resolver.rs b/ingester/src/buffer_tree/table/metadata_resolver.rs index d7dc7a34db..bd489aad17 100644 --- a/ingester/src/buffer_tree/table/metadata_resolver.rs +++ b/ingester/src/buffer_tree/table/metadata_resolver.rs @@ -92,8 +92,9 @@ pub(crate) mod mock { impl Default for MockTableProvider { fn default() -> Self { - Self::new(TableMetadata::with_default_partition_template_for_testing( + Self::new(TableMetadata::new_for_testing( "bananas".into(), + Default::default(), )) } } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index a23cb1eacf..c04726e8e0 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -1,6 +1,9 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId}; +use data_types::{ + partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey, + SequenceNumber, TableId, +}; use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace}; use lazy_static::lazy_static; use mutable_batch_lp::lines_to_batches; @@ -48,7 +51,10 @@ pub(crate) fn defer_table_metadata_1_sec() -> Arc> { Arc::new(DeferredLoad::new( Duration::from_secs(1), async { - TableMetadata::with_default_partition_template_for_testing(ARBITRARY_TABLE_NAME.clone()) + TableMetadata::new_for_testing( + ARBITRARY_TABLE_NAME.clone(), + TablePartitionTemplateOverride::default(), + ) }, &metric::Registry::default(), )) @@ -62,11 +68,11 @@ lazy_static! { pub(crate) static ref ARBITRARY_NAMESPACE_NAME_PROVIDER: Arc = Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)); pub(crate) static ref ARBITRARY_TABLE_NAME: TableName = TableName::from("bananas"); - pub(crate) static ref ARBITRARY_TABLE_PROVIDER: Arc = Arc::new( - MockTableProvider::new(TableMetadata::with_default_partition_template_for_testing( - ARBITRARY_TABLE_NAME.clone() - )) - ); + pub(crate) static ref ARBITRARY_TABLE_PROVIDER: Arc = + Arc::new(MockTableProvider::new(TableMetadata::new_for_testing( + ARBITRARY_TABLE_NAME.clone(), + TablePartitionTemplateOverride::default() + ))); } /// Build a [`PartitionData`] with mostly arbitrary-yet-valid values for tests. From 0297fe36516455bab9e45f9203bf3f59d259b5d8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 3 Jul 2023 17:23:59 +0200 Subject: [PATCH 14/14] refactor: less nesting in partition pruning logic Improve readability by pulling the partition pruning logic into it's own function and clean up some minor bits. --- ingester/src/buffer_tree/table.rs | 215 +++++++++++++++++------------- 1 file changed, 121 insertions(+), 94 deletions(-) diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index db1f70e9ba..6add3f7053 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -32,6 +32,7 @@ use crate::{ query::{ partition_response::PartitionResponse, response::PartitionStream, QueryError, QueryExec, }, + query_adaptor::QueryAdaptor, }; /// Metadata from the catalog for a table @@ -288,102 +289,28 @@ where assert_eq!(id, data.partition_id()); let data = Arc::new(data); - if let Some(predicate) = &predicate { - // Filter using the partition key - let column_ranges = Arc::new( - build_column_values(&table_partition_template, partition_key.inner()) - .filter_map(|(col, val)| { - let range = match val { - ColumnValue::Identity(s) => { - let s = Arc::new(ScalarValue::from(s.as_ref())); - ColumnRange { - min_value: Arc::clone(&s), - max_value: s, - } - } - ColumnValue::Prefix(p) => { - if p.is_empty() { - // full range => value is useless - return None; - } - // If the partition only has a prefix of the tag value (it was truncated) then form a conservative - // range: - // - // - // # Minimum - // Use the prefix itself. - // - // Note that the minimum is inclusive. - // - // All values in the partition are either: - // - identical to the prefix, in which case they are included by the inclusive minimum - // - have the form `""`, and it holds that `"" > ""` for all - // strings `""`. - // - // - // # Maximum - // Use `""`. - // - // Note that the maximum is inclusive. - // - // All strings in this partition must be smaller than this constructed maximum, because - // string comparison is front-to-back and the `"" > ""`. - - let min_value = Arc::new(ScalarValue::from(p.as_ref())); - - let mut chars = p.as_ref().chars().collect::>(); - *chars - .last_mut() - .expect("checked that prefix is not empty") = - std::char::MAX; - let max_value = Arc::new(ScalarValue::from( - chars.into_iter().collect::().as_str(), - )); - - ColumnRange { - min_value, - max_value, - } - } - }; - - Some((Arc::from(col), range)) - }) - .collect::>(), - ); - - let chunk_statistics = Arc::new(create_chunk_statistics( - data.num_rows(), - data.schema(), - data.ts_min_max(), - &column_ranges, - )); - - let keep_after_pruning = prune_summaries( - data.schema(), - &[(chunk_statistics, data.schema().as_arrow())], - predicate, - ) - // Errors are logged by `iox_query` and sometimes fine, e.g. for not implemented DataFusion - // features or upstream bugs. The querier uses the same strategy. Pruning is a mere - // optimization and should not lead to crashes. - .ok() - .map(|vals| { - vals.into_iter() - .next() - .expect("one chunk in, one chunk out") + // Potentially prune out this partition if the partition + // template & derived partition key can be used to match + // against the optional predicate. + if predicate + .as_ref() + .map(|p| { + !keep_after_pruning_partition_key( + &table_partition_template, + &partition_key, + p, + &data, + ) }) - .unwrap_or(true); - - if !keep_after_pruning { - return PartitionResponse::new( - vec![], - id, - hash_id, - completed_persistence_count, - ); - } + .unwrap_or_default() + { + return PartitionResponse::new( + vec![], + id, + hash_id, + completed_persistence_count, + ); } // Project the data if necessary @@ -408,6 +335,106 @@ where } } +/// Return true if `data` contains one or more rows matching `predicate`, +/// pruning based on the `partition_key` and `template`. +/// +/// Returns false iff it can be proven that all of data does not match the +/// predicate. +fn keep_after_pruning_partition_key( + table_partition_template: &TablePartitionTemplateOverride, + partition_key: &PartitionKey, + predicate: &Predicate, + data: &QueryAdaptor, +) -> bool { + // Construct a set of per-column min/max statistics based on the partition + // key values. + let column_ranges = Arc::new( + build_column_values(table_partition_template, partition_key.inner()) + .filter_map(|(col, val)| { + let range = match val { + ColumnValue::Identity(s) => { + let s = Arc::new(ScalarValue::from(s.as_ref())); + ColumnRange { + min_value: Arc::clone(&s), + max_value: s, + } + } + ColumnValue::Prefix(p) if p.is_empty() => return None, + ColumnValue::Prefix(p) => { + // If the partition only has a prefix of the tag value + // (it was truncated) then form a conservative range: + // + // # Minimum + // Use the prefix itself. + // + // Note that the minimum is inclusive. + // + // All values in the partition are either: + // + // - identical to the prefix, in which case they are + // included by the inclusive minimum + // + // - have the form `""`, and it holds that + // `"" > ""` for all strings + // `""`. + // + // # Maximum + // Use `""`. + // + // Note that the maximum is inclusive. + // + // All strings in this partition must be smaller than + // this constructed maximum, because string comparison + // is front-to-back and the + // `"" > + // ""`. + + let min_value = Arc::new(ScalarValue::from(p.as_ref())); + + let mut chars = p.as_ref().chars().collect::>(); + *chars.last_mut().expect("checked that prefix is not empty") = + std::char::MAX; + let max_value = Arc::new(ScalarValue::from( + chars.into_iter().collect::().as_str(), + )); + + ColumnRange { + min_value, + max_value, + } + } + }; + + Some((Arc::from(col), range)) + }) + .collect::>(), + ); + + let chunk_statistics = Arc::new(create_chunk_statistics( + data.num_rows(), + data.schema(), + data.ts_min_max(), + &column_ranges, + )); + + prune_summaries( + data.schema(), + &[(chunk_statistics, data.schema().as_arrow())], + predicate, + ) + // Errors are logged by `iox_query` and sometimes fine, e.g. for not + // implemented DataFusion features or upstream bugs. The querier uses the + // same strategy. Pruning is a mere optimization and should not lead to + // crashes or unreadable data. + .ok() + .map(|vals| { + vals.into_iter() + .next() + .expect("one chunk in, one chunk out") + }) + .unwrap_or(true) +} + #[cfg(test)] mod tests { use std::sync::Arc;