From 4b5cf6a70ed410b65427d773f1ccc1265b5eeaa8 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 15 Mar 2022 10:28:08 +0000 Subject: [PATCH] feat: cache processed tombstones (#4030) * test: allow to mock time in `iox_test` * feat: cache processed tombstones For #3974. * refactor: introduce `TTL_NOT_PROCESSED` --- iox_tests/src/util.rs | 13 +- querier/src/cache/mod.rs | 22 ++- querier/src/cache/processed_tombstones.rs | 162 ++++++++++++++++++++++ querier/src/chunk.rs | 5 +- querier/src/database.rs | 5 +- querier/src/namespace/mod.rs | 26 ++-- querier/src/namespace/test_util.rs | 5 +- 7 files changed, 217 insertions(+), 21 deletions(-) create mode 100644 querier/src/cache/processed_tombstones.rs diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index abb7f78470..4c2faa9a91 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -24,7 +24,7 @@ pub struct TestCatalog { pub catalog: Arc, pub metric_registry: Arc, pub object_store: Arc, - pub time_provider: Arc, + pub time_provider: Arc, pub exec: Arc, } @@ -61,9 +61,18 @@ impl TestCatalog { Arc::clone(&self.object_store) } + /// Return the mockable version of the catalog's time provider. + /// + /// If you need a generic time provider, use [`time_provider`](Self::time_provider) instead. + pub fn mock_time_provider(&self) -> &MockProvider { + self.time_provider.as_ref() + } + /// Return the catalog's time provider + /// + /// If you need to mock the time, use [`mock_time_provider`](Self::mock_time_provider) instead. pub fn time_provider(&self) -> Arc { - Arc::clone(&self.time_provider) + Arc::clone(&self.time_provider) as _ } /// Return the catalog's executor diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs index 2b045e34ab..1ab3cca280 100644 --- a/querier/src/cache/mod.rs +++ b/querier/src/cache/mod.rs @@ -1,11 +1,16 @@ use backoff::BackoffConfig; use iox_catalog::interface::Catalog; use std::sync::Arc; +use time::TimeProvider; -use self::{namespace::NamespaceCache, partition::PartitionCache, table::TableCache}; +use self::{ + namespace::NamespaceCache, partition::PartitionCache, + processed_tombstones::ProcessedTombstonesCache, table::TableCache, +}; pub mod namespace; pub mod partition; +pub mod processed_tombstones; pub mod table; #[cfg(test)] @@ -25,22 +30,28 @@ pub struct CatalogCache { /// Namespace cache. namespace_cache: NamespaceCache, + + /// Processed tombstone cache. + processed_tombstones: ProcessedTombstonesCache, } impl CatalogCache { /// Create empty cache. - pub fn new(catalog: Arc) -> Self { + pub fn new(catalog: Arc, time_provider: Arc) -> Self { let backoff_config = BackoffConfig::default(); let namespace_cache = NamespaceCache::new(Arc::clone(&catalog), backoff_config.clone()); let table_cache = TableCache::new(Arc::clone(&catalog), backoff_config.clone()); - let partition_cache = PartitionCache::new(Arc::clone(&catalog), backoff_config); + let partition_cache = PartitionCache::new(Arc::clone(&catalog), backoff_config.clone()); + let processed_tombstones = + ProcessedTombstonesCache::new(Arc::clone(&catalog), backoff_config, time_provider); Self { catalog, partition_cache, table_cache, namespace_cache, + processed_tombstones, } } @@ -63,4 +74,9 @@ impl CatalogCache { pub fn partition(&self) -> &PartitionCache { &self.partition_cache } + + /// Processed tombstone cache. + pub fn processed_tombstones(&self) -> &ProcessedTombstonesCache { + &self.processed_tombstones + } } diff --git a/querier/src/cache/processed_tombstones.rs b/querier/src/cache/processed_tombstones.rs new file mode 100644 index 0000000000..15a1864d04 --- /dev/null +++ b/querier/src/cache/processed_tombstones.rs @@ -0,0 +1,162 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use backoff::{Backoff, BackoffConfig}; +use data_types2::{ParquetFileId, TombstoneId}; +use iox_catalog::interface::Catalog; +use time::TimeProvider; + +use crate::cache_system::{ + backend::ttl::{TtlBackend, TtlProvider}, + driver::Cache, + loader::FunctionLoader, +}; + +/// Duration to keep "tombstone is NOT processed yet". +/// +/// Marking tombstones as processed is a mere optimization, so we can keep this cache entry for a while. +pub const TTL_NOT_PROCESSED: Duration = Duration::from_secs(100); + +/// Cache for processed tombstones. +#[derive(Debug)] +pub struct ProcessedTombstonesCache { + cache: Cache<(ParquetFileId, TombstoneId), bool>, +} + +impl ProcessedTombstonesCache { + /// Create new empty cache. + pub fn new( + catalog: Arc, + backoff_config: BackoffConfig, + time_provider: Arc, + ) -> Self { + let loader = Arc::new(FunctionLoader::new( + move |(parquet_file_id, tombstone_id)| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); + + async move { + Backoff::new(&backoff_config) + .retry_all_errors("processed tombstone exists", || async { + catalog + .repositories() + .await + .processed_tombstones() + .exist(parquet_file_id, tombstone_id) + .await + }) + .await + .expect("retry forever") + } + }, + )); + + let backend = Box::new(HashMap::new()); + let backend = Box::new(TtlBackend::new( + backend, + Arc::new(KeepExistsForever {}), + time_provider, + )); + + Self { + cache: Cache::new(loader, backend), + } + } + + /// Check if the specified tombstone is mark as "processed" for the given parquet file. + pub async fn exists(&self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> bool { + self.cache.get((parquet_file_id, tombstone_id)).await + } +} + +#[derive(Debug)] +struct KeepExistsForever; + +impl TtlProvider for KeepExistsForever { + type K = (ParquetFileId, TombstoneId); + type V = bool; + + fn expires_in(&self, _k: &Self::K, v: &Self::V) -> Option { + if *v { + // keep forever + None + } else { + // marking tombstones as processed is a mere optimization, so we can keep this cache entry for a while + Some(TTL_NOT_PROCESSED) + } + } +} + +#[cfg(test)] +mod tests { + use iox_tests::util::TestCatalog; + use time::{MockProvider, Time}; + + use crate::cache::test_util::assert_histogram_metric_count; + + use super::*; + + #[tokio::test] + async fn test() { + let catalog = TestCatalog::new(); + + let ns = catalog.create_namespace("ns").await; + let table = ns.create_table("table").await; + let sequencer = ns.create_sequencer(1).await; + let partition = table.with_sequencer(&sequencer).create_partition("k").await; + + let file1 = partition.create_parquet_file("table foo=1 11").await; + let file2 = partition.create_parquet_file("table foo=1 11").await; + let ts1 = table + .with_sequencer(&sequencer) + .create_tombstone(1, 1, 10, "foo=1") + .await; + let ts2 = table + .with_sequencer(&sequencer) + .create_tombstone(2, 1, 10, "foo=1") + .await; + + ts1.mark_processed(&file1).await; + + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let cache = ProcessedTombstonesCache::new( + catalog.catalog(), + BackoffConfig::default(), + Arc::clone(&time_provider) as _, + ); + + assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await); + assert!(!cache.exists(file1.parquet_file.id, ts2.tombstone.id).await); + assert!(!cache.exists(file2.parquet_file.id, ts1.tombstone.id).await); + assert!(!cache.exists(file2.parquet_file.id, ts2.tombstone.id).await); + + assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 4); + + ts2.mark_processed(&file2).await; + + // values are cached for a while + assert!(TTL_NOT_PROCESSED > Duration::from_millis(1)); + time_provider.inc(TTL_NOT_PROCESSED - Duration::from_millis(1)); + assert!(!cache.exists(file2.parquet_file.id, ts2.tombstone.id).await); + assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 4); + + time_provider.inc(Duration::from_millis(1)); + assert!(cache.exists(file2.parquet_file.id, ts2.tombstone.id).await); + assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 5); + + // "true" results are cached forever + assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await); + assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 5); + + // cache key has two dimensions + assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await); + assert!(!cache.exists(file1.parquet_file.id, ts2.tombstone.id).await); + assert!(!cache.exists(file2.parquet_file.id, ts1.tombstone.id).await); + assert!(cache.exists(file2.parquet_file.id, ts2.tombstone.id).await); + ts1.mark_processed(&file2).await; + time_provider.inc(TTL_NOT_PROCESSED); + assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await); + assert!(!cache.exists(file1.parquet_file.id, ts2.tombstone.id).await); + assert!(cache.exists(file2.parquet_file.id, ts1.tombstone.id).await); + assert!(cache.exists(file2.parquet_file.id, ts2.tombstone.id).await); + } +} diff --git a/querier/src/chunk.rs b/querier/src/chunk.rs index 9fe8c8bde9..997822d26e 100644 --- a/querier/src/chunk.rs +++ b/querier/src/chunk.rs @@ -165,7 +165,10 @@ mod tests { let catalog = TestCatalog::new(); let adapter = ParquetChunkAdapter::new( - Arc::new(CatalogCache::new(catalog.catalog())), + Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + )), catalog.object_store(), catalog.metric_registry(), catalog.time_provider(), diff --git a/querier/src/database.rs b/querier/src/database.rs index a0a115dde7..99bc17d773 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -63,7 +63,10 @@ impl QuerierDatabase { time_provider: Arc, exec: Arc, ) -> Self { - let catalog_cache = Arc::new(CatalogCache::new(Arc::clone(&catalog))); + let catalog_cache = Arc::new(CatalogCache::new( + Arc::clone(&catalog), + Arc::clone(&time_provider), + )); Self { backoff_config: BackoffConfig::default(), diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 8c7ff69e32..89e37130cf 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -575,17 +575,11 @@ impl QuerierNamespace { let mut predicates_filtered = vec![]; for (tombstone_id, predicate) in predicates { - let is_processed = Backoff::new(&self.backoff_config) - .retry_all_errors("processed tombstone exists", || async { - self.catalog - .repositories() - .await - .processed_tombstones() - .exist(parquet_file_id, *tombstone_id) - .await - }) - .await - .expect("retry forever"); + let is_processed = self + .catalog_cache + .processed_tombstones() + .exists(parquet_file_id, *tombstone_id) + .await; if !is_processed { predicates_filtered.push(Arc::clone(predicate)); @@ -606,7 +600,9 @@ impl QuerierNamespace { #[cfg(test)] mod tests { use super::*; - use crate::namespace::test_util::querier_namespace; + use crate::{ + cache::processed_tombstones::TTL_NOT_PROCESSED, namespace::test_util::querier_namespace, + }; use data_types2::{ChunkAddr, ChunkId, ColumnType, PartitionAddr}; use iox_tests::util::{TestCatalog, TestParquetFile}; use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; @@ -617,7 +613,10 @@ mod tests { let catalog = TestCatalog::new(); let querier_namespace = QuerierNamespace::new( - Arc::new(CatalogCache::new(catalog.catalog())), + Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + )), "ns".into(), NamespaceId::new(1), catalog.metric_registry(), @@ -935,6 +934,7 @@ mod tests { .with_sequencer(&sequencer1) .create_tombstone(4, 1, 10, "foo=4") .await; + catalog.mock_time_provider().inc(TTL_NOT_PROCESSED); // cache timeout for processed tombstones querier_namespace.sync().await; assert_eq!( delete_predicates(&querier_namespace), diff --git a/querier/src/namespace/test_util.rs b/querier/src/namespace/test_util.rs index 18d28dc95b..fc9350fe37 100644 --- a/querier/src/namespace/test_util.rs +++ b/querier/src/namespace/test_util.rs @@ -7,7 +7,10 @@ use super::QuerierNamespace; pub fn querier_namespace(catalog: &Arc, ns: &Arc) -> QuerierNamespace { QuerierNamespace::new( - Arc::new(CatalogCache::new(catalog.catalog())), + Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + )), ns.namespace.name.clone().into(), ns.namespace.id, catalog.metric_registry(),