chore: new `parquet_catalog` crate

pull/24376/head
Marco Neumann 2021-10-14 14:34:59 +02:00
parent e191210c5b
commit 28195b9c0c
26 changed files with 201 additions and 127 deletions

38
Cargo.lock generated
View File

@ -1678,6 +1678,7 @@ dependencies = [
"panic_logging",
"parking_lot",
"parquet",
"parquet_catalog",
"parquet_file",
"pprof",
"predicate",
@ -2687,6 +2688,42 @@ dependencies = [
"thrift",
]
[[package]]
name = "parquet_catalog"
version = "0.1.0"
dependencies = [
"arrow",
"base64 0.13.0",
"bytes",
"chrono",
"data_types",
"datafusion 0.1.0",
"datafusion_util",
"futures",
"generated_types",
"iox_object_store",
"metric",
"object_store",
"observability_deps",
"parking_lot",
"parquet",
"parquet-format",
"parquet_file",
"pbjson-types",
"persistence_windows",
"predicate",
"prost",
"schema",
"snafu",
"tempfile",
"thrift",
"time 0.1.0",
"tokio",
"tokio-stream",
"uuid",
"zstd",
]
[[package]]
name = "parquet_file"
version = "0.1.0"
@ -3851,6 +3888,7 @@ dependencies = [
"observability_deps",
"once_cell",
"parking_lot",
"parquet_catalog",
"parquet_file",
"persistence_windows",
"predicate",

View File

@ -56,6 +56,8 @@ members = [
"observability_deps",
"packers",
"panic_logging",
"parquet_catalog",
"parquet_file",
"persistence_windows",
"predicate",
"query",
@ -101,6 +103,7 @@ num_cpus = "1.13.0"
object_store = { path = "object_store" }
observability_deps = { path = "observability_deps" }
panic_logging = { path = "panic_logging" }
parquet_catalog = { path = "parquet_catalog" }
parquet_file = { path = "parquet_file" }
predicate = { path = "predicate" }
query = { path = "query" }

View File

@ -0,0 +1,36 @@
[package]
name = "parquet_catalog"
version = "0.1.0"
edition = "2018"
[dependencies]
arrow = { version = "5.0", features = ["prettyprint"] }
base64 = "0.13"
bytes = "1.0"
chrono = "0.4"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3.7"
generated_types = { path = "../generated_types" }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parquet = "5.0"
parquet_file = { path = "../parquet_file" }
parquet-format = "2.6"
parking_lot = "0.11.1"
pbjson-types = "0.1"
persistence_windows = { path = "../persistence_windows" }
predicate = { path = "../predicate" }
prost = "0.8"
snafu = "0.6"
schema = { path = "../schema" }
tempfile = "3.1.0"
thrift = "0.13"
time = { path = "../time" }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio-stream = "0.1"
uuid = { version = "0.8", features = ["serde", "v4"] }
zstd = "0.9"

View File

@ -9,7 +9,7 @@ use parking_lot::Mutex;
use predicate::delete_predicate::DeletePredicate;
use snafu::{ResultExt, Snafu};
use crate::catalog::{
use crate::{
core::PreservedCatalog,
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
@ -30,7 +30,7 @@ pub enum Error {
},
#[snafu(display("Error from catalog loading while cleaning object store: {}", source))]
CatalogLoadError { source: crate::catalog::core::Error },
CatalogLoadError { source: crate::core::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -154,10 +154,8 @@ impl CatalogState for TracerCatalogState {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
catalog::test_helpers::{make_config, new_empty},
test_utils::{chunk_addr, make_metadata, TestSize},
};
use crate::test_helpers::{make_config, new_empty};
use parquet_file::test_utils::{chunk_addr, make_metadata, TestSize};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;

View File

@ -1,16 +1,5 @@
//! Catalog preservation and transaction handling.
use crate::{
catalog::{
interface::{CatalogParquetInfo, CatalogState, CheckpointData, ChunkAddrWithoutDatabase},
internals::{
proto_io::{load_transaction_proto, store_transaction_proto},
proto_parse,
types::{FileType, TransactionKey},
},
},
metadata::IoxParquetMetaData,
};
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use generated_types::influxdata::iox::catalog::v1 as proto;
@ -18,6 +7,7 @@ use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
use object_store::{ObjectStore, ObjectStoreApi};
use observability_deps::tracing::{info, warn};
use parking_lot::RwLock;
use parquet_file::metadata::IoxParquetMetaData;
use predicate::delete_predicate::DeletePredicate;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
@ -33,8 +23,15 @@ use time::{Time, TimeProvider};
use tokio::sync::{Semaphore, SemaphorePermit};
use uuid::Uuid;
pub use crate::catalog::internals::proto_io::Error as ProtoIOError;
pub use crate::catalog::internals::proto_parse::Error as ProtoParseError;
use crate::{
interface::{CatalogParquetInfo, CatalogState, CheckpointData, ChunkAddrWithoutDatabase},
internals::{
proto_io::{load_transaction_proto, store_transaction_proto},
proto_parse,
types::{FileType, TransactionKey},
},
ProtoIOError, ProtoParseError,
};
/// Current version for serialized transactions.
///
@ -119,7 +116,9 @@ pub enum Error {
UnsupportedUpgrade { format: String },
#[snafu(display("Cannot decode parquet metadata: {}", source))]
MetadataDecodingFailed { source: crate::metadata::Error },
MetadataDecodingFailed {
source: parquet_file::metadata::Error,
},
#[snafu(display("Catalog already exists"))]
AlreadyExists {},
@ -139,12 +138,12 @@ pub enum Error {
#[snafu(display("Cannot add parquet file during load: {}", source))]
AddError {
source: crate::catalog::interface::CatalogStateAddError,
source: crate::interface::CatalogStateAddError,
},
#[snafu(display("Cannot remove parquet file during load: {}", source))]
RemoveError {
source: crate::catalog::interface::CatalogStateRemoveError,
source: crate::interface::CatalogStateRemoveError,
},
#[snafu(display("Delete predicate missing"))]
@ -286,7 +285,7 @@ impl PreservedCatalog {
}
Err(e) => warn!(%e, ?transaction_file_path, "Cannot parse timestamp"),
},
Err(e @ crate::catalog::internals::proto_io::Error::Read { .. }) => {
Err(e @ crate::internals::proto_io::Error::Read { .. }) => {
// bubble up IO error
return Err(Error::ProtobufIOError { source: e });
}
@ -1066,13 +1065,13 @@ mod tests {
use std::vec;
use bytes::Bytes;
use parquet_file::test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize};
use super::*;
use crate::catalog::test_helpers::{
use crate::test_helpers::{
break_catalog_with_weird_version, create_delete_predicate, exists, load_err, load_ok,
make_config, new_empty, TestCatalogState,
};
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize};
#[tokio::test]
async fn test_create_empty() {

View File

@ -6,10 +6,9 @@ use futures::TryStreamExt;
use generated_types::influxdata::iox::catalog::v1 as proto;
use iox_object_store::{IoxObjectStore, TransactionFilePath};
use object_store::{ObjectStore, ObjectStoreApi};
use parquet_file::metadata::{DecodedIoxParquetMetaData, IoxParquetMetaData};
use snafu::{ResultExt, Snafu};
use crate::metadata::{DecodedIoxParquetMetaData, IoxParquetMetaData};
use super::internals::proto_io::load_transaction_proto;
#[derive(Debug, Snafu)]
@ -33,7 +32,7 @@ pub struct DumpOptions {
/// recommended.
pub show_parquet_metadata: bool,
/// Show debug output of [`IoxMetadata`](crate::metadata::IoxMetadata) if decoding succeeds, show the decoding
/// Show debug output of [`IoxMetadata`](parquet_file::metadata::IoxMetadata) if decoding succeeds, show the decoding
/// error otherwise.
pub show_iox_metadata: bool,
@ -95,8 +94,8 @@ where
/// Wrapper around [`proto::Transaction`] with additional debug output (e.g. to show nested data).
struct File {
path: TransactionFilePath,
proto: Result<proto::Transaction, crate::catalog::internals::proto_io::Error>,
md: Option<Vec<Result<Metadata, crate::metadata::Error>>>,
proto: Result<proto::Transaction, crate::internals::proto_io::Error>,
md: Option<Vec<Result<Metadata, parquet_file::metadata::Error>>>,
}
impl File {
@ -179,7 +178,10 @@ struct Metadata {
impl Metadata {
/// Read metadata (in form of [`IoxParquetMetaData`]) from bytes, encoded as Apache Thrift.
fn read(data: &Bytes, options: Arc<DumpOptions>) -> Result<Self, crate::metadata::Error> {
fn read(
data: &Bytes,
options: Arc<DumpOptions>,
) -> Result<Self, parquet_file::metadata::Error> {
let iox_md = IoxParquetMetaData::from_thrift_bytes(data.as_ref().to_vec());
let md = iox_md.decode()?;
Ok(Self { md, options })
@ -219,12 +221,8 @@ impl Debug for Metadata {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
catalog::{
core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::make_config,
},
test_utils::{chunk_addr, make_metadata, TestSize},
};
use crate::{core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::make_config};
use parquet_file::test_utils::{chunk_addr, make_metadata, TestSize};
use time::Time;
use uuid::Uuid;

View File

@ -9,7 +9,7 @@ use iox_object_store::{IoxObjectStore, ParquetFilePath};
use predicate::delete_predicate::DeletePredicate;
use snafu::Snafu;
use crate::metadata::IoxParquetMetaData;
use parquet_file::metadata::IoxParquetMetaData;
/// Struct containing all information that a catalog received for a new parquet file.
#[derive(Debug, Clone)]
@ -57,7 +57,7 @@ impl std::fmt::Display for ChunkAddrWithoutDatabase {
pub enum CatalogStateAddError {
#[snafu(display("Cannot extract metadata from {:?}: {}", path, source))]
MetadataExtractFailed {
source: crate::metadata::Error,
source: parquet_file::metadata::Error,
path: ParquetFilePath,
},
@ -81,7 +81,7 @@ pub enum CatalogStateAddError {
#[snafu(display("Cannot create parquet chunk from {:?}: {}", path, source))]
ChunkCreationFailed {
source: crate::chunk::Error,
source: parquet_file::chunk::Error,
path: ParquetFilePath,
},

View File

@ -0,0 +1,21 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod cleanup;
pub mod core;
pub mod dump;
pub mod interface;
mod internals;
pub mod prune;
pub mod rebuild;
pub mod test_helpers;
pub use crate::internals::proto_io::Error as ProtoIOError;
pub use crate::internals::proto_parse::Error as ProtoParseError;

View File

@ -7,9 +7,9 @@ use object_store::{ObjectStore, ObjectStoreApi};
use snafu::{ResultExt, Snafu};
use time::Time;
use crate::catalog::{
core::{ProtoIOError, ProtoParseError},
use crate::{
internals::{proto_io::load_transaction_proto, proto_parse::parse_timestamp},
ProtoIOError, ProtoParseError,
};
#[derive(Debug, Snafu)]
@ -33,7 +33,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Prune history of [`PreservedCatalog`](crate::catalog::core::PreservedCatalog).
/// Prune history of [`PreservedCatalog`](crate::core::PreservedCatalog).
///
/// This deletes all transactions and checkpoints that were started prior to `before`. Note that this only deletes data
/// that is safe to delete when time travel to `before` is allowed. For example image the following transactions:
@ -121,13 +121,12 @@ fn is_checkpoint_or_zero(path: &TransactionFilePath) -> bool {
mod tests {
use std::time::Duration;
use parquet_file::test_utils::make_iox_object_store;
use crate::{
catalog::{
core::PreservedCatalog,
interface::CheckpointData,
test_helpers::{load_ok, make_config, new_empty},
},
test_utils::make_iox_object_store,
core::PreservedCatalog,
interface::CheckpointData,
test_helpers::{load_ok, make_config, new_empty},
};
use super::*;

View File

@ -4,41 +4,38 @@ use std::{fmt::Debug, sync::Arc};
use futures::TryStreamExt;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use observability_deps::tracing::error;
use parquet_file::metadata::IoxParquetMetaData;
use snafu::{ResultExt, Snafu};
use crate::catalog::core::PreservedCatalogConfig;
use crate::{
catalog::{
core::PreservedCatalog,
interface::{CatalogParquetInfo, CatalogState},
},
metadata::IoxParquetMetaData,
core::{PreservedCatalog, PreservedCatalogConfig},
interface::{CatalogParquetInfo, CatalogState},
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Cannot create new empty catalog: {}", source))]
NewEmptyFailure { source: crate::catalog::core::Error },
NewEmptyFailure { source: crate::core::Error },
#[snafu(display("Cannot read store: {}", source))]
ReadFailure { source: object_store::Error },
#[snafu(display("Cannot read IOx metadata from parquet file ({:?}): {}", path, source))]
MetadataReadFailure {
source: crate::metadata::Error,
source: parquet_file::metadata::Error,
path: ParquetFilePath,
},
#[snafu(display("Cannot add file to transaction: {}", source))]
FileRecordFailure {
source: crate::catalog::interface::CatalogStateAddError,
source: crate::interface::CatalogStateAddError,
},
#[snafu(display("Cannot commit transaction: {}", source))]
CommitFailure { source: crate::catalog::core::Error },
CommitFailure { source: crate::core::Error },
#[snafu(display("Cannot create checkpoint: {}", source))]
CheckpointFailure { source: crate::catalog::core::Error },
CheckpointFailure { source: crate::core::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -170,18 +167,18 @@ async fn read_parquet(
mod tests {
use super::*;
use crate::{
catalog::{
core::PreservedCatalog,
test_helpers::{exists, make_config, new_empty, TestCatalogState},
},
metadata::IoxMetadata,
storage::{MemWriter, Storage},
test_utils::{create_partition_and_database_checkpoint, make_record_batch, TestSize},
core::PreservedCatalog,
test_helpers::{exists, make_config, new_empty, TestCatalogState},
};
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use parquet::arrow::ArrowWriter;
use parquet_file::{
metadata::IoxMetadata,
storage::{MemWriter, Storage},
test_utils::{create_partition_and_database_checkpoint, make_record_batch, TestSize},
};
use time::Time;
use tokio_stream::StreamExt;

View File

@ -1,20 +1,20 @@
use crate::{
catalog::{
core::{PreservedCatalog, PreservedCatalogConfig},
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
CheckpointData, ChunkAddrWithoutDatabase,
},
internals::{
proto_io::{load_transaction_proto, store_transaction_proto},
types::TransactionKey,
},
core::{PreservedCatalog, PreservedCatalogConfig},
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
CheckpointData, ChunkAddrWithoutDatabase,
},
internals::{
proto_io::{load_transaction_proto, store_transaction_proto},
types::TransactionKey,
},
metadata::IoxParquetMetaData,
test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize},
};
use data_types::{chunk_metadata::ChunkId, timestamp::TimestampRange};
use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
use parquet_file::{
metadata::IoxParquetMetaData,
test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize},
};
use predicate::{
delete_expr::{DeleteExpr, Op, Scalar},
delete_predicate::DeletePredicate,
@ -112,7 +112,7 @@ impl TestCatalogState {
/// Inserts a file into this catalog state
pub fn insert(&mut self, info: CatalogParquetInfo) -> Result<(), CatalogStateAddError> {
use crate::catalog::interface::MetadataExtractFailed;
use crate::interface::MetadataExtractFailed;
let iox_md = info
.metadata
@ -219,7 +219,7 @@ pub async fn load_ok(
}
/// Load a `PreservedCatalog` and unwrap the error, expecting the operation to fail
pub async fn load_err(config: PreservedCatalogConfig) -> crate::catalog::core::Error {
pub async fn load_err(config: PreservedCatalogConfig) -> crate::core::Error {
PreservedCatalog::load(config, TestCatalogState::default())
.await
.unwrap_err()

View File

@ -1,8 +0,0 @@
pub mod cleanup;
pub mod core;
pub mod dump;
pub mod interface;
mod internals;
pub mod prune;
pub mod rebuild;
pub mod test_helpers;

View File

@ -8,7 +8,6 @@
clippy::clone_on_ref_ptr
)]
pub mod catalog;
pub mod chunk;
pub mod metadata;
pub mod storage;

View File

@ -119,8 +119,8 @@ use time::Time;
///
/// For breaking changes, this will change.
///
/// **Important: When changing this structure, consider bumping the
/// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!**
/// **Important: When changing this structure, consider bumping the catalog transaction version (`TRANSACTION_VERSION`
/// in the `parquet_catalog` crate)!**
pub const METADATA_VERSION: u32 = 10;
/// File-level metadata key to store the IOx-specific data.

View File

@ -36,6 +36,7 @@ object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.4.0", features = ["race"] }
parking_lot = "0.11.2"
parquet_catalog = { path = "../parquet_catalog" }
parquet_file = { path = "../parquet_file" }
persistence_windows = { path = "../persistence_windows" }
predicate = { path = "../predicate" }

View File

@ -20,7 +20,7 @@ use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore;
use observability_deps::tracing::{error, info, warn};
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use parquet_file::catalog::core::PreservedCatalog;
use parquet_catalog::core::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{future::Future, sync::Arc, time::Duration};
@ -64,7 +64,7 @@ pub enum Error {
))]
WipePreservedCatalog {
db_name: String,
source: Box<parquet_file::catalog::core::Error>,
source: Box<parquet_catalog::core::Error>,
},
#[snafu(display("failed to skip replay for database ({}): {}", db_name, source))]

View File

@ -32,7 +32,7 @@ use entry::{Entry, SequencedEntry, TableBatch};
use iox_object_store::IoxObjectStore;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use observability_deps::tracing::{debug, error, info, warn};
use parquet_file::catalog::{
use parquet_catalog::{
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
core::PreservedCatalog,
interface::{CatalogParquetInfo, CheckpointData, ChunkAddrWithoutDatabase},
@ -170,7 +170,7 @@ pub enum Error {
source
))]
CommitDeletePredicateError {
source: parquet_file::catalog::core::Error,
source: parquet_catalog::core::Error,
},
}
@ -917,7 +917,7 @@ impl Db {
async fn cleanup_unreferenced_parquet_files(
self: &Arc<Self>,
) -> std::result::Result<(), parquet_file::catalog::cleanup::Error> {
) -> std::result::Result<(), parquet_catalog::cleanup::Error> {
let guard = self.cleanup_lock.write().await;
let files = get_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await?;
drop(guard);
@ -928,7 +928,7 @@ impl Db {
async fn preserve_delete_predicates(
self: &Arc<Self>,
predicates: &[(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)],
) -> Result<(), parquet_file::catalog::core::Error> {
) -> Result<(), parquet_catalog::core::Error> {
let mut transaction = self.preserved_catalog.open_transaction().await;
for (predicate, chunks) in predicates {
transaction.delete_predicate(predicate, chunks);
@ -1413,8 +1413,8 @@ mod tests {
use iox_object_store::ParquetFilePath;
use metric::{Attributes, CumulativeGauge, Metric, Observation};
use object_store::ObjectStore;
use parquet_catalog::test_helpers::load_ok;
use parquet_file::{
catalog::test_helpers::load_ok,
metadata::IoxParquetMetaData,
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};

View File

@ -49,7 +49,7 @@ pub enum Error {
#[snafu(display("Error while commiting transaction on preserved catalog: {}", source))]
CommitError {
source: parquet_file::catalog::core::Error,
source: parquet_catalog::core::Error,
},
#[snafu(display("Cannot write chunk: {}", addr))]

View File

@ -14,8 +14,8 @@ use ::lifecycle::LifecycleWriteGuard;
use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job};
use observability_deps::tracing::{debug, warn};
use parquet_catalog::interface::CatalogParquetInfo;
use parquet_file::{
catalog::interface::CatalogParquetInfo,
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
metadata::IoxMetadata,
storage::Storage,

View File

@ -1,19 +1,17 @@
//! Functionality to load a [`Catalog`](crate::db::catalog::Catalog) and other information from a
//! [`PreservedCatalog`](parquet_file::catalog::core::PreservedCatalog).
//! [`PreservedCatalog`](parquet_catalog::core::PreservedCatalog).
use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use observability_deps::tracing::{error, info};
use parquet_file::{
catalog::{
core::{PreservedCatalog, PreservedCatalogConfig},
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
ChunkAddrWithoutDatabase, ChunkCreationFailed,
},
use parquet_catalog::{
core::{PreservedCatalog, PreservedCatalogConfig},
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
ChunkAddrWithoutDatabase, ChunkCreationFailed,
},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
};
use parquet_file::chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk};
use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner};
use predicate::delete_predicate::DeletePredicate;
use snafu::{ResultExt, Snafu};
@ -29,17 +27,17 @@ pub enum Error {
#[snafu(display("Cannot create new empty preserved catalog: {}", source))]
CannotCreateCatalog {
source: parquet_file::catalog::core::Error,
source: parquet_catalog::core::Error,
},
#[snafu(display("Cannot load preserved catalog: {}", source))]
CannotLoadCatalog {
source: parquet_file::catalog::core::Error,
source: parquet_catalog::core::Error,
},
#[snafu(display("Cannot wipe preserved catalog: {}", source))]
CannotWipeCatalog {
source: parquet_file::catalog::core::Error,
source: parquet_catalog::core::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -188,9 +186,7 @@ impl CatalogState for Loader {
iox_object_store: Arc<IoxObjectStore>,
info: CatalogParquetInfo,
) -> Result<(), CatalogStateAddError> {
use parquet_file::catalog::interface::{
MetadataExtractFailed, ReplayPlanError, SchemaError,
};
use parquet_catalog::interface::{MetadataExtractFailed, ReplayPlanError, SchemaError};
// extract relevant bits from parquet file metadata
let iox_md = info
@ -314,7 +310,7 @@ mod tests {
use crate::db::checkpoint_data_from_catalog;
use data_types::{server_id::ServerId, DatabaseName};
use object_store::ObjectStore;
use parquet_file::catalog::{
use parquet_catalog::{
interface::CheckpointData,
test_helpers::{assert_catalog_state_implementation, new_empty},
};
@ -338,8 +334,7 @@ mod tests {
);
let preserved_catalog = new_empty(config).await;
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
parquet_catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog).await;
load_or_create_preserved_catalog(
&db_name,

View File

@ -1287,9 +1287,8 @@ mod tests {
path::{parsed::DirsAndFileName, ObjectStorePath},
ObjectStore, ObjectStoreApi,
};
use parquet_file::catalog::core::PreservedCatalogConfig;
use parquet_file::catalog::{
core::PreservedCatalog,
use parquet_catalog::{
core::{PreservedCatalog, PreservedCatalogConfig},
test_helpers::{load_ok, new_empty},
};
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase};
@ -2236,8 +2235,7 @@ mod tests {
let (preserved_catalog, _catalog) = load_ok(config).await.unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
parquet_catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog).await;
drop(preserved_catalog);
rules_broken

View File

@ -32,7 +32,7 @@ pub enum Error {
#[snafu(display("Cannot dump catalog: {}", source))]
DumpCatalogFailure {
source: parquet_file::catalog::dump::Error,
source: parquet_catalog::dump::Error,
},
}
@ -103,7 +103,7 @@ struct DumpOptions {
show_unparsed_metadata: bool,
}
impl From<DumpOptions> for parquet_file::catalog::dump::DumpOptions {
impl From<DumpOptions> for parquet_catalog::dump::DumpOptions {
fn from(options: DumpOptions) -> Self {
Self {
show_parquet_metadata: options.show_parquet_metadata,
@ -134,7 +134,7 @@ pub async fn command(config: Config) -> Result<()> {
let mut writer = std::io::stdout();
let options = dump_catalog.dump_options.into();
parquet_file::catalog::dump::dump(&iox_object_store, &mut writer, options)
parquet_catalog::dump::dump(&iox_object_store, &mut writer, options)
.await
.context(DumpCatalogFailure)?;
}