fix: expire empty parquet_files cache and empty tombstones cache (#4701)

* fix: expire empty parquet_files cache

* fix: expire empty tombstones cache

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-05-26 07:08:15 -04:00 committed by GitHub
parent 6cc767efcc
commit 2d5a327bf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 2 deletions

View File

@ -58,6 +58,16 @@ impl CachedParquetFiles {
self.files.as_ref().clone()
}
/// return the number of cached files
pub fn len(&self) -> usize {
self.files.len()
}
/// return true if there are no cached files
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
/// Estimate the memory consumption of this object and its contents
fn size(&self) -> usize {
// simplify accounting by ensuring len and capacity of vector are the same
@ -206,7 +216,9 @@ impl ParquetFileCache {
if let Some(max_cached) = max_cached {
max_cached < max_parquet_sequence_number
} else {
false
// a max sequence was provided but there were no
// files in the cache. Means we need to refresh
true
}
})
} else {
@ -388,6 +400,47 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
#[tokio::test]
async fn test_expire_empty() {
let (catalog, table, partition) = make_catalog().await;
let cache = make_cache(&catalog);
let table_id = table.table.id;
// no parquet files, sould be none
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// second request should be cached
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Calls to expire if there is no known persisted file, should still be cached
cache.expire_on_newly_persisted_files(table_id, None);
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// make a new parquet file
let sequence_number_1 = SequenceNumber::new(1);
let tfile = partition
.create_parquet_file_with_min_max(
"table1 foo=1 11",
sequence_number_1.get(),
sequence_number_1.get(),
0,
100,
)
.await;
// cache is stale
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Now call to expire with knowledge of new file, will cause a cache refresh
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_1));
assert_eq!(cache.get(table_id).await.ids(), ids(&[&tfile]));
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
fn ids(files: &[&TestParquetFile]) -> HashSet<ParquetFileId> {
files.iter().map(|f| f.parquet_file.id).collect()
}

View File

@ -57,6 +57,16 @@ impl CachedTombstones {
self.tombstones.iter().map(|t| t.size()).sum::<usize>()
}
/// return the number of cached tombstones
pub fn len(&self) -> usize {
self.tombstones.len()
}
/// return true if there are no cached tombestones
pub fn is_empty(&self) -> bool {
self.tombstones.is_empty()
}
/// return the underlying Tombestones
pub fn to_vec(&self) -> Vec<Arc<Tombstone>> {
self.tombstones.iter().map(Arc::clone).collect()
@ -175,7 +185,9 @@ impl TombstoneCache {
if let Some(max_cached) = max_cached {
max_cached < max_tombstone_sequence_number
} else {
false
// a max sequence was provided but there were no
// files in the cache. Means we need to refresh
true
}
})
} else {
@ -366,6 +378,49 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
#[tokio::test]
async fn test_expore_empty() {
let catalog = TestCatalog::new();
let sequence_number_1 = SequenceNumber::new(1);
let ns = catalog.create_namespace("ns").await;
let table1 = ns.create_table("table1").await;
let sequencer1 = ns.create_sequencer(1).await;
let table_and_sequencer = table1.with_sequencer(&sequencer1);
let table_id = table1.table.id;
let cache = make_cache(&catalog);
// no tombstones for the table, cached
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// second request to should be cached
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// calls to expire if there are no new known tombstones should not still be cached
cache.expire_on_newly_persisted_files(table_id, None);
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Create a tombstone
let tombstone1 = table_and_sequencer
.create_tombstone(sequence_number_1.get(), 1, 100, "foo=1")
.await
.tombstone
.id;
// cache is stale
assert!(cache.get(table_id).await.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Now call to expire with knowledge of new tombstone, will cause a cache refresh
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_1));
assert_ids(&cache.get(table_id).await, &[tombstone1]);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
fn make_cache(catalog: &Arc<TestCatalog>) -> TombstoneCache {
TombstoneCache::new(
catalog.catalog(),