feat: cache processed tombstones (#4030)

* test: allow to mock time in `iox_test`

* feat: cache processed tombstones

For #3974.

* refactor: introduce `TTL_NOT_PROCESSED`
pull/24376/head
Marco Neumann 2022-03-15 10:28:08 +00:00 committed by GitHub
parent 8f1938a482
commit 4b5cf6a70e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 217 additions and 21 deletions

View File

@ -24,7 +24,7 @@ pub struct TestCatalog {
pub catalog: Arc<dyn Catalog>,
pub metric_registry: Arc<metric::Registry>,
pub object_store: Arc<ObjectStore>,
pub time_provider: Arc<dyn TimeProvider>,
pub time_provider: Arc<MockProvider>,
pub exec: Arc<Executor>,
}
@ -61,9 +61,18 @@ impl TestCatalog {
Arc::clone(&self.object_store)
}
/// Return the mockable version of the catalog's time provider.
///
/// If you need a generic time provider, use [`time_provider`](Self::time_provider) instead.
pub fn mock_time_provider(&self) -> &MockProvider {
self.time_provider.as_ref()
}
/// Return the catalog's time provider
///
/// If you need to mock the time, use [`mock_time_provider`](Self::mock_time_provider) instead.
pub fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(&self.time_provider)
Arc::clone(&self.time_provider) as _
}
/// Return the catalog's executor

View File

@ -1,11 +1,16 @@
use backoff::BackoffConfig;
use iox_catalog::interface::Catalog;
use std::sync::Arc;
use time::TimeProvider;
use self::{namespace::NamespaceCache, partition::PartitionCache, table::TableCache};
use self::{
namespace::NamespaceCache, partition::PartitionCache,
processed_tombstones::ProcessedTombstonesCache, table::TableCache,
};
pub mod namespace;
pub mod partition;
pub mod processed_tombstones;
pub mod table;
#[cfg(test)]
@ -25,22 +30,28 @@ pub struct CatalogCache {
/// Namespace cache.
namespace_cache: NamespaceCache,
/// Processed tombstone cache.
processed_tombstones: ProcessedTombstonesCache,
}
impl CatalogCache {
/// Create empty cache.
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
pub fn new(catalog: Arc<dyn Catalog>, time_provider: Arc<dyn TimeProvider>) -> Self {
let backoff_config = BackoffConfig::default();
let namespace_cache = NamespaceCache::new(Arc::clone(&catalog), backoff_config.clone());
let table_cache = TableCache::new(Arc::clone(&catalog), backoff_config.clone());
let partition_cache = PartitionCache::new(Arc::clone(&catalog), backoff_config);
let partition_cache = PartitionCache::new(Arc::clone(&catalog), backoff_config.clone());
let processed_tombstones =
ProcessedTombstonesCache::new(Arc::clone(&catalog), backoff_config, time_provider);
Self {
catalog,
partition_cache,
table_cache,
namespace_cache,
processed_tombstones,
}
}
@ -63,4 +74,9 @@ impl CatalogCache {
pub fn partition(&self) -> &PartitionCache {
&self.partition_cache
}
/// Processed tombstone cache.
pub fn processed_tombstones(&self) -> &ProcessedTombstonesCache {
&self.processed_tombstones
}
}

View File

@ -0,0 +1,162 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use backoff::{Backoff, BackoffConfig};
use data_types2::{ParquetFileId, TombstoneId};
use iox_catalog::interface::Catalog;
use time::TimeProvider;
use crate::cache_system::{
backend::ttl::{TtlBackend, TtlProvider},
driver::Cache,
loader::FunctionLoader,
};
/// Duration to keep "tombstone is NOT processed yet".
///
/// Marking tombstones as processed is a mere optimization, so we can keep this cache entry for a while.
pub const TTL_NOT_PROCESSED: Duration = Duration::from_secs(100);
/// Cache for processed tombstones.
#[derive(Debug)]
pub struct ProcessedTombstonesCache {
cache: Cache<(ParquetFileId, TombstoneId), bool>,
}
impl ProcessedTombstonesCache {
/// Create new empty cache.
pub fn new(
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
let loader = Arc::new(FunctionLoader::new(
move |(parquet_file_id, tombstone_id)| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
Backoff::new(&backoff_config)
.retry_all_errors("processed tombstone exists", || async {
catalog
.repositories()
.await
.processed_tombstones()
.exist(parquet_file_id, tombstone_id)
.await
})
.await
.expect("retry forever")
}
},
));
let backend = Box::new(HashMap::new());
let backend = Box::new(TtlBackend::new(
backend,
Arc::new(KeepExistsForever {}),
time_provider,
));
Self {
cache: Cache::new(loader, backend),
}
}
/// Check if the specified tombstone is mark as "processed" for the given parquet file.
pub async fn exists(&self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> bool {
self.cache.get((parquet_file_id, tombstone_id)).await
}
}
#[derive(Debug)]
struct KeepExistsForever;
impl TtlProvider for KeepExistsForever {
type K = (ParquetFileId, TombstoneId);
type V = bool;
fn expires_in(&self, _k: &Self::K, v: &Self::V) -> Option<Duration> {
if *v {
// keep forever
None
} else {
// marking tombstones as processed is a mere optimization, so we can keep this cache entry for a while
Some(TTL_NOT_PROCESSED)
}
}
}
#[cfg(test)]
mod tests {
use iox_tests::util::TestCatalog;
use time::{MockProvider, Time};
use crate::cache::test_util::assert_histogram_metric_count;
use super::*;
#[tokio::test]
async fn test() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let table = ns.create_table("table").await;
let sequencer = ns.create_sequencer(1).await;
let partition = table.with_sequencer(&sequencer).create_partition("k").await;
let file1 = partition.create_parquet_file("table foo=1 11").await;
let file2 = partition.create_parquet_file("table foo=1 11").await;
let ts1 = table
.with_sequencer(&sequencer)
.create_tombstone(1, 1, 10, "foo=1")
.await;
let ts2 = table
.with_sequencer(&sequencer)
.create_tombstone(2, 1, 10, "foo=1")
.await;
ts1.mark_processed(&file1).await;
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let cache = ProcessedTombstonesCache::new(
catalog.catalog(),
BackoffConfig::default(),
Arc::clone(&time_provider) as _,
);
assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await);
assert!(!cache.exists(file1.parquet_file.id, ts2.tombstone.id).await);
assert!(!cache.exists(file2.parquet_file.id, ts1.tombstone.id).await);
assert!(!cache.exists(file2.parquet_file.id, ts2.tombstone.id).await);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 4);
ts2.mark_processed(&file2).await;
// values are cached for a while
assert!(TTL_NOT_PROCESSED > Duration::from_millis(1));
time_provider.inc(TTL_NOT_PROCESSED - Duration::from_millis(1));
assert!(!cache.exists(file2.parquet_file.id, ts2.tombstone.id).await);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 4);
time_provider.inc(Duration::from_millis(1));
assert!(cache.exists(file2.parquet_file.id, ts2.tombstone.id).await);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 5);
// "true" results are cached forever
assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 5);
// cache key has two dimensions
assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await);
assert!(!cache.exists(file1.parquet_file.id, ts2.tombstone.id).await);
assert!(!cache.exists(file2.parquet_file.id, ts1.tombstone.id).await);
assert!(cache.exists(file2.parquet_file.id, ts2.tombstone.id).await);
ts1.mark_processed(&file2).await;
time_provider.inc(TTL_NOT_PROCESSED);
assert!(cache.exists(file1.parquet_file.id, ts1.tombstone.id).await);
assert!(!cache.exists(file1.parquet_file.id, ts2.tombstone.id).await);
assert!(cache.exists(file2.parquet_file.id, ts1.tombstone.id).await);
assert!(cache.exists(file2.parquet_file.id, ts2.tombstone.id).await);
}
}

View File

@ -165,7 +165,10 @@ mod tests {
let catalog = TestCatalog::new();
let adapter = ParquetChunkAdapter::new(
Arc::new(CatalogCache::new(catalog.catalog())),
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
)),
catalog.object_store(),
catalog.metric_registry(),
catalog.time_provider(),

View File

@ -63,7 +63,10 @@ impl QuerierDatabase {
time_provider: Arc<dyn TimeProvider>,
exec: Arc<Executor>,
) -> Self {
let catalog_cache = Arc::new(CatalogCache::new(Arc::clone(&catalog)));
let catalog_cache = Arc::new(CatalogCache::new(
Arc::clone(&catalog),
Arc::clone(&time_provider),
));
Self {
backoff_config: BackoffConfig::default(),

View File

@ -575,17 +575,11 @@ impl QuerierNamespace {
let mut predicates_filtered = vec![];
for (tombstone_id, predicate) in predicates {
let is_processed = Backoff::new(&self.backoff_config)
.retry_all_errors("processed tombstone exists", || async {
self.catalog
.repositories()
.await
.processed_tombstones()
.exist(parquet_file_id, *tombstone_id)
.await
})
.await
.expect("retry forever");
let is_processed = self
.catalog_cache
.processed_tombstones()
.exists(parquet_file_id, *tombstone_id)
.await;
if !is_processed {
predicates_filtered.push(Arc::clone(predicate));
@ -606,7 +600,9 @@ impl QuerierNamespace {
#[cfg(test)]
mod tests {
use super::*;
use crate::namespace::test_util::querier_namespace;
use crate::{
cache::processed_tombstones::TTL_NOT_PROCESSED, namespace::test_util::querier_namespace,
};
use data_types2::{ChunkAddr, ChunkId, ColumnType, PartitionAddr};
use iox_tests::util::{TestCatalog, TestParquetFile};
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
@ -617,7 +613,10 @@ mod tests {
let catalog = TestCatalog::new();
let querier_namespace = QuerierNamespace::new(
Arc::new(CatalogCache::new(catalog.catalog())),
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
)),
"ns".into(),
NamespaceId::new(1),
catalog.metric_registry(),
@ -935,6 +934,7 @@ mod tests {
.with_sequencer(&sequencer1)
.create_tombstone(4, 1, 10, "foo=4")
.await;
catalog.mock_time_provider().inc(TTL_NOT_PROCESSED); // cache timeout for processed tombstones
querier_namespace.sync().await;
assert_eq!(
delete_predicates(&querier_namespace),

View File

@ -7,7 +7,10 @@ use super::QuerierNamespace;
pub fn querier_namespace(catalog: &Arc<TestCatalog>, ns: &Arc<TestNamespace>) -> QuerierNamespace {
QuerierNamespace::new(
Arc::new(CatalogCache::new(catalog.catalog())),
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
)),
ns.namespace.name.clone().into(),
ns.namespace.id,
catalog.metric_registry(),