From a08a91c5ba5f50b32c8a896e0b77095bb0c73cbb Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 25 May 2022 12:44:42 +0200 Subject: [PATCH] fix: ensure querier cache is refreshed for partition sort key (#4660) * test: call `maybe_start_logging` in auto-generated cases * fix: ensure querier cache is refreshed for partition sort key Fixes #4631. * docs: explain querier sort key handling and test * test: test another version of issue 4631 * fix: correctly invalidate partition sort keys * fix: fix `table_not_found_on_ingester` --- .../tests/end_to_end_cases/querier.rs | 156 ++++++++++- iox_tests/src/util.rs | 21 ++ querier/src/cache/partition.rs | 114 +++++++- querier/src/chunk/mod.rs | 37 ++- querier/src/chunk/query_access.rs | 2 +- querier/src/ingester/mod.rs | 31 ++- querier/src/ingester/test_util.rs | 6 +- querier/src/table/mod.rs | 259 +++++++++--------- querier/src/table/state_reconciler.rs | 137 ++++++++- .../src/table/state_reconciler/interface.rs | 23 +- query_tests/generate/src/main.rs | 2 + query_tests/src/cases.rs | 28 ++ test_helpers_end_to_end/src/config.rs | 10 +- 13 files changed, 647 insertions(+), 179 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index 4ae75d3077..fe22c3eb10 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -132,7 +132,8 @@ async fn table_not_found_on_ingester() { let table_name = "the_table"; // Set up the cluster ==================================== - let mut cluster = MiniCluster::create_shared(database_url).await; + // cannot use shared cluster because we're restarting the ingester + let mut cluster = MiniCluster::create_non_shared_standard(database_url).await; StepTest::new( &mut cluster, @@ -261,3 +262,156 @@ async fn ingester_panic() { .run() .await } + +#[tokio::test] +async fn issue_4631_a() { + // See https://github.com/influxdata/influxdb_iox/issues/4631 + // + // The symptom was that on rare occasion the querier would panic because the query engine was sure there must be a + // partition sort key but the querier did not provide any. For this to happen we need overlapping chunks and all + // these chunks must be sorted. This is only the case if all chunks are persisted (i.e. parquet-backed, so no + // ingester data). The reason why the querier did NOT provide a partition sort key was because it once got queried + // when the partition was fresh and only existed within the ingester, so no partition sort key was calculated yet. + // During that initial query the querier would cache the partition information (incl. the absence of a partition + // sort key) and when queried again (this time all chunks are peristed but overlapping) it would use this stale + // information, confusing the query engine. + test_helpers::maybe_start_logging(); + + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let router_config = TestConfig::new_router(&database_url); + let ingester_config = + TestConfig::new_ingester(&router_config).with_ingester_persist_memory_threshold(1000); + let querier_config = TestConfig::new_querier(&ingester_config); + let mut cluster = MiniCluster::new() + .with_router(router_config) + .await + .with_ingester(ingester_config) + .await + .with_querier(querier_config) + .await; + + // We need a trigger for persistence that is not time so the test is as stable as possible. We use a long string to + // cross the persistence memory threshold. + let mut super_long_string = String::new(); + for _ in 0..10_000 { + super_long_string.push('x'); + } + + StepTest::new( + &mut cluster, + vec![ + // create UNPERSISTED ingester data + // IMPORTANT: The data MUST NOT be persisted before the first query is executed, because persistence + // calculates the partition sort key. The original bug was that the first query on a completely + // unpersisted partition would cache the NULL/None sort key which would later lead to a panic. + Step::WriteLineProtocol(format!("{},tag=A val=\"foo\" 1", table_name)), + Step::WaitForReadable, + Step::AssertNotPersisted, + // cache partition in querier w/o any partition sort key (yet) + // This MUST happen after we have some ingester data but before ANYTHING was persisted. In the original bug + // the querier would now cache the partition w/o a sort key (and would never invalidate this information). + Step::Query { + sql: format!("select * from {}", table_name), + expected: vec![ + "+-----+--------------------------------+-----+", + "| tag | time | val |", + "+-----+--------------------------------+-----+", + "| A | 1970-01-01T00:00:00.000000001Z | foo |", + "+-----+--------------------------------+-----+", + ], + }, + // flush ingester data + // Here the ingester calculates the partition sort key. + Step::WriteLineProtocol(format!( + "{},tag=B val=\"{}\" 2\n", + table_name, super_long_string + )), + Step::WaitForPersisted, + // create overlapping 2nd parquet file + // This is important to trigger the bug within the query engine, because only if there are multiple chunks + // that need to be de-duplicated the bug will occur. + Step::WriteLineProtocol(format!( + "{},tag=A val=\"bar\" 1\n{},tag=B val=\"{}\" 2\n", + table_name, table_name, super_long_string + )), + Step::WaitForPersisted, + // query + // In the original bug the querier would still use NULL/None as a partition sort key but present two sorted + // but overlapping chunks to the query engine. + Step::Query { + sql: format!("select * from {} where tag='A'", table_name), + expected: vec![ + "+-----+--------------------------------+-----+", + "| tag | time | val |", + "+-----+--------------------------------+-----+", + "| A | 1970-01-01T00:00:00.000000001Z | bar |", + "+-----+--------------------------------+-----+", + ], + }, + ], + ) + .run() + .await +} + +#[tokio::test] +async fn issue_4631_b() { + // This is similar to `issue_4631_a` but instead of updating the sort key from NULL/None to something we update it + // with a new tag. + test_helpers::maybe_start_logging(); + + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + // create persisted chunk with a single tag column + Step::WriteLineProtocol(format!("{},tag=A val=\"foo\" 1", table_name)), + Step::WaitForPersisted, + // query to prime the querier caches with partition sort key + Step::Query { + sql: format!("select * from {}", table_name), + expected: vec![ + "+-----+--------------------------------+-----+", + "| tag | time | val |", + "+-----+--------------------------------+-----+", + "| A | 1970-01-01T00:00:00.000000001Z | foo |", + "+-----+--------------------------------+-----+", + ], + }, + // create 2nd chunk with an additional tag column (which will be included in the partition sort key) + Step::WriteLineProtocol(format!("{},tag=A,tag2=B val=\"bar\" 1\n", table_name)), + Step::WaitForPersisted, + // in the original bug the querier would now panic with: + // + // Partition sort key tag, time, does not cover or is sorted on the same order of the chunk sort key tag, tag2, time, + // + // Note that we cannot query tag2 because the schema is cached for a while. + Step::Query { + sql: format!( + "select tag, val from {} where tag='A' order by val", + table_name + ), + expected: vec![ + "+-----+-----+", + "| tag | val |", + "+-----+-----+", + "| A | bar |", + "| A | foo |", + "+-----+-----+", + ], + }, + ], + ) + .run() + .await +} diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index ad908171e6..ee23080997 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -428,6 +428,27 @@ pub struct TestPartition { } impl TestPartition { + /// Update sort key. + pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Self { + let partition = self + .catalog + .catalog + .repositories() + .await + .partitions() + .update_sort_key(self.partition.id, &sort_key.to_columns()) + .await + .unwrap(); + + Self { + catalog: Arc::clone(&self.catalog), + namespace: Arc::clone(&self.namespace), + table: Arc::clone(&self.table), + sequencer: Arc::clone(&self.sequencer), + partition, + } + } + /// Create a parquet for the partition pub async fn create_parquet_file(self: &Arc, lp: &str) -> TestParquetFile { self.create_parquet_file_with_min_max( diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 0044f1ea73..4dca8f8eb4 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -5,6 +5,7 @@ use cache_system::{ backend::{ lru::{LruBackend, ResourcePool}, resource_consumption::FunctionEstimator, + shared::SharedBackend, }, driver::Cache, loader::{metrics::MetricsLoader, FunctionLoader}, @@ -13,7 +14,11 @@ use data_types::{PartitionId, SequencerId}; use iox_catalog::interface::Catalog; use iox_time::TimeProvider; use schema::sort::SortKey; -use std::{collections::HashMap, mem::size_of_val, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + mem::size_of_val, + sync::Arc, +}; use super::ram::RamSize; @@ -23,6 +28,7 @@ const CACHE_ID: &str = "partition"; #[derive(Debug)] pub struct PartitionCache { cache: Cache, + backend: SharedBackend, } impl PartitionCache { @@ -54,7 +60,7 @@ impl PartitionCache { CachedPartition { sequencer_id: partition.sequencer_id, - sort_key: partition.sort_key(), + sort_key: Arc::new(partition.sort_key()), } } })); @@ -74,9 +80,11 @@ impl PartitionCache { RamSize(size_of_val(k) + size_of_val(v) + v.size()) })), )); + let backend = SharedBackend::new(backend); Self { - cache: Cache::new(loader, backend), + cache: Cache::new(loader, Box::new(backend.clone())), + backend, } } @@ -86,21 +94,42 @@ impl PartitionCache { } /// Get sort key - pub async fn sort_key(&self, partition_id: PartitionId) -> Option { + pub async fn sort_key(&self, partition_id: PartitionId) -> Arc> { self.cache.get(partition_id).await.sort_key } + + /// Expire partition if the cached sort key does NOT cover the given set of columns. + pub fn expire_if_sort_key_does_not_cover( + &self, + partition_id: PartitionId, + columns: &HashSet, + ) { + self.backend.remove_if(&partition_id, |cached_partition| { + if let Some(sort_key) = cached_partition.sort_key.as_ref().as_ref() { + let covered: HashSet<_> = sort_key + .iter() + .map(|(col, _options)| Arc::clone(col)) + .collect(); + columns.iter().any(|col| !covered.contains(col.as_str())) + } else { + // no sort key at all => need to update + true + } + }); + } } #[derive(Debug, Clone)] struct CachedPartition { sequencer_id: SequencerId, - sort_key: Option, + sort_key: Arc>, } impl CachedPartition { /// RAM-bytes EXCLUDING `self`. fn size(&self) -> usize { self.sort_key + .as_ref() .as_ref() .map(|sk| sk.size() - size_of_val(sk)) .unwrap_or_default() @@ -185,15 +214,15 @@ mod tests { ); let sort_key1 = cache.sort_key(p1.id).await; - assert_eq!(sort_key1, p1.sort_key()); + assert_eq!(sort_key1.as_ref(), &p1.sort_key()); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); let sort_key2 = cache.sort_key(p2.id).await; - assert_eq!(sort_key2, p2.sort_key()); + assert_eq!(sort_key2.as_ref(), &p2.sort_key()); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); let sort_key1 = cache.sort_key(p1.id).await; - assert_eq!(sort_key1, p1.sort_key()); + assert_eq!(sort_key1.as_ref(), &p1.sort_key()); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); } @@ -244,4 +273,73 @@ mod tests { cache.sequencer_id(p2.id).await; assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); } + + #[tokio::test] + async fn test_expiration() { + let catalog = TestCatalog::new(); + + let ns = catalog.create_namespace("ns").await; + let t = ns.create_table("table").await; + let s = ns.create_sequencer(1).await; + let p = t.with_sequencer(&s).create_partition("k1").await; + let p_id = p.partition.id; + let p_sort_key = p.partition.sort_key(); + + let cache = PartitionCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + &catalog.metric_registry(), + test_ram_pool(), + ); + + let sort_key = cache.sort_key(p_id).await; + assert_eq!(sort_key.as_ref(), &p_sort_key); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); + + // non-existing sort keys always expire + assert!(p_sort_key.is_none()); + cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::new()); + let sort_key = cache.sort_key(p_id).await; + assert_eq!(sort_key.as_ref(), &p_sort_key); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); + + // set sort key + let p = p + .update_sort_key(SortKey::from_columns(["foo", "bar"])) + .await; + + // just changing the sort key doesn't expire, we need a signal + let sort_key = cache.sort_key(p_id).await; + assert_eq!(sort_key.as_ref(), &p_sort_key); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); + + // expire + let p_sort_key = p.partition.sort_key(); + cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::from([String::from("foo")])); + let sort_key = cache.sort_key(p_id).await; + assert_eq!(sort_key.as_ref(), &p_sort_key); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); + + // subsets and the full key don't expire + cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::new()); + cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::from([String::from("foo")])); + cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::from([String::from("bar")])); + cache.expire_if_sort_key_does_not_cover( + p_id, + &HashSet::from([String::from("foo"), String::from("bar")]), + ); + let sort_key = cache.sort_key(p_id).await; + assert_eq!(sort_key.as_ref(), &p_sort_key); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); + + // unknown columns expire + cache.expire_if_sort_key_does_not_cover( + p_id, + &HashSet::from([String::from("foo"), String::from("x")]), + ); + let sort_key = cache.sort_key(p_id).await; + assert_eq!(sort_key.as_ref(), &p_sort_key); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); + } } diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 010981fb45..b6f4ded819 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -35,9 +35,6 @@ pub struct ChunkMeta { /// Sort key. sort_key: Option, - /// Partition sort key - partition_sort_key: Option, - /// Sequencer that created the data within this chunk. sequencer_id: SequencerId, @@ -62,11 +59,6 @@ impl ChunkMeta { self.sort_key.as_ref() } - /// Partition sort key - pub fn partition_sort_key(&self) -> Option<&SortKey> { - self.partition_sort_key.as_ref() - } - /// Sequencer that created the data within this chunk. pub fn sequencer_id(&self) -> SequencerId { self.sequencer_id @@ -115,6 +107,9 @@ pub struct QuerierChunk { /// Delete predicates of this chunk delete_predicates: Vec>, + + /// Partition sort key + partition_sort_key: Arc>, } impl QuerierChunk { @@ -123,6 +118,7 @@ impl QuerierChunk { parquet_file_id: ParquetFileId, chunk: Arc, meta: Arc, + partition_sort_key: Arc>, ) -> Self { Self { storage: ChunkStorage::Parquet { @@ -131,15 +127,23 @@ impl QuerierChunk { }, meta, delete_predicates: Vec::new(), + partition_sort_key, } } /// Set delete predicates of the given chunk. pub fn with_delete_predicates(self, delete_predicates: Vec>) -> Self { Self { - storage: self.storage, - meta: self.meta, delete_predicates, + ..self + } + } + + /// Set partition sort key + pub fn with_partition_sort_key(self, partition_sort_key: Arc>) -> Self { + Self { + partition_sort_key, + ..self } } @@ -163,6 +167,11 @@ impl QuerierChunk { ChunkStorage::Parquet { chunk, .. } => chunk.timestamp_min_max(), } } + + /// Partition sort key + pub fn partition_sort_key(&self) -> Option<&SortKey> { + self.partition_sort_key.as_ref().as_ref() + } } /// Adapter that can create chunks. @@ -270,14 +279,18 @@ impl ParquetChunkAdapter { table_name, order, sort_key: iox_metadata.sort_key.clone(), - partition_sort_key, sequencer_id: iox_metadata.sequencer_id, partition_id: iox_metadata.partition_id, min_sequence_number: parquet_file.min_sequence_number, max_sequence_number: parquet_file.max_sequence_number, }); - Some(QuerierChunk::new_parquet(parquet_file.id, chunk, meta)) + Some(QuerierChunk::new_parquet( + parquet_file.id, + chunk, + meta, + partition_sort_key, + )) } } diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 01e31320b4..b36f45fbc6 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -32,7 +32,7 @@ impl QueryChunkMeta for QuerierChunk { } fn partition_sort_key(&self) -> Option<&SortKey> { - self.meta().partition_sort_key() + self.partition_sort_key() } fn partition_id(&self) -> Option { diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index fb4ed07fe5..9b5269ec02 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -147,7 +147,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static { columns: Vec, predicate: &Predicate, expected_schema: Arc, - ) -> Result>>; + ) -> Result>; /// Returns the most recent partition sstatus info across all ingester(s) for the specified /// write token. @@ -211,7 +211,7 @@ struct GetPartitionForIngester<'a> { } /// Fetches the partitions for a single ingester -async fn execute(request: GetPartitionForIngester<'_>) -> Result>> { +async fn execute(request: GetPartitionForIngester<'_>) -> Result> { let GetPartitionForIngester { flight_client, catalog_cache, @@ -322,6 +322,7 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result) -> Result, predicate: &Predicate, expected_schema: Arc, - ) -> Result>> { - let mut ingester_partitions: Vec> = self + ) -> Result> { + let mut ingester_partitions: Vec = self .ingester_addresses .iter() .map(|ingester_address| { @@ -464,6 +466,9 @@ pub struct IngesterPartition { /// persisted for this partition tombstone_max_sequence_number: Option, + /// Partition-wide sort key. + partition_sort_key: Arc>, + /// The raw table data batches: Vec, @@ -485,6 +490,7 @@ impl IngesterPartition { expected_schema: Arc, parquet_max_sequence_number: Option, tombstone_max_sequence_number: Option, + partition_sort_key: Arc>, batches: Vec, ) -> Result { // ensure that the schema of the batches matches the required @@ -510,11 +516,19 @@ impl IngesterPartition { schema: expected_schema, parquet_max_sequence_number, tombstone_max_sequence_number, + partition_sort_key, batches, summary, }) } + pub(crate) fn with_partition_sort_key(self, partition_sort_key: Arc>) -> Self { + Self { + partition_sort_key, + ..self + } + } + pub(crate) fn ingester(&self) -> &Arc { &self.ingester } @@ -551,7 +565,7 @@ impl QueryChunkMeta for IngesterPartition { } fn partition_sort_key(&self) -> Option<&SortKey> { - None // data comes from Ingester is not persisted yet and should not yet attached to any catalog partition + self.partition_sort_key.as_ref().as_ref() } fn partition_id(&self) -> Option { @@ -1136,7 +1150,7 @@ mod tests { async fn get_partitions( ingester_conn: &IngesterConnectionImpl, - ) -> Result>, Error> { + ) -> Result, Error> { let namespace = Arc::from("namespace"); let table = Arc::from("table"); let columns = vec![String::from("col")]; @@ -1274,6 +1288,7 @@ mod tests { Arc::clone(&expected_schema), parquet_max_sequence_number, tombstone_max_sequence_number, + Arc::new(None), vec![case], ) .unwrap(); @@ -1309,6 +1324,7 @@ mod tests { Arc::clone(&expected_schema), parquet_max_sequence_number, tombstone_max_sequence_number, + Arc::new(None), vec![batch], ) .unwrap_err(); @@ -1340,6 +1356,7 @@ mod tests { Arc::clone(&expected_schema), parquet_max_sequence_number, tombstone_max_sequence_number, + Arc::new(None), vec![batch], ) .unwrap(); diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index 5bbb76013f..d6ed536e19 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -7,7 +7,7 @@ use std::{any::Any, sync::Arc}; /// IngesterConnection for testing #[derive(Debug, Default)] pub struct MockIngesterConnection { - next_response: Mutex>>>>, + next_response: Mutex>>>, } impl MockIngesterConnection { @@ -18,7 +18,7 @@ impl MockIngesterConnection { /// Set next response for this connection. #[allow(dead_code)] - pub fn next_response(&self, response: super::Result>>) { + pub fn next_response(&self, response: super::Result>) { *self.next_response.lock() = Some(response); } } @@ -32,7 +32,7 @@ impl IngesterConnection for MockIngesterConnection { _columns: Vec, _predicate: &predicate::Predicate, _expected_schema: Arc, - ) -> super::Result>> { + ) -> super::Result> { self.next_response .lock() .take() diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 3a7e358a46..f1ae0ad0df 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -181,10 +181,7 @@ impl QuerierTable { } /// Get partitions from ingesters. - async fn ingester_partitions( - &self, - predicate: &Predicate, - ) -> Result>> { + async fn ingester_partitions(&self, predicate: &Predicate) -> Result> { // For now, ask for *all* columns in the table from the ingester (need // at least all pk (time, tag) columns for // deduplication. @@ -453,21 +450,20 @@ mod tests { .as_any() .downcast_ref::() .unwrap() - .next_response(Ok(vec![Arc::new( - IngesterPartition::try_new( - Arc::from("ingester"), - ChunkId::new(), - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition.partition.id, - sequencer.sequencer.id, - Arc::new(SchemaBuilder::new().build().unwrap()), - Some(SequenceNumber::new(1)), - None, - vec![], - ) - .unwrap(), - )])); + .next_response(Ok(vec![IngesterPartition::try_new( + Arc::from("ingester"), + ChunkId::new(), + Arc::from(ns.namespace.name.clone()), + Arc::from(table.table.name.clone()), + partition.partition.id, + sequencer.sequencer.id, + Arc::new(SchemaBuilder::new().build().unwrap()), + Some(SequenceNumber::new(1)), + None, + Arc::new(None), + vec![], + ) + .unwrap()])); let err = querier_table.chunks(&pred).await.unwrap_err(); assert_matches!(err, Error::StateFusion { .. }); @@ -566,53 +562,51 @@ mod tests { .unwrap() .next_response(Ok(vec![ // this chunk is kept - Arc::new( - IngesterPartition::try_new( - Arc::from("ingester"), - ingester_chunk_id1, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition1.partition.id, - sequencer.sequencer.id, - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ), - // parquet max persisted sequence number - Some(SequenceNumber::new(2)), - // tombstone max persisted sequence number - Some(SequenceNumber::new(10)), - vec![lp_to_record_batch("table foo=3i 33")], - ) - .unwrap(), - ), + IngesterPartition::try_new( + Arc::from("ingester"), + ingester_chunk_id1, + Arc::from(ns.namespace.name.clone()), + Arc::from(table.table.name.clone()), + partition1.partition.id, + sequencer.sequencer.id, + Arc::new( + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(), + ), + // parquet max persisted sequence number + Some(SequenceNumber::new(2)), + // tombstone max persisted sequence number + Some(SequenceNumber::new(10)), + Arc::new(None), + vec![lp_to_record_batch("table foo=3i 33")], + ) + .unwrap(), // this chunk is filtered out because it has no record batches but the reconciling still takes place - Arc::new( - IngesterPartition::try_new( - Arc::from("ingester"), - ingester_chunk_id2, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition2.partition.id, - sequencer.sequencer.id, - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ), - // parquet max persisted sequence number - Some(SequenceNumber::new(3)), - // tombstone max persisted sequence number - Some(SequenceNumber::new(11)), - vec![], - ) - .unwrap(), - ), + IngesterPartition::try_new( + Arc::from("ingester"), + ingester_chunk_id2, + Arc::from(ns.namespace.name.clone()), + Arc::from(table.table.name.clone()), + partition2.partition.id, + sequencer.sequencer.id, + Arc::new( + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(), + ), + // parquet max persisted sequence number + Some(SequenceNumber::new(3)), + // tombstone max persisted sequence number + Some(SequenceNumber::new(11)), + Arc::new(None), + vec![], + ) + .unwrap(), ])); let mut chunks = querier_table.chunks(&pred).await.unwrap(); @@ -668,75 +662,72 @@ mod tests { .downcast_ref::() .unwrap() .next_response(Ok(vec![ - Arc::new( - IngesterPartition::try_new( - Arc::from("ingester1"), - ingester_chunk_id1, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition1.partition.id, - sequencer.sequencer.id, - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ), - // parquet max persisted sequence number - None, - // tombstone max persisted sequence number - None, - vec![lp_to_record_batch("table foo=1i 1")], - ) - .unwrap(), - ), - Arc::new( - IngesterPartition::try_new( - Arc::from("ingester1"), - ingester_chunk_id2, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition2.partition.id, - sequencer.sequencer.id, - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ), - // parquet max persisted sequence number - None, - // tombstone max persisted sequence number - None, - vec![lp_to_record_batch("table foo=2i 2")], - ) - .unwrap(), - ), - Arc::new( - IngesterPartition::try_new( - Arc::from("ingester2"), - ingester_chunk_id3, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition1.partition.id, - sequencer.sequencer.id, - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ), - // parquet max persisted sequence number - None, - // tombstone max persisted sequence number - None, - vec![lp_to_record_batch("table foo=3i 3")], - ) - .unwrap(), - ), + IngesterPartition::try_new( + Arc::from("ingester1"), + ingester_chunk_id1, + Arc::from(ns.namespace.name.clone()), + Arc::from(table.table.name.clone()), + partition1.partition.id, + sequencer.sequencer.id, + Arc::new( + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(), + ), + // parquet max persisted sequence number + None, + // tombstone max persisted sequence number + None, + Arc::new(None), + vec![lp_to_record_batch("table foo=1i 1")], + ) + .unwrap(), + IngesterPartition::try_new( + Arc::from("ingester1"), + ingester_chunk_id2, + Arc::from(ns.namespace.name.clone()), + Arc::from(table.table.name.clone()), + partition2.partition.id, + sequencer.sequencer.id, + Arc::new( + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(), + ), + // parquet max persisted sequence number + None, + // tombstone max persisted sequence number + None, + Arc::new(None), + vec![lp_to_record_batch("table foo=2i 2")], + ) + .unwrap(), + IngesterPartition::try_new( + Arc::from("ingester2"), + ingester_chunk_id3, + Arc::from(ns.namespace.name.clone()), + Arc::from(table.table.name.clone()), + partition1.partition.id, + sequencer.sequencer.id, + Arc::new( + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(), + ), + // parquet max persisted sequence number + None, + // tombstone max persisted sequence number + None, + Arc::new(None), + vec![lp_to_record_batch("table foo=3i 3")], + ) + .unwrap(), ])); let err = querier_table.chunks(&pred).await.unwrap_err(); diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index 0bd2efbe20..55def48b63 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -16,13 +16,18 @@ mod interface; use data_types::{ParquetFileWithMetadata, PartitionId, SequencerId, Tombstone, TombstoneId}; use iox_query::QueryChunk; use observability_deps::tracing::debug; +use schema::{sort::SortKey, InfluxColumnType}; use snafu::Snafu; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use crate::{chunk::ParquetChunkAdapter, tombstone::QuerierTombstone, IngesterPartition}; +use crate::{ + chunk::{ParquetChunkAdapter, QuerierChunk}, + tombstone::QuerierTombstone, + IngesterPartition, +}; use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo}; @@ -58,11 +63,33 @@ impl Reconciler { /// chunks to query pub(crate) async fn reconcile( &self, - ingester_partitions: Vec>, + ingester_partitions: Vec, tombstones: Vec, parquet_files: Vec, ) -> Result>, ReconcileError> { - let tombstone_exclusion = tombstone_exclude_list(&ingester_partitions, &tombstones); + let mut chunks = self + .build_parquet_chunks(&ingester_partitions, tombstones, parquet_files) + .await?; + chunks.extend(self.build_ingester_chunks(ingester_partitions)); + debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation"); + + let chunks = self.sync_partition_sort_keys(chunks).await; + + let chunks: Vec> = chunks + .into_iter() + .map(|c| c.upcast_to_querier_chunk().into()) + .collect(); + + Ok(chunks) + } + + async fn build_parquet_chunks( + &self, + ingester_partitions: &[IngesterPartition], + tombstones: Vec, + parquet_files: Vec, + ) -> Result>, ReconcileError> { + let tombstone_exclusion = tombstone_exclude_list(ingester_partitions, &tombstones); let querier_tombstones: Vec<_> = tombstones.into_iter().map(QuerierTombstone::from).collect(); @@ -79,7 +106,7 @@ impl Reconciler { } // - let parquet_files = filter_parquet_files(&ingester_partitions, parquet_files)?; + let parquet_files = filter_parquet_files(ingester_partitions, parquet_files)?; debug!( ?parquet_files, @@ -101,7 +128,7 @@ impl Reconciler { } debug!(num_chunks=%parquet_chunks.len(), "Created parquet chunks"); - let mut chunks: Vec> = + let mut chunks: Vec> = Vec::with_capacity(parquet_chunks.len() + ingester_partitions.len()); for chunk in parquet_chunks.into_iter() { @@ -165,21 +192,74 @@ impl Reconciler { chunk }; - chunks.push(Arc::new(chunk) as Arc); + chunks.push(Box::new(chunk) as Box); } + Ok(chunks) + } + + fn build_ingester_chunks( + &self, + ingester_partitions: Vec, + ) -> impl Iterator> { // Add ingester chunks to the overall chunk list. // - filter out chunks that don't have any record batches // - tombstones don't need to be applied since they were already materialized by the ingester - let ingester_chunks = ingester_partitions + ingester_partitions .into_iter() .filter(|c| c.has_batches()) - .map(|c| c as Arc); + .map(|c| Box::new(c) as Box) + } - chunks.extend(ingester_chunks); - debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation"); + async fn sync_partition_sort_keys( + &self, + chunks: Vec>, + ) -> Vec> { + // collect columns + let mut all_columns: HashMap> = HashMap::new(); + for chunk in &chunks { + if let Some(partition_id) = chunk.partition_id() { + // columns for this partition MUST include the primary key of this chunk + all_columns.entry(partition_id).or_default().extend( + chunk + .schema() + .iter() + .filter(|(t, _field)| { + matches!(t, Some(InfluxColumnType::Tag | InfluxColumnType::Timestamp)) + }) + .map(|(_t, field)| field.name().clone()), + ); + } + } - Ok(chunks) + // report expiration signal to cache + let partition_cache = self.chunk_adapter.catalog_cache().partition(); + for (partition_id, columns) in &all_columns { + partition_cache.expire_if_sort_key_does_not_cover(*partition_id, columns); + } + + // get cached (or fresh) sort keys + let mut sort_keys: HashMap>> = + HashMap::with_capacity(all_columns.len()); + for partition_id in all_columns.into_keys() { + let sort_key = partition_cache.sort_key(partition_id).await; + sort_keys.insert(partition_id, sort_key); + } + + // write partition sort keys to chunks + chunks + .into_iter() + .map(|chunk| { + if let Some(partition_id) = chunk.partition_id() { + let sort_key = sort_keys + .get(&partition_id) + .expect("sort key for this partition should be fetched by now"); + chunk.update_partition_sort_key(Arc::clone(sort_key)) + } else { + chunk + } + }) + .collect() } #[must_use] @@ -193,6 +273,41 @@ impl Reconciler { } } +trait UpdatableQuerierChunk: QueryChunk { + fn update_partition_sort_key( + self: Box, + sort_key: Arc>, + ) -> Box; + + fn upcast_to_querier_chunk(self: Box) -> Box; +} + +impl UpdatableQuerierChunk for QuerierChunk { + fn update_partition_sort_key( + self: Box, + sort_key: Arc>, + ) -> Box { + Box::new(self.with_partition_sort_key(sort_key)) + } + + fn upcast_to_querier_chunk(self: Box) -> Box { + self as _ + } +} + +impl UpdatableQuerierChunk for IngesterPartition { + fn update_partition_sort_key( + self: Box, + sort_key: Arc>, + ) -> Box { + Box::new(self.with_partition_sort_key(sort_key)) + } + + fn upcast_to_querier_chunk(self: Box) -> Box { + self as _ + } +} + /// Filter out parquet files that contain "too new" data. /// /// The caller may only use the returned parquet files. diff --git a/querier/src/table/state_reconciler/interface.rs b/querier/src/table/state_reconciler/interface.rs index a1b596a96c..d5eefc13c3 100644 --- a/querier/src/table/state_reconciler/interface.rs +++ b/querier/src/table/state_reconciler/interface.rs @@ -16,7 +16,28 @@ pub trait IngesterPartitionInfo { fn tombstone_max_sequence_number(&self) -> Option; } -impl IngesterPartitionInfo for Arc { +impl IngesterPartitionInfo for IngesterPartition { + fn partition_id(&self) -> PartitionId { + self.deref().partition_id() + } + + fn sequencer_id(&self) -> SequencerId { + self.deref().sequencer_id() + } + + fn parquet_max_sequence_number(&self) -> Option { + self.deref().parquet_max_sequence_number() + } + + fn tombstone_max_sequence_number(&self) -> Option { + self.deref().tombstone_max_sequence_number() + } +} + +impl IngesterPartitionInfo for Arc +where + T: IngesterPartitionInfo, +{ fn partition_id(&self) -> PartitionId { self.deref().partition_id() } diff --git a/query_tests/generate/src/main.rs b/query_tests/generate/src/main.rs index 3f8701e8ea..de6f27393c 100644 --- a/query_tests/generate/src/main.rs +++ b/query_tests/generate/src/main.rs @@ -77,6 +77,8 @@ use crate::runner::Runner;"# #[tokio::test] // Tests from {:?}, async fn test_cases_{}() {{ + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("{}"); let mut runner = Runner::new(); runner diff --git a/query_tests/src/cases.rs b/query_tests/src/cases.rs index aee30e211e..2ee630da49 100644 --- a/query_tests/src/cases.rs +++ b/query_tests/src/cases.rs @@ -7,6 +7,8 @@ use crate::runner::Runner; #[tokio::test] // Tests from "basic.sql", async fn test_cases_basic_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("basic.sql"); let mut runner = Runner::new(); runner @@ -21,6 +23,8 @@ async fn test_cases_basic_sql() { #[tokio::test] // Tests from "delete_all.sql", async fn test_cases_delete_all_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("delete_all.sql"); let mut runner = Runner::new(); runner @@ -35,6 +39,8 @@ async fn test_cases_delete_all_sql() { #[tokio::test] // Tests from "delete_multi_expr_one_chunk.sql", async fn test_cases_delete_multi_expr_one_chunk_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("delete_multi_expr_one_chunk.sql"); let mut runner = Runner::new(); runner @@ -49,6 +55,8 @@ async fn test_cases_delete_multi_expr_one_chunk_sql() { #[tokio::test] // Tests from "delete_simple_pred_one_chunk.sql", async fn test_cases_delete_simple_pred_one_chunk_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("delete_simple_pred_one_chunk.sql"); let mut runner = Runner::new(); runner @@ -63,6 +71,8 @@ async fn test_cases_delete_simple_pred_one_chunk_sql() { #[tokio::test] // Tests from "delete_three_delete_three_chunks.sql", async fn test_cases_delete_three_delete_three_chunks_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("delete_three_delete_three_chunks.sql"); let mut runner = Runner::new(); runner @@ -77,6 +87,8 @@ async fn test_cases_delete_three_delete_three_chunks_sql() { #[tokio::test] // Tests from "delete_two_del_multi_expr_one_chunk.sql", async fn test_cases_delete_two_del_multi_expr_one_chunk_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("delete_two_del_multi_expr_one_chunk.sql"); let mut runner = Runner::new(); runner @@ -91,6 +103,8 @@ async fn test_cases_delete_two_del_multi_expr_one_chunk_sql() { #[tokio::test] // Tests from "duplicates.sql", async fn test_cases_duplicates_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("duplicates.sql"); let mut runner = Runner::new(); runner @@ -105,6 +119,8 @@ async fn test_cases_duplicates_sql() { #[tokio::test] // Tests from "new_sql_system_tables.sql", async fn test_cases_new_sql_system_tables_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("new_sql_system_tables.sql"); let mut runner = Runner::new(); runner @@ -119,6 +135,8 @@ async fn test_cases_new_sql_system_tables_sql() { #[tokio::test] // Tests from "pushdown.sql", async fn test_cases_pushdown_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("pushdown.sql"); let mut runner = Runner::new(); runner @@ -133,6 +151,8 @@ async fn test_cases_pushdown_sql() { #[tokio::test] // Tests from "several_chunks.sql", async fn test_cases_several_chunks_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("several_chunks.sql"); let mut runner = Runner::new(); runner @@ -147,6 +167,8 @@ async fn test_cases_several_chunks_sql() { #[tokio::test] // Tests from "sql_information_schema.sql", async fn test_cases_sql_information_schema_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("sql_information_schema.sql"); let mut runner = Runner::new(); runner @@ -161,6 +183,8 @@ async fn test_cases_sql_information_schema_sql() { #[tokio::test] // Tests from "timestamps.sql", async fn test_cases_timestamps_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("timestamps.sql"); let mut runner = Runner::new(); runner @@ -175,6 +199,8 @@ async fn test_cases_timestamps_sql() { #[tokio::test] // Tests from "two_chunks.sql", async fn test_cases_two_chunks_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("two_chunks.sql"); let mut runner = Runner::new(); runner @@ -189,6 +215,8 @@ async fn test_cases_two_chunks_sql() { #[tokio::test] // Tests from "two_chunks_missing_columns.sql", async fn test_cases_two_chunks_missing_columns_sql() { + test_helpers::maybe_start_logging(); + let input_path = Path::new("cases").join("in").join("two_chunks_missing_columns.sql"); let mut runner = Runner::new(); runner diff --git a/test_helpers_end_to_end/src/config.rs b/test_helpers_end_to_end/src/config.rs index debff38c7f..ce86fba1c4 100644 --- a/test_helpers_end_to_end/src/config.rs +++ b/test_helpers_end_to_end/src/config.rs @@ -146,10 +146,18 @@ impl TestConfig { /// Adds default ingester options fn with_default_ingester_options(self) -> Self { self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2000000") - .with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "10") + .with_ingester_persist_memory_threshold(10) .with_kafka_partition(0) } + /// Sets memory threshold for ingester. + pub fn with_ingester_persist_memory_threshold(self, bytes: u64) -> Self { + self.with_env( + "INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", + bytes.to_string(), + ) + } + /// Adds an ingester that ingests from the specified kafka partition pub fn with_kafka_partition(self, kafka_partition_id: u64) -> Self { self.with_env(