refactor: make PreservedCatalog easier to integrate

pull/24376/head
Marco Neumann 2021-05-17 11:23:54 +02:00
parent f4d7154746
commit 4fb800c7a6
1 changed files with 404 additions and 129 deletions

View File

@ -19,9 +19,11 @@ use object_store::{
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::{info, warn};
use parking_lot::RwLock;
use parquet::file::metadata::ParquetMetaData;
use prost::{DecodeError, EncodeError, Message};
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::sync::{Semaphore, SemaphorePermit};
use uuid::Uuid;
/// Current version for serialized transactions.
@ -126,27 +128,124 @@ pub enum Error {
#[snafu(display("Cannot decode parquet metadata: {}", source))]
MetadataDecodingFailed { source: crate::metadata::Error },
#[snafu(display("Cannot parse path {:?}: {}", path, source), visibility(pub))]
PathParseFailed {
source: crate::storage::Error,
path: DirsAndFileName,
},
#[snafu(
display("Cannot read schema from {:?}: {}", path, source),
visibility(pub)
)]
SchemaReadFailed {
source: crate::storage::Error,
path: DirsAndFileName,
},
#[snafu(
display("Cannot read statistics from {:?}: {}", path, source),
visibility(pub)
)]
StatisticsReadFailed {
source: crate::metadata::Error,
path: DirsAndFileName,
},
#[snafu(
display("Catalog state failure when processing {:?}: {}", path, source),
visibility(pub)
)]
CatalogStateFailure {
source: Box<dyn std::error::Error + Send + Sync>,
path: DirsAndFileName,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// In-memory view of the preserved catalog.
/// Struct containing all information that a catalog received for a new parquet file.
#[derive(Debug)]
pub struct CatalogParquetInfo {
/// Full path.
pub path: DirsAndFileName,
/// Associated parquet metadata.
pub metadata: ParquetMetaData,
}
/// Abstraction over how the in-memory state of the catalog works.
pub trait CatalogState {
/// Input to create a new empty instance.
///
/// **NOTE: This is a temporary structure until this module is wired up to the real in-memory catalog!**
pub struct PreservedCatalog {
/// See [`new_empty`](Self::new_empty) for details.
type EmptyInput;
/// Create empty state w/o any known files.
fn new_empty(data: Self::EmptyInput) -> Self;
/// Opens a new state.
///
/// Depending if the state implements full copy-on-write semantics, do one of the following:
///
/// - clone state
/// - return a pointer to self (e.g. using an [`Arc`](std::sync::Arc))
fn clone_or_keep(origin: &Arc<Self>) -> Arc<Self>;
/// Add parquet file to state.
fn add(
&self,
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: &str,
info: CatalogParquetInfo,
) -> Result<()>;
/// Remove parquet file from state.
fn remove(&self, path: DirsAndFileName) -> Result<()>;
}
/// Inner mutable part of the preserved catalog.
struct PreservedCatalogInner<S>
where
S: CatalogState,
{
previous_tkey: Option<TransactionKey>,
state: CatalogState,
state: Arc<S>,
}
/// In-memory view of the preserved catalog.
pub struct PreservedCatalog<S>
where
S: CatalogState,
{
inner: RwLock<PreservedCatalogInner<S>>,
transaction_semaphore: Semaphore,
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: String,
}
impl PreservedCatalog {
impl<S> PreservedCatalog<S>
where
S: CatalogState,
{
/// Create new catalog w/o any data.
pub fn new_empty(object_store: Arc<ObjectStore>, server_id: ServerId, db_name: String) -> Self {
Self {
pub fn new_empty(
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: String,
state_data: S::EmptyInput,
) -> Self {
let inner = PreservedCatalogInner {
previous_tkey: None,
state: CatalogState::new_empty(),
state: Arc::new(S::new_empty(state_data)),
};
Self {
inner: RwLock::new(inner),
transaction_semaphore: Semaphore::new(1),
object_store,
server_id,
db_name,
@ -158,6 +257,7 @@ impl PreservedCatalog {
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: String,
state_data: S::EmptyInput,
) -> Result<Option<Self>> {
// parse all paths into revisions
let list_path = transactions_path(&object_store, server_id, &db_name);
@ -210,7 +310,7 @@ impl PreservedCatalog {
// read and replay revisions
let max_revision = max_revision.expect("transactions list is not empty here");
let mut state = CatalogState::new_empty();
let mut state = Arc::new(CatalogState::new_empty(state_data));
let mut last_tkey = None;
for rev in 0..=max_revision {
let uuid = transactions.get(&rev).context(MissingTransaction {
@ -220,12 +320,13 @@ impl PreservedCatalog {
revision_counter: rev,
uuid: *uuid,
};
let tmp_state = S::clone_or_keep(&state);
let transaction = OpenTransaction::load_and_apply(
&object_store,
server_id,
&db_name,
&tkey,
state.clone(),
tmp_state,
&last_tkey,
)
.await?;
@ -233,9 +334,14 @@ impl PreservedCatalog {
state = transaction.next_state;
}
Ok(Some(Self {
let inner = PreservedCatalogInner {
previous_tkey: last_tkey,
state,
};
Ok(Some(Self {
inner: RwLock::new(inner),
transaction_semaphore: Semaphore::new(1),
object_store,
server_id,
db_name,
@ -243,24 +349,31 @@ impl PreservedCatalog {
}
/// Open a new transaction.
pub fn open_transaction(&mut self) -> TransactionHandle<'_> {
TransactionHandle::new(self)
pub async fn open_transaction(&self) -> TransactionHandle<'_, S> {
TransactionHandle::new(self).await
}
/// Return current state.
pub fn state(&self) -> &CatalogState {
&self.state
pub fn state(&self) -> Arc<S> {
Arc::clone(&self.inner.read().state)
}
/// Get latest revision counter.
///
/// This can be `None` for a newly created catalog.
pub fn revision_counter(&self) -> Option<u64> {
self.previous_tkey.clone().map(|tkey| tkey.revision_counter)
self.inner
.read()
.previous_tkey
.clone()
.map(|tkey| tkey.revision_counter)
}
}
impl Debug for PreservedCatalog {
impl<S> Debug for PreservedCatalog<S>
where
S: CatalogState,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PreservedCatalog{{..}}")
}
@ -431,36 +544,27 @@ impl Display for TransactionKey {
}
}
/// In-memory catalog state, for testing.
#[derive(Clone, Debug)]
pub struct CatalogState {
pub parquet_files: HashMap<DirsAndFileName, ParquetMetaData>,
}
impl CatalogState {
/// Create new empty state w/o any tracked files.
fn new_empty() -> Self {
Self {
parquet_files: HashMap::new(),
}
}
}
/// Tracker for an open, uncommitted transaction.
struct OpenTransaction {
next_state: CatalogState,
struct OpenTransaction<S>
where
S: CatalogState,
{
next_state: Arc<S>,
proto: proto::Transaction,
}
impl OpenTransaction {
fn new(catalog: &PreservedCatalog) -> Self {
let (revision_counter, previous_uuid) = match &catalog.previous_tkey {
impl<S> OpenTransaction<S>
where
S: CatalogState,
{
fn new(catalog_inner: &PreservedCatalogInner<S>) -> Self {
let (revision_counter, previous_uuid) = match &catalog_inner.previous_tkey {
Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()),
None => (0, String::new()),
};
Self {
next_state: catalog.state.clone(),
next_state: S::clone_or_keep(&catalog_inner.state),
proto: proto::Transaction {
actions: vec![],
version: TRANSACTION_VERSION,
@ -479,8 +583,11 @@ impl OpenTransaction {
}
fn handle_action(
state: &mut CatalogState,
state: &S,
action: &proto::transaction::action::Action,
object_store: &Arc<ObjectStore>,
server_id: ServerId,
db_name: &str,
) -> Result<()> {
match action {
proto::transaction::action::Action::Upgrade(u) => {
@ -491,29 +598,18 @@ impl OpenTransaction {
}
proto::transaction::action::Action::AddParquet(a) => {
let path = parse_dirs_and_filename(&a.path)?;
match state.parquet_files.entry(path) {
Occupied(o) => {
return Err(Error::ParquetFileAlreadyExists {
path: o.key().clone(),
});
}
Vacant(v) => {
let metadata = thrift_to_parquet_metadata(&a.metadata)
.context(MetadataDecodingFailed)?;
v.insert(metadata);
}
}
let metadata =
thrift_to_parquet_metadata(&a.metadata).context(MetadataDecodingFailed)?;
state.add(
Arc::clone(object_store),
server_id,
db_name,
CatalogParquetInfo { path, metadata },
)?;
}
proto::transaction::action::Action::RemoveParquet(a) => {
let path = parse_dirs_and_filename(&a.path)?;
match state.parquet_files.entry(path) {
Occupied(o) => {
o.remove();
}
Vacant(v) => {
return Err(Error::ParquetFileDoesNotExist { path: v.into_key() });
}
}
state.remove(path)?;
}
};
Ok(())
@ -522,18 +618,21 @@ impl OpenTransaction {
fn handle_action_and_record(
&mut self,
action: proto::transaction::action::Action,
object_store: &Arc<ObjectStore>,
server_id: ServerId,
db_name: &str,
) -> Result<()> {
Self::handle_action(&mut self.next_state, &action)?;
Self::handle_action(&self.next_state, &action, object_store, server_id, db_name)?;
self.proto.actions.push(proto::transaction::Action {
action: Some(action),
});
Ok(())
}
fn commit(mut self, catalog: &mut PreservedCatalog) {
fn commit(mut self, catalog_inner: &mut PreservedCatalogInner<S>) {
let tkey = self.tkey();
std::mem::swap(&mut catalog.state, &mut self.next_state);
catalog.previous_tkey = Some(tkey);
std::mem::swap(&mut catalog_inner.state, &mut self.next_state);
catalog_inner.previous_tkey = Some(tkey);
}
async fn store(
@ -548,11 +647,11 @@ impl OpenTransaction {
}
async fn load_and_apply(
object_store: &ObjectStore,
object_store: &Arc<ObjectStore>,
server_id: ServerId,
db_name: &str,
tkey: &TransactionKey,
mut state: CatalogState,
state: Arc<S>,
last_tkey: &Option<TransactionKey>,
) -> Result<Self> {
// recover state from store
@ -599,7 +698,7 @@ impl OpenTransaction {
// apply
for action in &proto.actions {
if let Some(action) = action.action.as_ref() {
Self::handle_action(&mut state, action)?;
Self::handle_action(&state, action, object_store, server_id, db_name)?;
}
}
@ -613,20 +712,45 @@ impl OpenTransaction {
/// Handle for an open uncommitted transaction.
///
/// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning.
pub struct TransactionHandle<'c> {
catalog: &'c mut PreservedCatalog,
transaction: Option<OpenTransaction>,
pub struct TransactionHandle<'c, S>
where
S: CatalogState,
{
catalog: &'c PreservedCatalog<S>,
// NOTE: The permit is technically used since we use it to reference the semaphore. It implements `drop` which we
// rely on.
#[allow(dead_code)]
permit: SemaphorePermit<'c>,
transaction: Option<OpenTransaction<S>>,
}
impl<'c> TransactionHandle<'c> {
fn new(catalog: &'c mut PreservedCatalog) -> Self {
let transaction = OpenTransaction::new(catalog);
impl<'c, S> TransactionHandle<'c, S>
where
S: CatalogState,
{
async fn new(catalog: &'c PreservedCatalog<S>) -> TransactionHandle<'c, S> {
// first acquire semaphore (which is only being used for transactions), then get state lock
let permit = catalog
.transaction_semaphore
.acquire()
.await
.expect("semaphore should not be closed");
let inner_guard = catalog.inner.write();
let transaction = OpenTransaction::new(&inner_guard);
// free state for readers again
drop(inner_guard);
let tkey = transaction.tkey();
info!(?tkey, "transaction started");
Self {
catalog,
transaction: Some(transaction),
permit,
}
}
@ -647,7 +771,9 @@ impl<'c> TransactionHandle<'c> {
let t = std::mem::take(&mut self.transaction)
.expect("calling .commit on a closed transaction?!");
let tkey = t.tkey();
t.commit(self.catalog);
let mut inner_guard = self.catalog.inner.write();
t.commit(&mut inner_guard);
drop(inner_guard);
info!(?tkey, "transaction committed");
Ok(())
@ -664,13 +790,16 @@ impl<'c> TransactionHandle<'c> {
self.transaction
.as_mut()
.expect("transaction handle w/o transaction?!")
.handle_action_and_record(proto::transaction::action::Action::AddParquet(
proto::AddParquet {
.handle_action_and_record(
proto::transaction::action::Action::AddParquet(proto::AddParquet {
path: Some(unparse_dirs_and_filename(path)),
metadata: parquet_metadata_to_thrift(metadata)
.context(MetadataEncodingFailed)?,
},
))
}),
&self.catalog.object_store,
self.catalog.server_id,
&self.catalog.db_name,
)
}
/// Remove a parquet file from the catalog.
@ -680,15 +809,21 @@ impl<'c> TransactionHandle<'c> {
self.transaction
.as_mut()
.expect("transaction handle w/o transaction?!")
.handle_action_and_record(proto::transaction::action::Action::RemoveParquet(
proto::RemoveParquet {
.handle_action_and_record(
proto::transaction::action::Action::RemoveParquet(proto::RemoveParquet {
path: Some(unparse_dirs_and_filename(path)),
},
))
}),
&self.catalog.object_store,
self.catalog.server_id,
&self.catalog.db_name,
)
}
}
impl<'c> Debug for TransactionHandle<'c> {
impl<'c, S> Debug for TransactionHandle<'c, S>
where
S: CatalogState,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.transaction {
Some(t) => write!(f, "TransactionHandle(open, {})", t.tkey()),
@ -697,7 +832,10 @@ impl<'c> Debug for TransactionHandle<'c> {
}
}
impl<'c> Drop for TransactionHandle<'c> {
impl<'c, S> Drop for TransactionHandle<'c, S>
where
S: CatalogState,
{
fn drop(&mut self) {
if self.transaction.is_some() {
warn!(?self, "dropped uncommitted transaction");
@ -707,7 +845,7 @@ impl<'c> Drop for TransactionHandle<'c> {
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;
use std::{cell::RefCell, num::NonZeroU32, ops::Deref};
use crate::{
metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata},
@ -718,6 +856,67 @@ mod tests {
use super::*;
#[derive(Clone, Debug)]
struct TestCatalogStateInner {
pub parquet_files: HashMap<DirsAndFileName, ParquetMetaData>,
}
/// In-memory catalog state, for testing.
#[derive(Clone, Debug)]
struct TestCatalogState {
pub inner: RefCell<TestCatalogStateInner>,
}
impl CatalogState for TestCatalogState {
type EmptyInput = ();
fn new_empty(_data: Self::EmptyInput) -> Self {
Self {
inner: RefCell::new(TestCatalogStateInner {
parquet_files: HashMap::new(),
}),
}
}
fn clone_or_keep(origin: &Arc<Self>) -> Arc<Self> {
Arc::new(origin.deref().clone())
}
fn add(
&self,
_object_store: Arc<ObjectStore>,
_server_id: ServerId,
_db_name: &str,
info: CatalogParquetInfo,
) -> Result<()> {
match self.inner.borrow_mut().parquet_files.entry(info.path) {
Occupied(o) => {
return Err(Error::ParquetFileAlreadyExists {
path: o.key().clone(),
});
}
Vacant(v) => {
v.insert(info.metadata);
}
}
Ok(())
}
fn remove(&self, path: DirsAndFileName) -> Result<()> {
match self.inner.borrow_mut().parquet_files.entry(path) {
Occupied(o) => {
o.remove();
}
Vacant(v) => {
return Err(Error::ParquetFileDoesNotExist { path: v.into_key() });
}
}
Ok(())
}
}
#[tokio::test]
async fn test_inmem_commit_semantics() {
let object_store = make_object_store();
@ -733,7 +932,12 @@ mod tests {
#[tokio::test]
async fn test_load_from_empty_store() {
let object_store = make_object_store();
let option = PreservedCatalog::load(object_store, make_server_id(), "db1".to_string())
let option = PreservedCatalog::<TestCatalogState>::load(
object_store,
make_server_id(),
"db1".to_string(),
(),
)
.await
.unwrap();
assert!(option.is_none());
@ -782,7 +986,12 @@ mod tests {
create_empty_file(&object_store, &path).await;
// no data present
let option = PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.clone())
let option = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.clone(),
(),
)
.await
.unwrap();
assert!(option.is_none());
@ -805,8 +1014,13 @@ mod tests {
checked_delete(&object_store, &path).await;
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert!(matches!(
res,
Err(Error::MissingTransaction {
@ -833,8 +1047,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Format version of transaction file for revision 0 is 42 but only [1] are supported"
@ -859,8 +1078,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Wrong revision counter in transaction file: expected 0 but found 42"
@ -887,8 +1111,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
format!(
@ -916,8 +1145,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"UUID required but not provided"
@ -942,8 +1176,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Cannot parse UUID: invalid length: expected one of [36, 32], found 3"
@ -968,8 +1207,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(res.unwrap_err().to_string(), "Wrong link to previous UUID in revision 0: expected None but found Some(00000000-0000-0000-0000-000000000000)");
}
@ -991,8 +1235,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(res.unwrap_err().to_string(), format!("Wrong link to previous UUID in revision 1: expected Some({}) but found Some(00000000-0000-0000-0000-000000000000)", trace.tkeys[0].uuid));
}
@ -1014,8 +1263,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Cannot parse UUID: invalid length: expected one of [36, 32], found 3"
@ -1025,9 +1279,13 @@ mod tests {
#[tokio::test]
async fn test_transaction_handle_debug() {
let object_store = make_object_store();
let mut catalog =
PreservedCatalog::new_empty(object_store, make_server_id(), "db1".to_string());
let mut t = catalog.open_transaction();
let catalog = PreservedCatalog::<TestCatalogState>::new_empty(
object_store,
make_server_id(),
"db1".to_string(),
(),
);
let mut t = catalog.open_transaction().await;
// open transaction
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string();
@ -1064,8 +1322,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
let (uuid1, uuid2) = if old_uuid < new_uuid {
(old_uuid, new_uuid)
} else {
@ -1098,8 +1361,13 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res =
PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await;
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Upgrade path not implemented/supported: foo",
@ -1107,8 +1375,10 @@ mod tests {
}
/// Get sorted list of catalog files from state
fn get_catalog_parquet_files(state: &CatalogState) -> Vec<(String, ParquetMetaData)> {
fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, ParquetMetaData)> {
let mut files: Vec<(String, ParquetMetaData)> = state
.inner
.borrow()
.parquet_files
.iter()
.map(|(path, md)| (path.display(), md.clone()))
@ -1119,10 +1389,10 @@ mod tests {
/// Assert that set of parquet files tracked by a catalog are identical to the given sorted list.
fn assert_catalog_parquet_files(
catalog: &PreservedCatalog,
catalog: &PreservedCatalog<TestCatalogState>,
expected: &[(String, ParquetMetaData)],
) {
let actual = get_catalog_parquet_files(catalog.state());
let actual = get_catalog_parquet_files(&catalog.state());
for ((actual_path, actual_md), (expected_path, expected_md)) in
actual.iter().zip(expected.iter())
{
@ -1180,7 +1450,7 @@ mod tests {
/// Result of [`assert_single_catalog_inmem_works`].
struct TestTrace {
tkeys: Vec<TransactionKey>,
states: Vec<CatalogState>,
states: Vec<TestCatalogState>,
}
impl TestTrace {
@ -1191,9 +1461,10 @@ mod tests {
}
}
fn record(&mut self, catalog: &PreservedCatalog) {
self.tkeys.push(catalog.previous_tkey.clone().unwrap());
self.states.push(catalog.state.clone());
fn record(&mut self, catalog: &PreservedCatalog<TestCatalogState>) {
self.tkeys
.push(catalog.inner.read().previous_tkey.clone().unwrap());
self.states.push(catalog.state().deref().clone());
}
}
@ -1202,8 +1473,12 @@ mod tests {
server_id: ServerId,
db_name: &str,
) -> TestTrace {
let mut catalog =
PreservedCatalog::new_empty(Arc::clone(&object_store), server_id, db_name.to_string());
let catalog = PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
);
// get some test metadata
let metadata1 = make_metadata(object_store, "foo").await;
@ -1218,7 +1493,7 @@ mod tests {
// fill catalog with examples
{
let mut t = catalog.open_transaction();
let mut t = catalog.open_transaction().await;
t.add_parquet(&parsed_path!("test1"), &metadata1).unwrap();
t.add_parquet(&parsed_path!(["sub1"], "test1"), &metadata2)
@ -1244,7 +1519,7 @@ mod tests {
// modify catalog with examples
{
let mut t = catalog.open_transaction();
let mut t = catalog.open_transaction().await;
// "real" modifications
t.add_parquet(&parsed_path!("test4"), &metadata1).unwrap();
@ -1274,7 +1549,7 @@ mod tests {
// uncommitted modifications have no effect
{
let mut t = catalog.open_transaction();
let mut t = catalog.open_transaction().await;
t.add_parquet(&parsed_path!("test5"), &metadata1).unwrap();
t.remove_parquet(&parsed_path!(["sub1"], "test2")).unwrap();
@ -1306,7 +1581,7 @@ mod tests {
// load catalog from store and check replayed state
let catalog =
PreservedCatalog::load(Arc::clone(object_store), server_id, db_name.to_string())
PreservedCatalog::load(Arc::clone(object_store), server_id, db_name.to_string(), ())
.await
.unwrap()
.unwrap();