Merge branch 'main' into cn/bump
commit
3ca0d5d42f
|
@ -1148,24 +1148,69 @@ impl<'c> Debug for CheckpointHandle<'c> {
|
||||||
|
|
||||||
pub mod test_helpers {
|
pub mod test_helpers {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata};
|
||||||
chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path,
|
|
||||||
};
|
#[derive(Clone, Debug, Default)]
|
||||||
|
pub struct Table {
|
||||||
|
pub partitions: HashMap<Arc<str>, Partition>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Default)]
|
||||||
|
pub struct Partition {
|
||||||
|
pub chunks: HashMap<u32, CatalogParquetInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
/// In-memory catalog state, for testing.
|
/// In-memory catalog state, for testing.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Default)]
|
||||||
pub struct TestCatalogState {
|
pub struct TestCatalogState {
|
||||||
/// Map of all parquet files that are currently pregistered.
|
/// Map of all parquet files that are currently pregistered.
|
||||||
pub parquet_files: HashMap<ParquetFilePath, CatalogParquetInfo>,
|
pub tables: HashMap<Arc<str>, Table>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestCatalogState {
|
impl TestCatalogState {
|
||||||
/// Simple way to create [`CheckpointData`].
|
/// Simple way to create [`CheckpointData`].
|
||||||
pub fn checkpoint_data(&self) -> CheckpointData {
|
pub fn checkpoint_data(&self) -> CheckpointData {
|
||||||
CheckpointData {
|
CheckpointData {
|
||||||
files: self.parquet_files.clone(),
|
files: self
|
||||||
|
.files()
|
||||||
|
.map(|info| (info.path.clone(), info.clone()))
|
||||||
|
.collect(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns an iterator over the files in this catalog state
|
||||||
|
pub fn files(&self) -> impl Iterator<Item = &CatalogParquetInfo> {
|
||||||
|
self.tables.values().flat_map(|table| {
|
||||||
|
table
|
||||||
|
.partitions
|
||||||
|
.values()
|
||||||
|
.flat_map(|partition| partition.chunks.values())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inserts a file into this catalog state
|
||||||
|
pub fn insert(&mut self, info: CatalogParquetInfo) -> Result<()> {
|
||||||
|
let iox_md = info
|
||||||
|
.metadata
|
||||||
|
.read_iox_metadata()
|
||||||
|
.context(MetadataExtractFailed {
|
||||||
|
path: info.path.clone(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let table = self.tables.entry(iox_md.table_name).or_default();
|
||||||
|
let partition = table.partitions.entry(iox_md.partition_key).or_default();
|
||||||
|
|
||||||
|
match partition.chunks.entry(iox_md.chunk_id) {
|
||||||
|
Occupied(o) => {
|
||||||
|
return Err(Error::ParquetFileAlreadyExists {
|
||||||
|
path: o.get().path.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Vacant(v) => v.insert(info),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CatalogState for TestCatalogState {
|
impl CatalogState for TestCatalogState {
|
||||||
|
@ -1173,7 +1218,7 @@ pub mod test_helpers {
|
||||||
|
|
||||||
fn new_empty(_db_name: &str, _data: Self::EmptyInput) -> Self {
|
fn new_empty(_db_name: &str, _data: Self::EmptyInput) -> Self {
|
||||||
Self {
|
Self {
|
||||||
parquet_files: HashMap::new(),
|
tables: HashMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1182,31 +1227,38 @@ pub mod test_helpers {
|
||||||
_iox_object_store: Arc<IoxObjectStore>,
|
_iox_object_store: Arc<IoxObjectStore>,
|
||||||
info: CatalogParquetInfo,
|
info: CatalogParquetInfo,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
match self.parquet_files.entry(info.path.clone()) {
|
self.insert(info)
|
||||||
Occupied(o) => {
|
|
||||||
return Err(Error::ParquetFileAlreadyExists {
|
|
||||||
path: o.key().clone(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Vacant(v) => {
|
|
||||||
v.insert(info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove(&mut self, path: &ParquetFilePath) -> Result<()> {
|
fn remove(&mut self, path: &ParquetFilePath) -> Result<()> {
|
||||||
match self.parquet_files.entry(path.clone()) {
|
let partitions = self
|
||||||
Occupied(o) => {
|
.tables
|
||||||
o.remove();
|
.values_mut()
|
||||||
}
|
.flat_map(|table| table.partitions.values_mut());
|
||||||
Vacant(v) => {
|
let mut removed = 0;
|
||||||
return Err(Error::ParquetFileDoesNotExist { path: v.into_key() });
|
|
||||||
|
for partition in partitions {
|
||||||
|
let to_remove: Vec<_> = partition
|
||||||
|
.chunks
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(id, chunk)| {
|
||||||
|
if &chunk.path == path {
|
||||||
|
return Some(*id);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for id in to_remove {
|
||||||
|
removed += 1;
|
||||||
|
partition.chunks.remove(&id).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
match removed {
|
||||||
|
0 => Err(Error::ParquetFileDoesNotExist { path: path.clone() }),
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1243,13 +1295,14 @@ pub mod test_helpers {
|
||||||
PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), state_data)
|
PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), state_data)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut expected: HashMap<ParquetFilePath, _> = HashMap::new();
|
|
||||||
|
// The expected state of the catalog
|
||||||
|
let mut expected: HashMap<u32, (ParquetFilePath, Arc<IoxParquetMetaData>)> = HashMap::new();
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// add files
|
// add files
|
||||||
let mut chunk_id_watermark = 5;
|
|
||||||
{
|
{
|
||||||
for chunk_id in 0..chunk_id_watermark {
|
for chunk_id in 0..5 {
|
||||||
let (path, metadata) =
|
let (path, metadata) =
|
||||||
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await;
|
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await;
|
||||||
state
|
state
|
||||||
|
@ -1262,23 +1315,21 @@ pub mod test_helpers {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
expected.insert(path, Arc::new(metadata));
|
expected.insert(chunk_id, (path, Arc::new(metadata)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// remove files
|
// remove files
|
||||||
{
|
{
|
||||||
let path = expected.keys().next().unwrap().clone();
|
let (path, _) = expected.remove(&1).unwrap();
|
||||||
state.remove(&path).unwrap();
|
state.remove(&path).unwrap();
|
||||||
expected.remove(&path);
|
|
||||||
}
|
}
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// add and remove in the same transaction
|
// add and remove in the same transaction
|
||||||
{
|
{
|
||||||
let (path, metadata) =
|
let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(5)).await;
|
||||||
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
|
|
||||||
state
|
state
|
||||||
.add(
|
.add(
|
||||||
Arc::clone(&iox_object_store),
|
Arc::clone(&iox_object_store),
|
||||||
|
@ -1290,15 +1341,13 @@ pub mod test_helpers {
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
state.remove(&path).unwrap();
|
state.remove(&path).unwrap();
|
||||||
chunk_id_watermark += 1;
|
|
||||||
}
|
}
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// remove and add in the same transaction
|
// remove and add in the same transaction
|
||||||
{
|
{
|
||||||
let path = expected.keys().next().unwrap().clone();
|
let (path, metadata) = expected.get(&3).unwrap();
|
||||||
let metadata = expected.get(&path).unwrap();
|
state.remove(path).unwrap();
|
||||||
state.remove(&path).unwrap();
|
|
||||||
state
|
state
|
||||||
.add(
|
.add(
|
||||||
Arc::clone(&iox_object_store),
|
Arc::clone(&iox_object_store),
|
||||||
|
@ -1314,8 +1363,7 @@ pub mod test_helpers {
|
||||||
|
|
||||||
// add, remove, add in the same transaction
|
// add, remove, add in the same transaction
|
||||||
{
|
{
|
||||||
let (path, metadata) =
|
let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(6)).await;
|
||||||
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
|
|
||||||
state
|
state
|
||||||
.add(
|
.add(
|
||||||
Arc::clone(&iox_object_store),
|
Arc::clone(&iox_object_store),
|
||||||
|
@ -1337,15 +1385,13 @@ pub mod test_helpers {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
expected.insert(path, Arc::new(metadata));
|
expected.insert(6, (path, Arc::new(metadata)));
|
||||||
chunk_id_watermark += 1;
|
|
||||||
}
|
}
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// remove, add, remove in same transaction
|
// remove, add, remove in same transaction
|
||||||
{
|
{
|
||||||
let path = expected.keys().next().unwrap().clone();
|
let (path, metadata) = expected.remove(&4).unwrap();
|
||||||
let metadata = expected.get(&path).unwrap();
|
|
||||||
state.remove(&path).unwrap();
|
state.remove(&path).unwrap();
|
||||||
state
|
state
|
||||||
.add(
|
.add(
|
||||||
|
@ -1353,20 +1399,20 @@ pub mod test_helpers {
|
||||||
CatalogParquetInfo {
|
CatalogParquetInfo {
|
||||||
path: path.clone(),
|
path: path.clone(),
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(metadata),
|
metadata: Arc::clone(&metadata),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
state.remove(&path).unwrap();
|
state.remove(&path).unwrap();
|
||||||
expected.remove(&path);
|
|
||||||
}
|
}
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// error handling, no real opt
|
// error handling, no real opt
|
||||||
{
|
{
|
||||||
// already exists (should also not change the metadata)
|
// TODO: Error handling should disambiguate between chunk collision and filename collision
|
||||||
let path = expected.keys().next().unwrap();
|
|
||||||
let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
|
// chunk with same ID already exists (should also not change the metadata)
|
||||||
|
let (path, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
|
||||||
let err = state
|
let err = state
|
||||||
.add(
|
.add(
|
||||||
Arc::clone(&iox_object_store),
|
Arc::clone(&iox_object_store),
|
||||||
|
@ -1379,46 +1425,43 @@ pub mod test_helpers {
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||||
|
|
||||||
// does not exist
|
// does not exist as has a different UUID
|
||||||
let path = make_parquet_file_path();
|
|
||||||
let err = state.remove(&path).unwrap_err();
|
let err = state.remove(&path).unwrap_err();
|
||||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||||
chunk_id_watermark += 1;
|
|
||||||
}
|
}
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// error handling, still something works
|
// error handling, still something works
|
||||||
{
|
{
|
||||||
// already exists (should also not change the metadata)
|
// already exists (should also not change the metadata)
|
||||||
let path = expected.keys().next().unwrap();
|
let (_, metadata) = expected.get(&0).unwrap();
|
||||||
let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
|
|
||||||
let err = state
|
let err = state
|
||||||
.add(
|
.add(
|
||||||
Arc::clone(&iox_object_store),
|
Arc::clone(&iox_object_store),
|
||||||
CatalogParquetInfo {
|
CatalogParquetInfo {
|
||||||
path: path.clone(),
|
// Intentionally "incorrect" path
|
||||||
|
path: ParquetFilePath::new(&chunk_addr(10)),
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::new(metadata.clone()),
|
metadata: Arc::clone(metadata),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||||
|
|
||||||
// this transaction will still work
|
// this transaction will still work
|
||||||
let (path, metadata) =
|
let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(7)).await;
|
||||||
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
|
let metadata = Arc::new(metadata);
|
||||||
state
|
state
|
||||||
.add(
|
.add(
|
||||||
Arc::clone(&iox_object_store),
|
Arc::clone(&iox_object_store),
|
||||||
CatalogParquetInfo {
|
CatalogParquetInfo {
|
||||||
path: path.clone(),
|
path: path.clone(),
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::new(metadata.clone()),
|
metadata: Arc::clone(&metadata),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
expected.insert(path.clone(), Arc::new(metadata.clone()));
|
expected.insert(7, (path.clone(), Arc::clone(&metadata)));
|
||||||
chunk_id_watermark += 1;
|
|
||||||
|
|
||||||
// recently added
|
// recently added
|
||||||
let err = state
|
let err = state
|
||||||
|
@ -1427,50 +1470,44 @@ pub mod test_helpers {
|
||||||
CatalogParquetInfo {
|
CatalogParquetInfo {
|
||||||
path,
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::new(metadata),
|
metadata: Arc::clone(&metadata),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||||
|
|
||||||
// does not exist
|
// does not exist - as different UUID
|
||||||
let path = make_parquet_file_path();
|
let path = ParquetFilePath::new(&chunk_addr(7));
|
||||||
let err = state.remove(&path).unwrap_err();
|
let err = state.remove(&path).unwrap_err();
|
||||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||||
chunk_id_watermark += 1;
|
|
||||||
|
|
||||||
// this still works
|
// this still works
|
||||||
let path = expected.keys().next().unwrap().clone();
|
let (path, _) = expected.remove(&7).unwrap();
|
||||||
state.remove(&path).unwrap();
|
state.remove(&path).unwrap();
|
||||||
expected.remove(&path);
|
|
||||||
|
|
||||||
// recently removed
|
// recently removed
|
||||||
let err = state.remove(&path).unwrap_err();
|
let err = state.remove(&path).unwrap_err();
|
||||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||||
}
|
}
|
||||||
assert_checkpoint(&state, &f, &expected);
|
assert_checkpoint(&state, &f, &expected);
|
||||||
|
|
||||||
// consume variable so that we can easily add tests w/o re-adding the final modification
|
|
||||||
println!("{}", chunk_id_watermark);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Assert that tracked files and their linked metadata are equal.
|
/// Assert that tracked files and their linked metadata are equal.
|
||||||
fn assert_checkpoint<S, F>(
|
fn assert_checkpoint<S, F>(
|
||||||
state: &S,
|
state: &S,
|
||||||
f: &F,
|
f: &F,
|
||||||
expected_files: &HashMap<ParquetFilePath, Arc<IoxParquetMetaData>>,
|
expected_files: &HashMap<u32, (ParquetFilePath, Arc<IoxParquetMetaData>)>,
|
||||||
) where
|
) where
|
||||||
F: Fn(&S) -> CheckpointData,
|
F: Fn(&S) -> CheckpointData,
|
||||||
{
|
{
|
||||||
let actual_files = f(state).files;
|
let actual_files: HashMap<ParquetFilePath, CatalogParquetInfo> = f(state).files;
|
||||||
|
|
||||||
let sorted_keys_actual = get_sorted_keys(&actual_files);
|
let sorted_keys_actual = get_sorted_keys(actual_files.keys());
|
||||||
let sorted_keys_expected = get_sorted_keys(expected_files);
|
let sorted_keys_expected = get_sorted_keys(expected_files.values().map(|(path, _)| path));
|
||||||
assert_eq!(sorted_keys_actual, sorted_keys_expected);
|
assert_eq!(sorted_keys_actual, sorted_keys_expected);
|
||||||
|
|
||||||
for k in sorted_keys_actual {
|
for (path, md_expected) in expected_files.values() {
|
||||||
let md_actual = &actual_files[&k].metadata;
|
let md_actual = &actual_files[path].metadata;
|
||||||
let md_expected = &expected_files[&k];
|
|
||||||
|
|
||||||
let iox_md_actual = md_actual.read_iox_metadata().unwrap();
|
let iox_md_actual = md_actual.read_iox_metadata().unwrap();
|
||||||
let iox_md_expected = md_expected.read_iox_metadata().unwrap();
|
let iox_md_expected = md_expected.read_iox_metadata().unwrap();
|
||||||
|
@ -1486,12 +1523,11 @@ pub mod test_helpers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a sorted list of keys from `HashMap`.
|
/// Get a sorted list of keys from an iterator.
|
||||||
fn get_sorted_keys<K, V>(map: &HashMap<K, V>) -> Vec<K>
|
fn get_sorted_keys<'a>(
|
||||||
where
|
keys: impl Iterator<Item = &'a ParquetFilePath>,
|
||||||
K: Clone + Ord,
|
) -> Vec<&'a ParquetFilePath> {
|
||||||
{
|
let mut keys: Vec<_> = keys.collect();
|
||||||
let mut keys: Vec<K> = map.keys().cloned().collect();
|
|
||||||
keys.sort();
|
keys.sort();
|
||||||
keys
|
keys
|
||||||
}
|
}
|
||||||
|
@ -1505,9 +1541,7 @@ mod tests {
|
||||||
},
|
},
|
||||||
*,
|
*,
|
||||||
};
|
};
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata};
|
||||||
chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_create_empty() {
|
async fn test_create_empty() {
|
||||||
|
@ -2086,9 +2120,6 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_checkpoint() {
|
async fn test_checkpoint() {
|
||||||
let iox_object_store = make_iox_object_store();
|
let iox_object_store = make_iox_object_store();
|
||||||
let addr = chunk_addr(1337);
|
|
||||||
let (_, metadata) = make_metadata(&iox_object_store, "foo", addr.clone()).await;
|
|
||||||
let metadata = Arc::new(metadata);
|
|
||||||
|
|
||||||
// use common test as baseline
|
// use common test as baseline
|
||||||
let mut trace = assert_single_catalog_inmem_works(&iox_object_store).await;
|
let mut trace = assert_single_catalog_inmem_works(&iox_object_store).await;
|
||||||
|
@ -2113,14 +2144,16 @@ mod tests {
|
||||||
|
|
||||||
// create another transaction on-top that adds a file (this transaction will be required to load the full state)
|
// create another transaction on-top that adds a file (this transaction will be required to load the full state)
|
||||||
{
|
{
|
||||||
|
let addr = chunk_addr(1337);
|
||||||
|
let (path, metadata) = make_metadata(&iox_object_store, "foo", addr.clone()).await;
|
||||||
|
|
||||||
let mut transaction = catalog.open_transaction().await;
|
let mut transaction = catalog.open_transaction().await;
|
||||||
let path = make_parquet_file_path();
|
|
||||||
let info = CatalogParquetInfo {
|
let info = CatalogParquetInfo {
|
||||||
path,
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(&metadata),
|
metadata: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
transaction.add_parquet(&info).unwrap();
|
transaction.add_parquet(&info).unwrap();
|
||||||
let ckpt_handle = transaction.commit().await.unwrap();
|
let ckpt_handle = transaction.commit().await.unwrap();
|
||||||
ckpt_handle
|
ckpt_handle
|
||||||
|
@ -2163,8 +2196,7 @@ mod tests {
|
||||||
state: &TestCatalogState,
|
state: &TestCatalogState,
|
||||||
) -> Vec<(ParquetFilePath, IoxParquetMetaData)> {
|
) -> Vec<(ParquetFilePath, IoxParquetMetaData)> {
|
||||||
let mut files: Vec<(ParquetFilePath, IoxParquetMetaData)> = state
|
let mut files: Vec<(ParquetFilePath, IoxParquetMetaData)> = state
|
||||||
.parquet_files
|
.files()
|
||||||
.values()
|
|
||||||
.map(|info| (info.path.clone(), info.metadata.as_ref().clone()))
|
.map(|info| (info.path.clone(), info.metadata.as_ref().clone()))
|
||||||
.collect();
|
.collect();
|
||||||
files.sort_by_key(|(path, _)| path.clone());
|
files.sort_by_key(|(path, _)| path.clone());
|
||||||
|
@ -2255,12 +2287,6 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// get some test metadata
|
|
||||||
let (_, metadata1) = make_metadata(iox_object_store, "foo", chunk_addr(1)).await;
|
|
||||||
let metadata1 = Arc::new(metadata1);
|
|
||||||
let (_, metadata2) = make_metadata(iox_object_store, "bar", chunk_addr(1)).await;
|
|
||||||
let metadata2 = Arc::new(metadata2);
|
|
||||||
|
|
||||||
// track all the intermediate results
|
// track all the intermediate results
|
||||||
let mut trace = TestTrace::new();
|
let mut trace = TestTrace::new();
|
||||||
|
|
||||||
|
@ -2269,116 +2295,102 @@ mod tests {
|
||||||
assert_catalog_parquet_files(&state, &[]);
|
assert_catalog_parquet_files(&state, &[]);
|
||||||
trace.record(&catalog, &state, false);
|
trace.record(&catalog, &state, false);
|
||||||
|
|
||||||
|
let mut expected = vec![];
|
||||||
|
|
||||||
// fill catalog with examples
|
// fill catalog with examples
|
||||||
let test1 = make_parquet_file_path();
|
|
||||||
let sub1_test1 = make_parquet_file_path();
|
|
||||||
let sub1_test2 = make_parquet_file_path();
|
|
||||||
let sub2_test1 = make_parquet_file_path();
|
|
||||||
{
|
{
|
||||||
let mut t = catalog.open_transaction().await;
|
let mut t = catalog.open_transaction().await;
|
||||||
|
|
||||||
|
let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(0)).await;
|
||||||
|
expected.push((path.clone(), metadata.clone()));
|
||||||
let info = CatalogParquetInfo {
|
let info = CatalogParquetInfo {
|
||||||
path: test1.clone(),
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(&metadata1),
|
metadata: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
t.add_parquet(&info).unwrap();
|
t.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
|
let (path, metadata) = make_metadata(iox_object_store, "bar", chunk_addr(1)).await;
|
||||||
|
expected.push((path.clone(), metadata.clone()));
|
||||||
let info = CatalogParquetInfo {
|
let info = CatalogParquetInfo {
|
||||||
path: sub1_test1.clone(),
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(&metadata2),
|
metadata: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
t.add_parquet(&info).unwrap();
|
t.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
|
let (path, metadata) = make_metadata(iox_object_store, "bar", chunk_addr(2)).await;
|
||||||
|
expected.push((path.clone(), metadata.clone()));
|
||||||
let info = CatalogParquetInfo {
|
let info = CatalogParquetInfo {
|
||||||
path: sub1_test2.clone(),
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(&metadata2),
|
metadata: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
t.add_parquet(&info).unwrap();
|
t.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
|
let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(3)).await;
|
||||||
|
expected.push((path.clone(), metadata.clone()));
|
||||||
let info = CatalogParquetInfo {
|
let info = CatalogParquetInfo {
|
||||||
path: sub2_test1.clone(),
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(&metadata1),
|
metadata: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
t.add_parquet(&info).unwrap();
|
t.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
t.commit().await.unwrap();
|
t.commit().await.unwrap();
|
||||||
}
|
}
|
||||||
assert_eq!(catalog.revision_counter(), 1);
|
assert_eq!(catalog.revision_counter(), 1);
|
||||||
assert_catalog_parquet_files(
|
assert_catalog_parquet_files(&state, &expected);
|
||||||
&state,
|
|
||||||
&[
|
|
||||||
(sub1_test1.clone(), metadata2.as_ref().clone()),
|
|
||||||
(sub1_test2.clone(), metadata2.as_ref().clone()),
|
|
||||||
(sub2_test1.clone(), metadata1.as_ref().clone()),
|
|
||||||
(test1.clone(), metadata1.as_ref().clone()),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
trace.record(&catalog, &state, false);
|
trace.record(&catalog, &state, false);
|
||||||
|
|
||||||
// modify catalog with examples
|
// modify catalog with examples
|
||||||
let test4 = make_parquet_file_path();
|
|
||||||
{
|
{
|
||||||
|
let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(4)).await;
|
||||||
|
expected.push((path.clone(), metadata.clone()));
|
||||||
|
|
||||||
let mut t = catalog.open_transaction().await;
|
let mut t = catalog.open_transaction().await;
|
||||||
|
|
||||||
// "real" modifications
|
// "real" modifications
|
||||||
let info = CatalogParquetInfo {
|
let info = CatalogParquetInfo {
|
||||||
path: test4.clone(),
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(&metadata1),
|
metadata: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
t.add_parquet(&info).unwrap();
|
t.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
state.parquet_files.remove(&test1);
|
let (path, _) = expected.remove(0);
|
||||||
t.remove_parquet(&test1);
|
state.remove(&path).unwrap();
|
||||||
|
t.remove_parquet(&path);
|
||||||
|
|
||||||
t.commit().await.unwrap();
|
t.commit().await.unwrap();
|
||||||
}
|
}
|
||||||
assert_eq!(catalog.revision_counter(), 2);
|
assert_eq!(catalog.revision_counter(), 2);
|
||||||
assert_catalog_parquet_files(
|
assert_catalog_parquet_files(&state, &expected);
|
||||||
&state,
|
|
||||||
&[
|
|
||||||
(sub1_test1.clone(), metadata2.as_ref().clone()),
|
|
||||||
(sub1_test2.clone(), metadata2.as_ref().clone()),
|
|
||||||
(sub2_test1.clone(), metadata1.as_ref().clone()),
|
|
||||||
(test4.clone(), metadata1.as_ref().clone()),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
trace.record(&catalog, &state, false);
|
trace.record(&catalog, &state, false);
|
||||||
|
|
||||||
// uncommitted modifications have no effect
|
// uncommitted modifications have no effect
|
||||||
{
|
{
|
||||||
let mut t = catalog.open_transaction().await;
|
let mut t = catalog.open_transaction().await;
|
||||||
|
|
||||||
|
let (path, metadata) = make_metadata(iox_object_store, "foo", chunk_addr(1)).await;
|
||||||
let info = CatalogParquetInfo {
|
let info = CatalogParquetInfo {
|
||||||
path: make_parquet_file_path(),
|
path,
|
||||||
file_size_bytes: 33,
|
file_size_bytes: 33,
|
||||||
metadata: Arc::clone(&metadata1),
|
metadata: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
|
|
||||||
t.add_parquet(&info).unwrap();
|
t.add_parquet(&info).unwrap();
|
||||||
t.remove_parquet(&sub1_test2);
|
t.remove_parquet(&expected[0].0);
|
||||||
|
|
||||||
// NO commit here!
|
// NO commit here!
|
||||||
}
|
}
|
||||||
assert_eq!(catalog.revision_counter(), 2);
|
assert_eq!(catalog.revision_counter(), 2);
|
||||||
assert_catalog_parquet_files(
|
assert_catalog_parquet_files(&state, &expected);
|
||||||
&state,
|
|
||||||
&[
|
|
||||||
(sub1_test1.clone(), metadata2.as_ref().clone()),
|
|
||||||
(sub1_test2.clone(), metadata2.as_ref().clone()),
|
|
||||||
(sub2_test1.clone(), metadata1.as_ref().clone()),
|
|
||||||
(test4.clone(), metadata1.as_ref().clone()),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
trace.record(&catalog, &state, true);
|
trace.record(&catalog, &state, true);
|
||||||
|
|
||||||
trace
|
trace
|
||||||
|
|
|
@ -189,11 +189,11 @@ mod tests {
|
||||||
let mut transaction = catalog.open_transaction().await;
|
let mut transaction = catalog.open_transaction().await;
|
||||||
|
|
||||||
let info = create_parquet_file(&iox_object_store, 0).await;
|
let info = create_parquet_file(&iox_object_store, 0).await;
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
transaction.add_parquet(&info).unwrap();
|
transaction.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
let info = create_parquet_file(&iox_object_store, 1).await;
|
let info = create_parquet_file(&iox_object_store, 1).await;
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
transaction.add_parquet(&info).unwrap();
|
transaction.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
transaction.commit().await.unwrap();
|
transaction.commit().await.unwrap();
|
||||||
|
@ -207,7 +207,7 @@ mod tests {
|
||||||
let mut transaction = catalog.open_transaction().await;
|
let mut transaction = catalog.open_transaction().await;
|
||||||
|
|
||||||
let info = create_parquet_file(&iox_object_store, 2).await;
|
let info = create_parquet_file(&iox_object_store, 2).await;
|
||||||
state.parquet_files.insert(info.path.clone(), info.clone());
|
state.insert(info.clone()).unwrap();
|
||||||
transaction.add_parquet(&info).unwrap();
|
transaction.add_parquet(&info).unwrap();
|
||||||
|
|
||||||
transaction.commit().await.unwrap();
|
transaction.commit().await.unwrap();
|
||||||
|
@ -215,7 +215,7 @@ mod tests {
|
||||||
|
|
||||||
// store catalog state
|
// store catalog state
|
||||||
let paths_expected = {
|
let paths_expected = {
|
||||||
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
|
let mut tmp: Vec<_> = state.files().map(|info| info.path.clone()).collect();
|
||||||
tmp.sort();
|
tmp.sort();
|
||||||
tmp
|
tmp
|
||||||
};
|
};
|
||||||
|
@ -231,7 +231,7 @@ mod tests {
|
||||||
|
|
||||||
// check match
|
// check match
|
||||||
let paths_actual = {
|
let paths_actual = {
|
||||||
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
|
let mut tmp: Vec<_> = state.files().map(|info| info.path.clone()).collect();
|
||||||
tmp.sort();
|
tmp.sort();
|
||||||
tmp
|
tmp
|
||||||
};
|
};
|
||||||
|
@ -259,7 +259,7 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// check match
|
// check match
|
||||||
assert!(state.parquet_files.is_empty());
|
assert!(state.files().next().is_none());
|
||||||
assert_eq!(catalog.revision_counter(), 0);
|
assert_eq!(catalog.revision_counter(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -290,7 +290,7 @@ mod tests {
|
||||||
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), true)
|
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), true)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(state.parquet_files.is_empty());
|
assert!(state.files().next().is_none());
|
||||||
assert_eq!(catalog.revision_counter(), 0);
|
assert_eq!(catalog.revision_counter(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -764,12 +764,6 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) ->
|
||||||
record_batches
|
record_batches
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create an arbitrary ParquetFilePath
|
|
||||||
pub fn make_parquet_file_path() -> ParquetFilePath {
|
|
||||||
let chunk_addr = chunk_addr(3);
|
|
||||||
ParquetFilePath::new(&chunk_addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create test metadata by creating a parquet file and reading it back into memory.
|
/// Create test metadata by creating a parquet file and reading it back into memory.
|
||||||
///
|
///
|
||||||
/// See [`make_chunk`] for details.
|
/// See [`make_chunk`] for details.
|
||||||
|
|
|
@ -3921,7 +3921,7 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let paths_actual = {
|
let paths_actual = {
|
||||||
let mut tmp: Vec<_> = catalog.parquet_files.keys().cloned().collect();
|
let mut tmp: Vec<_> = catalog.files().map(|info| info.path.clone()).collect();
|
||||||
tmp.sort();
|
tmp.sort();
|
||||||
tmp
|
tmp
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue