Merge pull request #2556 from influxdata/crepererum/preserved_catalog_api_split
refactor: split preserved catalog `api` into `core` and `interface`pull/24376/head
commit
ca1973fca7
|
@ -1,7 +1,6 @@
|
|||
//! Methods to cleanup the object store.
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use crate::catalog::api::{CatalogParquetInfo, CatalogState, PreservedCatalog};
|
||||
use futures::TryStreamExt;
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
use object_store::{ObjectStore, ObjectStoreApi};
|
||||
|
@ -9,6 +8,11 @@ use observability_deps::tracing::info;
|
|||
use parking_lot::Mutex;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::catalog::{
|
||||
core::PreservedCatalog,
|
||||
interface::{CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError},
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Error from read operation while cleaning object store: {}", source))]
|
||||
|
@ -22,7 +26,7 @@ pub enum Error {
|
|||
},
|
||||
|
||||
#[snafu(display("Error from catalog loading while cleaning object store: {}", source))]
|
||||
CatalogLoadError { source: crate::catalog::api::Error },
|
||||
CatalogLoadError { source: crate::catalog::core::Error },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -124,12 +128,12 @@ impl CatalogState for TracerCatalogState {
|
|||
&mut self,
|
||||
_iox_object_store: Arc<IoxObjectStore>,
|
||||
info: CatalogParquetInfo,
|
||||
) -> crate::catalog::api::Result<()> {
|
||||
) -> Result<(), CatalogStateAddError> {
|
||||
self.files.lock().insert(info.path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove(&mut self, _path: &ParquetFilePath) -> crate::catalog::api::Result<()> {
|
||||
fn remove(&mut self, _path: &ParquetFilePath) -> Result<(), CatalogStateRemoveError> {
|
||||
// Do NOT remove the file since we still need it for time travel
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
//! Catalog preservation and transaction handling.
|
||||
|
||||
use crate::{
|
||||
catalog::internals::{
|
||||
proto_io::{load_transaction_proto, store_transaction_proto},
|
||||
proto_parse,
|
||||
types::{FileType, TransactionKey},
|
||||
catalog::{
|
||||
interface::{CatalogParquetInfo, CatalogState, CheckpointData},
|
||||
internals::{
|
||||
proto_io::{load_transaction_proto, store_transaction_proto},
|
||||
proto_parse,
|
||||
types::{FileType, TransactionKey},
|
||||
},
|
||||
},
|
||||
metadata::IoxParquetMetaData,
|
||||
};
|
||||
|
@ -113,55 +116,9 @@ pub enum Error {
|
|||
#[snafu(display("Upgrade path not implemented/supported: {}", format))]
|
||||
UnsupportedUpgrade { format: String },
|
||||
|
||||
#[snafu(display("Parquet already exists in catalog: {:?}", path))]
|
||||
ParquetFileAlreadyExists { path: ParquetFilePath },
|
||||
|
||||
#[snafu(display("Parquet does not exist in catalog: {:?}", path))]
|
||||
ParquetFileDoesNotExist { path: ParquetFilePath },
|
||||
|
||||
#[snafu(display("Cannot decode parquet metadata: {}", source))]
|
||||
MetadataDecodingFailed { source: crate::metadata::Error },
|
||||
|
||||
#[snafu(
|
||||
display("Cannot extract metadata from {:?}: {}", path, source),
|
||||
visibility(pub)
|
||||
)]
|
||||
MetadataExtractFailed {
|
||||
source: crate::metadata::Error,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(
|
||||
display("Schema for {:?} does not work with existing schema: {}", path, source),
|
||||
visibility(pub)
|
||||
)]
|
||||
SchemaError {
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(
|
||||
display(
|
||||
"Internal error: Using checkpoints from {:?} leads to broken replay plan: {}, catalog likely broken",
|
||||
path,
|
||||
source
|
||||
),
|
||||
visibility(pub)
|
||||
)]
|
||||
ReplayPlanError {
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(
|
||||
display("Cannot create parquet chunk from {:?}: {}", path, source),
|
||||
visibility(pub)
|
||||
)]
|
||||
ChunkCreationFailed {
|
||||
source: crate::chunk::Error,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(display("Catalog already exists"))]
|
||||
AlreadyExists {},
|
||||
|
||||
|
@ -177,44 +134,20 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Cannot commit transaction: {}", source))]
|
||||
CommitError { source: Box<Error> },
|
||||
|
||||
#[snafu(display("Cannot add parquet file during load: {}", source))]
|
||||
AddError {
|
||||
source: crate::catalog::interface::CatalogStateAddError,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot remove parquet file during load: {}", source))]
|
||||
RemoveError {
|
||||
source: crate::catalog::interface::CatalogStateRemoveError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Struct containing all information that a catalog received for a new parquet file.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CatalogParquetInfo {
|
||||
/// Path within this database.
|
||||
pub path: ParquetFilePath,
|
||||
|
||||
/// Size of the parquet file, in bytes
|
||||
pub file_size_bytes: usize,
|
||||
|
||||
/// Associated parquet metadata.
|
||||
pub metadata: Arc<IoxParquetMetaData>,
|
||||
}
|
||||
|
||||
/// Abstraction over how the in-memory state of the catalog works.
|
||||
pub trait CatalogState {
|
||||
/// Input to create a new empty instance.
|
||||
///
|
||||
/// See [`new_empty`](Self::new_empty) for details.
|
||||
type EmptyInput: Send;
|
||||
|
||||
/// Create empty state w/o any known files.
|
||||
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self;
|
||||
|
||||
/// Add parquet file to state.
|
||||
fn add(
|
||||
&mut self,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
info: CatalogParquetInfo,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Remove parquet file from state.
|
||||
fn remove(&mut self, path: &ParquetFilePath) -> Result<()>;
|
||||
}
|
||||
|
||||
/// In-memory view of the preserved catalog.
|
||||
pub struct PreservedCatalog {
|
||||
// We need an RWLock AND a semaphore, so that readers are NOT blocked during an open
|
||||
|
@ -605,20 +538,22 @@ impl OpenTransaction {
|
|||
|
||||
let metadata = Arc::new(metadata);
|
||||
|
||||
state.add(
|
||||
Arc::clone(iox_object_store),
|
||||
CatalogParquetInfo {
|
||||
path,
|
||||
file_size_bytes,
|
||||
metadata,
|
||||
},
|
||||
)?;
|
||||
state
|
||||
.add(
|
||||
Arc::clone(iox_object_store),
|
||||
CatalogParquetInfo {
|
||||
path,
|
||||
file_size_bytes,
|
||||
metadata,
|
||||
},
|
||||
)
|
||||
.context(AddError)?;
|
||||
}
|
||||
proto::transaction::action::Action::RemoveParquet(a) => {
|
||||
let path =
|
||||
proto_parse::parse_dirs_and_filename(a.path.as_ref().context(PathRequired)?)
|
||||
.context(ProtobufParseError)?;
|
||||
state.remove(&path)?;
|
||||
state.remove(&path).context(RemoveError)?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
|
@ -737,20 +672,6 @@ impl OpenTransaction {
|
|||
}
|
||||
}
|
||||
|
||||
/// Structure that holds all information required to create a checkpoint.
|
||||
///
|
||||
/// Note that while checkpoint are addressed using the same schema as we use for transaction
|
||||
/// (revision counter, UUID), they contain the changes at the end (aka including) the transaction
|
||||
/// they refer.
|
||||
#[derive(Debug)]
|
||||
pub struct CheckpointData {
|
||||
/// List of all Parquet files that are currently (i.e. by the current version) tracked by the
|
||||
/// catalog.
|
||||
///
|
||||
/// If a file was once added but later removed it MUST NOT appear in the result.
|
||||
pub files: HashMap<ParquetFilePath, CatalogParquetInfo>,
|
||||
}
|
||||
|
||||
/// Handle for an open uncommitted transaction.
|
||||
///
|
||||
/// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning.
|
|
@ -223,8 +223,7 @@ mod tests {
|
|||
|
||||
use crate::{
|
||||
catalog::{
|
||||
api::{CatalogParquetInfo, PreservedCatalog},
|
||||
test_helpers::TestCatalogState,
|
||||
core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::TestCatalogState,
|
||||
},
|
||||
test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize},
|
||||
};
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
//! Abstract interfaces to make different users work with the perserved catalog.
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
use snafu::Snafu;
|
||||
|
||||
use crate::metadata::IoxParquetMetaData;
|
||||
|
||||
/// Struct containing all information that a catalog received for a new parquet file.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CatalogParquetInfo {
|
||||
/// Path within this database.
|
||||
pub path: ParquetFilePath,
|
||||
|
||||
/// Size of the parquet file, in bytes
|
||||
pub file_size_bytes: usize,
|
||||
|
||||
/// Associated parquet metadata.
|
||||
pub metadata: Arc<IoxParquetMetaData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum CatalogStateAddError {
|
||||
#[snafu(display("Cannot extract metadata from {:?}: {}", path, source))]
|
||||
MetadataExtractFailed {
|
||||
source: crate::metadata::Error,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(display("Schema for {:?} does not work with existing schema: {}", path, source))]
|
||||
SchemaError {
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(
|
||||
display(
|
||||
"Internal error: Using checkpoints from {:?} leads to broken replay plan: {}, catalog likely broken",
|
||||
path,
|
||||
source
|
||||
),
|
||||
)]
|
||||
ReplayPlanError {
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot create parquet chunk from {:?}: {}", path, source))]
|
||||
ChunkCreationFailed {
|
||||
source: crate::chunk::Error,
|
||||
path: ParquetFilePath,
|
||||
},
|
||||
|
||||
#[snafu(display("Parquet already exists in catalog: {:?}", path))]
|
||||
ParquetFileAlreadyExists { path: ParquetFilePath },
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum CatalogStateRemoveError {
|
||||
#[snafu(display("Parquet does not exist in catalog: {:?}", path))]
|
||||
ParquetFileDoesNotExist { path: ParquetFilePath },
|
||||
}
|
||||
|
||||
/// Abstraction over how the in-memory state of the catalog works.
|
||||
pub trait CatalogState {
|
||||
/// Input to create a new empty instance.
|
||||
///
|
||||
/// See [`new_empty`](Self::new_empty) for details.
|
||||
type EmptyInput: Send;
|
||||
|
||||
/// Create empty state w/o any known files.
|
||||
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self;
|
||||
|
||||
/// Add parquet file to state.
|
||||
fn add(
|
||||
&mut self,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
info: CatalogParquetInfo,
|
||||
) -> Result<(), CatalogStateAddError>;
|
||||
|
||||
/// Remove parquet file from state.
|
||||
fn remove(&mut self, path: &ParquetFilePath) -> Result<(), CatalogStateRemoveError>;
|
||||
}
|
||||
|
||||
/// Structure that holds all information required to create a checkpoint.
|
||||
///
|
||||
/// Note that while checkpoint are addressed using the same schema as we use for transaction
|
||||
/// (revision counter, UUID), they contain the changes at the end (aka including) the transaction
|
||||
/// they refer.
|
||||
#[derive(Debug)]
|
||||
pub struct CheckpointData {
|
||||
/// List of all Parquet files that are currently (i.e. by the current version) tracked by the
|
||||
/// catalog.
|
||||
///
|
||||
/// If a file was once added but later removed it MUST NOT appear in the result.
|
||||
pub files: HashMap<ParquetFilePath, CatalogParquetInfo>,
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
pub mod api;
|
||||
pub mod cleanup;
|
||||
pub mod core;
|
||||
pub mod dump;
|
||||
pub mod interface;
|
||||
mod internals;
|
||||
pub mod prune;
|
||||
pub mod rebuild;
|
||||
|
|
|
@ -8,7 +8,7 @@ use object_store::{ObjectStore, ObjectStoreApi};
|
|||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::catalog::{
|
||||
api::{ProtoIOError, ProtoParseError},
|
||||
core::{ProtoIOError, ProtoParseError},
|
||||
internals::{proto_io::load_transaction_proto, proto_parse::parse_timestamp},
|
||||
};
|
||||
|
||||
|
@ -33,7 +33,7 @@ pub enum Error {
|
|||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Prune history of [`PreservedCatalog`](crate::catalog::api::PreservedCatalog).
|
||||
/// Prune history of [`PreservedCatalog`](crate::catalog::core::PreservedCatalog).
|
||||
///
|
||||
/// This deletes all transactions and checkpoints that were started prior to `before`. Note that this only deletes data
|
||||
/// that is safe to delete when time travel to `before` is allowed. For example image the following transactions:
|
||||
|
@ -133,8 +133,7 @@ fn is_checkpoint_or_zero(path: &TransactionFilePath) -> bool {
|
|||
mod tests {
|
||||
use crate::{
|
||||
catalog::{
|
||||
api::{CheckpointData, PreservedCatalog},
|
||||
test_helpers::TestCatalogState,
|
||||
core::PreservedCatalog, interface::CheckpointData, test_helpers::TestCatalogState,
|
||||
},
|
||||
test_utils::make_iox_object_store,
|
||||
};
|
||||
|
|
|
@ -7,13 +7,17 @@ use observability_deps::tracing::error;
|
|||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::{
|
||||
catalog::api::{CatalogParquetInfo, CatalogState, PreservedCatalog},
|
||||
catalog::{
|
||||
core::PreservedCatalog,
|
||||
interface::{CatalogParquetInfo, CatalogState},
|
||||
},
|
||||
metadata::IoxParquetMetaData,
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Cannot create new empty catalog: {}", source))]
|
||||
NewEmptyFailure { source: crate::catalog::api::Error },
|
||||
NewEmptyFailure { source: crate::catalog::core::Error },
|
||||
|
||||
#[snafu(display("Cannot read store: {}", source))]
|
||||
ReadFailure { source: object_store::Error },
|
||||
|
@ -25,13 +29,15 @@ pub enum Error {
|
|||
},
|
||||
|
||||
#[snafu(display("Cannot add file to transaction: {}", source))]
|
||||
FileRecordFailure { source: crate::catalog::api::Error },
|
||||
FileRecordFailure {
|
||||
source: crate::catalog::interface::CatalogStateAddError,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot commit transaction: {}", source))]
|
||||
CommitFailure { source: crate::catalog::api::Error },
|
||||
CommitFailure { source: crate::catalog::core::Error },
|
||||
|
||||
#[snafu(display("Cannot create checkpoint: {}", source))]
|
||||
CheckpointFailure { source: crate::catalog::api::Error },
|
||||
CheckpointFailure { source: crate::catalog::core::Error },
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
|
@ -164,7 +170,7 @@ async fn read_parquet(
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
catalog::{api::PreservedCatalog, test_helpers::TestCatalogState},
|
||||
catalog::{core::PreservedCatalog, test_helpers::TestCatalogState},
|
||||
metadata::IoxMetadata,
|
||||
storage::{MemWriter, Storage},
|
||||
test_utils::{
|
||||
|
|
|
@ -12,7 +12,11 @@ use snafu::ResultExt;
|
|||
|
||||
use crate::{
|
||||
catalog::{
|
||||
api::{CatalogParquetInfo, CatalogState, CheckpointData, PreservedCatalog},
|
||||
core::PreservedCatalog,
|
||||
interface::{
|
||||
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
|
||||
CheckpointData,
|
||||
},
|
||||
internals::{
|
||||
proto_io::{load_transaction_proto, store_transaction_proto},
|
||||
types::TransactionKey,
|
||||
|
@ -61,8 +65,8 @@ impl TestCatalogState {
|
|||
}
|
||||
|
||||
/// Inserts a file into this catalog state
|
||||
pub fn insert(&mut self, info: CatalogParquetInfo) -> crate::catalog::api::Result<()> {
|
||||
use crate::catalog::api::{Error, MetadataExtractFailed};
|
||||
pub fn insert(&mut self, info: CatalogParquetInfo) -> Result<(), CatalogStateAddError> {
|
||||
use crate::catalog::interface::MetadataExtractFailed;
|
||||
|
||||
let iox_md = info
|
||||
.metadata
|
||||
|
@ -80,7 +84,7 @@ impl TestCatalogState {
|
|||
|
||||
match partition.chunks.entry(iox_md.chunk_id) {
|
||||
Occupied(o) => {
|
||||
return Err(Error::ParquetFileAlreadyExists {
|
||||
return Err(CatalogStateAddError::ParquetFileAlreadyExists {
|
||||
path: o.get().path.clone(),
|
||||
});
|
||||
}
|
||||
|
@ -104,13 +108,11 @@ impl CatalogState for TestCatalogState {
|
|||
&mut self,
|
||||
_iox_object_store: Arc<IoxObjectStore>,
|
||||
info: CatalogParquetInfo,
|
||||
) -> crate::catalog::api::Result<()> {
|
||||
) -> Result<(), CatalogStateAddError> {
|
||||
self.insert(info)
|
||||
}
|
||||
|
||||
fn remove(&mut self, path: &ParquetFilePath) -> crate::catalog::api::Result<()> {
|
||||
use crate::catalog::api::Error;
|
||||
|
||||
fn remove(&mut self, path: &ParquetFilePath) -> Result<(), CatalogStateRemoveError> {
|
||||
let partitions = self
|
||||
.tables
|
||||
.values_mut()
|
||||
|
@ -136,7 +138,7 @@ impl CatalogState for TestCatalogState {
|
|||
}
|
||||
|
||||
match removed {
|
||||
0 => Err(Error::ParquetFileDoesNotExist { path: path.clone() }),
|
||||
0 => Err(CatalogStateRemoveError::ParquetFileDoesNotExist { path: path.clone() }),
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
@ -173,8 +175,6 @@ where
|
|||
S: CatalogState + Debug + Send + Sync,
|
||||
F: Fn(&S) -> CheckpointData + Send,
|
||||
{
|
||||
use crate::catalog::api::Error;
|
||||
|
||||
// empty state
|
||||
let iox_object_store = make_iox_object_store().await;
|
||||
let (_catalog, mut state) =
|
||||
|
@ -317,11 +317,17 @@ where
|
|||
},
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||
assert!(matches!(
|
||||
err,
|
||||
CatalogStateAddError::ParquetFileAlreadyExists { .. }
|
||||
));
|
||||
|
||||
// does not exist as has a different UUID
|
||||
let err = state.remove(&path).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||
assert!(matches!(
|
||||
err,
|
||||
CatalogStateRemoveError::ParquetFileDoesNotExist { .. }
|
||||
));
|
||||
}
|
||||
assert_checkpoint(&state, &f, &expected);
|
||||
|
||||
|
@ -340,7 +346,10 @@ where
|
|||
},
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||
assert!(matches!(
|
||||
err,
|
||||
CatalogStateAddError::ParquetFileAlreadyExists { .. }
|
||||
));
|
||||
|
||||
// this transaction will still work
|
||||
let (path, metadata) =
|
||||
|
@ -369,12 +378,18 @@ where
|
|||
},
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
|
||||
assert!(matches!(
|
||||
err,
|
||||
CatalogStateAddError::ParquetFileAlreadyExists { .. }
|
||||
));
|
||||
|
||||
// does not exist - as different UUID
|
||||
let path = ParquetFilePath::new(&chunk_addr(7));
|
||||
let err = state.remove(&path).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||
assert!(matches!(
|
||||
err,
|
||||
CatalogStateRemoveError::ParquetFileDoesNotExist { .. }
|
||||
));
|
||||
|
||||
// this still works
|
||||
let (path, _) = expected.remove(&7).unwrap();
|
||||
|
@ -382,7 +397,10 @@ where
|
|||
|
||||
// recently removed
|
||||
let err = state.remove(&path).unwrap_err();
|
||||
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
|
||||
assert!(matches!(
|
||||
err,
|
||||
CatalogStateRemoveError::ParquetFileDoesNotExist { .. }
|
||||
));
|
||||
}
|
||||
assert_checkpoint(&state, &f, &expected);
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro
|
|||
/// For breaking changes, this will change.
|
||||
///
|
||||
/// **Important: When changing this structure, consider bumping the
|
||||
/// [catalog transaction version](crate::catalog::api::TRANSACTION_VERSION)!**
|
||||
/// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!**
|
||||
pub const METADATA_VERSION: u32 = 6;
|
||||
|
||||
/// File-level metadata key to store the IOx-specific data.
|
||||
|
|
|
@ -18,7 +18,7 @@ use internal_types::freezable::Freezable;
|
|||
use iox_object_store::IoxObjectStore;
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
|
||||
use parquet_file::catalog::api::PreservedCatalog;
|
||||
use parquet_file::catalog::core::PreservedCatalog;
|
||||
use persistence_windows::checkpoint::ReplayPlan;
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::{future::Future, sync::Arc, time::Duration};
|
||||
|
@ -60,7 +60,7 @@ pub enum Error {
|
|||
))]
|
||||
WipePreservedCatalog {
|
||||
db_name: String,
|
||||
source: Box<parquet_file::catalog::api::Error>,
|
||||
source: Box<parquet_file::catalog::core::Error>,
|
||||
},
|
||||
|
||||
#[snafu(display("failed to skip replay for database ({}): {}", db_name, source))]
|
||||
|
|
|
@ -32,8 +32,9 @@ use iox_object_store::IoxObjectStore;
|
|||
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
|
||||
use observability_deps::tracing::{debug, error, info};
|
||||
use parquet_file::catalog::{
|
||||
api::{CatalogParquetInfo, CheckpointData, PreservedCatalog},
|
||||
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
|
||||
core::PreservedCatalog,
|
||||
interface::{CatalogParquetInfo, CheckpointData},
|
||||
prune::prune_history as prune_catalog_transaction_history,
|
||||
};
|
||||
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
|
||||
|
|
|
@ -49,7 +49,7 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Error while commiting transaction on preserved catalog: {}", source))]
|
||||
CommitError {
|
||||
source: parquet_file::catalog::api::Error,
|
||||
source: parquet_file::catalog::core::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot write chunk: {}", addr))]
|
||||
|
|
|
@ -17,7 +17,7 @@ use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job};
|
|||
use internal_types::selection::Selection;
|
||||
use observability_deps::tracing::{debug, warn};
|
||||
use parquet_file::{
|
||||
catalog::api::CatalogParquetInfo,
|
||||
catalog::interface::CatalogParquetInfo,
|
||||
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
|
||||
metadata::IoxMetadata,
|
||||
storage::Storage,
|
||||
|
|
|
@ -1,11 +1,17 @@
|
|||
//! Functionality to load a [`Catalog`](crate::db::catalog::Catalog) and other information from a
|
||||
//! [`PreservedCatalog`](parquet_file::catalog::api::PreservedCatalog).
|
||||
//! [`PreservedCatalog`](parquet_file::catalog::core::PreservedCatalog).
|
||||
|
||||
use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog};
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use parquet_file::{
|
||||
catalog::api::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
|
||||
catalog::{
|
||||
core::PreservedCatalog,
|
||||
interface::{
|
||||
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
|
||||
ChunkCreationFailed,
|
||||
},
|
||||
},
|
||||
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
|
||||
};
|
||||
use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner};
|
||||
|
@ -22,17 +28,17 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Cannot create new empty preserved catalog: {}", source))]
|
||||
CannotCreateCatalog {
|
||||
source: parquet_file::catalog::api::Error,
|
||||
source: parquet_file::catalog::core::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot load preserved catalog: {}", source))]
|
||||
CannotLoadCatalog {
|
||||
source: parquet_file::catalog::api::Error,
|
||||
source: parquet_file::catalog::core::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot wipe preserved catalog: {}", source))]
|
||||
CannotWipeCatalog {
|
||||
source: parquet_file::catalog::api::Error,
|
||||
source: parquet_file::catalog::core::Error,
|
||||
},
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -166,8 +172,10 @@ impl CatalogState for Loader {
|
|||
&mut self,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
info: CatalogParquetInfo,
|
||||
) -> parquet_file::catalog::api::Result<()> {
|
||||
use parquet_file::catalog::api::{MetadataExtractFailed, ReplayPlanError, SchemaError};
|
||||
) -> Result<(), CatalogStateAddError> {
|
||||
use parquet_file::catalog::interface::{
|
||||
MetadataExtractFailed, ReplayPlanError, SchemaError,
|
||||
};
|
||||
|
||||
// extract relevant bits from parquet file metadata
|
||||
let iox_md = info
|
||||
|
@ -212,9 +220,7 @@ impl CatalogState for Loader {
|
|||
.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
|
||||
let mut partition = partition.write();
|
||||
if partition.chunk(iox_md.chunk_id).is_some() {
|
||||
return Err(
|
||||
parquet_file::catalog::api::Error::ParquetFileAlreadyExists { path: info.path },
|
||||
);
|
||||
return Err(CatalogStateAddError::ParquetFileAlreadyExists { path: info.path });
|
||||
}
|
||||
let schema_handle = TableSchemaUpsertHandle::new(&table_schema, &parquet_chunk.schema())
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
|
@ -234,7 +240,7 @@ impl CatalogState for Loader {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn remove(&mut self, path: &ParquetFilePath) -> parquet_file::catalog::api::Result<()> {
|
||||
fn remove(&mut self, path: &ParquetFilePath) -> Result<(), CatalogStateRemoveError> {
|
||||
let mut removed_any = false;
|
||||
|
||||
for partition in self.catalog.partitions() {
|
||||
|
@ -261,7 +267,7 @@ impl CatalogState for Loader {
|
|||
if removed_any {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(parquet_file::catalog::api::Error::ParquetFileDoesNotExist { path: path.clone() })
|
||||
Err(CatalogStateRemoveError::ParquetFileDoesNotExist { path: path.clone() })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -273,7 +279,7 @@ mod tests {
|
|||
use data_types::{server_id::ServerId, DatabaseName};
|
||||
use object_store::ObjectStore;
|
||||
use parquet_file::catalog::{
|
||||
api::CheckpointData,
|
||||
interface::CheckpointData,
|
||||
test_helpers::{assert_catalog_state_implementation, TestCatalogState},
|
||||
};
|
||||
use std::convert::TryFrom;
|
||||
|
|
|
@ -1154,7 +1154,7 @@ mod tests {
|
|||
path::{parsed::DirsAndFileName, ObjectStorePath},
|
||||
ObjectStore, ObjectStoreApi,
|
||||
};
|
||||
use parquet_file::catalog::{api::PreservedCatalog, test_helpers::TestCatalogState};
|
||||
use parquet_file::catalog::{core::PreservedCatalog, test_helpers::TestCatalogState};
|
||||
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase};
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
|
|
Loading…
Reference in New Issue