fix: Treat empty ingester info differently than not having ingester info

When pre-warming the catalog cache before the ingester responses have
returned, we don't have any ingester parquet file counts. This is
different than asking the ingesters for the parquet file counts and not
getting any. So keep the Option to be able to treat "not present"
differently from "present but empty".
pull/24376/head
Carol (Nichols || Goulding) 2023-04-12 14:32:50 -04:00
parent acf857816e
commit f0f74bae02
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 113 additions and 42 deletions

View File

@ -32,7 +32,7 @@ pub enum Error {
}, },
} }
type IngesterCounts = Arc<Vec<(Uuid, u64)>>; type IngesterCounts = Option<Arc<Vec<(Uuid, u64)>>>;
/// Holds catalog information about a parquet file /// Holds catalog information about a parquet file
#[derive(Debug)] #[derive(Debug)]
@ -76,17 +76,19 @@ impl CachedParquetFiles {
// Note size_of_val is the size of the Arc // Note size_of_val is the size of the Arc
// https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da // https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da
// size of the Arc+(HashMap) itself // size of the Arc+(Option+HashMap) itself
mem::size_of_val(self) + mem::size_of_val(self) +
// Vec overhead // Vec overhead
mem::size_of_val(self.files.as_ref()) + mem::size_of_val(self.files.as_ref()) +
// size of the underlying parquet files // size of the underlying parquet files
self.files.iter().map(|f| f.size()).sum::<usize>() + self.files.iter().map(|f| f.size()).sum::<usize>() +
// hashmap data // hashmap data
std::mem::size_of_val(self.persisted_file_counts_from_ingesters.as_ref()) +
self.persisted_file_counts_from_ingesters self.persisted_file_counts_from_ingesters
.as_ref() .as_ref()
.capacity() * mem::size_of::<(Uuid, u64)>() .map(|map| {
std::mem::size_of_val(map.as_ref()) +
map.capacity() * mem::size_of::<(Uuid, u64)>()
}).unwrap_or_default()
} }
} }
@ -127,7 +129,7 @@ impl ParquetFileCache {
async move { async move {
Backoff::new(&backoff_config) Backoff::new(&backoff_config)
.retry_all_errors("get parquet_files", || { .retry_all_errors("get parquet_files", || {
let extra = Arc::clone(&extra); let extra = extra.clone();
async { async {
// TODO refreshing all parquet files for the // TODO refreshing all parquet files for the
// entire table is likely to be quite wasteful // entire table is likely to be quite wasteful
@ -207,30 +209,38 @@ impl ParquetFileCache {
pub async fn get( pub async fn get(
&self, &self,
table_id: TableId, table_id: TableId,
persisted_file_counts_by_ingester_uuid: HashMap<Uuid, u64>, persisted_file_counts_by_ingester_uuid: Option<HashMap<Uuid, u64>>,
span: Option<Span>, span: Option<Span>,
) -> Arc<CachedParquetFiles> { ) -> Arc<CachedParquetFiles> {
let mut entries = persisted_file_counts_by_ingester_uuid let persisted_file_counts_by_ingester_uuid =
.into_iter() persisted_file_counts_by_ingester_uuid.map(|map| {
.collect::<Vec<_>>(); let mut entries = map.into_iter().collect::<Vec<_>>();
entries.sort(); entries.sort();
entries.shrink_to_fit(); entries.shrink_to_fit();
Arc::new(entries)
let persisted_file_counts_by_ingester_uuid = Arc::new(entries); });
let persisted_file_counts_by_ingester_uuid_captured = let persisted_file_counts_by_ingester_uuid_captured =
Arc::clone(&persisted_file_counts_by_ingester_uuid); persisted_file_counts_by_ingester_uuid.clone();
self.remove_if_handle self.remove_if_handle
.remove_if_and_get( .remove_if_and_get(
&self.cache, &self.cache,
table_id, table_id,
|cached_file| { |cached_file| {
// If there's new or different information about the ingesters or the if let Some(ingester_counts) = &persisted_file_counts_by_ingester_uuid_captured
// number of files they've persisted, we need to refresh. {
different( // If there's new or different information about the ingesters or the
cached_file.persisted_file_counts_from_ingesters.as_ref(), // number of files they've persisted, we need to refresh.
&persisted_file_counts_by_ingester_uuid_captured, different(
) cached_file
.persisted_file_counts_from_ingesters
.as_ref()
.map(|x| x.as_ref().as_ref()),
ingester_counts,
)
} else {
false
}
}, },
(persisted_file_counts_by_ingester_uuid, span), (persisted_file_counts_by_ingester_uuid, span),
) )
@ -244,10 +254,10 @@ impl ParquetFileCache {
} }
} }
fn different(stored_counts: &[(Uuid, u64)], ingester_counts: &[(Uuid, u64)]) -> bool { fn different(stored_counts: Option<&[(Uuid, u64)]>, ingester_counts: &[(Uuid, u64)]) -> bool {
// If we have some information stored for this table, // If we have some information stored for this table,
if !stored_counts.is_empty() { if let Some(stored) = stored_counts {
ingester_counts != stored_counts ingester_counts != stored
} else { } else {
// Otherwise, we've never seen ingester file counts for this table. // Otherwise, we've never seen ingester file counts for this table.
// If the hashmap we got is empty, then we still haven't gotten any information, so we // If the hashmap we got is empty, then we still haven't gotten any information, so we
@ -278,7 +288,7 @@ mod tests {
let tfile = partition.create_parquet_file(builder).await; let tfile = partition.create_parquet_file(builder).await;
let cache = make_cache(&catalog); let cache = make_cache(&catalog);
let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec(); let cached_files = cache.get(table.table.id, None, None).await.vec();
assert_eq!(cached_files.len(), 1); assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile.parquet_file; let expected_parquet_file = &tfile.parquet_file;
@ -286,7 +296,7 @@ mod tests {
// validate a second request doesn't result in a catalog request // validate a second request doesn't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.get(table.table.id, HashMap::new(), None).await; cache.get(table.table.id, None, None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
} }
@ -305,12 +315,12 @@ mod tests {
let cache = make_cache(&catalog); let cache = make_cache(&catalog);
let cached_files = cache.get(table1.table.id, HashMap::new(), None).await.vec(); let cached_files = cache.get(table1.table.id, None, None).await.vec();
assert_eq!(cached_files.len(), 1); assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile1.parquet_file; let expected_parquet_file = &tfile1.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file); assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
let cached_files = cache.get(table2.table.id, HashMap::new(), None).await.vec(); let cached_files = cache.get(table2.table.id, None, None).await.vec();
assert_eq!(cached_files.len(), 1); assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile2.parquet_file; let expected_parquet_file = &tfile2.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file); assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
@ -326,7 +336,7 @@ mod tests {
let different_catalog = TestCatalog::new(); let different_catalog = TestCatalog::new();
let cache = make_cache(&different_catalog); let cache = make_cache(&different_catalog);
let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec(); let cached_files = cache.get(table.table.id, None, None).await.vec();
assert!(cached_files.is_empty()); assert!(cached_files.is_empty());
} }
@ -337,19 +347,19 @@ mod tests {
partition.create_parquet_file(builder).await; partition.create_parquet_file(builder).await;
let table_id = table.table.id; let table_id = table.table.id;
let single_file_size = 256; let single_file_size = 232;
let two_file_size = 448; let two_file_size = 424;
assert!(single_file_size < two_file_size); assert!(single_file_size < two_file_size);
let cache = make_cache(&catalog); let cache = make_cache(&catalog);
let cached_files = cache.get(table_id, HashMap::new(), None).await; let cached_files = cache.get(table_id, None, None).await;
assert_eq!(cached_files.size(), single_file_size); assert_eq!(cached_files.size(), single_file_size);
// add a second file, and force the cache to find it // add a second file, and force the cache to find it
let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL); let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL);
partition.create_parquet_file(builder).await; partition.create_parquet_file(builder).await;
cache.expire(table_id); cache.expire(table_id);
let cached_files = cache.get(table_id, HashMap::new(), None).await; let cached_files = cache.get(table_id, None, None).await;
assert_eq!(cached_files.size(), two_file_size); assert_eq!(cached_files.size(), two_file_size);
} }
@ -361,35 +371,41 @@ mod tests {
let cache = make_cache(&catalog); let cache = make_cache(&catalog);
// No metadata: make one request that should be cached // No metadata: make one request that should be cached
cache.get(table_id, HashMap::new(), None).await; cache.get(table_id, None, None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.get(table_id, HashMap::new(), None).await; cache.get(table_id, None, None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Empty metadata: make one request, should still be cached // Empty metadata: make one request, should still be cached
cache.get(table_id, HashMap::new(), None).await; cache.get(table_id, Some(HashMap::new()), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// See a new UUID: refresh the cache // See a new UUID: refresh the cache
cache.get(table_id, HashMap::from([(uuid, 3)]), None).await; cache
.get(table_id, Some(HashMap::from([(uuid, 3)])), None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
// See the same UUID with the same count: should still be cached // See the same UUID with the same count: should still be cached
cache.get(table_id, HashMap::from([(uuid, 3)]), None).await; cache
.get(table_id, Some(HashMap::from([(uuid, 3)])), None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
// See the same UUID with a different count: refresh the cache // See the same UUID with a different count: refresh the cache
cache.get(table_id, HashMap::from([(uuid, 4)]), None).await; cache
.get(table_id, Some(HashMap::from([(uuid, 4)])), None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 3); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 3);
// Empty metadata again: still use the cache // Empty metadata again: still use the cache
cache.get(table_id, HashMap::new(), None).await; cache.get(table_id, Some(HashMap::new()), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4);
// See a new UUID and not the old one: refresh the cache // See a new UUID and not the old one: refresh the cache
let new_uuid = Uuid::new_v4(); let new_uuid = Uuid::new_v4();
cache cache
.get(table_id, HashMap::from([(new_uuid, 1)]), None) .get(table_id, Some(HashMap::from([(new_uuid, 1)])), None)
.await; .await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 5); assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 5);
} }

View File

@ -219,7 +219,7 @@ impl QuerierTable {
), ),
catalog_cache.parquet_file().get( catalog_cache.parquet_file().get(
self.id(), self.id(),
HashMap::new(), None,
span_recorder.child_span("cache GET parquet_file (pre-warm") span_recorder.child_span("cache GET parquet_file (pre-warm")
), ),
catalog_cache.tombstone().get( catalog_cache.tombstone().get(
@ -260,7 +260,7 @@ impl QuerierTable {
let (parquet_files, tombstones) = join!( let (parquet_files, tombstones) = join!(
catalog_cache.parquet_file().get( catalog_cache.parquet_file().get(
self.id(), self.id(),
persisted_file_counts_by_ingester_uuid, Some(persisted_file_counts_by_ingester_uuid),
span_recorder.child_span("cache GET parquet_file"), span_recorder.child_span("cache GET parquet_file"),
), ),
catalog_cache.tombstone().get( catalog_cache.tombstone().get(
@ -822,6 +822,61 @@ mod tests {
assert_batches_eq!(&expected, &batches); assert_batches_eq!(&expected, &batches);
} }
#[tokio::test]
async fn test_parquet_cache_refresh() {
maybe_start_logging();
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table1").await;
let shard = ns.create_shard(1).await;
let partition = table.with_shard(&shard).create_partition("k").await;
let schema = make_schema(&table).await;
let builder =
IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]);
// Parquet file between with max sequence number 2
let pf_builder = TestParquetFileBuilder::default()
.with_line_protocol("table1 foo=1 11")
.with_max_seq(2);
partition.create_parquet_file(pf_builder).await;
let ingester_partition =
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(2)));
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
.with_ingester_partition(ingester_partition);
// Expect 2 chunks: one for ingester, and one from parquet file
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 2);
// Now, make a second chunk with max sequence number 3
let pf_builder = TestParquetFileBuilder::default()
.with_line_protocol("table1 foo=1 22")
.with_max_seq(3);
partition.create_parquet_file(pf_builder).await;
// With the same ingester response, still expect 2 chunks: one
// for ingester, and one from parquet file
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 2);
// update the ingester response to return a new max parquet
// sequence number that includes the new file (3)
let ingester_partition =
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(3)));
let querier_table = querier_table
.clear_ingester_partitions()
.with_ingester_partition(ingester_partition);
// expect the second file is found, resulting in three chunks
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 3);
}
#[tokio::test] #[tokio::test]
async fn test_tombstone_cache_refresh() { async fn test_tombstone_cache_refresh() {
maybe_start_logging(); maybe_start_logging();