diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index c013bd8d2f..5c2b731c26 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -877,27 +877,21 @@ where } } -#[cfg(test)] -mod tests { - use std::{cell::RefCell, num::NonZeroU32, ops::Deref}; - - use crate::{ - metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata}, - storage::read_schema_from_parquet_metadata, - utils::{load_parquet_from_store, make_chunk, make_object_store}, - }; - use object_store::parsed_path; - +pub mod test_helpers { use super::*; + use std::{cell::RefCell, ops::Deref}; + /// Part that actually holds the data of [`TestCatalogState`]. #[derive(Clone, Debug)] - struct TestCatalogStateInner { + pub struct TestCatalogStateInner { + /// Map of all parquet files that are currently registered. pub parquet_files: HashMap, } /// In-memory catalog state, for testing. #[derive(Clone, Debug)] - struct TestCatalogState { + pub struct TestCatalogState { + /// Inner mutable state. pub inner: RefCell, } @@ -950,6 +944,21 @@ mod tests { Ok(()) } } +} + +#[cfg(test)] +pub mod tests { + use std::{num::NonZeroU32, ops::Deref}; + + use crate::{ + metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata}, + storage::read_schema_from_parquet_metadata, + utils::{load_parquet_from_store, make_chunk, make_object_store}, + }; + use object_store::parsed_path; + + use super::test_helpers::TestCatalogState; + use super::*; #[tokio::test] async fn test_inmem_commit_semantics() { diff --git a/server/src/db.rs b/server/src/db.rs index 4201b33c7a..7cf3a618ef 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1188,7 +1188,11 @@ mod tests { }; use entry::test_helpers::lp_to_entry; use futures::{stream, StreamExt, TryStreamExt}; - use object_store::{memory::InMemory, path::Path, ObjectStore, ObjectStoreApi}; + use object_store::{ + memory::InMemory, + path::{ObjectStorePath, Path}, + ObjectStore, ObjectStoreApi, + }; use parquet_file::{ metadata::read_parquet_metadata_from_file, storage::read_schema_from_parquet_metadata, @@ -2686,4 +2690,85 @@ mod tests { .gt(0.07) .unwrap(); } + + #[tokio::test] + async fn write_one_chunk_to_preserved_catalog() { + // Test that parquet data is commited to preserved catalog + let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + let server_id = ServerId::try_from(1).unwrap(); + let db_name = "preserved_catalog_test"; + + // Create a DB given a server id, an object store and a db name + let test_db = TestDb::builder() + .object_store(Arc::clone(&object_store)) + .server_id(server_id) + .db_name(db_name) + .build(); + let db = Arc::new(test_db.db); + + // Write some line protocols in Mutable buffer of the DB + write_lp(db.as_ref(), "cpu bar=1 10"); + + //Now mark the MB chunk close + let partition_key = "1970-01-01T00"; + let mb_chunk = db + .rollover_partition("1970-01-01T00", "cpu") + .await + .unwrap() + .unwrap(); + // Move that MB chunk to RB chunk and drop it from MB + db.load_chunk_to_read_buffer(partition_key, "cpu", mb_chunk.id()) + .await + .unwrap(); + + // at this point, no preserved catalog exists + let maybe_preserved_catalog = + PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await + .unwrap(); + assert!(maybe_preserved_catalog.is_none()); + + // Write the RB chunk to Object Store but keep it in RB + db.write_chunk_to_object_store(partition_key, "cpu", mb_chunk.id()) + .await + .unwrap(); + + // the preserved catalog should now register a single file + let chunk = { + let partition = db.catalog.state().valid_partition(&partition_key).unwrap(); + let partition = partition.read(); + + partition.chunk("cpu", mb_chunk.id()).unwrap() + }; + let chunk = chunk.read(); + if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { + let path = chunk.table_path("cpu").unwrap().display(); + + let preserved_catalog = PreservedCatalog::< + parquet_file::catalog::test_helpers::TestCatalogState, + >::load( + object_store, server_id, db_name.to_string(), () + ) + .await + .unwrap() + .unwrap(); + let mut paths: Vec = preserved_catalog + .state() + .inner + .borrow() + .parquet_files + .keys() + .map(|p| p.display()) + .collect(); + paths.sort(); + assert_eq!(paths, vec![path]); + } else { + panic!("Wrong chunk state."); + } + } }