diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 750b717e8b..1e105eaab8 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1148,24 +1148,69 @@ impl<'c> Debug for CheckpointHandle<'c> { pub mod test_helpers { use super::*; - use crate::test_utils::{ - chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path, - }; + use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; + + #[derive(Clone, Debug, Default)] + pub struct Table { + pub partitions: HashMap, Partition>, + } + + #[derive(Clone, Debug, Default)] + pub struct Partition { + pub chunks: HashMap, + } /// In-memory catalog state, for testing. - #[derive(Clone, Debug)] + #[derive(Clone, Debug, Default)] pub struct TestCatalogState { /// Map of all parquet files that are currently pregistered. - pub parquet_files: HashMap, + pub tables: HashMap, Table>, } impl TestCatalogState { /// Simple way to create [`CheckpointData`]. pub fn checkpoint_data(&self) -> CheckpointData { CheckpointData { - files: self.parquet_files.clone(), + files: self + .files() + .map(|info| (info.path.clone(), info.clone())) + .collect(), } } + + /// Returns an iterator over the files in this catalog state + pub fn files(&self) -> impl Iterator { + self.tables.values().flat_map(|table| { + table + .partitions + .values() + .flat_map(|partition| partition.chunks.values()) + }) + } + + /// Inserts a file into this catalog state + pub fn insert(&mut self, info: CatalogParquetInfo) -> Result<()> { + let iox_md = info + .metadata + .read_iox_metadata() + .context(MetadataExtractFailed { + path: info.path.clone(), + })?; + + let table = self.tables.entry(iox_md.table_name).or_default(); + let partition = table.partitions.entry(iox_md.partition_key).or_default(); + + match partition.chunks.entry(iox_md.chunk_id) { + Occupied(o) => { + return Err(Error::ParquetFileAlreadyExists { + path: o.get().path.clone(), + }); + } + Vacant(v) => v.insert(info), + }; + + Ok(()) + } } impl CatalogState for TestCatalogState { @@ -1173,7 +1218,7 @@ pub mod test_helpers { fn new_empty(_db_name: &str, _data: Self::EmptyInput) -> Self { Self { - parquet_files: HashMap::new(), + tables: HashMap::new(), } } @@ -1182,31 +1227,38 @@ pub mod test_helpers { _iox_object_store: Arc, info: CatalogParquetInfo, ) -> Result<()> { - match self.parquet_files.entry(info.path.clone()) { - Occupied(o) => { - return Err(Error::ParquetFileAlreadyExists { - path: o.key().clone(), - }); - } - Vacant(v) => { - v.insert(info); - } - } - - Ok(()) + self.insert(info) } fn remove(&mut self, path: &ParquetFilePath) -> Result<()> { - match self.parquet_files.entry(path.clone()) { - Occupied(o) => { - o.remove(); - } - Vacant(v) => { - return Err(Error::ParquetFileDoesNotExist { path: v.into_key() }); + let partitions = self + .tables + .values_mut() + .flat_map(|table| table.partitions.values_mut()); + let mut removed = 0; + + for partition in partitions { + let to_remove: Vec<_> = partition + .chunks + .iter() + .filter_map(|(id, chunk)| { + if &chunk.path == path { + return Some(*id); + } + None + }) + .collect(); + + for id in to_remove { + removed += 1; + partition.chunks.remove(&id).unwrap(); } } - Ok(()) + match removed { + 0 => Err(Error::ParquetFileDoesNotExist { path: path.clone() }), + _ => Ok(()), + } } } @@ -1243,13 +1295,14 @@ pub mod test_helpers { PreservedCatalog::new_empty::(Arc::clone(&iox_object_store), state_data) .await .unwrap(); - let mut expected: HashMap = HashMap::new(); + + // The expected state of the catalog + let mut expected: HashMap)> = HashMap::new(); assert_checkpoint(&state, &f, &expected); // add files - let mut chunk_id_watermark = 5; { - for chunk_id in 0..chunk_id_watermark { + for chunk_id in 0..5 { let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await; state @@ -1262,23 +1315,21 @@ pub mod test_helpers { }, ) .unwrap(); - expected.insert(path, Arc::new(metadata)); + expected.insert(chunk_id, (path, Arc::new(metadata))); } } assert_checkpoint(&state, &f, &expected); // remove files { - let path = expected.keys().next().unwrap().clone(); + let (path, _) = expected.remove(&1).unwrap(); state.remove(&path).unwrap(); - expected.remove(&path); } assert_checkpoint(&state, &f, &expected); // add and remove in the same transaction { - let (path, metadata) = - make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(5)).await; state .add( Arc::clone(&iox_object_store), @@ -1290,15 +1341,13 @@ pub mod test_helpers { ) .unwrap(); state.remove(&path).unwrap(); - chunk_id_watermark += 1; } assert_checkpoint(&state, &f, &expected); // remove and add in the same transaction { - let path = expected.keys().next().unwrap().clone(); - let metadata = expected.get(&path).unwrap(); - state.remove(&path).unwrap(); + let (path, metadata) = expected.get(&3).unwrap(); + state.remove(path).unwrap(); state .add( Arc::clone(&iox_object_store), @@ -1314,8 +1363,7 @@ pub mod test_helpers { // add, remove, add in the same transaction { - let (path, metadata) = - make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(6)).await; state .add( Arc::clone(&iox_object_store), @@ -1337,15 +1385,13 @@ pub mod test_helpers { }, ) .unwrap(); - expected.insert(path, Arc::new(metadata)); - chunk_id_watermark += 1; + expected.insert(6, (path, Arc::new(metadata))); } assert_checkpoint(&state, &f, &expected); // remove, add, remove in same transaction { - let path = expected.keys().next().unwrap().clone(); - let metadata = expected.get(&path).unwrap(); + let (path, metadata) = expected.remove(&4).unwrap(); state.remove(&path).unwrap(); state .add( @@ -1353,20 +1399,20 @@ pub mod test_helpers { CatalogParquetInfo { path: path.clone(), file_size_bytes: 33, - metadata: Arc::clone(metadata), + metadata: Arc::clone(&metadata), }, ) .unwrap(); state.remove(&path).unwrap(); - expected.remove(&path); } assert_checkpoint(&state, &f, &expected); // error handling, no real opt { - // already exists (should also not change the metadata) - let path = expected.keys().next().unwrap(); - let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; + // TODO: Error handling should disambiguate between chunk collision and filename collision + + // chunk with same ID already exists (should also not change the metadata) + let (path, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; let err = state .add( Arc::clone(&iox_object_store), @@ -1379,46 +1425,43 @@ pub mod test_helpers { .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); - // does not exist - let path = make_parquet_file_path(); + // does not exist as has a different UUID let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); - chunk_id_watermark += 1; } assert_checkpoint(&state, &f, &expected); // error handling, still something works { // already exists (should also not change the metadata) - let path = expected.keys().next().unwrap(); - let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; + let (_, metadata) = expected.get(&0).unwrap(); let err = state .add( Arc::clone(&iox_object_store), CatalogParquetInfo { - path: path.clone(), + // Intentionally "incorrect" path + path: ParquetFilePath::new(&chunk_addr(10)), file_size_bytes: 33, - metadata: Arc::new(metadata.clone()), + metadata: Arc::clone(metadata), }, ) .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // this transaction will still work - let (path, metadata) = - make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(7)).await; + let metadata = Arc::new(metadata); state .add( Arc::clone(&iox_object_store), CatalogParquetInfo { path: path.clone(), file_size_bytes: 33, - metadata: Arc::new(metadata.clone()), + metadata: Arc::clone(&metadata), }, ) .unwrap(); - expected.insert(path.clone(), Arc::new(metadata.clone())); - chunk_id_watermark += 1; + expected.insert(7, (path.clone(), Arc::clone(&metadata))); // recently added let err = state @@ -1427,50 +1470,44 @@ pub mod test_helpers { CatalogParquetInfo { path, file_size_bytes: 33, - metadata: Arc::new(metadata), + metadata: Arc::clone(&metadata), }, ) .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); - // does not exist - let path = make_parquet_file_path(); + // does not exist - as different UUID + let path = ParquetFilePath::new(&chunk_addr(7)); let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); - chunk_id_watermark += 1; // this still works - let path = expected.keys().next().unwrap().clone(); + let (path, _) = expected.remove(&7).unwrap(); state.remove(&path).unwrap(); - expected.remove(&path); // recently removed let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); } assert_checkpoint(&state, &f, &expected); - - // consume variable so that we can easily add tests w/o re-adding the final modification - println!("{}", chunk_id_watermark); } /// Assert that tracked files and their linked metadata are equal. fn assert_checkpoint( state: &S, f: &F, - expected_files: &HashMap>, + expected_files: &HashMap)>, ) where F: Fn(&S) -> CheckpointData, { - let actual_files = f(state).files; + let actual_files: HashMap = f(state).files; - let sorted_keys_actual = get_sorted_keys(&actual_files); - let sorted_keys_expected = get_sorted_keys(expected_files); + let sorted_keys_actual = get_sorted_keys(actual_files.keys()); + let sorted_keys_expected = get_sorted_keys(expected_files.values().map(|(path, _)| path)); assert_eq!(sorted_keys_actual, sorted_keys_expected); - for k in sorted_keys_actual { - let md_actual = &actual_files[&k].metadata; - let md_expected = &expected_files[&k]; + for (path, md_expected) in expected_files.values() { + let md_actual = &actual_files[path].metadata; let iox_md_actual = md_actual.read_iox_metadata().unwrap(); let iox_md_expected = md_expected.read_iox_metadata().unwrap(); @@ -1486,12 +1523,11 @@ pub mod test_helpers { } } - /// Get a sorted list of keys from `HashMap`. - fn get_sorted_keys(map: &HashMap) -> Vec - where - K: Clone + Ord, - { - let mut keys: Vec = map.keys().cloned().collect(); + /// Get a sorted list of keys from an iterator. + fn get_sorted_keys<'a>( + keys: impl Iterator, + ) -> Vec<&'a ParquetFilePath> { + let mut keys: Vec<_> = keys.collect(); keys.sort(); keys } @@ -1505,9 +1541,7 @@ mod tests { }, *, }; - use crate::test_utils::{ - chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path, - }; + use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; #[tokio::test] async fn test_create_empty() { @@ -2086,9 +2120,6 @@ mod tests { #[tokio::test] async fn test_checkpoint() { let iox_object_store = make_iox_object_store(); - let addr = chunk_addr(1337); - let (_, metadata) = make_metadata(&iox_object_store, "foo", addr.clone()).await; - let metadata = Arc::new(metadata); // use common test as baseline let mut trace = assert_single_catalog_inmem_works(&iox_object_store).await; @@ -2113,14 +2144,16 @@ mod tests { // create another transaction on-top that adds a file (this transaction will be required to load the full state) { + let addr = chunk_addr(1337); + let (path, metadata) = make_metadata(&iox_object_store, "foo", addr.clone()).await; + let mut transaction = catalog.open_transaction().await; - let path = make_parquet_file_path(); let info = CatalogParquetInfo { path, file_size_bytes: 33, - metadata: Arc::clone(&metadata), + metadata: Arc::new(metadata), }; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); transaction.add_parquet(&info).unwrap(); let ckpt_handle = transaction.commit().await.unwrap(); ckpt_handle @@ -2163,8 +2196,7 @@ mod tests { state: &TestCatalogState, ) -> Vec<(ParquetFilePath, IoxParquetMetaData)> { let mut files: Vec<(ParquetFilePath, IoxParquetMetaData)> = state - .parquet_files - .values() + .files() .map(|info| (info.path.clone(), info.metadata.as_ref().clone())) .collect(); files.sort_by_key(|(path, _)| path.clone()); @@ -2255,12 +2287,6 @@ mod tests { .await .unwrap(); - // get some test metadata - let (_, metadata1) = make_metadata(iox_object_store, "foo", chunk_addr(1)).await; - let metadata1 = Arc::new(metadata1); - let (_, metadata2) = make_metadata(iox_object_store, "bar", chunk_addr(1)).await; - let metadata2 = Arc::new(metadata2); - // track all the intermediate results let mut trace = TestTrace::new(); @@ -2269,116 +2295,102 @@ mod tests { assert_catalog_parquet_files(&state, &[]); trace.record(&catalog, &state, false); + let mut expected = vec![]; + // fill catalog with examples - let test1 = make_parquet_file_path(); - let sub1_test1 = make_parquet_file_path(); - let sub1_test2 = make_parquet_file_path(); - let sub2_test1 = make_parquet_file_path(); { let mut t = catalog.open_transaction().await; + let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(0)).await; + expected.push((path.clone(), metadata.clone())); let info = CatalogParquetInfo { - path: test1.clone(), + path, file_size_bytes: 33, - metadata: Arc::clone(&metadata1), + metadata: Arc::new(metadata), }; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); t.add_parquet(&info).unwrap(); + let (path, metadata) = make_metadata(iox_object_store, "bar", chunk_addr(1)).await; + expected.push((path.clone(), metadata.clone())); let info = CatalogParquetInfo { - path: sub1_test1.clone(), + path, file_size_bytes: 33, - metadata: Arc::clone(&metadata2), + metadata: Arc::new(metadata), }; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); t.add_parquet(&info).unwrap(); + let (path, metadata) = make_metadata(iox_object_store, "bar", chunk_addr(2)).await; + expected.push((path.clone(), metadata.clone())); let info = CatalogParquetInfo { - path: sub1_test2.clone(), + path, file_size_bytes: 33, - metadata: Arc::clone(&metadata2), + metadata: Arc::new(metadata), }; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); t.add_parquet(&info).unwrap(); + let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(3)).await; + expected.push((path.clone(), metadata.clone())); let info = CatalogParquetInfo { - path: sub2_test1.clone(), + path, file_size_bytes: 33, - metadata: Arc::clone(&metadata1), + metadata: Arc::new(metadata), }; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); t.add_parquet(&info).unwrap(); t.commit().await.unwrap(); } assert_eq!(catalog.revision_counter(), 1); - assert_catalog_parquet_files( - &state, - &[ - (sub1_test1.clone(), metadata2.as_ref().clone()), - (sub1_test2.clone(), metadata2.as_ref().clone()), - (sub2_test1.clone(), metadata1.as_ref().clone()), - (test1.clone(), metadata1.as_ref().clone()), - ], - ); + assert_catalog_parquet_files(&state, &expected); trace.record(&catalog, &state, false); // modify catalog with examples - let test4 = make_parquet_file_path(); { + let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(4)).await; + expected.push((path.clone(), metadata.clone())); + let mut t = catalog.open_transaction().await; // "real" modifications let info = CatalogParquetInfo { - path: test4.clone(), + path, file_size_bytes: 33, - metadata: Arc::clone(&metadata1), + metadata: Arc::new(metadata), }; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); t.add_parquet(&info).unwrap(); - state.parquet_files.remove(&test1); - t.remove_parquet(&test1); + let (path, _) = expected.remove(0); + state.remove(&path).unwrap(); + t.remove_parquet(&path); t.commit().await.unwrap(); } assert_eq!(catalog.revision_counter(), 2); - assert_catalog_parquet_files( - &state, - &[ - (sub1_test1.clone(), metadata2.as_ref().clone()), - (sub1_test2.clone(), metadata2.as_ref().clone()), - (sub2_test1.clone(), metadata1.as_ref().clone()), - (test4.clone(), metadata1.as_ref().clone()), - ], - ); + assert_catalog_parquet_files(&state, &expected); trace.record(&catalog, &state, false); // uncommitted modifications have no effect { let mut t = catalog.open_transaction().await; + let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(1)).await; let info = CatalogParquetInfo { - path: make_parquet_file_path(), + path, file_size_bytes: 33, - metadata: Arc::clone(&metadata1), + metadata: Arc::new(metadata), }; t.add_parquet(&info).unwrap(); - t.remove_parquet(&sub1_test2); + t.remove_parquet(&expected[0].0); // NO commit here! } assert_eq!(catalog.revision_counter(), 2); - assert_catalog_parquet_files( - &state, - &[ - (sub1_test1.clone(), metadata2.as_ref().clone()), - (sub1_test2.clone(), metadata2.as_ref().clone()), - (sub2_test1.clone(), metadata1.as_ref().clone()), - (test4.clone(), metadata1.as_ref().clone()), - ], - ); + assert_catalog_parquet_files(&state, &expected); trace.record(&catalog, &state, true); trace diff --git a/parquet_file/src/rebuild.rs b/parquet_file/src/rebuild.rs index 105bba9096..43c66d0aeb 100644 --- a/parquet_file/src/rebuild.rs +++ b/parquet_file/src/rebuild.rs @@ -189,11 +189,11 @@ mod tests { let mut transaction = catalog.open_transaction().await; let info = create_parquet_file(&iox_object_store, 0).await; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); transaction.add_parquet(&info).unwrap(); let info = create_parquet_file(&iox_object_store, 1).await; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); transaction.add_parquet(&info).unwrap(); transaction.commit().await.unwrap(); @@ -207,7 +207,7 @@ mod tests { let mut transaction = catalog.open_transaction().await; let info = create_parquet_file(&iox_object_store, 2).await; - state.parquet_files.insert(info.path.clone(), info.clone()); + state.insert(info.clone()).unwrap(); transaction.add_parquet(&info).unwrap(); transaction.commit().await.unwrap(); @@ -215,7 +215,7 @@ mod tests { // store catalog state let paths_expected = { - let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); + let mut tmp: Vec<_> = state.files().map(|info| info.path.clone()).collect(); tmp.sort(); tmp }; @@ -231,7 +231,7 @@ mod tests { // check match let paths_actual = { - let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); + let mut tmp: Vec<_> = state.files().map(|info| info.path.clone()).collect(); tmp.sort(); tmp }; @@ -259,7 +259,7 @@ mod tests { .unwrap(); // check match - assert!(state.parquet_files.is_empty()); + assert!(state.files().next().is_none()); assert_eq!(catalog.revision_counter(), 0); } @@ -290,7 +290,7 @@ mod tests { let (catalog, state) = rebuild_catalog::(iox_object_store, (), true) .await .unwrap(); - assert!(state.parquet_files.is_empty()); + assert!(state.files().next().is_none()); assert_eq!(catalog.revision_counter(), 0); } diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index 2fc120ff11..d2b61c2341 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -764,12 +764,6 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec) -> record_batches } -/// Create an arbitrary ParquetFilePath -pub fn make_parquet_file_path() -> ParquetFilePath { - let chunk_addr = chunk_addr(3); - ParquetFilePath::new(&chunk_addr) -} - /// Create test metadata by creating a parquet file and reading it back into memory. /// /// See [`make_chunk`] for details. diff --git a/server/src/db.rs b/server/src/db.rs index a96c6c82f0..68ded4cfa9 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -3921,7 +3921,7 @@ mod tests { .unwrap() .unwrap(); let paths_actual = { - let mut tmp: Vec<_> = catalog.parquet_files.keys().cloned().collect(); + let mut tmp: Vec<_> = catalog.files().map(|info| info.path.clone()).collect(); tmp.sort(); tmp };