fix: ensure ingester state tracked in querier cache is always in-sync (#6512)

Fixes #6510.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-01-10 13:00:05 +01:00 committed by GitHub
parent d0f52c89ae
commit 2bb6db3f37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 61 additions and 68 deletions

View File

@ -15,7 +15,6 @@ use data_types::{ParquetFile, SequenceNumber, TableId};
use iox_catalog::interface::Catalog; use iox_catalog::interface::Catalog;
use iox_time::TimeProvider; use iox_time::TimeProvider;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use parking_lot::RwLock;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, mem, sync::Arc}; use std::{collections::HashMap, mem, sync::Arc};
use trace::span::Span; use trace::span::Span;
@ -34,19 +33,31 @@ pub enum Error {
}, },
} }
type IngesterCounts = Option<Arc<HashMap<Uuid, u64>>>;
/// Holds catalog information about a parquet file /// Holds catalog information about a parquet file
#[derive(Debug)] #[derive(Debug)]
pub struct CachedParquetFiles { pub struct CachedParquetFiles {
/// Parquet catalog information /// Parquet catalog information
pub files: Arc<Vec<Arc<ParquetFile>>>, pub files: Arc<Vec<Arc<ParquetFile>>>,
/// Number of persisted Parquet files per table ID per ingester UUID that ingesters have told
/// us about. When a call to `get` includes a number of persisted Parquet files for this table
/// and a particular ingester UUID that doesn't match what we've previously seen, the cache
/// needs to be expired.
persisted_file_counts_from_ingesters: IngesterCounts,
} }
impl CachedParquetFiles { impl CachedParquetFiles {
fn new(parquet_files: Vec<ParquetFile>) -> Self { fn new(
parquet_files: Vec<ParquetFile>,
persisted_file_counts_from_ingesters: IngesterCounts,
) -> Self {
let files: Vec<_> = parquet_files.into_iter().map(Arc::new).collect(); let files: Vec<_> = parquet_files.into_iter().map(Arc::new).collect();
Self { Self {
files: Arc::new(files), files: Arc::new(files),
persisted_file_counts_from_ingesters,
} }
} }
@ -64,12 +75,14 @@ impl CachedParquetFiles {
// Note size_of_val is the size of the Arc // Note size_of_val is the size of the Arc
// https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da // https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da
// size of the Arc itself // size of the Arc+(Option+HashMap) itself
mem::size_of_val(self) + mem::size_of_val(self) +
// Vec overhead // Vec overhead
mem::size_of_val(self.files.as_ref()) + mem::size_of_val(self.files.as_ref()) +
// size of the underlying parquet files // size of the underlying parquet files
self.files.iter().map(|f| f.size()).sum::<usize>() self.files.iter().map(|f| f.size()).sum::<usize>() +
// hashmap data
self.persisted_file_counts_from_ingesters.as_ref().map(|map| std::mem::size_of_val(map.as_ref()) + map.capacity() * mem::size_of::<(Uuid, u64)>()).unwrap_or_default()
} }
/// Returns the greatest parquet sequence number stored in this cache entry /// Returns the greatest parquet sequence number stored in this cache entry
@ -82,7 +95,7 @@ type CacheT = Box<
dyn Cache< dyn Cache<
K = TableId, K = TableId,
V = Arc<CachedParquetFiles>, V = Arc<CachedParquetFiles>,
GetExtra = ((), Option<Span>), GetExtra = (IngesterCounts, Option<Span>),
PeekExtra = ((), Option<Span>), PeekExtra = ((), Option<Span>),
>, >,
>; >;
@ -96,12 +109,6 @@ pub struct ParquetFileCache {
/// Handle that allows clearing entries for existing cache entries /// Handle that allows clearing entries for existing cache entries
remove_if_handle: RemoveIfHandle<TableId, Arc<CachedParquetFiles>>, remove_if_handle: RemoveIfHandle<TableId, Arc<CachedParquetFiles>>,
/// Number of persisted Parquet files per table ID per ingester UUID that ingesters have told
/// us about. When a call to `get` includes a number of persisted Parquet files for this table
/// and a particular ingester UUID that doesn't match what we've previously seen, the cache
/// needs to be expired.
persisted_file_counts_from_ingesters: RwLock<HashMap<TableId, HashMap<Uuid, u64>>>,
} }
impl ParquetFileCache { impl ParquetFileCache {
@ -114,35 +121,38 @@ impl ParquetFileCache {
ram_pool: Arc<ResourcePool<RamSize>>, ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool, testing: bool,
) -> Self { ) -> Self {
let loader = FunctionLoader::new(move |table_id: TableId, _extra: ()| { let loader = FunctionLoader::new(move |table_id: TableId, extra: IngesterCounts| {
let catalog = Arc::clone(&catalog); let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone(); let backoff_config = backoff_config.clone();
async move { async move {
Backoff::new(&backoff_config) Backoff::new(&backoff_config)
.retry_all_errors("get parquet_files", || async { .retry_all_errors("get parquet_files", || {
// TODO refreshing all parquet files for the let extra = extra.clone();
// entire table is likely to be quite wasteful async {
// for large tables. // TODO refreshing all parquet files for the
// // entire table is likely to be quite wasteful
// Some this code could be more efficeint: // for large tables.
// //
// 1. incrementally fetch only NEW parquet // Some this code could be more efficeint:
// files that aren't already in the cache //
// // 1. incrementally fetch only NEW parquet
// 2. track time ranges needed for queries and // files that aren't already in the cache
// limit files fetched to what is actually //
// needed // 2. track time ranges needed for queries and
let parquet_files: Vec<_> = catalog // limit files fetched to what is actually
.repositories() // needed
.await let parquet_files: Vec<_> = catalog
.parquet_files() .repositories()
.list_by_table_not_to_delete(table_id) .await
.await .parquet_files()
.context(CatalogSnafu)?; .list_by_table_not_to_delete(table_id)
.await
.context(CatalogSnafu)?;
Ok(Arc::new(CachedParquetFiles::new(parquet_files))) Ok(Arc::new(CachedParquetFiles::new(parquet_files, extra)))
as std::result::Result<_, Error> as std::result::Result<_, Error>
}
}) })
.await .await
.expect("retry forever") .expect("retry forever")
@ -181,7 +191,6 @@ impl ParquetFileCache {
Self { Self {
cache, cache,
remove_if_handle, remove_if_handle,
persisted_file_counts_from_ingesters: Default::default(),
} }
} }
@ -221,22 +230,13 @@ impl ParquetFileCache {
persisted_file_counts_by_ingester_uuid: Option<HashMap<Uuid, u64>>, persisted_file_counts_by_ingester_uuid: Option<HashMap<Uuid, u64>>,
span: Option<Span>, span: Option<Span>,
) -> Arc<CachedParquetFiles> { ) -> Arc<CachedParquetFiles> {
// Make a copy of the information we've stored about the ingesters and the number of files let persisted_file_counts_by_ingester_uuid =
// they've persisted, for the closure to use to decide whether to expire the cache. persisted_file_counts_by_ingester_uuid.map(|mut map| {
let stored_counts = self map.shrink_to_fit();
.persisted_file_counts_from_ingesters Arc::new(map)
.read() });
.get(&table_id) let persisted_file_counts_by_ingester_uuid_captured =
.cloned(); persisted_file_counts_by_ingester_uuid.clone();
// Update the stored information with what we've just seen from the ingesters, if anything.
if let Some(ingester_counts) = &persisted_file_counts_by_ingester_uuid {
self.persisted_file_counts_from_ingesters
.write()
.entry(table_id)
.and_modify(|existing| existing.extend(ingester_counts))
.or_insert_with(|| ingester_counts.clone());
}
self.remove_if_handle self.remove_if_handle
.remove_if_and_get( .remove_if_and_get(
@ -263,15 +263,20 @@ impl ParquetFileCache {
); );
expire expire
} else if let Some(ingester_counts) = &persisted_file_counts_by_ingester_uuid { } else if let Some(ingester_counts) =
&persisted_file_counts_by_ingester_uuid_captured
{
// If there's new or different information about the ingesters or the // If there's new or different information about the ingesters or the
// number of files they've persisted, we need to refresh. // number of files they've persisted, we need to refresh.
new_or_different(stored_counts.as_ref(), ingester_counts) new_or_different(
cached_file.persisted_file_counts_from_ingesters.as_deref(),
ingester_counts,
)
} else { } else {
false false
} }
}, },
((), span), (persisted_file_counts_by_ingester_uuid, span),
) )
.await .await
} }
@ -399,8 +404,8 @@ mod tests {
partition.create_parquet_file(builder).await; partition.create_parquet_file(builder).await;
let table_id = table.table.id; let table_id = table.table.id;
let single_file_size = 216; let single_file_size = 224;
let two_file_size = 400; let two_file_size = 408;
assert!(single_file_size < two_file_size); assert!(single_file_size < two_file_size);
let cache = make_cache(&catalog); let cache = make_cache(&catalog);
@ -534,18 +539,6 @@ mod tests {
.get(table_id, None, Some(HashMap::from([(new_uuid, 1)])), None) .get(table_id, None, Some(HashMap::from([(new_uuid, 1)])), None)
.await; .await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4);
// See the "new" UUID *and* the old UUID with the same counts as seen before: still use the
// cache
cache
.get(
table_id,
None,
Some(HashMap::from([(new_uuid, 1), (uuid, 4)])),
None,
)
.await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4);
} }
#[tokio::test] #[tokio::test]