feat: catalog parquet file cache TTL (#7975)

Avoid that the querier accesses files that were flagged for deletion a
long time ago. This would happen if the following conditions hold:

- we have very long-running querier pods (e.g. over holidays)
- the table doesn't receive any writes (or the partition if we ever
  change the cache granularity), hence the querier is never informed
  that its state is out-of-date
- a compactor runs a cold compaction, and by doing so flags a file for
  deletion
- the GC finally wants to delete it

This is mostly a safety measure to prevent weird internal server errors
that should nearly never happen. On the other hand I do not want to hunt
Heisenbugs.
pull/24376/head
Marco Neumann 2023-06-12 16:02:47 +02:00 committed by GitHub
parent 792b991778
commit 453a361d3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 120 additions and 2 deletions

View File

@ -55,6 +55,51 @@ impl<K, V> TtlProvider for NeverTtlProvider<K, V> {
}
}
/// [`TtlProvider`] that returns a constant value.
pub struct ConstantValueTtlProvider<K, V>
where
K: 'static,
V: 'static,
{
// phantom data that is Send and Sync, see https://stackoverflow.com/a/50201389
_k: PhantomData<fn() -> K>,
_v: PhantomData<fn() -> V>,
ttl: Option<Duration>,
}
impl<K, V> ConstantValueTtlProvider<K, V>
where
K: 'static,
V: 'static,
{
/// Create new provider with the given TTL value.
pub fn new(ttl: Option<Duration>) -> Self {
Self {
_k: PhantomData::default(),
_v: PhantomData::default(),
ttl,
}
}
}
impl<K, V> std::fmt::Debug for ConstantValueTtlProvider<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConstantValueTtlProvider")
.field("ttl", &self.ttl)
.finish_non_exhaustive()
}
}
impl<K, V> TtlProvider for ConstantValueTtlProvider<K, V> {
type K = K;
type V = V;
fn expires_in(&self, _k: &Self::K, _v: &Self::V) -> Option<Duration> {
self.ttl
}
}
/// [`TtlProvider`] that returns different values for `None`/`Some(...)` values.
pub struct OptionalValueTtlProvider<K, V>
where
@ -312,6 +357,13 @@ mod tests {
assert_eq!(provider.expires_in(&1, &2), None);
}
#[test]
fn test_constant_value_ttl_provider() {
let ttl = Some(Duration::from_secs(1));
let provider = ConstantValueTtlProvider::<u8, i8>::new(ttl);
assert_eq!(provider.expires_in(&1, &2), ttl);
}
#[test]
fn test_optional_value_ttl_provider() {
let ttl_none = Some(Duration::from_secs(1));

View File

@ -5,6 +5,7 @@ use cache_system::{
backend::policy::{
lru::{LruPolicy, ResourcePool},
remove_if::{RemoveIfHandle, RemoveIfPolicy},
ttl::{ConstantValueTtlProvider, TtlPolicy},
PolicyBackend,
},
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
@ -15,12 +16,17 @@ use data_types::{ParquetFile, TableId};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, mem, sync::Arc};
use std::{collections::HashMap, mem, sync::Arc, time::Duration};
use trace::span::Span;
use uuid::Uuid;
use super::ram::RamSize;
/// Duration to keep cached view.
///
/// This is currently `12h`.
pub const TTL: Duration = Duration::from_secs(12 * 60 * 60);
const CACHE_ID: &str = "parquet_file";
#[derive(Debug, Snafu)]
@ -180,6 +186,11 @@ impl ParquetFileCache {
},
)),
));
backend.add_policy(TtlPolicy::new(
Arc::new(ConstantValueTtlProvider::new(Some(TTL))),
CACHE_ID,
metric_registry,
));
let cache = CacheDriver::new(loader, backend);
let cache = Box::new(CacheWithMetrics::new(
@ -269,7 +280,7 @@ fn different(stored_counts: Option<&[(Uuid, u64)]>, ingester_counts: &[(Uuid, u6
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::{collections::HashSet, time::Duration};
use super::*;
use data_types::{ColumnType, ParquetFileId};
@ -279,6 +290,8 @@ mod tests {
const METRIC_NAME: &str = "parquet_list_by_table_not_to_delete";
const TABLE1_LINE_PROTOCOL: &str = "table1 foo=1 11";
const TABLE1_LINE_PROTOCOL2: &str = "table1 foo=2 22";
const TABLE1_LINE_PROTOCOL3: &str = "table1 foo=3 33";
const TABLE2_LINE_PROTOCOL: &str = "table2 foo=1 11";
#[tokio::test]
@ -410,6 +423,59 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 5);
}
#[tokio::test]
async fn test_ttl() {
let (catalog, table, partition) = make_catalog().await;
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL)
.with_creation_time(catalog.time_provider.now());
let tfile1 = partition.create_parquet_file(builder).await;
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL2)
.with_creation_time(catalog.time_provider.now());
let tfile2 = partition.create_parquet_file(builder).await;
let cache = make_cache(&catalog);
let mut cached_files = cache.get(table.table.id, None, None).await.vec();
cached_files.sort_by_key(|f| f.id);
assert_eq!(cached_files.len(), 2);
assert_eq!(cached_files[0].as_ref(), &tfile1.parquet_file);
assert_eq!(cached_files[1].as_ref(), &tfile2.parquet_file);
// update state
// replace first file and add a new one
catalog.mock_time_provider().inc(Duration::from_secs(60));
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL)
.with_creation_time(catalog.time_provider.now());
let tfile3 = partition.create_parquet_file(builder).await;
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL3)
.with_creation_time(catalog.time_provider.now());
let tfile4 = partition.create_parquet_file(builder).await;
tfile1.flag_for_delete().await;
// validate a second request doesn't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
let cached_files = cache.get(table.table.id, None, None).await.vec();
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// still the old data
assert_eq!(cached_files.len(), 2);
assert_eq!(cached_files[0].as_ref(), &tfile1.parquet_file);
assert_eq!(cached_files[1].as_ref(), &tfile2.parquet_file);
// trigger TTL
catalog.mock_time_provider().inc(TTL);
let mut cached_files = cache.get(table.table.id, None, None).await.vec();
cached_files.sort_by_key(|f| f.id);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
assert_eq!(cached_files.len(), 3);
assert_eq!(cached_files[0].as_ref(), &tfile2.parquet_file);
assert_eq!(cached_files[1].as_ref(), &tfile3.parquet_file);
assert_eq!(cached_files[2].as_ref(), &tfile4.parquet_file);
}
/// Extracts parquet ids from various objects
trait ParquetIds {
fn ids(&self) -> HashSet<ParquetFileId>;