test: add test suite for `CatalogState` impls
This makes it easier to check if `CatalogState` correctly implement all features, including transaction aborting.pull/24376/head
parent
a429de4784
commit
e064a6bbba
|
@ -1226,10 +1226,13 @@ where
|
|||
}
|
||||
|
||||
pub mod test_helpers {
|
||||
use object_store::parsed_path;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::test_utils::{make_metadata, make_object_store};
|
||||
|
||||
use super::*;
|
||||
use std::ops::Deref;
|
||||
use std::{convert::TryFrom, ops::Deref};
|
||||
|
||||
/// Part that actually holds the data of [`TestCatalogState`].
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -1341,6 +1344,255 @@ pub mod test_helpers {
|
|||
.expect("should have at least a single transaction")
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Torture-test implementations for [`CatalogState`].
|
||||
pub async fn assert_catalog_state_implementation<S>(state_data: S::EmptyInput, test_abort: bool)
|
||||
where
|
||||
S: CatalogState + Send + Sync,
|
||||
{
|
||||
// empty state
|
||||
let object_store = make_object_store();
|
||||
let catalog = PreservedCatalog::<S>::new_empty(
|
||||
Arc::clone(&object_store),
|
||||
ServerId::try_from(1).unwrap(),
|
||||
"db1".to_string(),
|
||||
state_data,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut expected = HashMap::new();
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// add files
|
||||
let mut chunk_id_watermark = 5;
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
for chunk_id in 0..chunk_id_watermark {
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id).as_ref());
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", chunk_id).await;
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
expected.insert(path, Arc::new(metadata));
|
||||
}
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// remove files
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
let path = parsed_path!("chunk_1");
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
expected.remove(&path);
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// add and remove in the same transaction
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", chunk_id_watermark).await;
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
chunk_id_watermark += 1;
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// remove and add in the same transaction
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
let path = parsed_path!("chunk_2");
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", 2).await;
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// add, remove, add in the same transaction
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", chunk_id_watermark).await;
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
expected.insert(path, Arc::new(metadata));
|
||||
chunk_id_watermark += 1;
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// remove, add, remove in same transaction
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
let path = parsed_path!("chunk_2");
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", 2).await;
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
expected.remove(&path);
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// error handling, no real opt
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
// already exists (should also not change the metadata)
|
||||
let path = parsed_path!("chunk_0");
|
||||
let (_, metadata) = make_metadata(&object_store, "fail", 0).await;
|
||||
let err = transaction.add_parquet(&path, &metadata).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||
|
||||
// does not exist
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let err = transaction.remove_parquet(&path).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||
chunk_id_watermark += 1;
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// error handling, still something works
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
// already exists (should also not change the metadata)
|
||||
let path = parsed_path!("chunk_0");
|
||||
let (_, metadata) = make_metadata(&object_store, "fail", 0).await;
|
||||
let err = transaction.add_parquet(&path, &metadata).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||
|
||||
// this transaction will still work
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", chunk_id_watermark).await;
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
expected.insert(path.clone(), Arc::new(metadata.clone()));
|
||||
chunk_id_watermark += 1;
|
||||
|
||||
// recently added
|
||||
let err = transaction.add_parquet(&path, &metadata).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||
|
||||
// does not exist
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let err = transaction.remove_parquet(&path).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||
chunk_id_watermark += 1;
|
||||
|
||||
// this still works
|
||||
let path = parsed_path!("chunk_3");
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
expected.remove(&path);
|
||||
|
||||
// recently removed
|
||||
let err = transaction.remove_parquet(&path).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||
|
||||
transaction.commit(true).await.unwrap();
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
|
||||
// transaction aborting
|
||||
if test_abort {
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
// add
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", chunk_id_watermark).await;
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
chunk_id_watermark += 1;
|
||||
|
||||
// remove
|
||||
let path = parsed_path!("chunk_4");
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
|
||||
// add and remove
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let (_, metadata) = make_metadata(&object_store, "ok", chunk_id_watermark).await;
|
||||
transaction.add_parquet(&path, &metadata).unwrap();
|
||||
transaction.remove_parquet(&path).unwrap();
|
||||
chunk_id_watermark += 1;
|
||||
}
|
||||
assert_files_eq(&catalog.state().files(), &expected);
|
||||
}
|
||||
|
||||
// transaction aborting w/ errors
|
||||
if test_abort {
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
// already exists (should also not change the metadata)
|
||||
let path = parsed_path!("chunk_0");
|
||||
let (_, metadata) = make_metadata(&object_store, "fail", 0).await;
|
||||
let err = transaction.add_parquet(&path, &metadata).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||
|
||||
// does not exist
|
||||
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
|
||||
let err = transaction.remove_parquet(&path).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||
chunk_id_watermark += 1;
|
||||
}
|
||||
|
||||
// consume variable so that we can easily add tests w/o re-adding the final modification
|
||||
println!("{}", chunk_id_watermark);
|
||||
}
|
||||
|
||||
/// Assert that tracked files and their linked metadata are equal.
|
||||
fn assert_files_eq(
|
||||
actual: &HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>,
|
||||
expected: &HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>,
|
||||
) {
|
||||
let sorted_keys_actual = get_sorted_keys(actual);
|
||||
let sorted_keys_expected = get_sorted_keys(expected);
|
||||
assert_eq!(sorted_keys_actual, sorted_keys_expected);
|
||||
|
||||
for k in sorted_keys_actual {
|
||||
let md_actual = &actual[&k];
|
||||
let md_expected = &expected[&k];
|
||||
|
||||
let iox_md_actual = md_actual.read_iox_metadata().unwrap();
|
||||
let iox_md_expected = md_expected.read_iox_metadata().unwrap();
|
||||
assert_eq!(iox_md_actual, iox_md_expected);
|
||||
|
||||
let schema_actual = md_actual.read_schema().unwrap();
|
||||
let schema_expected = md_expected.read_schema().unwrap();
|
||||
assert_eq!(schema_actual, schema_expected);
|
||||
|
||||
let stats_actual = md_actual.read_statistics(&schema_actual, "foo").unwrap();
|
||||
let stats_expected = md_expected
|
||||
.read_statistics(&schema_expected, "foo")
|
||||
.unwrap();
|
||||
assert_eq!(stats_actual, stats_expected);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a sorted list of keys from `HashMap`.
|
||||
fn get_sorted_keys<K, V>(map: &HashMap<K, V>) -> Vec<K>
|
||||
where
|
||||
K: Clone + Ord,
|
||||
{
|
||||
let mut keys: Vec<K> = map.keys().cloned().collect();
|
||||
keys.sort();
|
||||
keys
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -1350,7 +1602,9 @@ mod tests {
|
|||
use crate::test_utils::{make_metadata, make_object_store};
|
||||
use object_store::parsed_path;
|
||||
|
||||
use super::test_helpers::{break_catalog_with_weird_version, TestCatalogState};
|
||||
use super::test_helpers::{
|
||||
assert_catalog_state_implementation, break_catalog_with_weird_version, TestCatalogState,
|
||||
};
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -3029,4 +3283,9 @@ mod tests {
|
|||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_catalog_state() {
|
||||
assert_catalog_state_implementation::<TestCatalogState>((), true).await;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue