diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index a8d7481cb3..4aa5682071 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1148,9 +1148,7 @@ impl<'c> Debug for CheckpointHandle<'c> { pub mod test_helpers { use super::*; - use crate::test_utils::{ - chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path, - }; + use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; /// In-memory catalog state, for testing. #[derive(Clone, Debug)] @@ -1243,13 +1241,14 @@ pub mod test_helpers { PreservedCatalog::new_empty::(Arc::clone(&iox_object_store), state_data) .await .unwrap(); - let mut expected: HashMap = HashMap::new(); + + // The expected state of the catalog + let mut expected: HashMap)> = HashMap::new(); assert_checkpoint(&state, &f, &expected); // add files - let mut chunk_id_watermark = 5; { - for chunk_id in 0..chunk_id_watermark { + for chunk_id in 0..5 { let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await; state @@ -1262,23 +1261,21 @@ pub mod test_helpers { }, ) .unwrap(); - expected.insert(path, Arc::new(metadata)); + expected.insert(chunk_id, (path, Arc::new(metadata))); } } assert_checkpoint(&state, &f, &expected); // remove files { - let path = expected.keys().next().unwrap().clone(); + let (path, _) = expected.remove(&1).unwrap(); state.remove(&path).unwrap(); - expected.remove(&path); } assert_checkpoint(&state, &f, &expected); // add and remove in the same transaction { - let (path, metadata) = - make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(5)).await; state .add( Arc::clone(&iox_object_store), @@ -1290,14 +1287,12 @@ pub mod test_helpers { ) .unwrap(); state.remove(&path).unwrap(); - chunk_id_watermark += 1; } assert_checkpoint(&state, &f, &expected); // remove and add in the same transaction { - let path = expected.keys().next().unwrap().clone(); - let metadata = expected.get(&path).unwrap(); + let (path, metadata) = expected.get(&3).unwrap(); state.remove(&path).unwrap(); state .add( @@ -1314,8 +1309,7 @@ pub mod test_helpers { // add, remove, add in the same transaction { - let (path, metadata) = - make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(6)).await; state .add( Arc::clone(&iox_object_store), @@ -1337,15 +1331,13 @@ pub mod test_helpers { }, ) .unwrap(); - expected.insert(path, Arc::new(metadata)); - chunk_id_watermark += 1; + expected.insert(6, (path, Arc::new(metadata))); } assert_checkpoint(&state, &f, &expected); // remove, add, remove in same transaction { - let path = expected.keys().next().unwrap().clone(); - let metadata = expected.get(&path).unwrap(); + let (path, metadata) = expected.remove(&4).unwrap(); state.remove(&path).unwrap(); state .add( @@ -1353,20 +1345,20 @@ pub mod test_helpers { CatalogParquetInfo { path: path.clone(), file_size_bytes: 33, - metadata: Arc::clone(metadata), + metadata: Arc::clone(&metadata), }, ) .unwrap(); state.remove(&path).unwrap(); - expected.remove(&path); } assert_checkpoint(&state, &f, &expected); // error handling, no real opt { - // already exists (should also not change the metadata) - let path = expected.keys().next().unwrap(); - let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; + // TODO: Error handling should disambiguate between chunk collision and filename collision + + // chunk with same ID already exists (should also not change the metadata) + let (path, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; let err = state .add( Arc::clone(&iox_object_store), @@ -1379,46 +1371,43 @@ pub mod test_helpers { .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); - // does not exist - let path = make_parquet_file_path(); + // does not exist as has a different UUID let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); - chunk_id_watermark += 1; } assert_checkpoint(&state, &f, &expected); // error handling, still something works { // already exists (should also not change the metadata) - let path = expected.keys().next().unwrap(); - let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; + let (_, metadata) = expected.get(&0).unwrap(); let err = state .add( Arc::clone(&iox_object_store), CatalogParquetInfo { - path: path.clone(), + // Intentionally "incorrect" path + path: ParquetFilePath::new(&chunk_addr(10)), file_size_bytes: 33, - metadata: Arc::new(metadata.clone()), + metadata: Arc::clone(metadata), }, ) .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // this transaction will still work - let (path, metadata) = - make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(7)).await; + let metadata = Arc::new(metadata); state .add( Arc::clone(&iox_object_store), CatalogParquetInfo { path: path.clone(), file_size_bytes: 33, - metadata: Arc::new(metadata.clone()), + metadata: Arc::clone(&metadata), }, ) .unwrap(); - expected.insert(path.clone(), Arc::new(metadata.clone())); - chunk_id_watermark += 1; + expected.insert(7, (path.clone(), Arc::clone(&metadata))); // recently added let err = state @@ -1427,50 +1416,44 @@ pub mod test_helpers { CatalogParquetInfo { path, file_size_bytes: 33, - metadata: Arc::new(metadata), + metadata: Arc::clone(&metadata), }, ) .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); - // does not exist - let path = make_parquet_file_path(); + // does not exist - as different UUID + let path = ParquetFilePath::new(&chunk_addr(7)); let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); - chunk_id_watermark += 1; // this still works - let path = expected.keys().next().unwrap().clone(); + let (path, _) = expected.remove(&7).unwrap(); state.remove(&path).unwrap(); - expected.remove(&path); // recently removed let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); } assert_checkpoint(&state, &f, &expected); - - // consume variable so that we can easily add tests w/o re-adding the final modification - println!("{}", chunk_id_watermark); } /// Assert that tracked files and their linked metadata are equal. fn assert_checkpoint( state: &S, f: &F, - expected_files: &HashMap>, + expected_files: &HashMap)>, ) where F: Fn(&S) -> CheckpointData, { - let actual_files = f(state).files; + let actual_files: HashMap = f(state).files; - let sorted_keys_actual = get_sorted_keys(&actual_files); - let sorted_keys_expected = get_sorted_keys(expected_files); + let sorted_keys_actual = get_sorted_keys(actual_files.keys()); + let sorted_keys_expected = get_sorted_keys(expected_files.values().map(|(path, _)| path)); assert_eq!(sorted_keys_actual, sorted_keys_expected); - for k in sorted_keys_actual { - let md_actual = &actual_files[&k].metadata; - let md_expected = &expected_files[&k]; + for (path, md_expected) in expected_files.values() { + let md_actual = &actual_files[&path].metadata; let iox_md_actual = md_actual.read_iox_metadata().unwrap(); let iox_md_expected = md_expected.read_iox_metadata().unwrap(); @@ -1486,12 +1469,11 @@ pub mod test_helpers { } } - /// Get a sorted list of keys from `HashMap`. - fn get_sorted_keys(map: &HashMap) -> Vec - where - K: Clone + Ord, - { - let mut keys: Vec = map.keys().cloned().collect(); + /// Get a sorted list of keys from an iterator. + fn get_sorted_keys<'a>( + keys: impl Iterator, + ) -> Vec<&'a ParquetFilePath> { + let mut keys: Vec<_> = keys.collect(); keys.sort(); keys }