diff --git a/Cargo.lock b/Cargo.lock index 922aac4734..cbd6afd06f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2628,6 +2628,7 @@ dependencies = [ "parquet_file", "paste", "pin-project", + "predicate", "prost", "rand", "schema", diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 732e4d4ffa..1b8efb84ef 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -1323,10 +1323,15 @@ async fn assert_ingester_contains_results( .await .unwrap(); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid; + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("at least one ingester partition"); + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid; assert!(!ingester_uuid.is_empty()); - assert_batches_sorted_eq!(expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(expected, &ingester_partition.record_batches); } #[tokio::test] diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index 437d233f78..446495263d 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -1,8 +1,14 @@ +use arrow::datatypes::DataType; use arrow_flight::{error::FlightError, Ticket}; use arrow_util::assert_batches_sorted_eq; use data_types::{NamespaceId, TableId}; +use datafusion::{ + prelude::{col, lit}, + scalar::ScalarValue, +}; use futures::FutureExt; use http::StatusCode; +use influxdb_iox_client::table::generated_types::{Part, PartitionTemplate, TemplatePart}; use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest}; use prost::Message; use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState}; @@ -39,7 +45,14 @@ async fn persist_on_demand() { .await .unwrap(); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid; + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid; assert!(!ingester_uuid.is_empty()); let expected = [ @@ -49,7 +62,7 @@ async fn persist_on_demand() { "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", "+------+------+--------------------------------+-----+", ]; - assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches); } .boxed() })), @@ -77,8 +90,15 @@ async fn persist_on_demand() { .await .unwrap(); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + let num_files_persisted = - ingester_response.app_metadata.completed_persistence_count; + ingester_partition.app_metadata.completed_persistence_count; assert_eq!(num_files_persisted, 1); } .boxed() @@ -121,11 +141,17 @@ async fn ingester_flight_api() { .query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection()) .await .unwrap(); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid.clone(); + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid.clone(); assert!(!ingester_uuid.is_empty()); - let schema = ingester_response.schema.unwrap(); + let schema = ingester_partition.schema.unwrap(); let expected = [ "+------+------+--------------------------------+-----+", @@ -135,11 +161,11 @@ async fn ingester_flight_api() { "| B | A | 1970-01-01T00:00:00.001234567Z | 84 |", "+------+------+--------------------------------+-----+", ]; - assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches); // Also ensure that the schema of the batches matches what is // reported by the performed_query. - ingester_response + ingester_partition .record_batches .iter() .enumerate() @@ -152,7 +178,13 @@ async fn ingester_flight_api() { .query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection()) .await .unwrap(); - assert_eq!(ingester_response.app_metadata.ingester_uuid, ingester_uuid); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + assert_eq!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid); // Restart the ingesters cluster.restart_ingesters().await; @@ -167,7 +199,146 @@ async fn ingester_flight_api() { .query_ingester(query, cluster.ingester().ingester_grpc_connection()) .await .unwrap(); - assert_ne!(ingester_response.app_metadata.ingester_uuid, ingester_uuid); + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + assert_ne!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid); +} + +#[tokio::test] +async fn ingester_partition_pruning() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + // Set up cluster + let mut cluster = MiniCluster::create_shared_never_persist(database_url).await; + + let mut steps: Vec<_> = vec![Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let namespace_name = state.cluster().namespace(); + + let mut namespace_client = influxdb_iox_client::namespace::Client::new( + state.cluster().router().router_grpc_connection(), + ); + namespace_client + .create_namespace( + namespace_name, + None, + None, + Some(PartitionTemplate { + parts: vec![ + TemplatePart { + part: Some(Part::TagValue("tag1".into())), + }, + TemplatePart { + part: Some(Part::TagValue("tag3".into())), + }, + ], + }), + ) + .await + .unwrap(); + + let mut table_client = influxdb_iox_client::table::Client::new( + state.cluster().router().router_grpc_connection(), + ); + + // table1: create implicitly by writing to it + + // table2: do not override partition template => use namespace template + table_client + .create_table(namespace_name, "table2", None) + .await + .unwrap(); + + // table3: overide namespace template + table_client + .create_table( + namespace_name, + "table3", + Some(PartitionTemplate { + parts: vec![TemplatePart { + part: Some(Part::TagValue("tag2".into())), + }], + }), + ) + .await + .unwrap(); + } + .boxed() + }))] + .into_iter() + .chain((1..=3).flat_map(|tid| { + [Step::WriteLineProtocol( + [ + format!("table{tid},tag1=v1a,tag2=v2a,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2a,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1a,tag2=v2b,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2b,tag3=v3a f=1 11"), + format!("table{tid},tag1=v1a,tag2=v2a,tag3=v3b f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2a,tag3=v3b f=1 11"), + format!("table{tid},tag1=v1a,tag2=v2b,tag3=v3b f=1 11"), + format!("table{tid},tag1=v1b,tag2=v2b,tag3=v3b f=1 11"), + ] + .join("\n"), + )] + .into_iter() + })) + .collect(); + + steps.push(Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + // Note: The querier will perform correct type coercion. We must simulate this here, otherwise the ingester + // will NOT be able to prune the data because the predicate evaluation will fail with a type error + // and the predicate will be ignored. + let predicate = ::predicate::Predicate::new().with_expr(col("tag1").eq(lit( + ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::from("v1a")), + ), + ))); + + let query = IngesterQueryRequest::new( + state.cluster().namespace_id().await, + state.cluster().table_id("table1").await, + vec![], + Some(predicate), + ); + + let query: proto::IngesterQueryRequest = query.try_into().unwrap(); + let ingester_response = state + .cluster() + .query_ingester( + query.clone(), + state.cluster().ingester().ingester_grpc_connection(), + ) + .await + .unwrap(); + + let expected = [ + "+-----+------+------+------+--------------------------------+", + "| f | tag1 | tag2 | tag3 | time |", + "+-----+------+------+------+--------------------------------+", + "| 1.0 | v1a | v2a | v3a | 1970-01-01T00:00:00.000000011Z |", + "| 1.0 | v1a | v2a | v3b | 1970-01-01T00:00:00.000000011Z |", + "| 1.0 | v1a | v2b | v3a | 1970-01-01T00:00:00.000000011Z |", + "| 1.0 | v1a | v2b | v3b | 1970-01-01T00:00:00.000000011Z |", + "+-----+------+------+------+--------------------------------+", + ]; + let record_batches = ingester_response + .partitions + .into_iter() + .flat_map(|p| p.record_batches) + .collect::>(); + assert_batches_sorted_eq!(&expected, &record_batches); + } + .boxed() + }))); + + StepTest::new(&mut cluster, steps).run().await } #[tokio::test] diff --git a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs index 0669cd22e7..e081f878f9 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs @@ -193,7 +193,14 @@ async fn write_replication() { .await .unwrap(); - let ingester_uuid = ingester_response.app_metadata.ingester_uuid; + assert_eq!(ingester_response.partitions.len(), 1); + let ingester_partition = ingester_response + .partitions + .into_iter() + .next() + .expect("just checked len"); + + let ingester_uuid = ingester_partition.app_metadata.ingester_uuid; assert!(!ingester_uuid.is_empty()); let expected = [ @@ -212,7 +219,7 @@ async fn write_replication() { "| A | B | 1970-01-01T00:00:00.000000020Z | 20 |", "+------+------+--------------------------------+-----+", ]; - assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); + assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches); } .boxed() }))); diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index e36ac29330..91c168deaf 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -32,6 +32,7 @@ once_cell = "1.18" parking_lot = "0.12.1" parquet_file = { version = "0.1.0", path = "../parquet_file" } pin-project = "1.1.2" +predicate = { version = "0.1.0", path = "../predicate" } prost = { version = "0.11.9", default-features = false, features = ["std"] } rand = "0.8.5" schema = { version = "0.1.0", path = "../schema" } diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index aa599b3586..e521e1a735 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -7,12 +7,13 @@ use std::sync::Arc; use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use metric::U64Counter; +use predicate::Predicate; use trace::span::Span; use super::{ partition::resolver::PartitionProvider, post_write::PostWriteObserver, - table::{name_resolver::TableNameProvider, TableData}, + table::{metadata_resolver::TableProvider, TableData}, }; use crate::{ arcmap::ArcMap, @@ -60,12 +61,13 @@ pub(crate) struct NamespaceData { /// A set of tables this [`NamespaceData`] instance has processed /// [`IngestOp`]'s for. /// - /// The [`TableNameProvider`] acts as a [`DeferredLoad`] constructor to - /// resolve the [`TableName`] for new [`TableData`] out of the hot path. + /// The [`TableProvider`] acts as a [`DeferredLoad`] constructor to + /// resolve the catalog [`Table`] for new [`TableData`] out of the hot path. /// - /// [`TableName`]: crate::buffer_tree::table::TableName + /// + /// [`Table`]: data_types::Table tables: ArcMap>, - table_name_resolver: Arc, + catalog_table_resolver: Arc, /// The count of tables initialised in this Ingester so far, across all /// namespaces. table_count: U64Counter, @@ -83,7 +85,7 @@ impl NamespaceData { pub(super) fn new( namespace_id: NamespaceId, namespace_name: Arc>, - table_name_resolver: Arc, + catalog_table_resolver: Arc, partition_provider: Arc, post_write_observer: Arc, metrics: &metric::Registry, @@ -99,7 +101,7 @@ impl NamespaceData { namespace_id, namespace_name, tables: Default::default(), - table_name_resolver, + catalog_table_resolver, table_count, partition_provider, post_write_observer, @@ -151,7 +153,7 @@ where self.table_count.inc(1); Arc::new(TableData::new( table_id, - Arc::new(self.table_name_resolver.for_table(table_id)), + Arc::new(self.catalog_table_resolver.for_table(table_id)), self.namespace_id, Arc::clone(&self.namespace_name), Arc::clone(&self.partition_provider), @@ -189,6 +191,7 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { assert_eq!( self.namespace_id, namespace_id, @@ -204,7 +207,7 @@ where // a tracing delegate to emit a child span. Ok(QueryResponse::new( QueryExecTracing::new(inner, "table") - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await?, )) } @@ -226,7 +229,7 @@ mod tests { test_util::{ defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, - ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER, }, }; @@ -243,7 +246,7 @@ mod tests { let ns = NamespaceData::new( ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_ms(), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), &metrics, diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index f68550f3dd..98c3c816db 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -14,7 +14,7 @@ use self::{ buffer::{traits::Queryable, BufferState, DataBuffer, Persisting}, persisting::{BatchIdent, PersistingData}, }; -use super::{namespace::NamespaceName, table::TableName}; +use super::{namespace::NamespaceName, table::TableMetadata}; use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor}; mod buffer; @@ -73,9 +73,9 @@ pub struct PartitionData { /// The catalog ID for the table this partition is part of. table_id: TableId, - /// The name of the table this partition is part of, potentially unresolved + /// The catalog metadata for the table this partition is part of, potentially unresolved /// / deferred. - table_name: Arc>, + table: Arc>, /// A [`DataBuffer`] for incoming writes. buffer: DataBuffer, @@ -108,7 +108,7 @@ impl PartitionData { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, sort_key: SortKeyState, ) -> Self { Self { @@ -119,7 +119,7 @@ impl PartitionData { namespace_id, namespace_name, table_id, - table_name, + table, buffer: DataBuffer::default(), persisting: VecDeque::with_capacity(1), started_persistence_count: BatchIdent::default(), @@ -139,7 +139,7 @@ impl PartitionData { trace!( namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, "buffered write" @@ -175,7 +175,7 @@ impl PartitionData { trace!( namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, n_batches = data.len(), @@ -221,7 +221,7 @@ impl PartitionData { debug!( namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, %batch_ident, @@ -271,7 +271,7 @@ impl PartitionData { persistence_count = %self.completed_persistence_count, namespace_id = %self.namespace_id, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, batch_ident = %batch.batch_ident(), @@ -302,10 +302,10 @@ impl PartitionData { self.completed_persistence_count } - /// Return the name of the table this [`PartitionData`] is buffering writes + /// Return the metadata of the table this [`PartitionData`] is buffering writes /// for. - pub(crate) fn table_name(&self) -> &Arc> { - &self.table_name + pub(crate) fn table(&self) -> &Arc> { + &self.table } /// Return the table ID for this partition. diff --git a/ingester/src/buffer_tree/partition/resolver/cache.rs b/ingester/src/buffer_tree/partition/resolver/cache.rs index 6ab23c1026..047d9176b1 100644 --- a/ingester/src/buffer_tree/partition/resolver/cache.rs +++ b/ingester/src/buffer_tree/partition/resolver/cache.rs @@ -14,7 +14,7 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{resolver::SortKeyResolver, PartitionData, SortKeyState}, - table::TableName, + table::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -173,7 +173,7 @@ where namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { // Use the cached PartitionKey instead of the caller's partition_key, // instead preferring to reuse the already-shared Arc in the cache. @@ -203,7 +203,7 @@ where namespace_id, namespace_name, table_id, - table_name, + table, SortKeyState::Deferred(Arc::new(sort_key_resolver)), ))); } @@ -212,13 +212,7 @@ where // Otherwise delegate to the catalog / inner impl. self.inner - .get_partition( - partition_key, - namespace_id, - namespace_name, - table_id, - table_name, - ) + .get_partition(partition_key, namespace_id, namespace_name, table_id, table) .await } } @@ -234,7 +228,7 @@ mod tests { use crate::{ buffer_tree::partition::resolver::mock::MockPartitionProvider, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_PARTITION_KEY_STR, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, @@ -270,15 +264,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); assert_eq!( &**got.lock().namespace_name().get().await, @@ -309,15 +303,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); assert_eq!( &**got.lock().namespace_name().get().await, @@ -366,15 +360,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), other_key_id); assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); } @@ -402,15 +396,15 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), other_table, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); assert_eq!(got.lock().table_id(), other_table); assert_eq!( - &**got.lock().table_name().get().await, - &***ARBITRARY_TABLE_NAME + &**got.lock().table().get().await.name(), + &**ARBITRARY_TABLE_NAME ); } } diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index 5eff133978..9b80f8dac8 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -15,7 +15,7 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{PartitionData, SortKeyState}, - table::TableName, + table::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -61,12 +61,12 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { debug!( %partition_key, %table_id, - %table_name, + %table, "upserting partition in catalog" ); let p = Backoff::new(&self.backoff_config) @@ -86,7 +86,7 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_id, namespace_name, table_id, - table_name, + table, SortKeyState::Provided(p.sort_key()), ))) } @@ -103,6 +103,7 @@ mod tests { use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table}; use super::*; + use crate::buffer_tree::table::TableName; const TABLE_NAME: &str = "bananas"; const NAMESPACE_NAME: &str = "ns-bananas"; @@ -138,17 +139,25 @@ mod tests { table_id, Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { TableName::from(TABLE_NAME) }, + async { + TableMetadata::new_for_testing( + TableName::from(TABLE_NAME), + Default::default(), + ) + }, &metrics, )), ) .await; // Ensure the table name is available. - let _ = got.lock().table_name().get().await; + let _ = got.lock().table().get().await.name(); assert_eq!(got.lock().namespace_id(), namespace_id); - assert_eq!(got.lock().table_name().to_string(), table_name.to_string()); + assert_eq!( + got.lock().table().get().await.name().to_string(), + table_name.to_string() + ); assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None)); assert!(got.lock().partition_key.ptr_eq(&callers_partition_key)); diff --git a/ingester/src/buffer_tree/partition/resolver/coalesce.rs b/ingester/src/buffer_tree/partition/resolver/coalesce.rs index b89ca460e4..57b6673e08 100644 --- a/ingester/src/buffer_tree/partition/resolver/coalesce.rs +++ b/ingester/src/buffer_tree/partition/resolver/coalesce.rs @@ -14,7 +14,7 @@ use hashbrown::{hash_map::Entry, HashMap}; use parking_lot::Mutex; use crate::{ - buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, + buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata}, deferred_load::DeferredLoad, }; @@ -146,7 +146,7 @@ where namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { let key = Key { namespace_id, @@ -170,7 +170,7 @@ where namespace_id, namespace_name, table_id, - table_name, + table, )); // Make the future poll-able by many callers, all of which @@ -233,7 +233,7 @@ async fn do_fetch( namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> where T: PartitionProvider + 'static, @@ -248,13 +248,7 @@ where // (which would cause the connection to be returned). tokio::spawn(async move { inner - .get_partition( - partition_key, - namespace_id, - namespace_name, - table_id, - table_name, - ) + .get_partition(partition_key, namespace_id, namespace_name, table_id, table) .await }) .await @@ -280,7 +274,7 @@ mod tests { use crate::{ buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState}, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, }, }; @@ -308,7 +302,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ) }) .collect::>() @@ -342,7 +336,7 @@ mod tests { _namespace_id: NamespaceId, _namespace_name: Arc>, _table_id: TableId, - _table_name: Arc>, + _table: Arc>, ) -> core::pin::Pin< Box< dyn core::future::Future>> @@ -368,7 +362,7 @@ mod tests { let data = PartitionDataBuilder::new().build(); let namespace_loader = defer_namespace_name_1_sec(); - let table_name_loader = defer_table_name_1_sec(); + let table_loader = defer_table_metadata_1_sec(); // Add a single instance of the partition - if more than one call is // made to the mock, it will panic. @@ -384,14 +378,14 @@ mod tests { ARBITRARY_NAMESPACE_ID, Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, - Arc::clone(&table_name_loader), + Arc::clone(&table_loader), ); let pa_2 = layer.get_partition( ARBITRARY_PARTITION_KEY.clone(), ARBITRARY_NAMESPACE_ID, Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, - Arc::clone(&table_name_loader), + Arc::clone(&table_loader), ); let waker = futures::task::noop_waker(); @@ -411,7 +405,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, namespace_loader, ARBITRARY_TABLE_ID, - table_name_loader, + table_loader, ) .with_timeout_panic(Duration::from_secs(5)) .await; @@ -441,7 +435,7 @@ mod tests { _namespace_id: NamespaceId, _namespace_name: Arc>, _table_id: TableId, - _table_name: Arc>, + _table: Arc>, ) -> Arc> { let waker = self.wait.notified(); let permit = self.sem.acquire().await.unwrap(); @@ -481,7 +475,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ); let waker = futures::task::noop_waker(); diff --git a/ingester/src/buffer_tree/partition/resolver/mock.rs b/ingester/src/buffer_tree/partition/resolver/mock.rs index f5ca824bd5..c16554ca46 100644 --- a/ingester/src/buffer_tree/partition/resolver/mock.rs +++ b/ingester/src/buffer_tree/partition/resolver/mock.rs @@ -8,7 +8,7 @@ use parking_lot::Mutex; use super::r#trait::PartitionProvider; use crate::{ - buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, + buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata}, deferred_load::{self, DeferredLoad}, }; @@ -53,7 +53,7 @@ impl PartitionProvider for MockPartitionProvider { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { let p = self .partitions @@ -75,8 +75,8 @@ impl PartitionProvider for MockPartitionProvider { deferred_load::UNRESOLVED_DISPLAY_STRING, ); - let actual_table_name = p.table_name().to_string(); - let expected_table_name = table_name.get().await.to_string(); + let actual_table_name = p.table().to_string(); + let expected_table_name = table.get().await.name().to_string(); assert!( (actual_table_name.as_str() == expected_table_name) || (actual_table_name == deferred_load::UNRESOLVED_DISPLAY_STRING), diff --git a/ingester/src/buffer_tree/partition/resolver/trait.rs b/ingester/src/buffer_tree/partition/resolver/trait.rs index 158d9bdda2..e525cd5a6f 100644 --- a/ingester/src/buffer_tree/partition/resolver/trait.rs +++ b/ingester/src/buffer_tree/partition/resolver/trait.rs @@ -5,7 +5,7 @@ use data_types::{NamespaceId, PartitionKey, TableId}; use parking_lot::Mutex; use crate::{ - buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, + buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata}, deferred_load::DeferredLoad, }; @@ -24,7 +24,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug { namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc>; } @@ -39,16 +39,10 @@ where namespace_id: NamespaceId, namespace_name: Arc>, table_id: TableId, - table_name: Arc>, + table: Arc>, ) -> Arc> { (**self) - .get_partition( - partition_key, - namespace_id, - namespace_name, - table_id, - table_name, - ) + .get_partition(partition_key, namespace_id, namespace_name, table_id, table) .await } } @@ -61,7 +55,7 @@ mod tests { use crate::{ buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState}, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, }, @@ -70,10 +64,10 @@ mod tests { #[tokio::test] async fn test_arc_impl() { let namespace_loader = defer_namespace_name_1_sec(); - let table_name_loader = defer_table_name_1_sec(); + let table_loader = defer_table_metadata_1_sec(); let data = PartitionDataBuilder::new() - .with_table_name_loader(Arc::clone(&table_name_loader)) + .with_table_loader(Arc::clone(&table_loader)) .with_namespace_loader(Arc::clone(&namespace_loader)) .build(); @@ -85,7 +79,7 @@ mod tests { ARBITRARY_NAMESPACE_ID, Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, - Arc::clone(&table_name_loader), + Arc::clone(&table_loader), ) .await; assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID); @@ -94,9 +88,6 @@ mod tests { got.lock().namespace_name().to_string(), namespace_loader.to_string() ); - assert_eq!( - got.lock().table_name().to_string(), - table_name_loader.to_string() - ); + assert_eq!(got.lock().table().to_string(), table_loader.to_string()); } } diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 9576fc006c..b66e7b1d4b 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -4,13 +4,14 @@ use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use metric::U64Counter; use parking_lot::Mutex; +use predicate::Predicate; use trace::span::Span; use super::{ namespace::{name_resolver::NamespaceNameProvider, NamespaceData}, partition::{resolver::PartitionProvider, PartitionData}, post_write::PostWriteObserver, - table::name_resolver::TableNameProvider, + table::metadata_resolver::TableProvider, }; use crate::{ arcmap::ArcMap, @@ -92,12 +93,12 @@ pub(crate) struct BufferTree { /// [`NamespaceName`]: data_types::NamespaceName namespaces: ArcMap>, namespace_name_resolver: Arc, - /// The [`TableName`] provider used by [`NamespaceData`] to initialise a + /// The [`TableMetadata`] provider used by [`NamespaceData`] to initialise a /// [`TableData`]. /// - /// [`TableName`]: crate::buffer_tree::table::TableName + /// [`TableMetadata`]: crate::buffer_tree::table::TableMetadata /// [`TableData`]: crate::buffer_tree::table::TableData - table_name_resolver: Arc, + table_resolver: Arc, metrics: Arc, namespace_count: U64Counter, @@ -112,7 +113,7 @@ where /// Initialise a new [`BufferTree`] that emits metrics to `metrics`. pub(crate) fn new( namespace_name_resolver: Arc, - table_name_resolver: Arc, + table_resolver: Arc, partition_provider: Arc, post_write_observer: Arc, metrics: Arc, @@ -127,7 +128,7 @@ where Self { namespaces: Default::default(), namespace_name_resolver, - table_name_resolver, + table_resolver, metrics, partition_provider, post_write_observer, @@ -178,7 +179,7 @@ where Arc::new(NamespaceData::new( namespace_id, Arc::new(self.namespace_name_resolver.for_namespace(namespace_id)), - Arc::clone(&self.table_name_resolver), + Arc::clone(&self.table_resolver), Arc::clone(&self.partition_provider), Arc::clone(&self.post_write_observer), &self.metrics, @@ -202,6 +203,7 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { // Extract the namespace if it exists. let inner = self @@ -211,7 +213,7 @@ where // Delegate query execution to the namespace, wrapping the execution in // a tracing delegate to emit a child span. QueryExecTracing::new(inner, "namespace") - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await } } @@ -227,29 +229,41 @@ where #[cfg(test)] mod tests { + use std::{sync::Arc, time::Duration}; + + use arrow::datatypes::DataType; + use assert_matches::assert_matches; + use data_types::{ + partition_template::{test_table_partition_override, TemplatePart}, + PartitionId, PartitionKey, + }; + use datafusion::{ + assert_batches_eq, assert_batches_sorted_eq, + prelude::{col, lit}, + scalar::ScalarValue, + }; + use futures::StreamExt; + use lazy_static::lazy_static; + use metric::{Attributes, Metric}; + use predicate::Predicate; + use test_helpers::maybe_start_logging; + use super::*; use crate::{ buffer_tree::{ namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceData}, partition::resolver::mock::MockPartitionProvider, post_write::mock::MockPostWriteObserver, - table::TableName, + table::{metadata_resolver::mock::MockTableProvider, TableMetadata}, }, deferred_load::{self, DeferredLoad}, query::partition_response::PartitionResponse, test_util::{ defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, - ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER, }, }; - use assert_matches::assert_matches; - use data_types::{PartitionId, PartitionKey}; - use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; - use futures::StreamExt; - use lazy_static::lazy_static; - use metric::{Attributes, Metric}; - use std::{sync::Arc, time::Duration}; const PARTITION2_ID: PartitionId = PartitionId::new(2); const PARTITION3_ID: PartitionId = PartitionId::new(3); @@ -278,7 +292,7 @@ mod tests { let ns = NamespaceData::new( ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_ms(), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), &metrics, @@ -337,13 +351,19 @@ mod tests { macro_rules! test_write_query { ( $name:ident, - partitions = [$($partition:expr), +], // The set of PartitionData for the mock partition provider + $(table_provider = $table_provider:expr,)? // An optional table provider + partitions = [$($partition:expr), +], // The set of PartitionData for the mock + // partition provider writes = [$($write:expr), *], // The set of WriteOperation to apply() - want = $want:expr // The expected results of querying ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID + predicate = $predicate:expr, // An optional predicate to use for the query + want = $want:expr // The expected results of querying + // ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID ) => { paste::paste! { #[tokio::test] async fn []() { + maybe_start_logging(); + // Configure the mock partition provider with the provided // partitions. let partition_provider = Arc::new(MockPartitionProvider::default() @@ -352,10 +372,16 @@ mod tests { )+ ); + #[allow(unused_variables)] + let table_provider = Arc::clone(&*ARBITRARY_TABLE_PROVIDER); + $( + let table_provider: Arc = $table_provider; + )? + // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + table_provider, partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), @@ -370,7 +396,13 @@ mod tests { // Execute the query against ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID let batches = buf - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + $predicate + ) .await .expect("query should succeed") .into_partition_stream() @@ -407,6 +439,7 @@ mod tests { ), None, )], + predicate = None, want = [ "+----------+------+-------------------------------+", "| region | temp | time |", @@ -456,6 +489,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+----------+------+-------------------------------+", "| region | temp | time |", @@ -508,6 +542,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+--------+------+-------------------------------+", "| region | temp | time |", @@ -520,7 +555,7 @@ mod tests { // A query that ensures the data across multiple tables (with the same table // name!) is correctly filtered to return only the queried table. test_write_query!( - filter_multiple_tabls, + filter_multiple_tables, partitions = [ PartitionDataBuilder::new() .with_partition_id(ARBITRARY_PARTITION_ID) @@ -558,6 +593,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+--------+------+-------------------------------+", "| region | temp | time |", @@ -603,6 +639,7 @@ mod tests { None, ) ], + predicate = None, want = [ "+----------+------+-------------------------------+", "| region | temp | time |", @@ -613,6 +650,98 @@ mod tests { ] ); + // This test asserts that the results returned from a query to the + // [`BufferTree`] filters rows from the result as directed by the + // query's [`Predicate`]. + // + // It makes sure that for a [`BufferTree`] with a set of partitions split + // by some key a query with a predicate ` == ` + // returns partition data that has been filtered to contain only rows which + // contain the specified value in that partition key column. + test_write_query!( + filter_by_predicate_partition_key, + table_provider = Arc::new(MockTableProvider::new(TableMetadata::new_for_testing( + ARBITRARY_TABLE_NAME.clone(), + test_table_partition_override(vec![TemplatePart::TagValue("region")]) + ))), + partitions = [ + PartitionDataBuilder::new() + .with_partition_id(ARBITRARY_PARTITION_ID) + .with_partition_key(ARBITRARY_PARTITION_KEY.clone()) // "platanos" + .build(), + PartitionDataBuilder::new() + .with_partition_id(PARTITION2_ID) + .with_partition_key(PARTITION2_KEY.clone()) // "p2" + .build() + ], + writes = [ + make_write_op( + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 0, + &format!( + r#"{},region={} temp=35 4242424242"#, + &*ARBITRARY_TABLE_NAME, &*ARBITRARY_PARTITION_KEY + ), + None, + ), + make_write_op( + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 1, + &format!( + r#"{},region={} temp=12 4242424242"#, + &*ARBITRARY_TABLE_NAME, &*ARBITRARY_PARTITION_KEY + ), + None, + ), + make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 2, + &format!( + r#"{},region={} temp=17 7676767676"#, + &*ARBITRARY_TABLE_NAME, *PARTITION2_KEY + ), + None, + ), + make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 3, + &format!( + r#"{},region={} temp=13 7676767676"#, + &*ARBITRARY_TABLE_NAME, *PARTITION2_KEY, + ), + None, + ) + ], + // NOTE: The querier will coerce the type of the predicates correctly, so the ingester does NOT need to perform + // type coercion. This type should reflect that. + predicate = Some(Predicate::new().with_expr(col("region").eq(lit( + ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::from(PARTITION2_KEY.inner())) + ) + )))), + want = [ + "+--------+------+-------------------------------+", + "| region | temp | time |", + "+--------+------+-------------------------------+", + "| p2 | 13.0 | 1970-01-01T00:00:07.676767676 |", + "| p2 | 17.0 | 1970-01-01T00:00:07.676767676 |", + "+--------+------+-------------------------------+", + ] + ); + /// Assert that multiple writes to a single namespace/table results in a /// single namespace being created, and matching metrics. #[tokio::test] @@ -627,7 +756,7 @@ mod tests { ) .with_partition( PartitionDataBuilder::new() - .with_partition_id(ARBITRARY_PARTITION_ID) + .with_partition_id(PARTITION2_ID) .with_partition_key(PARTITION2_KEY.clone()) .build(), ), @@ -638,7 +767,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::clone(&metrics), @@ -722,9 +851,14 @@ mod tests { .with_partition_id(PARTITION3_ID) .with_partition_key(PARTITION3_KEY.clone()) .with_table_id(TABLE2_ID) - .with_table_name_loader(Arc::new(DeferredLoad::new( + .with_table_loader(Arc::new(DeferredLoad::new( Duration::from_secs(1), - async move { TableName::from(TABLE2_NAME) }, + async move { + TableMetadata::new_for_testing( + TABLE2_NAME.into(), + Default::default(), + ) + }, &metric::Registry::default(), ))) .build(), @@ -734,7 +868,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::clone(&Arc::new(metric::Registry::default())), @@ -821,7 +955,7 @@ mod tests { // Init the BufferTree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), @@ -829,7 +963,13 @@ mod tests { // Query the empty tree let err = buf - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect_err("query should fail"); assert_matches!(err, QueryError::NamespaceNotFound(ns) => { @@ -854,7 +994,7 @@ mod tests { // Ensure an unknown table errors let err = buf - .query_exec(ARBITRARY_NAMESPACE_ID, TABLE2_ID, vec![], None) + .query_exec(ARBITRARY_NAMESPACE_ID, TABLE2_ID, vec![], None, None) .await .expect_err("query should fail"); assert_matches!(err, QueryError::TableNotFound(ns, t) => { @@ -863,9 +1003,15 @@ mod tests { }); // Ensure a valid namespace / table does not error - buf.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) - .await - .expect("namespace / table should exist"); + buf.query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) + .await + .expect("namespace / table should exist"); } /// This test asserts the read consistency properties defined in the @@ -906,7 +1052,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), @@ -931,7 +1077,13 @@ mod tests { // Execute a query of the buffer tree, generating the result stream, but // DO NOT consume it. let stream = buf - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed") .into_partition_stream(); diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 93a83de978..6add3f7053 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -1,13 +1,23 @@ //! Table level data buffer structures. -pub(crate) mod name_resolver; +pub(crate) mod metadata_resolver; -use std::{fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; +use data_types::{ + partition_template::{build_column_values, ColumnValue, TablePartitionTemplateOverride}, + NamespaceId, PartitionKey, SequenceNumber, Table, TableId, +}; +use datafusion::scalar::ScalarValue; +use iox_query::{ + chunk_statistics::{create_chunk_statistics, ColumnRange}, + pruning::prune_summaries, + QueryChunk, +}; use mutable_batch::MutableBatch; use parking_lot::Mutex; +use predicate::Predicate; use schema::Projection; use trace::span::{Span, SpanRecorder}; @@ -22,8 +32,52 @@ use crate::{ query::{ partition_response::PartitionResponse, response::PartitionStream, QueryError, QueryExec, }, + query_adaptor::QueryAdaptor, }; +/// Metadata from the catalog for a table +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct TableMetadata { + name: TableName, + partition_template: TablePartitionTemplateOverride, +} + +impl TableMetadata { + #[cfg(test)] + pub fn new_for_testing( + name: TableName, + partition_template: TablePartitionTemplateOverride, + ) -> Self { + Self { + name, + partition_template, + } + } + + pub(crate) fn name(&self) -> &TableName { + &self.name + } + + pub(crate) fn partition_template(&self) -> &TablePartitionTemplateOverride { + &self.partition_template + } +} + +impl From for TableMetadata { + fn from(t: Table) -> Self { + Self { + name: t.name.into(), + partition_template: t.partition_template, + } + } +} + +impl std::fmt::Display for TableMetadata { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.name, f) + } +} + /// The string name / identifier of a Table. /// /// A reference-counted, cheap clone-able string. @@ -69,7 +123,7 @@ impl PartialEq for TableName { #[derive(Debug)] pub(crate) struct TableData { table_id: TableId, - table_name: Arc>, + catalog_table: Arc>, /// The catalog ID of the namespace this table is being populated from. namespace_id: NamespaceId, @@ -93,7 +147,7 @@ impl TableData { /// for the first time. pub(super) fn new( table_id: TableId, - table_name: Arc>, + catalog_table: Arc>, namespace_id: NamespaceId, namespace_name: Arc>, partition_provider: Arc, @@ -101,7 +155,7 @@ impl TableData { ) -> Self { Self { table_id, - table_name, + catalog_table, namespace_id, namespace_name, partition_data: Default::default(), @@ -132,9 +186,9 @@ impl TableData { self.table_id } - /// Returns the name of this table. - pub(crate) fn table_name(&self) -> &Arc> { - &self.table_name + /// Returns the catalog data for this table. + pub(crate) fn catalog_table(&self) -> &Arc> { + &self.catalog_table } /// Return the [`NamespaceId`] this table is a part of. @@ -166,7 +220,7 @@ where self.namespace_id, Arc::clone(&self.namespace_name), self.table_id, - Arc::clone(&self.table_name), + Arc::clone(&self.catalog_table), ) .await; // Add the partition to the map. @@ -204,6 +258,7 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { assert_eq!(self.table_id, table_id, "buffer tree index inconsistency"); assert_eq!( @@ -211,18 +266,21 @@ where "buffer tree index inconsistency" ); + let table_partition_template = self.catalog_table.get().await.partition_template; + // Gather the partition data from all of the partitions in this table. let span = SpanRecorder::new(span); let partitions = self.partitions().into_iter().map(move |p| { let mut span = span.child("partition read"); - let (id, hash_id, completed_persistence_count, data) = { + let (id, hash_id, completed_persistence_count, data, partition_key) = { let mut p = p.lock(); ( p.partition_id(), p.partition_hash_id().cloned(), p.completed_persistence_count(), p.get_query_data(), + p.partition_key().clone(), ) }; @@ -230,6 +288,31 @@ where Some(data) => { assert_eq!(id, data.partition_id()); + let data = Arc::new(data); + + // Potentially prune out this partition if the partition + // template & derived partition key can be used to match + // against the optional predicate. + if predicate + .as_ref() + .map(|p| { + !keep_after_pruning_partition_key( + &table_partition_template, + &partition_key, + p, + &data, + ) + }) + .unwrap_or_default() + { + return PartitionResponse::new( + vec![], + id, + hash_id, + completed_persistence_count, + ); + } + // Project the data if necessary let columns = columns.iter().map(String::as_str).collect::>(); let selection = if columns.is_empty() { @@ -252,6 +335,106 @@ where } } +/// Return true if `data` contains one or more rows matching `predicate`, +/// pruning based on the `partition_key` and `template`. +/// +/// Returns false iff it can be proven that all of data does not match the +/// predicate. +fn keep_after_pruning_partition_key( + table_partition_template: &TablePartitionTemplateOverride, + partition_key: &PartitionKey, + predicate: &Predicate, + data: &QueryAdaptor, +) -> bool { + // Construct a set of per-column min/max statistics based on the partition + // key values. + let column_ranges = Arc::new( + build_column_values(table_partition_template, partition_key.inner()) + .filter_map(|(col, val)| { + let range = match val { + ColumnValue::Identity(s) => { + let s = Arc::new(ScalarValue::from(s.as_ref())); + ColumnRange { + min_value: Arc::clone(&s), + max_value: s, + } + } + ColumnValue::Prefix(p) if p.is_empty() => return None, + ColumnValue::Prefix(p) => { + // If the partition only has a prefix of the tag value + // (it was truncated) then form a conservative range: + // + // # Minimum + // Use the prefix itself. + // + // Note that the minimum is inclusive. + // + // All values in the partition are either: + // + // - identical to the prefix, in which case they are + // included by the inclusive minimum + // + // - have the form `""`, and it holds that + // `"" > ""` for all strings + // `""`. + // + // # Maximum + // Use `""`. + // + // Note that the maximum is inclusive. + // + // All strings in this partition must be smaller than + // this constructed maximum, because string comparison + // is front-to-back and the + // `"" > + // ""`. + + let min_value = Arc::new(ScalarValue::from(p.as_ref())); + + let mut chars = p.as_ref().chars().collect::>(); + *chars.last_mut().expect("checked that prefix is not empty") = + std::char::MAX; + let max_value = Arc::new(ScalarValue::from( + chars.into_iter().collect::().as_str(), + )); + + ColumnRange { + min_value, + max_value, + } + } + }; + + Some((Arc::from(col), range)) + }) + .collect::>(), + ); + + let chunk_statistics = Arc::new(create_chunk_statistics( + data.num_rows(), + data.schema(), + data.ts_min_max(), + &column_ranges, + )); + + prune_summaries( + data.schema(), + &[(chunk_statistics, data.schema().as_arrow())], + predicate, + ) + // Errors are logged by `iox_query` and sometimes fine, e.g. for not + // implemented DataFusion features or upstream bugs. The querier uses the + // same strategy. Pruning is a mere optimization and should not lead to + // crashes or unreadable data. + .ok() + .map(|vals| { + vals.into_iter() + .next() + .expect("one chunk in, one chunk out") + }) + .unwrap_or(true) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -265,7 +448,7 @@ mod tests { post_write::mock::MockPostWriteObserver, }, test_util::{ - defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder, + defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, }, @@ -280,7 +463,7 @@ mod tests { let table = TableData::new( ARBITRARY_TABLE_ID, - defer_table_name_1_sec(), + defer_table_metadata_1_sec(), ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), partition_provider, diff --git a/ingester/src/buffer_tree/table/name_resolver.rs b/ingester/src/buffer_tree/table/metadata_resolver.rs similarity index 67% rename from ingester/src/buffer_tree/table/name_resolver.rs rename to ingester/src/buffer_tree/table/metadata_resolver.rs index 37bc1ce605..bd489aad17 100644 --- a/ingester/src/buffer_tree/table/name_resolver.rs +++ b/ingester/src/buffer_tree/table/metadata_resolver.rs @@ -4,24 +4,24 @@ use backoff::{Backoff, BackoffConfig}; use data_types::TableId; use iox_catalog::interface::Catalog; -use super::TableName; +use super::TableMetadata; use crate::deferred_load::DeferredLoad; /// An abstract provider of a [`DeferredLoad`] configured to fetch the -/// [`TableName`] of the specified [`TableId`]. -pub(crate) trait TableNameProvider: Send + Sync + std::fmt::Debug { - fn for_table(&self, id: TableId) -> DeferredLoad; +/// catalog [`TableMetadata`] of the specified [`TableId`]. +pub(crate) trait TableProvider: Send + Sync + std::fmt::Debug { + fn for_table(&self, id: TableId) -> DeferredLoad; } #[derive(Debug)] -pub(crate) struct TableNameResolver { +pub(crate) struct TableResolver { max_smear: Duration, catalog: Arc, backoff_config: BackoffConfig, metrics: Arc, } -impl TableNameResolver { +impl TableResolver { pub(crate) fn new( max_smear: Duration, catalog: Arc, @@ -36,16 +36,16 @@ impl TableNameResolver { } } - /// Fetch the [`TableName`] from the [`Catalog`] for specified + /// Fetch the [`TableMetadata`] from the [`Catalog`] for specified /// `table_id`, retrying endlessly when errors occur. pub(crate) async fn fetch( table_id: TableId, catalog: Arc, backoff_config: BackoffConfig, - ) -> TableName { + ) -> TableMetadata { Backoff::new(&backoff_config) - .retry_all_errors("fetch table name", || async { - let s = catalog + .retry_all_errors("fetch table", || async { + let table = catalog .repositories() .await .tables() @@ -54,18 +54,17 @@ impl TableNameResolver { .unwrap_or_else(|| { panic!("resolving table name for non-existent table id {table_id}") }) - .name .into(); - Result::<_, iox_catalog::interface::Error>::Ok(s) + Result::<_, iox_catalog::interface::Error>::Ok(table) }) .await .expect("retry forever") } } -impl TableNameProvider for TableNameResolver { - fn for_table(&self, id: TableId) -> DeferredLoad { +impl TableProvider for TableResolver { + fn for_table(&self, id: TableId) -> DeferredLoad { DeferredLoad::new( self.max_smear, Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()), @@ -79,28 +78,33 @@ pub(crate) mod mock { use super::*; #[derive(Debug)] - pub(crate) struct MockTableNameProvider { - name: TableName, + pub(crate) struct MockTableProvider { + table: TableMetadata, } - impl MockTableNameProvider { - pub(crate) fn new(name: impl Into) -> Self { - Self { name: name.into() } + impl MockTableProvider { + pub(crate) fn new(table: impl Into) -> Self { + Self { + table: table.into(), + } } } - impl Default for MockTableNameProvider { + impl Default for MockTableProvider { fn default() -> Self { - Self::new("bananas") + Self::new(TableMetadata::new_for_testing( + "bananas".into(), + Default::default(), + )) } } - impl TableNameProvider for MockTableNameProvider { - fn for_table(&self, _id: TableId) -> DeferredLoad { - let name = self.name.clone(); + impl TableProvider for MockTableProvider { + fn for_table(&self, _id: TableId) -> DeferredLoad { + let table = self.table.clone(); DeferredLoad::new( Duration::from_secs(1), - async { name }, + async { table }, &metric::Registry::default(), ) } @@ -129,7 +133,7 @@ mod tests { // Populate the catalog with the namespace / table let (_ns_id, table_id) = populate_catalog(&*catalog, NAMESPACE_NAME, TABLE_NAME).await; - let fetcher = Arc::new(TableNameResolver::new( + let fetcher = Arc::new(TableResolver::new( Duration::from_secs(10), Arc::clone(&catalog), backoff_config.clone(), @@ -141,6 +145,6 @@ mod tests { .get() .with_timeout_panic(Duration::from_secs(5)) .await; - assert_eq!(&**got, TABLE_NAME); + assert_eq!(got.name(), TABLE_NAME); } } diff --git a/ingester/src/init.rs b/ingester/src/init.rs index ff1a7fd6bd..9f84a448ba 100644 --- a/ingester/src/init.rs +++ b/ingester/src/init.rs @@ -29,7 +29,7 @@ use crate::{ partition::resolver::{ CatalogPartitionResolver, CoalescePartitionResolver, PartitionCache, PartitionProvider, }, - table::name_resolver::{TableNameProvider, TableNameResolver}, + table::metadata_resolver::{TableProvider, TableResolver}, BufferTree, }, dml_sink::{instrumentation::DmlSinkInstrumentation, tracing::DmlSinkTracing}, @@ -246,8 +246,8 @@ where Arc::clone(&metrics), )); - // Initialise the deferred table name resolver. - let table_name_provider: Arc = Arc::new(TableNameResolver::new( + // Initialise the deferred table metadata resolver. + let table_provider: Arc = Arc::new(TableResolver::new( persist_background_fetch_time, Arc::clone(&catalog), BackoffConfig::default(), @@ -319,7 +319,7 @@ where let buffer = Arc::new(BufferTree::new( namespace_name_provider, - table_name_provider, + table_provider, partition_provider, Arc::new(hot_partition_persister), Arc::clone(&metrics), diff --git a/ingester/src/persist/context.rs b/ingester/src/persist/context.rs index 0e5fc55a67..c441d38a5e 100644 --- a/ingester/src/persist/context.rs +++ b/ingester/src/persist/context.rs @@ -18,7 +18,7 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{persisting::PersistingData, PartitionData, SortKeyState}, - table::TableName, + table::TableMetadata, }, deferred_load::DeferredLoad, persist::completion_observer::CompletedPersist, @@ -94,14 +94,14 @@ pub(super) struct Context { // The partition key for this partition partition_key: PartitionKey, - /// Deferred strings needed for persistence. + /// Deferred data needed for persistence. /// /// These [`DeferredLoad`] are given a pre-fetch hint when this [`Context`] /// is constructed to load them in the background (if not already resolved) /// in order to avoid incurring the query latency when the values are /// needed. namespace_name: Arc>, - table_name: Arc>, + table: Arc>, /// The [`SortKey`] for the [`PartitionData`] at the time of [`Context`] /// construction. @@ -164,7 +164,7 @@ impl Context { partition_hash_id: guard.partition_hash_id().cloned(), partition_key: guard.partition_key().clone(), namespace_name: Arc::clone(guard.namespace_name()), - table_name: Arc::clone(guard.table_name()), + table: Arc::clone(guard.table()), // Technically the sort key isn't immutable, but MUST NOT be // changed by an external actor (by something other than code in @@ -182,7 +182,7 @@ impl Context { // Pre-fetch the deferred values in a background thread (if not already // resolved) s.namespace_name.prefetch_now(); - s.table_name.prefetch_now(); + s.table.prefetch_now(); if let SortKeyState::Deferred(ref d) = s.sort_key { d.prefetch_now(); } @@ -253,7 +253,7 @@ impl Context { namespace_id = %self.namespace_id, namespace_name = %self.namespace_name, table_id = %self.table_id, - table_name = %self.table_name, + table = %self.table, partition_id = %self.partition_id, partition_key = %self.partition_key, total_persist_duration = ?now.duration_since(self.enqueued_at), @@ -315,7 +315,7 @@ impl Context { self.namespace_name.as_ref() } - pub(super) fn table_name(&self) -> &DeferredLoad { - self.table_name.as_ref() + pub(super) fn table(&self) -> &DeferredLoad { + self.table.as_ref() } } diff --git a/ingester/src/persist/handle.rs b/ingester/src/persist/handle.rs index e823767ef2..a6fdefcf94 100644 --- a/ingester/src/persist/handle.rs +++ b/ingester/src/persist/handle.rs @@ -501,7 +501,7 @@ mod tests { test_util::{ make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, - ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER, }, }; @@ -510,7 +510,7 @@ mod tests { async fn new_partition(sort_key: SortKeyState) -> Arc> { let buffer_tree = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), Arc::new( MockPartitionProvider::default().with_partition( PartitionDataBuilder::new() diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index 44311a2e9e..3dc23b8cd5 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -48,7 +48,7 @@ mod tests { test_util::{ make_write_op, populate_catalog, ARBITRARY_NAMESPACE_NAME, ARBITRARY_NAMESPACE_NAME_PROVIDER, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_NAME, - ARBITRARY_TABLE_NAME_PROVIDER, + ARBITRARY_TABLE_PROVIDER, }, }; @@ -67,7 +67,7 @@ mod tests { // Init the buffer tree let buf = BufferTree::new( Arc::clone(&*ARBITRARY_NAMESPACE_NAME_PROVIDER), - Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER), + Arc::clone(&*ARBITRARY_TABLE_PROVIDER), Arc::new(CatalogPartitionResolver::new(Arc::clone(&catalog))), Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index e9911297c2..857265e92f 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -202,7 +202,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), ?sort_key, @@ -218,7 +218,7 @@ where compact_persisting_batch( &worker_state.exec, sort_key, - ctx.table_name().get().await, + ctx.table().get().await.name().clone(), ctx.data().query_adaptor(), ) .await @@ -249,7 +249,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, @@ -265,7 +265,7 @@ where namespace_id: ctx.namespace_id(), namespace_name: Arc::clone(&*ctx.namespace_name().get().await), table_id: ctx.table_id(), - table_name: Arc::clone(&*ctx.table_name().get().await), + table_name: Arc::clone(ctx.table().get().await.name()), partition_key: ctx.partition_key().clone(), compaction_level: CompactionLevel::Initial, sort_key: Some(data_sort_key), @@ -291,7 +291,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, @@ -358,7 +358,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), ?new_sort_key, @@ -394,7 +394,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), expected=?old_sort_key, @@ -420,7 +420,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), expected=?old_sort_key, @@ -460,7 +460,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), ?old_sort_key, @@ -488,7 +488,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, @@ -512,7 +512,7 @@ where namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), table_id = %ctx.table_id(), - table_name = %ctx.table_name(), + table = %ctx.table(), partition_id = %ctx.partition_id(), partition_key = %ctx.partition_key(), %object_store_id, diff --git a/ingester/src/query/exec_instrumentation.rs b/ingester/src/query/exec_instrumentation.rs index 4a05367d81..f6bc6f2737 100644 --- a/ingester/src/query/exec_instrumentation.rs +++ b/ingester/src/query/exec_instrumentation.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; +use predicate::Predicate; use trace::span::Span; use super::QueryExec; @@ -64,12 +65,13 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { let t = self.time_provider.now(); let res = self .inner - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await; if let Some(delta) = self.time_provider.now().checked_duration_since(t) { @@ -113,7 +115,7 @@ mod tests { // Call the decorator and assert the return value let got = decorator - .query_exec(NamespaceId::new(42), TableId::new(24), vec![], None) + .query_exec(NamespaceId::new(42), TableId::new(24), vec![], None, None) .await; assert_matches!(got, $($want_ret)+); diff --git a/ingester/src/query/mock_query_exec.rs b/ingester/src/query/mock_query_exec.rs index 580e139f02..db032e992b 100644 --- a/ingester/src/query/mock_query_exec.rs +++ b/ingester/src/query/mock_query_exec.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use parking_lot::Mutex; +use predicate::Predicate; use trace::span::Span; use super::{response::QueryResponse, QueryError, QueryExec}; @@ -27,6 +28,7 @@ impl QueryExec for MockQueryExec { _table_id: TableId, _columns: Vec, _span: Option, + _predicate: Option, ) -> Result { self.response .lock() diff --git a/ingester/src/query/result_instrumentation.rs b/ingester/src/query/result_instrumentation.rs index a412059b82..53ba57a5f3 100644 --- a/ingester/src/query/result_instrumentation.rs +++ b/ingester/src/query/result_instrumentation.rs @@ -58,6 +58,7 @@ use iox_time::{SystemProvider, Time, TimeProvider}; use metric::{DurationHistogram, Metric, U64Histogram, U64HistogramOptions}; use observability_deps::tracing::debug; use pin_project::{pin_project, pinned_drop}; +use predicate::Predicate; use trace::span::Span; use crate::query::{ @@ -204,12 +205,15 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { let started_at = self.time_provider.now(); + // TODO(savage): Would accepting a predicate here require additional + // metrics to be added? let stream = self .inner - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await?; let stream = QueryMetricContext::new( @@ -467,7 +471,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); @@ -548,7 +558,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); @@ -628,7 +644,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); @@ -708,7 +730,13 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) + .query_exec( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_TABLE_ID, + vec![], + None, + None, + ) .await .expect("query should succeed"); diff --git a/ingester/src/query/tracing.rs b/ingester/src/query/tracing.rs index 34b171e30d..86751e1413 100644 --- a/ingester/src/query/tracing.rs +++ b/ingester/src/query/tracing.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use async_trait::async_trait; use data_types::{NamespaceId, TableId}; +use predicate::Predicate; use trace::span::{Span, SpanRecorder}; use super::QueryExec; @@ -42,12 +43,19 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { let mut recorder = SpanRecorder::new(span).child(self.name.clone()); match self .inner - .query_exec(namespace_id, table_id, columns, recorder.span().cloned()) + .query_exec( + namespace_id, + table_id, + columns, + recorder.span().cloned(), + predicate, + ) .await { Ok(v) => { @@ -111,6 +119,7 @@ mod tests { TableId::new(24), vec![], Some(span.child("root span")), + None, ) .await .expect("wrapper should not modify result"); @@ -134,6 +143,7 @@ mod tests { TableId::new(24), vec![], Some(span.child("root span")), + None, ) .await .expect_err("wrapper should not modify result"); diff --git a/ingester/src/query/trait.rs b/ingester/src/query/trait.rs index 30655eff66..c031dd5d2b 100644 --- a/ingester/src/query/trait.rs +++ b/ingester/src/query/trait.rs @@ -2,6 +2,7 @@ use std::{fmt::Debug, ops::Deref, sync::Arc}; use async_trait::async_trait; use data_types::{NamespaceId, TableId}; +use predicate::Predicate; use thiserror::Error; use trace::span::Span; @@ -25,6 +26,7 @@ pub(crate) trait QueryExec: Send + Sync + Debug { table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result; } @@ -41,9 +43,10 @@ where table_id: TableId, columns: Vec, span: Option, + predicate: Option, ) -> Result { self.deref() - .query_exec(namespace_id, table_id, columns, span) + .query_exec(namespace_id, table_id, columns, span, predicate) .await } } diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index 591fca003c..2a4b7a91e1 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -5,7 +5,7 @@ use std::{any::Any, sync::Arc}; use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; -use data_types::{ChunkId, ChunkOrder, PartitionId}; +use data_types::{ChunkId, ChunkOrder, PartitionId, TimestampMinMax}; use datafusion::physical_plan::Statistics; use iox_query::{ util::{compute_timenanosecond_min_max, create_basic_summary}, @@ -105,16 +105,26 @@ impl QueryAdaptor { pub(crate) fn partition_id(&self) -> PartitionId { self.partition_id } + + /// Number of rows, useful for building stats + pub(crate) fn num_rows(&self) -> u64 { + self.data.iter().map(|b| b.num_rows()).sum::() as u64 + } + + /// Time range, useful for building stats + pub(crate) fn ts_min_max(&self) -> TimestampMinMax { + compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref())) + .expect("Should have time range") + } } impl QueryChunk for QueryAdaptor { fn stats(&self) -> Arc { Arc::clone(self.stats.get_or_init(|| { - let ts_min_max = compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref())) - .expect("Should have time range"); + let ts_min_max = self.ts_min_max(); Arc::new(create_basic_summary( - self.data.iter().map(|b| b.num_rows()).sum::() as u64, + self.num_rows(), self.schema(), ts_min_max, )) diff --git a/ingester/src/server/grpc/query.rs b/ingester/src/server/grpc/query.rs index 77d457f368..d736f1d4d9 100644 --- a/ingester/src/server/grpc/query.rs +++ b/ingester/src/server/grpc/query.rs @@ -12,6 +12,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use ingester_query_grpc::influxdata::iox::ingester::v1 as proto; use metric::{DurationHistogram, U64Counter}; use observability_deps::tracing::*; +use predicate::Predicate; use prost::Message; use thiserror::Error; use tokio::sync::{Semaphore, TryAcquireError}; @@ -48,6 +49,10 @@ enum Error { /// The number of simultaneous queries being executed has been reached. #[error("simultaneous query limit exceeded")] RequestLimit, + + /// The payload within the request has an invalid field value. + #[error("field violation: {0}")] + FieldViolation(#[from] ingester_query_grpc::FieldViolation), } /// Map a query-execution error into a [`tonic::Status`]. @@ -77,6 +82,10 @@ impl From for tonic::Status { warn!("simultaneous query limit exceeded"); Code::ResourceExhausted } + Error::FieldViolation(_) => { + debug!(error=%e, "request contains field violation"); + Code::InvalidArgument + } }; Self::new(code, e.to_string()) @@ -188,18 +197,25 @@ where let ticket = request.into_inner(); let request = proto::IngesterQueryRequest::decode(&*ticket.ticket).map_err(Error::from)?; - // Extract the namespace/table identifiers + // Extract the namespace/table identifiers and the query predicate let namespace_id = NamespaceId::new(request.namespace_id); let table_id = TableId::new(request.table_id); - - // Predicate pushdown is part of the API, but not implemented. - if let Some(p) = request.predicate { - debug!(predicate=?p, "ignoring query predicate (unsupported)"); - } + let predicate = if let Some(p) = request.predicate { + debug!(predicate=?p, "received query predicate"); + Some(Predicate::try_from(p).map_err(Error::from)?) + } else { + None + }; let response = match self .query_handler - .query_exec(namespace_id, table_id, request.columns, span.clone()) + .query_exec( + namespace_id, + table_id, + request.columns, + span.clone(), + predicate, + ) .await { Ok(v) => v, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 2fe3b5b1c4..c04726e8e0 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -1,6 +1,9 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId}; +use data_types::{ + partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey, + SequenceNumber, TableId, +}; use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace}; use lazy_static::lazy_static; use mutable_batch_lp::lines_to_batches; @@ -15,8 +18,8 @@ use crate::{ }, partition::{PartitionData, SortKeyState}, table::{ - name_resolver::{mock::MockTableNameProvider, TableNameProvider}, - TableName, + metadata_resolver::{mock::MockTableProvider, TableProvider}, + TableMetadata, TableName, }, }, deferred_load::DeferredLoad, @@ -44,10 +47,15 @@ pub(crate) fn defer_namespace_name_1_ms() -> Arc> { )) } -pub(crate) fn defer_table_name_1_sec() -> Arc> { +pub(crate) fn defer_table_metadata_1_sec() -> Arc> { Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { ARBITRARY_TABLE_NAME.clone() }, + async { + TableMetadata::new_for_testing( + ARBITRARY_TABLE_NAME.clone(), + TablePartitionTemplateOverride::default(), + ) + }, &metric::Registry::default(), )) } @@ -60,8 +68,11 @@ lazy_static! { pub(crate) static ref ARBITRARY_NAMESPACE_NAME_PROVIDER: Arc = Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)); pub(crate) static ref ARBITRARY_TABLE_NAME: TableName = TableName::from("bananas"); - pub(crate) static ref ARBITRARY_TABLE_NAME_PROVIDER: Arc = - Arc::new(MockTableNameProvider::new(&**ARBITRARY_TABLE_NAME)); + pub(crate) static ref ARBITRARY_TABLE_PROVIDER: Arc = + Arc::new(MockTableProvider::new(TableMetadata::new_for_testing( + ARBITRARY_TABLE_NAME.clone(), + TablePartitionTemplateOverride::default() + ))); } /// Build a [`PartitionData`] with mostly arbitrary-yet-valid values for tests. @@ -71,7 +82,7 @@ pub(crate) struct PartitionDataBuilder { partition_key: Option, namespace_id: Option, table_id: Option, - table_name_loader: Option>>, + table_loader: Option>>, namespace_loader: Option>>, sort_key: Option, } @@ -101,11 +112,11 @@ impl PartitionDataBuilder { self } - pub(crate) fn with_table_name_loader( + pub(crate) fn with_table_loader( mut self, - table_name_loader: Arc>, + table_loader: Arc>, ) -> Self { - self.table_name_loader = Some(table_name_loader); + self.table_loader = Some(table_loader); self } @@ -134,8 +145,7 @@ impl PartitionDataBuilder { self.namespace_loader .unwrap_or_else(defer_namespace_name_1_sec), self.table_id.unwrap_or(ARBITRARY_TABLE_ID), - self.table_name_loader - .unwrap_or_else(defer_table_name_1_sec), + self.table_loader.unwrap_or_else(defer_table_metadata_1_sec), self.sort_key.unwrap_or(SortKeyState::Provided(None)), ) } diff --git a/querier/src/df_stats.rs b/iox_query/src/chunk_statistics.rs similarity index 100% rename from querier/src/df_stats.rs rename to iox_query/src/chunk_statistics.rs diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index f63089957b..9592f4aa77 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -34,6 +34,7 @@ use schema::{ }; use std::{any::Any, fmt::Debug, sync::Arc}; +pub mod chunk_statistics; pub mod config; pub mod exec; pub mod frontend; diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 77a3aaf321..fba27593e7 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -17,6 +17,7 @@ use data_types::{ }; use datafusion::scalar::ScalarValue; use iox_catalog::interface::Catalog; +use iox_query::chunk_statistics::{ColumnRange, ColumnRanges}; use iox_time::TimeProvider; use observability_deps::tracing::debug; use schema::sort::SortKey; @@ -27,8 +28,6 @@ use std::{ }; use trace::span::Span; -use crate::df_stats::{ColumnRange, ColumnRanges}; - use super::{namespace::CachedTable, ram::RamSize}; const CACHE_ID: &str = "partition"; diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index c13b05f77f..d726e94162 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -6,10 +6,7 @@ use self::{ invalidate_on_error::InvalidateOnErrorFlightClient, test_util::MockIngesterConnection, }; -use crate::{ - cache::{namespace::CachedTable, CatalogCache}, - df_stats::{create_chunk_statistics, ColumnRanges}, -}; +use crate::cache::{namespace::CachedTable, CatalogCache}; use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; use arrow_flight::decode::DecodedPayload; use async_trait::async_trait; @@ -22,7 +19,11 @@ use ingester_query_grpc::{ encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata, IngesterQueryRequest, }; -use iox_query::{util::compute_timenanosecond_min_max, QueryChunk, QueryChunkData}; +use iox_query::{ + chunk_statistics::{create_chunk_statistics, ColumnRanges}, + util::compute_timenanosecond_min_max, + QueryChunk, QueryChunkData, +}; use iox_time::{Time, TimeProvider}; use metric::{DurationHistogram, Metric}; use observability_deps::tracing::{debug, trace, warn}; diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 9985f1d0e5..0a18796b2a 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -18,7 +18,6 @@ use workspace_hack as _; mod cache; mod database; -mod df_stats; mod ingester; mod namespace; mod parquet; diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index 24ac694d78..da1a72ed83 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -2,6 +2,7 @@ use data_types::{ChunkId, ChunkOrder, PartitionId}; use datafusion::physical_plan::Statistics; +use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRanges}; use parquet_file::chunk::ParquetChunk; use schema::sort::SortKey; use std::sync::Arc; @@ -11,8 +12,6 @@ mod query_access; pub use creation::ChunkAdapter; -use crate::df_stats::{create_chunk_statistics, ColumnRanges}; - /// Immutable metadata attached to a [`QuerierParquetChunk`]. #[derive(Debug)] pub struct QuerierParquetChunkMeta { diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 25bbb08b7a..fcdc3c55aa 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -492,7 +492,6 @@ mod tests { use super::*; use crate::{ cache::test_util::{assert_cache_access_metric_count, assert_catalog_access_metric_count}, - df_stats::ColumnRange, ingester::{test_util::MockIngesterConnection, IngesterPartition}, table::test_util::{querier_table, IngesterPartitionBuilder}, }; @@ -506,7 +505,7 @@ mod tests { use generated_types::influxdata::iox::partition_template::v1::{ template_part::Part, PartitionTemplate, TemplatePart, }; - use iox_query::exec::IOxSessionContext; + use iox_query::{chunk_statistics::ColumnRange, exec::IOxSessionContext}; use iox_tests::{TestCatalog, TestParquetFileBuilder, TestTable}; use predicate::Predicate; use schema::{builder::SchemaBuilder, InfluxFieldType, TIME_COLUMN_NAME}; diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index 182a77b18a..842c4eb9cb 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -1,11 +1,12 @@ use super::{PruneMetrics, QuerierTable, QuerierTableArgs}; use crate::{ - cache::CatalogCache, create_ingester_connection_for_testing, df_stats::ColumnRanges, - parquet::ChunkAdapter, IngesterPartition, + cache::CatalogCache, create_ingester_connection_for_testing, parquet::ChunkAdapter, + IngesterPartition, }; use arrow::record_batch::RecordBatch; use data_types::ChunkId; use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows}; +use iox_query::chunk_statistics::ColumnRanges; use iox_tests::{TestCatalog, TestPartition, TestTable}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use schema::{Projection, Schema}; diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index dfa19f4c7a..fa7ee4a7a1 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -481,24 +481,43 @@ impl MiniCluster { .await? .into_inner(); - let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap(); - assert!(matches!(msg, DecodedPayload::None), "{msg:?}"); - - let schema = next_message(&mut performed_query) - .await - .map(|(msg, _)| unwrap_schema(msg)); - - let mut record_batches = vec![]; - while let Some((msg, _md)) = next_message(&mut performed_query).await { - let batch = unwrap_record_batch(msg); - record_batches.push(batch); + let mut partitions = vec![]; + let mut current_partition = None; + while let Some((msg, app_metadata)) = next_message(&mut performed_query).await { + match msg { + DecodedPayload::None => { + if let Some(p) = std::mem::take(&mut current_partition) { + partitions.push(p); + } + current_partition = Some(IngesterResponsePartition { + app_metadata, + schema: None, + record_batches: vec![], + }); + } + DecodedPayload::Schema(schema) => { + let current_partition = + current_partition.as_mut().expect("schema w/o partition"); + assert!( + current_partition.schema.is_none(), + "got two schemas for a single partition" + ); + current_partition.schema = Some(schema); + } + DecodedPayload::RecordBatch(batch) => { + let current_partition = + current_partition.as_mut().expect("batch w/o partition"); + assert!(current_partition.schema.is_some(), "batch w/o schema"); + current_partition.record_batches.push(batch); + } + } } - Ok(IngesterResponse { - app_metadata, - schema, - record_batches, - }) + if let Some(p) = current_partition { + partitions.push(p); + } + + Ok(IngesterResponse { partitions }) } /// Ask all of the ingesters to persist their data. @@ -589,6 +608,11 @@ impl MiniCluster { /// Gathers data from ingester Flight queries #[derive(Debug)] pub struct IngesterResponse { + pub partitions: Vec, +} + +#[derive(Debug)] +pub struct IngesterResponsePartition { pub app_metadata: IngesterQueryResponseMetadata, pub schema: Option, pub record_batches: Vec, @@ -708,17 +732,3 @@ async fn next_message( Some((payload, app_metadata)) } - -fn unwrap_schema(msg: DecodedPayload) -> SchemaRef { - match msg { - DecodedPayload::Schema(s) => s, - _ => panic!("Unexpected message type: {msg:?}"), - } -} - -fn unwrap_record_batch(msg: DecodedPayload) -> RecordBatch { - match msg { - DecodedPayload::RecordBatch(b) => b, - _ => panic!("Unexpected message type: {msg:?}"), - } -}