test: test preserved catalog <-> Db write wiring
parent
4299371cf2
commit
cdf0ada6a6
|
@ -877,27 +877,21 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{cell::RefCell, num::NonZeroU32, ops::Deref};
|
||||
|
||||
use crate::{
|
||||
metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata},
|
||||
storage::read_schema_from_parquet_metadata,
|
||||
utils::{load_parquet_from_store, make_chunk, make_object_store},
|
||||
};
|
||||
use object_store::parsed_path;
|
||||
|
||||
pub mod test_helpers {
|
||||
use super::*;
|
||||
use std::{cell::RefCell, ops::Deref};
|
||||
|
||||
/// Part that actually holds the data of [`TestCatalogState`].
|
||||
#[derive(Clone, Debug)]
|
||||
struct TestCatalogStateInner {
|
||||
pub struct TestCatalogStateInner {
|
||||
/// Map of all parquet files that are currently registered.
|
||||
pub parquet_files: HashMap<DirsAndFileName, ParquetMetaData>,
|
||||
}
|
||||
|
||||
/// In-memory catalog state, for testing.
|
||||
#[derive(Clone, Debug)]
|
||||
struct TestCatalogState {
|
||||
pub struct TestCatalogState {
|
||||
/// Inner mutable state.
|
||||
pub inner: RefCell<TestCatalogStateInner>,
|
||||
}
|
||||
|
||||
|
@ -950,6 +944,21 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use std::{num::NonZeroU32, ops::Deref};
|
||||
|
||||
use crate::{
|
||||
metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata},
|
||||
storage::read_schema_from_parquet_metadata,
|
||||
utils::{load_parquet_from_store, make_chunk, make_object_store},
|
||||
};
|
||||
use object_store::parsed_path;
|
||||
|
||||
use super::test_helpers::TestCatalogState;
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_inmem_commit_semantics() {
|
||||
|
|
|
@ -1188,7 +1188,11 @@ mod tests {
|
|||
};
|
||||
use entry::test_helpers::lp_to_entry;
|
||||
use futures::{stream, StreamExt, TryStreamExt};
|
||||
use object_store::{memory::InMemory, path::Path, ObjectStore, ObjectStoreApi};
|
||||
use object_store::{
|
||||
memory::InMemory,
|
||||
path::{ObjectStorePath, Path},
|
||||
ObjectStore, ObjectStoreApi,
|
||||
};
|
||||
use parquet_file::{
|
||||
metadata::read_parquet_metadata_from_file,
|
||||
storage::read_schema_from_parquet_metadata,
|
||||
|
@ -2686,4 +2690,85 @@ mod tests {
|
|||
.gt(0.07)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_one_chunk_to_preserved_catalog() {
|
||||
// Test that parquet data is commited to preserved catalog
|
||||
let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
let db_name = "preserved_catalog_test";
|
||||
|
||||
// Create a DB given a server id, an object store and a db name
|
||||
let test_db = TestDb::builder()
|
||||
.object_store(Arc::clone(&object_store))
|
||||
.server_id(server_id)
|
||||
.db_name(db_name)
|
||||
.build();
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
// Write some line protocols in Mutable buffer of the DB
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
|
||||
//Now mark the MB chunk close
|
||||
let partition_key = "1970-01-01T00";
|
||||
let mb_chunk = db
|
||||
.rollover_partition("1970-01-01T00", "cpu")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
// Move that MB chunk to RB chunk and drop it from MB
|
||||
db.load_chunk_to_read_buffer(partition_key, "cpu", mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// at this point, no preserved catalog exists
|
||||
let maybe_preserved_catalog =
|
||||
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::load(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name.to_string(),
|
||||
(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(maybe_preserved_catalog.is_none());
|
||||
|
||||
// Write the RB chunk to Object Store but keep it in RB
|
||||
db.write_chunk_to_object_store(partition_key, "cpu", mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// the preserved catalog should now register a single file
|
||||
let chunk = {
|
||||
let partition = db.catalog.state().valid_partition(&partition_key).unwrap();
|
||||
let partition = partition.read();
|
||||
|
||||
partition.chunk("cpu", mb_chunk.id()).unwrap()
|
||||
};
|
||||
let chunk = chunk.read();
|
||||
if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() {
|
||||
let path = chunk.table_path("cpu").unwrap().display();
|
||||
|
||||
let preserved_catalog = PreservedCatalog::<
|
||||
parquet_file::catalog::test_helpers::TestCatalogState,
|
||||
>::load(
|
||||
object_store, server_id, db_name.to_string(), ()
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let mut paths: Vec<String> = preserved_catalog
|
||||
.state()
|
||||
.inner
|
||||
.borrow()
|
||||
.parquet_files
|
||||
.keys()
|
||||
.map(|p| p.display())
|
||||
.collect();
|
||||
paths.sort();
|
||||
assert_eq!(paths, vec![path]);
|
||||
} else {
|
||||
panic!("Wrong chunk state.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue