feat: make catalog state test deterministic (#2349)

pull/24376/head
Raphael Taylor-Davies 2021-08-19 13:56:25 +01:00
parent 98627944e7
commit 5a841600d9
1 changed files with 41 additions and 59 deletions

View File

@ -1148,9 +1148,7 @@ impl<'c> Debug for CheckpointHandle<'c> {
pub mod test_helpers {
use super::*;
use crate::test_utils::{
chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path,
};
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata};
/// In-memory catalog state, for testing.
#[derive(Clone, Debug)]
@ -1243,13 +1241,14 @@ pub mod test_helpers {
PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), state_data)
.await
.unwrap();
let mut expected: HashMap<ParquetFilePath, _> = HashMap::new();
// The expected state of the catalog
let mut expected: HashMap<u32, (ParquetFilePath, Arc<IoxParquetMetaData>)> = HashMap::new();
assert_checkpoint(&state, &f, &expected);
// add files
let mut chunk_id_watermark = 5;
{
for chunk_id in 0..chunk_id_watermark {
for chunk_id in 0..5 {
let (path, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await;
state
@ -1262,23 +1261,21 @@ pub mod test_helpers {
},
)
.unwrap();
expected.insert(path, Arc::new(metadata));
expected.insert(chunk_id, (path, Arc::new(metadata)));
}
}
assert_checkpoint(&state, &f, &expected);
// remove files
{
let path = expected.keys().next().unwrap().clone();
let (path, _) = expected.remove(&1).unwrap();
state.remove(&path).unwrap();
expected.remove(&path);
}
assert_checkpoint(&state, &f, &expected);
// add and remove in the same transaction
{
let (path, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(5)).await;
state
.add(
Arc::clone(&iox_object_store),
@ -1290,14 +1287,12 @@ pub mod test_helpers {
)
.unwrap();
state.remove(&path).unwrap();
chunk_id_watermark += 1;
}
assert_checkpoint(&state, &f, &expected);
// remove and add in the same transaction
{
let path = expected.keys().next().unwrap().clone();
let metadata = expected.get(&path).unwrap();
let (path, metadata) = expected.get(&3).unwrap();
state.remove(&path).unwrap();
state
.add(
@ -1314,8 +1309,7 @@ pub mod test_helpers {
// add, remove, add in the same transaction
{
let (path, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(6)).await;
state
.add(
Arc::clone(&iox_object_store),
@ -1337,15 +1331,13 @@ pub mod test_helpers {
},
)
.unwrap();
expected.insert(path, Arc::new(metadata));
chunk_id_watermark += 1;
expected.insert(6, (path, Arc::new(metadata)));
}
assert_checkpoint(&state, &f, &expected);
// remove, add, remove in same transaction
{
let path = expected.keys().next().unwrap().clone();
let metadata = expected.get(&path).unwrap();
let (path, metadata) = expected.remove(&4).unwrap();
state.remove(&path).unwrap();
state
.add(
@ -1353,20 +1345,20 @@ pub mod test_helpers {
CatalogParquetInfo {
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::clone(metadata),
metadata: Arc::clone(&metadata),
},
)
.unwrap();
state.remove(&path).unwrap();
expected.remove(&path);
}
assert_checkpoint(&state, &f, &expected);
// error handling, no real opt
{
// already exists (should also not change the metadata)
let path = expected.keys().next().unwrap();
let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
// TODO: Error handling should disambiguate between chunk collision and filename collision
// chunk with same ID already exists (should also not change the metadata)
let (path, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
let err = state
.add(
Arc::clone(&iox_object_store),
@ -1379,46 +1371,43 @@ pub mod test_helpers {
.unwrap_err();
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
// does not exist
let path = make_parquet_file_path();
// does not exist as has a different UUID
let err = state.remove(&path).unwrap_err();
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
chunk_id_watermark += 1;
}
assert_checkpoint(&state, &f, &expected);
// error handling, still something works
{
// already exists (should also not change the metadata)
let path = expected.keys().next().unwrap();
let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
let (_, metadata) = expected.get(&0).unwrap();
let err = state
.add(
Arc::clone(&iox_object_store),
CatalogParquetInfo {
path: path.clone(),
// Intentionally "incorrect" path
path: ParquetFilePath::new(&chunk_addr(10)),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
metadata: Arc::clone(metadata),
},
)
.unwrap_err();
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
// this transaction will still work
let (path, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(7)).await;
let metadata = Arc::new(metadata);
state
.add(
Arc::clone(&iox_object_store),
CatalogParquetInfo {
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
metadata: Arc::clone(&metadata),
},
)
.unwrap();
expected.insert(path.clone(), Arc::new(metadata.clone()));
chunk_id_watermark += 1;
expected.insert(7, (path.clone(), Arc::clone(&metadata)));
// recently added
let err = state
@ -1427,50 +1416,44 @@ pub mod test_helpers {
CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
metadata: Arc::clone(&metadata),
},
)
.unwrap_err();
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
// does not exist
let path = make_parquet_file_path();
// does not exist - as different UUID
let path = ParquetFilePath::new(&chunk_addr(7));
let err = state.remove(&path).unwrap_err();
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
chunk_id_watermark += 1;
// this still works
let path = expected.keys().next().unwrap().clone();
let (path, _) = expected.remove(&7).unwrap();
state.remove(&path).unwrap();
expected.remove(&path);
// recently removed
let err = state.remove(&path).unwrap_err();
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
}
assert_checkpoint(&state, &f, &expected);
// consume variable so that we can easily add tests w/o re-adding the final modification
println!("{}", chunk_id_watermark);
}
/// Assert that tracked files and their linked metadata are equal.
fn assert_checkpoint<S, F>(
state: &S,
f: &F,
expected_files: &HashMap<ParquetFilePath, Arc<IoxParquetMetaData>>,
expected_files: &HashMap<u32, (ParquetFilePath, Arc<IoxParquetMetaData>)>,
) where
F: Fn(&S) -> CheckpointData,
{
let actual_files = f(state).files;
let actual_files: HashMap<ParquetFilePath, CatalogParquetInfo> = f(state).files;
let sorted_keys_actual = get_sorted_keys(&actual_files);
let sorted_keys_expected = get_sorted_keys(expected_files);
let sorted_keys_actual = get_sorted_keys(actual_files.keys());
let sorted_keys_expected = get_sorted_keys(expected_files.values().map(|(path, _)| path));
assert_eq!(sorted_keys_actual, sorted_keys_expected);
for k in sorted_keys_actual {
let md_actual = &actual_files[&k].metadata;
let md_expected = &expected_files[&k];
for (path, md_expected) in expected_files.values() {
let md_actual = &actual_files[&path].metadata;
let iox_md_actual = md_actual.read_iox_metadata().unwrap();
let iox_md_expected = md_expected.read_iox_metadata().unwrap();
@ -1486,12 +1469,11 @@ pub mod test_helpers {
}
}
/// Get a sorted list of keys from `HashMap`.
fn get_sorted_keys<K, V>(map: &HashMap<K, V>) -> Vec<K>
where
K: Clone + Ord,
{
let mut keys: Vec<K> = map.keys().cloned().collect();
/// Get a sorted list of keys from an iterator.
fn get_sorted_keys<'a>(
keys: impl Iterator<Item = &'a ParquetFilePath>,
) -> Vec<&'a ParquetFilePath> {
let mut keys: Vec<_> = keys.collect();
keys.sort();
keys
}