fix: Remove parquet_catalog

pull/24376/head
Carol (Nichols || Goulding) 2022-05-04 09:03:07 -04:00
parent 2d8656e2e1
commit cae32209da
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
17 changed files with 0 additions and 4972 deletions

38
Cargo.lock generated
View File

@ -2210,7 +2210,6 @@ dependencies = [
"panic_logging",
"parking_lot 0.12.0",
"parquet",
"parquet_catalog",
"parquet_file",
"pin-project",
"predicate",
@ -3826,42 +3825,6 @@ dependencies = [
"thrift",
]
[[package]]
name = "parquet_catalog"
version = "0.1.0"
dependencies = [
"arrow",
"base64 0.13.0",
"bytes",
"data_types",
"datafusion 0.1.0",
"datafusion_util",
"futures",
"generated_types",
"iox_object_store",
"iox_time",
"metric",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"parquet",
"parquet-format",
"parquet_file",
"pbjson-types",
"persistence_windows",
"predicate",
"prost",
"schema",
"snafu",
"tempfile",
"thrift",
"tokio",
"tokio-stream",
"uuid 0.8.2",
"workspace-hack",
"zstd",
]
[[package]]
name = "parquet_file"
version = "0.1.0"
@ -5314,7 +5277,6 @@ dependencies = [
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"parquet_catalog",
"persistence_windows",
"query",
"rand",

View File

@ -49,7 +49,6 @@ members = [
"observability_deps",
"packers",
"panic_logging",
"parquet_catalog",
"parquet_file",
"persistence_windows",
"predicate",

View File

@ -37,7 +37,6 @@ mutable_batch_pb = { path = "../mutable_batch_pb" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
panic_logging = { path = "../panic_logging" }
parquet_catalog = { path = "../parquet_catalog" }
parquet_file = { path = "../parquet_file" }
predicate = { path = "../predicate" }
querier = { path = "../querier" }

View File

@ -1,36 +0,0 @@
[package]
name = "parquet_catalog"
version = "0.1.0"
edition = "2021"
[dependencies]
arrow = { version = "13", features = ["prettyprint"] }
base64 = "0.13"
bytes = "1.0"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3"
generated_types = { path = "../generated_types" }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parquet = "13"
parquet_file = { path = "../parquet_file" }
parquet-format = "4.0"
parking_lot = "0.12"
pbjson-types = "0.3"
persistence_windows = { path = "../persistence_windows" }
predicate = { path = "../predicate" }
prost = "0.10"
snafu = "0.7"
schema = { path = "../schema" }
tempfile = "3.1.0"
thrift = "0.13"
iox_time = { path = "../iox_time" }
tokio = { version = "1.18", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] }
tokio-stream = "0.1"
uuid = { version = "0.8", features = ["v4"] }
zstd = "0.11"
workspace-hack = { path = "../workspace-hack"}

View File

@ -1,323 +0,0 @@
//! Methods to cleanup the object store.
use std::{collections::HashSet, sync::Arc};
use data_types::delete_predicate::DeletePredicate;
use futures::TryStreamExt;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::{ObjectStoreApi, ObjectStoreImpl};
use observability_deps::tracing::info;
use parking_lot::Mutex;
use snafu::{ResultExt, Snafu};
use crate::{
core::PreservedCatalog,
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
ChunkAddrWithoutDatabase,
},
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error from read operation while cleaning object store: {}", source))]
ReadError {
source: <ObjectStoreImpl as ObjectStoreApi>::Error,
},
#[snafu(display("Error from write operation while cleaning object store: {}", source))]
WriteError {
source: <ObjectStoreImpl as ObjectStoreApi>::Error,
},
#[snafu(display("Error from catalog loading while cleaning object store: {}", source))]
CatalogLoadError { source: crate::core::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Get unreferenced parquet files.
///
/// The resulting vector is in no particular order. It may be passed to [`delete_files`].
///
/// # Locking / Concurrent Actions
///
/// While this method is running you MUST NOT create any new parquet files or modify the preserved
/// catalog in any other way. Hence this method needs exclusive access to the preserved catalog and
/// the parquet file. Otherwise this method may report files for deletion that you are about to
/// write to the catalog!
///
/// **This method does NOT acquire the transaction lock!**
///
/// To limit the time the exclusive access is required use `max_files` which will limit the number
/// of files to be detected in this cleanup round.
///
/// The exclusive access can be dropped after this method returned and before calling
/// [`delete_files`].
pub async fn get_unreferenced_parquet_files(
catalog: &PreservedCatalog,
max_files: usize,
) -> Result<Vec<ParquetFilePath>> {
let iox_object_store = catalog.iox_object_store();
let all_known = {
// replay catalog transactions to track ALL (even dropped) files that are referenced
let (_catalog, state) = PreservedCatalog::load::<TracerCatalogState>(
catalog.config(),
TracerCatalogState::default(),
)
.await
.context(CatalogLoadSnafu)?
.expect("catalog gone while reading it?");
state.files.into_inner()
};
// gather a list of "files to remove" eagerly so we do not block transactions on the catalog
// for too long
let mut to_remove = vec![];
let mut stream = iox_object_store.parquet_files().await.context(ReadSnafu)?;
'outer: while let Some(paths) = stream.try_next().await.context(ReadSnafu)? {
for path in paths {
if to_remove.len() >= max_files {
info!(%max_files, "reached limit of number of files to cleanup in one go");
break 'outer;
}
// only delete if file is not tracked by the catalog
if !all_known.contains(&path) {
to_remove.push(path);
}
}
}
if !to_remove.is_empty() {
info!(n_files = to_remove.len(), "Found files to delete");
}
Ok(to_remove)
}
/// Delete all `files` from the store linked to the preserved catalog.
///
/// A file might already be deleted (or entirely absent) when this method is called. This will NOT
/// result in an error.
///
/// # Locking / Concurrent Actions
/// File creation and catalog modifications can be done while calling this method. Even
/// [`get_unreferenced_parquet_files`] can be called while is method is in-progress.
pub async fn delete_files(catalog: &PreservedCatalog, files: &[ParquetFilePath]) -> Result<()> {
let store = catalog.iox_object_store();
for path in files {
info!(?path, "Delete file");
store.delete_parquet_file(path).await.context(WriteSnafu)?;
}
if !files.is_empty() {
info!(n_files = files.len(), "Finished deletion, removed files.");
}
Ok(())
}
/// Catalog state that traces all used parquet files.
#[derive(Debug, Default)]
struct TracerCatalogState {
files: Mutex<HashSet<ParquetFilePath>>,
}
impl CatalogState for TracerCatalogState {
fn add(
&mut self,
_iox_object_store: Arc<IoxObjectStore>,
info: CatalogParquetInfo,
) -> Result<(), CatalogStateAddError> {
self.files.lock().insert(info.path);
Ok(())
}
fn remove(&mut self, _path: &ParquetFilePath) -> Result<(), CatalogStateRemoveError> {
// Do NOT remove the file since we still need it for time travel
Ok(())
}
fn delete_predicate(
&mut self,
_predicate: Arc<DeletePredicate>,
_chunks: Vec<ChunkAddrWithoutDatabase>,
) {
// No need to track delete predicates, because the cleanup's job is to remove unreferenced parquet files. Delete
// predicates however are stored directly within the preserved catalog and therefore don't need pruning.
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::{make_config, new_empty};
use parquet_file::test_utils::generator::ChunkGenerator;
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
#[tokio::test]
async fn test_cleanup_empty() {
let config = make_config().await;
let catalog = new_empty(config).await;
// run clean-up
let files = get_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
delete_files(&catalog, &files).await.unwrap();
}
#[tokio::test]
async fn test_cleanup_rules() {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let catalog = new_empty(config.clone()).await;
// create some data
let mut paths_keep = vec![];
let mut paths_delete = vec![];
{
let mut transaction = catalog.open_transaction().await;
// an ordinary tracked parquet file => keep
let (chunk, _) = generator.generate().await.unwrap();
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
paths_keep.push(chunk.path().clone());
// another ordinary tracked parquet file that was added and removed => keep (for time
// travel)
let (chunk, _) = generator.generate().await.unwrap();
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.remove_parquet(chunk.path());
paths_keep.push(chunk.path().clone());
// an untracked parquet file => delete
let (chunk, _) = generator.generate().await.unwrap();
paths_delete.push(chunk.path().clone());
transaction.commit().await.unwrap();
}
// run clean-up
let files = get_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
delete_files(&catalog, &files).await.unwrap();
// deleting a second time should just work
delete_files(&catalog, &files).await.unwrap();
// list all files
let all_files = list_all_files(iox_object_store).await;
for p in paths_keep {
assert!(dbg!(&all_files).contains(dbg!(&p)));
}
for p in paths_delete {
assert!(!dbg!(&all_files).contains(dbg!(&p)));
}
}
#[tokio::test]
async fn test_cleanup_with_parallel_transaction() {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let lock: RwLock<()> = Default::default();
let catalog = new_empty(config.clone()).await;
// try multiple times to provoke a conflict
for i in 1..100 {
// Every so often try to create a file with the same ChunkAddr beforehand. This should
// not trick the cleanup logic to remove the actual file because file paths contains a
// UUIDv4 part.
if i % 2 == 0 {
generator.generate_id(i).await.unwrap();
}
let (chunk, _) = tokio::join!(
async {
let guard = lock.read().await;
let (chunk, _) = generator.generate_id(i).await.unwrap();
let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.commit().await.unwrap();
drop(guard);
chunk
},
async {
let guard = lock.write().await;
let files = get_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
drop(guard);
delete_files(&catalog, &files).await.unwrap();
},
);
let all_files = list_all_files(iox_object_store).await;
assert!(dbg!(all_files).contains(dbg!(chunk.path())));
}
}
#[tokio::test]
async fn test_cleanup_max_files() {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let catalog = new_empty(config.clone()).await;
// create some files
let mut to_remove = HashSet::default();
for _ in 0..3 {
let (chunk, _) = generator.generate().await.unwrap();
to_remove.insert(chunk.path().clone());
}
// run clean-up
let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap();
assert_eq!(files.len(), 2);
delete_files(&catalog, &files).await.unwrap();
// should only delete 2
let all_files = list_all_files(iox_object_store).await;
let leftover: HashSet<_> = all_files.intersection(&to_remove).collect();
assert_eq!(leftover.len(), 1);
// run clean-up again
let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap();
assert_eq!(files.len(), 1);
delete_files(&catalog, &files).await.unwrap();
// should delete remaining file
let all_files = list_all_files(iox_object_store).await;
let leftover: HashSet<_> = all_files.intersection(&to_remove).collect();
assert_eq!(leftover.len(), 0);
}
async fn list_all_files(iox_object_store: &IoxObjectStore) -> HashSet<ParquetFilePath> {
iox_object_store
.parquet_files()
.await
.unwrap()
.try_concat()
.await
.unwrap()
.into_iter()
.collect()
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,630 +0,0 @@
//! Tooling to dump the catalog content for debugging.
use std::{fmt::Debug, sync::Arc};
use bytes::Bytes;
use futures::TryStreamExt;
use generated_types::influxdata::iox::preserved_catalog::v1 as proto;
use iox_object_store::{IoxObjectStore, TransactionFilePath};
use object_store::{ObjectStoreApi, ObjectStoreImpl};
use parquet_file::metadata::{DecodedIoxParquetMetaData, IoxParquetMetaData};
use snafu::{ResultExt, Snafu};
use super::internals::proto_io::load_transaction_proto;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error while listing files from object store: {}", source))]
ListFiles {
source: <ObjectStoreImpl as ObjectStoreApi>::Error,
},
#[snafu(display("Error while writing result to output: {}", source))]
WriteOutput { source: std::io::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Options to control the output of [`dump`].
#[derive(Debug, Default, Clone, Copy)]
pub struct DumpOptions {
/// Show debug output of [`DecodedIoxParquetMetaData`] if decoding succeeds, show the decoding error otherwise.
///
/// Since this contains the entire Apache Parquet metadata object this is quite verbose and is usually not
/// recommended.
pub show_parquet_metadata: bool,
/// Show debug output of [`IoxMetadata`](parquet_file::metadata::IoxMetadata) if decoding succeeds, show the decoding
/// error otherwise.
pub show_iox_metadata: bool,
/// Show debug output of [`Schema`](schema::Schema) if decoding succeeds, show the decoding
/// error otherwise.
pub show_schema: bool,
/// Show debug output of [`ColumnSummary`](data_types::partition_metadata::ColumnSummary) if decoding succeeds,
/// show the decoding error otherwise.
pub show_statistics: bool,
/// Show unparsed [`IoxParquetMetaData`] -- which are Apache Thrift bytes -- as part of the transaction actions.
///
/// Since this binary data is usually quite hard to read, it is recommended to set this to `false` which will
/// replace the actual bytes with `b"metadata omitted"`. Use the other toggles to instead show the content of the
/// Apache Thrift message.
pub show_unparsed_metadata: bool,
}
/// Dump catalog content in text form to `writer`.
///
/// This is mostly useful for debugging. The output of this might change at any point in time and should not be parsed
/// (except for one-time debugging).
///
/// Errors that happen while listing the object store or while writing the output to `writer` will be bubbled up. All
/// other errors (e.g. reading a transaction file from object store, deserializing protobuf data) will be shown as part
/// of the output and will NOT result in a failure. Also note that NO validation of the catalog content (e.g. version
/// checks, fork detection) will be performed.
pub async fn dump<W>(
iox_object_store: &IoxObjectStore,
writer: &mut W,
options: DumpOptions,
) -> Result<()>
where
W: std::io::Write + Send,
{
let mut files = iox_object_store
.catalog_transaction_files()
.await
.context(ListFilesSnafu)?
.try_concat()
.await
.context(ListFilesSnafu)?;
files.sort_by_key(|f| (f.revision_counter, f.uuid, !f.is_checkpoint()));
let options = Arc::new(options);
for file in files {
writeln!(
writer,
"{:#?}",
File::read(iox_object_store, &file, Arc::clone(&options)).await
)
.context(WriteOutputSnafu)?;
}
Ok(())
}
/// Wrapper around [`proto::Transaction`] with additional debug output (e.g. to show nested data).
struct File {
path: TransactionFilePath,
proto: Result<proto::Transaction, crate::internals::proto_io::Error>,
md: Option<Vec<Result<Metadata, parquet_file::metadata::Error>>>,
}
impl File {
/// Read transaction data (in form of [`proto::Transaction`]) from object store.
async fn read(
iox_object_store: &IoxObjectStore,
path: &TransactionFilePath,
options: Arc<DumpOptions>,
) -> Self {
let (proto, md) = match load_transaction_proto(iox_object_store, path).await {
Ok(transaction) => {
let mut md = vec![];
// Rebuild transaction object and:
// 1. Scan for contained `IoxParquetMetaData`.
// 2. Replace encoded metadata (in byte form) w/ placeholder (if requested via flags).
let transaction = proto::Transaction {
actions: transaction
.actions
.into_iter()
.map(|action| proto::transaction::Action {
action: action.action.map(|action| match action {
proto::transaction::action::Action::AddParquet(add_parquet) => {
let iox_md =
Metadata::read(&add_parquet.metadata, Arc::clone(&options));
md.push(iox_md);
proto::transaction::action::Action::AddParquet(
proto::AddParquet {
metadata: if options.show_unparsed_metadata {
add_parquet.metadata
} else {
Bytes::from(format!(
"metadata omitted ({} bytes)",
add_parquet.metadata.len()
))
},
..add_parquet
},
)
}
other => other,
}),
})
.collect(),
..transaction
};
(Ok(transaction), Some(md))
}
Err(e) => (Err(e), None),
};
Self {
path: *path,
proto,
md,
}
}
}
impl Debug for File {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("File")
.field("path", &self.path.relative_dirs_and_file_name().to_string())
.field("revision_counter", &self.path.revision_counter)
.field("uuid", &self.path.uuid)
.field("is_checkpoint", &self.path.is_checkpoint())
.field("proto", &self.proto)
.field("metadata", &self.md)
.finish()
}
}
/// Wrapper around [`IoxParquetMetaData`] with additional debug output (e.g. to also show nested data).
struct Metadata {
md: DecodedIoxParquetMetaData,
options: Arc<DumpOptions>,
}
impl Metadata {
/// Read metadata (in form of [`IoxParquetMetaData`]) from bytes, encoded as Apache Thrift.
fn read(
data: &Bytes,
options: Arc<DumpOptions>,
) -> Result<Self, parquet_file::metadata::Error> {
let iox_md = IoxParquetMetaData::from_thrift_bytes(data.as_ref().to_vec());
let md = iox_md.decode()?;
Ok(Self { md, options })
}
}
impl Debug for Metadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let schema = self.md.read_schema();
let statistics = schema
.as_ref()
.ok()
.map(|schema| self.md.read_statistics(schema));
let mut dbg = f.debug_struct("Metadata");
if self.options.show_parquet_metadata {
dbg.field("parquet_metadata", &self.md);
}
if self.options.show_iox_metadata {
dbg.field("iox_metadata", &self.md.read_iox_metadata_old());
}
if self.options.show_schema {
dbg.field("schema", &schema);
}
if self.options.show_statistics {
dbg.field("statistics", &statistics);
}
dbg.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::make_config};
use iox_time::Time;
use parquet_file::test_utils::generator::{ChunkGenerator, GeneratorConfig};
use uuid::Uuid;
#[tokio::test]
async fn test_dump_default_options() {
let time_provider = Arc::new(iox_time::MockProvider::new(Time::from_timestamp(10, 20)));
let config = make_config()
.await
.with_fixed_uuid(Uuid::nil())
.with_time_provider(time_provider);
let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
generator.set_config(GeneratorConfig::Simple);
// build catalog with some data
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{
let (chunk, _) = generator.generate().await.unwrap();
let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.commit().await.unwrap();
}
let mut buf = std::io::Cursor::new(Vec::new());
let options = DumpOptions::default();
dump(iox_object_store, &mut buf, options).await.unwrap();
let actual = String::from_utf8(buf.into_inner()).unwrap();
let actual = actual.trim();
let expected = r#"
File {
path: "00000000000000000000/00000000-0000-0000-0000-000000000000.txn",
revision_counter: 0,
uuid: 00000000-0000-0000-0000-000000000000,
is_checkpoint: false,
proto: Ok(
Transaction {
version: 19,
actions: [],
revision_counter: 0,
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"",
start_timestamp: Some(
Timestamp {
seconds: 10,
nanos: 20,
},
),
encoding: Delta,
},
),
metadata: Some(
[],
),
}
File {
path: "00000000000000000001/00000000-0000-0000-0000-000000000000.txn",
revision_counter: 1,
uuid: 00000000-0000-0000-0000-000000000000,
is_checkpoint: false,
proto: Ok(
Transaction {
version: 19,
actions: [
Action {
action: Some(
AddParquet(
AddParquet {
path: Some(
Path {
directories: [
"table1",
"part1",
],
file_name: "00000000-0000-0000-0000-000000000001.parquet",
},
),
file_size_bytes: 3661,
metadata: b"metadata omitted (953 bytes)",
},
),
),
},
],
revision_counter: 1,
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
start_timestamp: Some(
Timestamp {
seconds: 10,
nanos: 20,
},
),
encoding: Delta,
},
),
metadata: Some(
[
Ok(
Metadata,
),
],
),
}
"#
.trim();
assert_eq!(
actual, expected,
"\n\nactual:\n{}\n\nexpected:\n{}",
actual, expected
);
}
#[tokio::test]
async fn test_dump_show_parsed_data() {
let time_provider = Arc::new(iox_time::MockProvider::new(Time::from_timestamp(10, 20)));
let config = make_config()
.await
.with_fixed_uuid(Uuid::nil())
.with_time_provider(time_provider);
let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
generator.set_config(GeneratorConfig::Simple);
// build catalog with some data
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{
let (chunk, _) = generator.generate().await.unwrap();
let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.commit().await.unwrap();
}
let mut buf = std::io::Cursor::new(Vec::new());
let options = DumpOptions {
show_iox_metadata: true,
show_schema: true,
show_statistics: true,
..Default::default()
};
dump(iox_object_store, &mut buf, options).await.unwrap();
let actual = String::from_utf8(buf.into_inner()).unwrap();
let actual = actual.trim();
let expected = r#"
File {
path: "00000000000000000000/00000000-0000-0000-0000-000000000000.txn",
revision_counter: 0,
uuid: 00000000-0000-0000-0000-000000000000,
is_checkpoint: false,
proto: Ok(
Transaction {
version: 19,
actions: [],
revision_counter: 0,
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"",
start_timestamp: Some(
Timestamp {
seconds: 10,
nanos: 20,
},
),
encoding: Delta,
},
),
metadata: Some(
[],
),
}
File {
path: "00000000000000000001/00000000-0000-0000-0000-000000000000.txn",
revision_counter: 1,
uuid: 00000000-0000-0000-0000-000000000000,
is_checkpoint: false,
proto: Ok(
Transaction {
version: 19,
actions: [
Action {
action: Some(
AddParquet(
AddParquet {
path: Some(
Path {
directories: [
"table1",
"part1",
],
file_name: "00000000-0000-0000-0000-000000000001.parquet",
},
),
file_size_bytes: 3661,
metadata: b"metadata omitted (953 bytes)",
},
),
),
},
],
revision_counter: 1,
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
start_timestamp: Some(
Timestamp {
seconds: 10,
nanos: 20,
},
),
encoding: Delta,
},
),
metadata: Some(
[
Ok(
Metadata {
iox_metadata: Ok(
IoxMetadataOld {
creation_timestamp: 1970-01-01T00:00:10.000000020+00:00,
time_of_first_write: 1970-01-01T00:00:30.000000040+00:00,
time_of_last_write: 1970-01-01T00:00:50.000000060+00:00,
table_name: "table1",
partition_key: "part1",
chunk_id: ChunkId(
1,
),
partition_checkpoint: PartitionCheckpoint {
table_name: "table1",
partition_key: "part1",
sequencer_numbers: {
1: OptionalMinMaxSequence {
min: None,
max: 18,
},
2: OptionalMinMaxSequence {
min: Some(
25,
),
max: 28,
},
},
flush_timestamp: 1970-01-01T00:00:10.000000020+00:00,
},
database_checkpoint: DatabaseCheckpoint {
sequencer_numbers: {
1: OptionalMinMaxSequence {
min: None,
max: 18,
},
2: OptionalMinMaxSequence {
min: Some(
24,
),
max: 29,
},
3: OptionalMinMaxSequence {
min: Some(
35,
),
max: 38,
},
},
},
chunk_order: ChunkOrder(
1,
),
sort_key: None,
},
),
schema: Ok(
Schema {
inner: Schema {
fields: [
Field {
name: "foo_tag_normal",
data_type: Dictionary(
Int32,
Utf8,
),
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: Some(
{
"iox::column::type": "iox::column_type::tag",
},
),
},
Field {
name: "foo_field_i64_normal",
data_type: Int64,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: Some(
{
"iox::column::type": "iox::column_type::field::integer",
},
),
},
Field {
name: "time",
data_type: Timestamp(
Nanosecond,
None,
),
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: Some(
{
"iox::column::type": "iox::column_type::timestamp",
},
),
},
],
metadata: {},
},
},
),
statistics: Some(
Ok(
[
ColumnSummary {
name: "foo_tag_normal",
influxdb_type: Some(
Tag,
),
stats: String(
StatValues {
min: Some(
"bar",
),
max: Some(
"foo",
),
total_count: 4,
null_count: Some(
0,
),
distinct_count: None,
},
),
},
ColumnSummary {
name: "foo_field_i64_normal",
influxdb_type: Some(
Field,
),
stats: I64(
StatValues {
min: Some(
-1,
),
max: Some(
4,
),
total_count: 4,
null_count: Some(
0,
),
distinct_count: None,
},
),
},
ColumnSummary {
name: "time",
influxdb_type: Some(
Timestamp,
),
stats: I64(
StatValues {
min: Some(
1000,
),
max: Some(
4000,
),
total_count: 4,
null_count: Some(
0,
),
distinct_count: None,
},
),
},
],
),
),
},
),
],
),
}
"#
.trim();
assert_eq!(
actual, expected,
"\n\nactual:\n{}\n\nexpected:\n{}",
actual, expected
);
}
}

View File

@ -1,151 +0,0 @@
//! Abstract interfaces to make different users work with the perserved catalog.
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use data_types::chunk_metadata::{ChunkAddr, ChunkId};
use data_types::delete_predicate::DeletePredicate;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use parquet_file::chunk::ParquetChunk;
use snafu::Snafu;
use parquet_file::metadata::IoxParquetMetaData;
/// Struct containing all information that a catalog received for a new parquet file.
#[derive(Debug, Clone)]
pub struct CatalogParquetInfo {
/// Path within this database.
pub path: ParquetFilePath,
/// Size of the parquet file, in bytes
pub file_size_bytes: usize,
/// Associated parquet metadata.
pub metadata: Arc<IoxParquetMetaData>,
}
impl CatalogParquetInfo {
/// Creates a [`CatalogParquetInfo`] from a [`ParquetChunk`]
pub fn from_chunk(chunk: &ParquetChunk) -> Self {
Self {
path: chunk.path().clone(),
file_size_bytes: chunk.file_size_bytes(),
metadata: chunk.parquet_metadata(),
}
}
}
/// Same as [ChunkAddr] but w/o the database part.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ChunkAddrWithoutDatabase {
pub table_name: Arc<str>,
pub partition_key: Arc<str>,
pub chunk_id: ChunkId,
}
impl From<ChunkAddr> for ChunkAddrWithoutDatabase {
fn from(addr: ChunkAddr) -> Self {
Self {
table_name: addr.table_name,
partition_key: addr.partition_key,
chunk_id: addr.chunk_id,
}
}
}
impl std::fmt::Display for ChunkAddrWithoutDatabase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Chunk('{}':'{}':{})",
self.table_name, self.partition_key, self.chunk_id
)
}
}
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum CatalogStateAddError {
#[snafu(display("Cannot extract metadata from {:?}: {}", path, source))]
MetadataExtractFailed {
source: parquet_file::metadata::Error,
path: ParquetFilePath,
},
#[snafu(display("Schema for {:?} does not work with existing schema: {}", path, source))]
SchemaError {
source: Box<dyn std::error::Error + Send + Sync>,
path: ParquetFilePath,
},
#[snafu(
display(
"Internal error: Using checkpoints from {:?} leads to broken replay plan: {}, catalog likely broken",
path,
source
),
)]
ReplayPlanError {
source: Box<dyn std::error::Error + Send + Sync>,
path: ParquetFilePath,
},
#[snafu(display("Cannot create parquet chunk from {:?}: {}", path, source))]
ChunkCreationFailed {
source: parquet_file::chunk::Error,
path: ParquetFilePath,
},
#[snafu(display("Parquet already exists in catalog: {:?}", path))]
ParquetFileAlreadyExists { path: ParquetFilePath },
}
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum CatalogStateRemoveError {
#[snafu(display("Parquet does not exist in catalog: {:?}", path))]
ParquetFileDoesNotExist { path: ParquetFilePath },
}
/// Abstraction over how the in-memory state of the catalog works.
pub trait CatalogState {
/// Add parquet file to state.
fn add(
&mut self,
iox_object_store: Arc<IoxObjectStore>,
info: CatalogParquetInfo,
) -> Result<(), CatalogStateAddError>;
/// Remove parquet file from state.
fn remove(&mut self, path: &ParquetFilePath) -> Result<(), CatalogStateRemoveError>;
/// Register new predicate to delete data.
///
/// The delete predicate will only be applied to the given chunks (by table name, partition key, and chunk ID).
fn delete_predicate(
&mut self,
predicate: Arc<DeletePredicate>,
chunks: Vec<ChunkAddrWithoutDatabase>,
);
}
/// Structure that holds all information required to create a checkpoint.
///
/// Note that while checkpoint are addressed using the same schema as we use for transaction
/// (revision counter, UUID), they contain the changes at the end (aka including) the transaction
/// they refer.
#[derive(Debug)]
pub struct CheckpointData {
/// Maps all Parquet file paths that are currently (i.e. by the current version) tracked by the
/// catalog to the associated metadata.
///
/// If a file was once added but later removed it MUST NOT appear in the result.
pub files: HashMap<ParquetFilePath, CatalogParquetInfo>,
/// Maps active delete predicates to their chunks (by table name, partition key, and chunk ID).
///
/// This must only contains chunks that are still present in the catalog. Predicates that do not have any chunks
/// attached should be left out.
pub delete_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>>,
}

View File

@ -1,3 +0,0 @@
pub mod proto_io;
pub mod proto_parse;
pub mod types;

View File

@ -1,61 +0,0 @@
use bytes::Bytes;
use generated_types::influxdata::iox::preserved_catalog::v1 as proto;
use iox_object_store::{IoxObjectStore, TransactionFilePath};
use object_store::{ObjectStoreApi, ObjectStoreImpl};
use prost::Message;
use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error during protobuf serialization: {}", source))]
Serialization { source: prost::EncodeError },
#[snafu(display("Error during protobuf deserialization: {}", source))]
Deserialization { source: prost::DecodeError },
#[snafu(display("Error during store write operation: {}", source))]
Write {
source: <ObjectStoreImpl as ObjectStoreApi>::Error,
},
#[snafu(display("Error during store read operation: {}", source))]
Read {
source: <ObjectStoreImpl as ObjectStoreApi>::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Serialize and store protobuf-encoded transaction.
pub async fn store_transaction_proto(
iox_object_store: &IoxObjectStore,
path: &TransactionFilePath,
proto: &proto::Transaction,
) -> Result<()> {
let mut data = Vec::new();
proto.encode(&mut data).context(SerializationSnafu {})?;
let data = Bytes::from(data);
iox_object_store
.put_catalog_transaction_file(path, data)
.await
.context(WriteSnafu {})?;
Ok(())
}
/// Load and deserialize protobuf-encoded transaction from store.
pub async fn load_transaction_proto(
iox_object_store: &IoxObjectStore,
path: &TransactionFilePath,
) -> Result<proto::Transaction> {
let data = iox_object_store
.get_catalog_transaction_file(path)
.await
.context(ReadSnafu {})?
.bytes()
.await
.context(ReadSnafu {})?;
let proto = proto::Transaction::decode(&data[..]).context(DeserializationSnafu {})?;
Ok(proto)
}

View File

@ -1,100 +0,0 @@
use std::{convert::TryInto, num::TryFromIntError};
use generated_types::influxdata::iox::preserved_catalog::v1 as proto;
use iox_object_store::{ParquetFilePath, ParquetFilePathParseError};
use iox_time::Time;
use object_store::path::{parsed::DirsAndFileName, parts::PathPart};
use snafu::{OptionExt, ResultExt, Snafu};
use uuid::Uuid;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Cannot parse UUID: {}", source))]
UuidParse { source: uuid::Error },
#[snafu(display("UUID required but not provided"))]
UuidRequired {},
#[snafu(display("Invalid parquet file path: {}", source))]
InvalidParquetFilePath { source: ParquetFilePathParseError },
#[snafu(display("Datetime required but missing in serialized catalog"))]
DateTimeRequired {},
#[snafu(display("Cannot parse datetime in serialized catalog: {}", source))]
DateTimeParseError { source: TryFromIntError },
#[snafu(display(
"Cannot parse encoding in serialized catalog: {} is not a valid, specified variant",
data
))]
EncodingParseError { data: i32 },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Parse big-endian UUID from protobuf.
pub fn parse_uuid(bytes: &[u8]) -> Result<Option<Uuid>> {
if bytes.is_empty() {
Ok(None)
} else {
let uuid = Uuid::from_slice(bytes).context(UuidParseSnafu {})?;
Ok(Some(uuid))
}
}
/// Parse big-endian UUID from protobuf and fail if protobuf did not provide data.
pub fn parse_uuid_required(bytes: &[u8]) -> Result<Uuid> {
parse_uuid(bytes)?.context(UuidRequiredSnafu {})
}
/// Parse [`ParquetFilePath`](iox_object_store::ParquetFilePath) from protobuf.
pub fn parse_dirs_and_filename(proto: &proto::Path) -> Result<ParquetFilePath> {
let dirs_and_file_name = DirsAndFileName {
directories: proto
.directories
.iter()
.map(|s| PathPart::from(&s[..]))
.collect(),
file_name: Some(PathPart::from(&proto.file_name[..])),
};
ParquetFilePath::from_relative_dirs_and_file_name(&dirs_and_file_name)
.context(InvalidParquetFilePathSnafu)
}
/// Store [`ParquetFilePath`](iox_object_store::ParquetFilePath) as protobuf.
pub fn unparse_dirs_and_filename(path: &ParquetFilePath) -> proto::Path {
let path = path.relative_dirs_and_file_name();
proto::Path {
directories: path
.directories
.iter()
.map(|part| part.encoded().to_string())
.collect(),
file_name: path
.file_name
.as_ref()
.map(|part| part.encoded().to_string())
.unwrap_or_default(),
}
}
/// Parse timestamp from protobuf.
pub fn parse_timestamp(ts: &Option<generated_types::google::protobuf::Timestamp>) -> Result<Time> {
let ts: generated_types::google::protobuf::Timestamp =
ts.as_ref().context(DateTimeRequiredSnafu)?.clone();
let ts = ts.try_into().context(DateTimeParseSnafu)?;
Ok(Time::from_date_time(ts))
}
/// Parse encoding from protobuf.
pub fn parse_encoding(encoding: i32) -> Result<proto::transaction::Encoding> {
let parsed = proto::transaction::Encoding::from_i32(encoding)
.context(EncodingParseSnafu { data: encoding })?;
if parsed == proto::transaction::Encoding::Unspecified {
Err(Error::EncodingParseError { data: encoding })
} else {
Ok(parsed)
}
}

View File

@ -1,37 +0,0 @@
use std::fmt::Display;
use generated_types::influxdata::iox::preserved_catalog::v1 as proto;
use uuid::Uuid;
/// Type of catalog file.
#[derive(Debug, Clone, Copy)]
pub enum FileType {
/// Ordinary transaction with delta encoding.
Transaction,
/// Checkpoints with full state.
Checkpoint,
}
impl FileType {
/// Get encoding that should be used for this file.
pub fn encoding(&self) -> proto::transaction::Encoding {
match self {
Self::Transaction => proto::transaction::Encoding::Delta,
Self::Checkpoint => proto::transaction::Encoding::Full,
}
}
}
/// Key to address transactions.
#[derive(Clone, Debug, Copy)]
pub struct TransactionKey {
pub revision_counter: u64,
pub uuid: Uuid,
}
impl Display for TransactionKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.revision_counter, self.uuid)
}
}

View File

@ -1,21 +0,0 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod cleanup;
pub mod core;
pub mod dump;
pub mod interface;
mod internals;
pub mod prune;
pub mod rebuild;
pub mod test_helpers;
pub use crate::internals::proto_io::Error as ProtoIOError;
pub use crate::internals::proto_parse::Error as ProtoParseError;

View File

@ -1,283 +0,0 @@
//! Tooling to remove parts of the preserved catalog that are no longer needed.
use std::{collections::BTreeMap, sync::Arc};
use futures::TryStreamExt;
use iox_object_store::{IoxObjectStore, TransactionFilePath};
use iox_time::Time;
use object_store::{ObjectStoreApi, ObjectStoreImpl};
use snafu::{ResultExt, Snafu};
use crate::{
internals::{proto_io::load_transaction_proto, proto_parse::parse_timestamp},
ProtoIOError, ProtoParseError,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error during store read operation: {}", source))]
Read {
source: <ObjectStoreImpl as ObjectStoreApi>::Error,
},
#[snafu(display("Error during store delete operation: {}", source))]
Delete {
source: <ObjectStoreImpl as ObjectStoreApi>::Error,
},
#[snafu(display("Error during protobuf IO: {}", source))]
ProtobufIOError { source: ProtoIOError },
#[snafu(display("Internal: Error while parsing protobuf: {}", source))]
ProtobufParseError { source: ProtoParseError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Prune history of [`PreservedCatalog`](crate::core::PreservedCatalog).
///
/// This deletes all transactions and checkpoints that were started prior to `before`. Note that this only deletes data
/// that is safe to delete when time travel to `before` is allowed. For example image the following transactions:
///
/// | Timestamp | What |
/// | --------- | ------------------------- |
/// | C1 | Checkpoint |
/// | T2 | Transaction |
/// | T3, C3 | Transaction + Checkpoint |
/// | T4 | Transaction |
/// | `before` | |
/// | T5 | Transaction |
/// | T6, C6 | Transaction + Checkpoint |
/// | T7 | Transaction |
/// | T9, C9 | Transaction + Checkpoint |
///
/// This will delete the following content: C1, T2, and T3. C3 and T4 cannot be deleted because it is required to
/// recover T5 which is AFTER `before`.
pub async fn prune_history(iox_object_store: Arc<IoxObjectStore>, before: Time) -> Result<()> {
// collect all files so we can quickly filter them later for deletion
// Use a btree-map so we can iterate from oldest to newest revision.
let mut files: BTreeMap<u64, Vec<TransactionFilePath>> = Default::default();
// remember latest checkpoint is <= before
let mut latest_in_prune_range: Option<u64> = None;
let mut stream = iox_object_store
.catalog_transaction_files()
.await
.context(ReadSnafu)?;
while let Some(transaction_file_list) = stream.try_next().await.context(ReadSnafu)? {
for transaction_file_path in transaction_file_list {
if is_checkpoint_or_zero(&transaction_file_path) {
let proto = load_transaction_proto(&iox_object_store, &transaction_file_path)
.await
.context(ProtobufIOSnafu)?;
let start_timestamp =
parse_timestamp(&proto.start_timestamp).context(ProtobufParseSnafu)?;
if start_timestamp <= before {
latest_in_prune_range = Some(
latest_in_prune_range.map_or(transaction_file_path.revision_counter, |x| {
x.max(transaction_file_path.revision_counter)
}),
);
}
}
files
.entry(transaction_file_path.revision_counter)
.and_modify(|known| known.push(transaction_file_path))
.or_insert_with(|| vec![transaction_file_path]);
}
}
if let Some(earliest_keep) = latest_in_prune_range {
for (revision, files) in files {
if revision > earliest_keep {
break;
}
for file in files {
if (file.revision_counter < earliest_keep) || !is_checkpoint_or_zero(&file) {
iox_object_store
.delete_catalog_transaction_file(&file)
.await
.context(DeleteSnafu)?;
}
}
}
}
Ok(())
}
/// Check if given path is represents a checkpoint or is revision zero.
///
/// For both cases this file can be used to start to read a catalog.
fn is_checkpoint_or_zero(path: &TransactionFilePath) -> bool {
path.is_checkpoint() || (path.revision_counter == 0)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use parquet_file::test_utils::make_iox_object_store;
use crate::{
core::PreservedCatalog,
interface::CheckpointData,
test_helpers::{load_ok, make_config, new_empty},
};
use super::*;
#[tokio::test]
async fn test_empty_store() {
let iox_object_store = make_iox_object_store().await;
prune_history(iox_object_store, Time::from_timestamp_nanos(0))
.await
.unwrap();
}
#[tokio::test]
async fn test_do_delete_wipe_last_checkpoint() {
let config = make_config().await;
new_empty(config.clone()).await;
prune_history(
Arc::clone(&config.iox_object_store),
Time::from_timestamp_nanos(0),
)
.await
.unwrap();
load_ok(config).await.unwrap();
}
#[tokio::test]
async fn test_complex_1() {
let time = Arc::new(iox_time::MockProvider::new(Time::from_timestamp(0, 32)));
let config = make_config()
.await
.with_time_provider(Arc::<iox_time::MockProvider>::clone(&time));
let iox_object_store = &config.iox_object_store;
let catalog = new_empty(config.clone()).await;
create_transaction(&catalog).await;
create_transaction_and_checkpoint(&catalog).await;
let before = time.inc(Duration::from_secs(21));
time.inc(Duration::from_secs(1));
create_transaction(&catalog).await;
prune_history(Arc::clone(iox_object_store), before)
.await
.unwrap();
assert_eq!(
known_revisions(iox_object_store).await,
vec![(2, true), (3, false)],
);
}
#[tokio::test]
async fn test_complex_2() {
let time = Arc::new(iox_time::MockProvider::new(Time::from_timestamp(0, 32)));
let config = make_config()
.await
.with_time_provider(Arc::<iox_time::MockProvider>::clone(&time));
let iox_object_store = &config.iox_object_store;
let catalog = new_empty(config.clone()).await;
create_transaction(&catalog).await;
create_transaction_and_checkpoint(&catalog).await;
create_transaction(&catalog).await;
let before = time.inc(Duration::from_secs(25));
time.inc(Duration::from_secs(1));
create_transaction(&catalog).await;
create_transaction_and_checkpoint(&catalog).await;
create_transaction(&catalog).await;
prune_history(Arc::clone(iox_object_store), before)
.await
.unwrap();
assert_eq!(
known_revisions(iox_object_store).await,
vec![
(2, true),
(3, false),
(4, false),
(5, false),
(5, true),
(6, false)
],
);
}
#[tokio::test]
async fn test_keep_all() {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let catalog = new_empty(config.clone()).await;
create_transaction(&catalog).await;
create_transaction_and_checkpoint(&catalog).await;
create_transaction(&catalog).await;
let before = config.time_provider.now() - Duration::from_secs(1_000);
prune_history(Arc::clone(iox_object_store), before)
.await
.unwrap();
assert_eq!(
known_revisions(iox_object_store).await,
vec![(0, false), (1, false), (2, false), (2, true), (3, false)],
);
}
async fn create_transaction(catalog: &PreservedCatalog) {
let t = catalog.open_transaction().await;
t.commit().await.unwrap();
}
async fn create_transaction_and_checkpoint(catalog: &PreservedCatalog) {
let t = catalog.open_transaction().await;
let ckpt_handle = t.commit().await.unwrap();
ckpt_handle
.create_checkpoint(CheckpointData {
files: Default::default(),
delete_predicates: Default::default(),
})
.await
.unwrap();
}
async fn known_revisions(iox_object_store: &IoxObjectStore) -> Vec<(u64, bool)> {
let mut revisions = iox_object_store
.catalog_transaction_files()
.await
.unwrap()
.map_ok(|files| {
files
.into_iter()
.map(|f| (f.revision_counter, f.is_checkpoint()))
.collect::<Vec<(u64, bool)>>()
})
.try_concat()
.await
.unwrap();
revisions.sort_unstable();
revisions
}
}

View File

@ -1,444 +0,0 @@
//! Contains code to rebuild a catalog from files.
use std::{fmt::Debug, sync::Arc};
use futures::TryStreamExt;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use observability_deps::tracing::error;
use parquet_file::metadata::IoxParquetMetaData;
use snafu::{ResultExt, Snafu};
use crate::{
core::{PreservedCatalog, PreservedCatalogConfig},
interface::CatalogParquetInfo,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Cannot create new empty catalog: {}", source))]
NewEmptyFailure { source: crate::core::Error },
#[snafu(display("Cannot read store: {}", source))]
ReadFailure { source: object_store::Error },
#[snafu(display("Cannot read IOx metadata from parquet file ({:?}): {}", path, source))]
MetadataReadFailure {
source: parquet_file::metadata::Error,
path: ParquetFilePath,
},
#[snafu(display("No row groups from parquet file ({:?})", path))]
NoRowGroups { path: ParquetFilePath },
#[snafu(display("Cannot add file to transaction: {}", source))]
FileRecordFailure {
source: crate::interface::CatalogStateAddError,
},
#[snafu(display("Cannot commit transaction: {}", source))]
CommitFailure { source: crate::core::Error },
#[snafu(display("Cannot create checkpoint: {}", source))]
CheckpointFailure { source: crate::core::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Creates a new catalog from parquet files.
///
/// Users are required to [wipe](PreservedCatalog::wipe) the existing catalog before running this
/// procedure (**after creating a backup!**).
///
/// # Limitations
/// Compared to an intact catalog, wiping a catalog and rebuilding it from Parquet files has the
/// following drawbacks:
///
/// - **Garbage Susceptibility:** The rebuild process will stumble over garbage parquet files (i.e.
/// files being present in the object store but that were not part of the catalog). For files
/// that were not written by IOx it will likely report [`Error::MetadataReadFailure`].
/// - **No Removals:** The rebuild system cannot recover the fact that files were removed from the
/// catalog during some transaction. This might not always be an issue due to "deduplicate while
/// read"-logic in the query engine, but also might have unwanted side effects (e.g. performance
/// issues).
/// - **Single Transaction:** All files are added in a single transaction. Hence time-traveling is
/// NOT possible using the resulting catalog.
/// - **Fork Detection:** The rebuild procedure does NOT detect forks (i.e. files written for the
/// same server ID by multiple IOx instances).
///
/// # Error Handling
/// This routine will fail if:
///
/// - **Metadata Read Failure:** There is a parquet file with metadata that cannot be read. Set
/// `ignore_metadata_read_failure` to `true` to ignore these cases.
pub async fn rebuild_catalog(
config: PreservedCatalogConfig,
ignore_metadata_read_failure: bool,
) -> Result<PreservedCatalog> {
// collect all revisions from parquet files
let files = collect_files(&config.iox_object_store, ignore_metadata_read_failure).await?;
// create new empty catalog
let catalog = PreservedCatalog::new_empty(config.clone())
.await
.context(NewEmptyFailureSnafu)?;
// create single transaction with all files
if !files.is_empty() {
let mut transaction = catalog.open_transaction().await;
for info in files {
transaction.add_parquet(&info);
}
transaction.commit().await.context(CheckpointFailureSnafu)?;
}
Ok(catalog)
}
/// Collect all files for the database managed by the given IoxObjectStore.
///
/// Returns a vector of (file, size, metadata) tuples.
///
/// The file listing is recursive.
async fn collect_files(
iox_object_store: &IoxObjectStore,
ignore_metadata_read_failure: bool,
) -> Result<Vec<CatalogParquetInfo>> {
let mut stream = iox_object_store
.parquet_files()
.await
.context(ReadFailureSnafu)?;
let mut files = vec![];
while let Some(paths) = stream.try_next().await.context(ReadFailureSnafu)? {
for path in paths {
match read_parquet(iox_object_store, &path).await {
Ok((file_size_bytes, metadata)) => {
files.push(CatalogParquetInfo {
path,
file_size_bytes,
metadata,
});
}
Err(e @ Error::MetadataReadFailure { .. }) if ignore_metadata_read_failure => {
error!("error while reading metdata from parquet, ignoring: {}", e);
continue;
}
Err(e) => return Err(e),
};
}
}
Ok(files)
}
/// Read Parquet and IOx metadata from given path.
async fn read_parquet(
iox_object_store: &IoxObjectStore,
path: &ParquetFilePath,
) -> Result<(usize, Arc<IoxParquetMetaData>)> {
let data = iox_object_store
.get_parquet_file(path)
.await
.context(ReadFailureSnafu)?
.bytes()
.await
.context(ReadFailureSnafu)?;
let file_size_bytes = data.len();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(Arc::new(data))
.context(MetadataReadFailureSnafu { path: path.clone() })?; // Error reading metadata
if parquet_metadata.is_none() {
return NoRowGroupsSnafu { path: path.clone() }.fail();
} // No data and hence no metadata
let parquet_metadata = parquet_metadata.unwrap();
// validate IOxMetadata
parquet_metadata
.decode()
.context(MetadataReadFailureSnafu { path: path.clone() })?
.read_iox_metadata_old()
.context(MetadataReadFailureSnafu { path: path.clone() })?;
Ok((file_size_bytes, Arc::new(parquet_metadata)))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
core::PreservedCatalog,
test_helpers::{exists, make_config, new_empty, TestCatalogState},
};
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use iox_time::Time;
use parquet::arrow::ArrowWriter;
use parquet_file::{
metadata::IoxMetadataOld,
storage::{MemWriter, Storage},
test_utils::{create_partition_and_database_checkpoint, make_record_batch, TestSize},
};
use tokio_stream::StreamExt;
#[tokio::test]
async fn test_rebuild_successfull() {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let db_name = Arc::from("db1");
// build catalog with some data
let catalog = new_empty(config.clone()).await;
let mut state = TestCatalogState::default();
{
let mut transaction = catalog.open_transaction().await;
let info = create_parquet_file(&db_name, iox_object_store, ChunkId::new_test(0)).await;
state.insert(info.clone()).unwrap();
transaction.add_parquet(&info);
let info = create_parquet_file(&db_name, iox_object_store, ChunkId::new_test(1)).await;
state.insert(info.clone()).unwrap();
transaction.add_parquet(&info);
transaction.commit().await.unwrap();
}
{
// empty transaction
let transaction = catalog.open_transaction().await;
transaction.commit().await.unwrap();
}
{
let mut transaction = catalog.open_transaction().await;
let info = create_parquet_file(&db_name, iox_object_store, ChunkId::new_test(2)).await;
state.insert(info.clone()).unwrap();
transaction.add_parquet(&info);
transaction.commit().await.unwrap();
}
// store catalog state
let paths_expected = {
let mut tmp: Vec<_> = state.files().map(|info| info.path.clone()).collect();
tmp.sort();
tmp
};
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(iox_object_store).await.unwrap();
// rebuild
rebuild_catalog(config.clone(), false).await.unwrap();
let (catalog, state) = PreservedCatalog::load(config, TestCatalogState::default())
.await
.unwrap()
.unwrap();
// check match
let paths_actual = {
let mut tmp: Vec<_> = state.files().map(|info| info.path.clone()).collect();
tmp.sort();
tmp
};
assert_eq!(paths_actual, paths_expected);
assert_eq!(catalog.revision_counter(), 1);
}
#[tokio::test]
async fn test_rebuild_empty() {
let config = make_config().await;
// build empty catalog
let catalog = new_empty(config.clone()).await;
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&config.iox_object_store)
.await
.unwrap();
// rebuild
rebuild_catalog(config.clone(), false).await.unwrap();
let (catalog, state) = PreservedCatalog::load(config, TestCatalogState::default())
.await
.unwrap()
.unwrap();
// check match
assert!(state.files().next().is_none());
assert_eq!(catalog.revision_counter(), 0);
}
#[tokio::test]
async fn test_rebuild_no_metadata() {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let db_name = Arc::from("db1");
// build catalog with same data
let catalog = new_empty(config.clone()).await;
// file w/o metadata
create_parquet_file_without_metadata(&db_name, iox_object_store, ChunkId::new_test(0))
.await;
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(iox_object_store).await.unwrap();
// rebuild (do not ignore errors)
let res = rebuild_catalog(config.clone(), false).await;
assert!(dbg!(res.unwrap_err().to_string())
.starts_with("Cannot read IOx metadata from parquet file"));
// rebuild (ignore errors)
let catalog = rebuild_catalog(config.clone(), true).await.unwrap();
let (catalog, state) =
PreservedCatalog::load(catalog.config(), TestCatalogState::default())
.await
.unwrap()
.unwrap();
assert!(state.files().next().is_none());
assert_eq!(catalog.revision_counter(), 0);
}
#[tokio::test]
async fn test_rebuild_creates_no_checkpoint() {
// the rebuild method will create a catalog with the following transactions:
// 1. an empty one (done by `PreservedCatalog::new_empty`)
// 2. an "add all the files"
//
// There is no real need to create a checkpoint in this case. So here we delete all
// transaction files and then check that rebuilt catalog will be gone afterwards. Note the
// difference to the `test_rebuild_empty` case where we can indeed proof the existence of a
// catalog (even though it is empty aka has no files).
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
// build catalog with some data (2 transactions + initial empty one)
let catalog = new_empty(config.clone()).await;
assert_eq!(catalog.revision_counter(), 0);
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(iox_object_store).await.unwrap();
// rebuild
rebuild_catalog(config.clone(), false).await.unwrap();
// delete transaction files
let paths = iox_object_store
.catalog_transaction_files()
.await
.unwrap()
.try_concat()
.await
.unwrap();
let mut deleted = false;
for path in paths {
if !path.is_checkpoint() {
iox_object_store
.delete_catalog_transaction_file(&path)
.await
.unwrap();
deleted = true;
}
}
assert!(deleted);
// the catalog should be gone because there should have been no checkpoint files remaining
assert!(!exists(iox_object_store).await);
}
pub async fn create_parquet_file(
db_name: &Arc<str>,
iox_object_store: &Arc<IoxObjectStore>,
chunk_id: ChunkId,
) -> CatalogParquetInfo {
let table_name = Arc::from("table1");
let partition_key = Arc::from("part1");
let (record_batches, _schema, _column_summaries, _num_rows) =
make_record_batch("foo", TestSize::Full);
let storage = Storage::new(Arc::clone(iox_object_store));
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&table_name),
Arc::clone(&partition_key),
);
let metadata = IoxMetadataOld {
creation_timestamp: Time::from_timestamp_nanos(0),
table_name: Arc::clone(&table_name),
partition_key: Arc::clone(&partition_key),
chunk_id,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Time::from_timestamp_nanos(0),
time_of_last_write: Time::from_timestamp_nanos(0),
chunk_order: ChunkOrder::new(5).unwrap(),
sort_key: None,
};
let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches));
let (path, file_size_bytes, metadata) = storage
.write_to_object_store(
ChunkAddr {
db_name: Arc::clone(db_name),
table_name,
partition_key,
chunk_id,
},
stream,
metadata,
)
.await
.unwrap()
.unwrap();
CatalogParquetInfo {
path,
file_size_bytes,
metadata: Arc::new(metadata),
}
}
pub async fn create_parquet_file_without_metadata(
db_name: &Arc<str>,
iox_object_store: &Arc<IoxObjectStore>,
chunk_id: ChunkId,
) -> (ParquetFilePath, IoxParquetMetaData) {
let (record_batches, schema, _column_summaries, _num_rows) =
make_record_batch("foo", TestSize::Full);
let mut stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches));
let mem_writer = MemWriter::default();
{
let mut writer =
ArrowWriter::try_new(mem_writer.clone(), Arc::clone(schema.inner()), None).unwrap();
while let Some(batch) = stream.next().await {
let batch = batch.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
} // drop the reference to the MemWriter that the SerializedFileWriter has
let data = mem_writer.into_inner().unwrap();
let md = IoxParquetMetaData::from_file_bytes(Arc::new(data.clone()))
.unwrap()
.unwrap();
let storage = Storage::new(Arc::clone(iox_object_store));
let chunk_addr = ChunkAddr {
db_name: Arc::clone(db_name),
table_name: Arc::from("table1"),
partition_key: Arc::from("part1"),
chunk_id,
};
let path = ParquetFilePath::new_old_gen(&chunk_addr);
storage.to_object_store(data, &path).await.unwrap();
(path, md)
}
}

View File

@ -1,574 +0,0 @@
use crate::{
core::{PreservedCatalog, PreservedCatalogConfig},
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
CheckpointData, ChunkAddrWithoutDatabase,
},
internals::{
proto_io::{load_transaction_proto, store_transaction_proto},
types::TransactionKey,
},
};
use data_types::delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar};
use data_types::{
chunk_metadata::{ChunkAddr, ChunkId},
timestamp::TimestampRange,
};
use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
use parquet_file::{
chunk::ParquetChunk,
test_utils::{generator::ChunkGenerator, make_iox_object_store},
};
use snafu::ResultExt;
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap, HashSet,
},
fmt::Debug,
sync::Arc,
};
/// Metrics need a database name, but what the database name is doesn't matter for what's tested
/// in this crate. This is an arbitrary name that can be used wherever a database name is needed.
pub const DB_NAME: &str = "db1";
#[derive(Clone, Debug, Default)]
pub struct Table {
pub partitions: HashMap<Arc<str>, Partition>,
}
#[derive(Clone, Debug, Default)]
pub struct Partition {
pub chunks: HashMap<ChunkId, Chunk>,
}
#[derive(Clone, Debug)]
pub struct Chunk {
pub parquet_info: CatalogParquetInfo,
pub delete_predicates: Vec<Arc<DeletePredicate>>,
}
/// In-memory catalog state, for testing.
#[derive(Clone, Debug, Default)]
pub struct TestCatalogState {
/// Map of all parquet files that are currently registered.
pub tables: HashMap<Arc<str>, Table>,
}
impl TestCatalogState {
/// Simple way to create [`CheckpointData`].
pub fn checkpoint_data(&self) -> CheckpointData {
CheckpointData {
files: self
.files()
.map(|info| (info.path.clone(), info.clone()))
.collect(),
delete_predicates: self.delete_predicates(),
}
}
/// Returns an iterator over the files in this catalog state
pub fn files(&self) -> impl Iterator<Item = &CatalogParquetInfo> {
self.tables.values().flat_map(|table| {
table
.partitions
.values()
.flat_map(|partition| partition.chunks.values().map(|chunk| &chunk.parquet_info))
})
}
/// Return an iterator over all predicates in this catalog.
pub fn delete_predicates(
&self,
) -> HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> {
let mut predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> =
Default::default();
for (table_name, table) in &self.tables {
for (partition_key, partition) in &table.partitions {
for (chunk_id, chunk) in &partition.chunks {
for predicate in &chunk.delete_predicates {
let pred_chunk_closure = || ChunkAddrWithoutDatabase {
table_name: Arc::clone(table_name),
partition_key: Arc::clone(partition_key),
chunk_id: *chunk_id,
};
predicates
.entry(Arc::clone(predicate))
.and_modify(|chunks| {
chunks.insert(pred_chunk_closure());
})
.or_insert_with(|| {
IntoIterator::into_iter([pred_chunk_closure()]).collect()
});
}
}
}
}
predicates
}
/// Inserts a file into this catalog state
pub fn insert(&mut self, info: CatalogParquetInfo) -> Result<(), CatalogStateAddError> {
use crate::interface::MetadataExtractFailedSnafu;
let iox_md = info
.metadata
.decode()
.context(MetadataExtractFailedSnafu {
path: info.path.clone(),
})?
.read_iox_metadata_old()
.context(MetadataExtractFailedSnafu {
path: info.path.clone(),
})?;
let table = self.tables.entry(iox_md.table_name).or_default();
let partition = table.partitions.entry(iox_md.partition_key).or_default();
match partition.chunks.entry(iox_md.chunk_id) {
Occupied(o) => {
return Err(CatalogStateAddError::ParquetFileAlreadyExists {
path: o.get().parquet_info.path.clone(),
});
}
Vacant(v) => v.insert(Chunk {
parquet_info: info,
delete_predicates: vec![],
}),
};
Ok(())
}
}
impl CatalogState for TestCatalogState {
fn add(
&mut self,
_iox_object_store: Arc<IoxObjectStore>,
info: CatalogParquetInfo,
) -> Result<(), CatalogStateAddError> {
self.insert(info)
}
fn remove(&mut self, path: &ParquetFilePath) -> Result<(), CatalogStateRemoveError> {
let partitions = self
.tables
.values_mut()
.flat_map(|table| table.partitions.values_mut());
let mut removed = 0;
for partition in partitions {
let to_remove: Vec<_> = partition
.chunks
.iter()
.filter_map(|(id, chunk)| {
if &chunk.parquet_info.path == path {
return Some(*id);
}
None
})
.collect();
for id in to_remove {
removed += 1;
partition.chunks.remove(&id).unwrap();
}
}
match removed {
0 => Err(CatalogStateRemoveError::ParquetFileDoesNotExist { path: path.clone() }),
_ => Ok(()),
}
}
fn delete_predicate(
&mut self,
predicate: Arc<DeletePredicate>,
chunks: Vec<ChunkAddrWithoutDatabase>,
) {
for addr in chunks {
if let Some(chunk) = self
.tables
.get_mut(&addr.table_name)
.and_then(|table| table.partitions.get_mut(&addr.partition_key))
.and_then(|partition| partition.chunks.get_mut(&addr.chunk_id))
{
chunk.delete_predicates.push(Arc::clone(&predicate));
}
}
}
}
/// Test whether the catalog exists or not, expecting the operation to succeed
pub async fn exists(iox_object_store: &Arc<IoxObjectStore>) -> bool {
PreservedCatalog::exists(iox_object_store).await.unwrap()
}
/// Load a `PreservedCatalog` and unwrap, expecting the operation to succeed
pub async fn load_ok(
config: PreservedCatalogConfig,
) -> Option<(PreservedCatalog, TestCatalogState)> {
PreservedCatalog::load(config, TestCatalogState::default())
.await
.unwrap()
}
/// Load a `PreservedCatalog` and unwrap the error, expecting the operation to fail
pub async fn load_err(config: PreservedCatalogConfig) -> crate::core::Error {
PreservedCatalog::load(config, TestCatalogState::default())
.await
.unwrap_err()
}
/// Create a new empty catalog with the TestCatalogState, expecting the operation to succeed
pub async fn new_empty(config: PreservedCatalogConfig) -> PreservedCatalog {
PreservedCatalog::new_empty(config).await.unwrap()
}
/// Break preserved catalog by moving one of the transaction files into a weird unknown version.
pub async fn break_catalog_with_weird_version(catalog: &PreservedCatalog) {
let tkey = get_tkey(catalog);
let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid);
let mut proto = load_transaction_proto(&catalog.iox_object_store(), &path)
.await
.unwrap();
proto.version = 42;
store_transaction_proto(&catalog.iox_object_store(), &path, &proto)
.await
.unwrap();
}
/// Helper function to ensure that guards don't leak into the future state machine.
fn get_tkey(catalog: &PreservedCatalog) -> TransactionKey {
let revision_counter = catalog.revision_counter();
let uuid = catalog.revision_uuid();
TransactionKey {
revision_counter,
uuid,
}
}
/// Torture-test implementations for [`CatalogState`].
///
/// A function to extract [`CheckpointData`] from the [`CatalogState`] must be provided.
pub async fn assert_catalog_state_implementation<S, F>(mut state: S, f: F)
where
S: CatalogState + Debug + Send + Sync,
F: Fn(&S) -> CheckpointData + Send,
{
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
// The expected state of the catalog
let mut expected_chunks: HashMap<u32, ParquetChunk> = HashMap::new();
let mut expected_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> =
HashMap::new();
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add files
{
for chunk_id in 1..5 {
let (chunk, _) = generator.generate_id(chunk_id).await.unwrap();
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
expected_chunks.insert(chunk_id, chunk);
}
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// remove files
{
let chunk = expected_chunks.remove(&1).unwrap();
state.remove(chunk.path()).unwrap();
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add and remove in the same transaction
{
let (chunk, _) = generator.generate_id(5).await.unwrap();
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
state.remove(chunk.path()).unwrap();
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// remove and add in the same transaction
{
let chunk = expected_chunks.get(&3).unwrap();
state.remove(chunk.path()).unwrap();
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(chunk),
)
.unwrap();
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add, remove, add in the same transaction
{
let (chunk, _) = generator.generate_id(6).await.unwrap();
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
state.remove(chunk.path()).unwrap();
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
expected_chunks.insert(6, chunk);
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// remove, add, remove in same transaction
{
let chunk = expected_chunks.remove(&4).unwrap();
state.remove(chunk.path()).unwrap();
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
state.remove(chunk.path()).unwrap();
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// error handling, no real opt
{
// TODO: Error handling should disambiguate between chunk collision and filename collision
// chunk with same ID already exists (should also not change the metadata)
let (chunk, _) = generator.generate_id(2).await.unwrap();
let err = state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap_err();
assert!(matches!(
err,
CatalogStateAddError::ParquetFileAlreadyExists { .. }
));
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// error handling, still something works
{
// already exists (should also not change the metadata)
let (chunk, _) = generator.generate_id(2).await.unwrap();
let err = state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap_err();
assert!(matches!(
err,
CatalogStateAddError::ParquetFileAlreadyExists { .. }
));
// this transaction will still work
let (chunk, _) = generator.generate_id(7).await.unwrap();
let info = CatalogParquetInfo::from_chunk(&chunk);
state
.add(Arc::clone(iox_object_store), info.clone())
.unwrap();
expected_chunks.insert(7, chunk);
// recently added
let err = state.add(Arc::clone(iox_object_store), info).unwrap_err();
assert!(matches!(
err,
CatalogStateAddError::ParquetFileAlreadyExists { .. }
));
// this still works
let chunk = expected_chunks.remove(&7).unwrap();
state.remove(chunk.path()).unwrap();
// recently removed
let err = state.remove(chunk.path()).unwrap_err();
assert!(matches!(
err,
CatalogStateRemoveError::ParquetFileDoesNotExist { .. }
));
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add predicates
{
// create two chunks that we can use for delete predicate
let (chunk, metadata) = generator.generate_id(8).await.unwrap();
let chunk_addr_1 = ChunkAddr::new(generator.partition(), metadata.chunk_id);
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
expected_chunks.insert(8, chunk);
let (chunk, metadata) = generator.generate_id(9).await.unwrap();
let chunk_addr_2 = ChunkAddr::new(generator.partition(), metadata.chunk_id);
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
expected_chunks.insert(9, chunk);
// first predicate used only a single chunk
let predicate_1 = create_delete_predicate(1);
let chunks_1 = vec![chunk_addr_1.clone().into()];
state.delete_predicate(Arc::clone(&predicate_1), chunks_1.clone());
expected_predicates.insert(predicate_1, chunks_1.into_iter().collect());
// second predicate uses both chunks (but not the older chunks)
let predicate_2 = create_delete_predicate(2);
let chunks_2 = vec![chunk_addr_1.into(), chunk_addr_2.into()];
state.delete_predicate(Arc::clone(&predicate_2), chunks_2.clone());
expected_predicates.insert(predicate_2, chunks_2.into_iter().collect());
// chunks created afterwards are unaffected
let (chunk, _) = generator.generate_id(10).await.unwrap();
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
expected_chunks.insert(10, chunk);
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// removing a chunk will also remove its predicates
{
let chunk = expected_chunks.remove(&8).unwrap();
state.remove(chunk.path()).unwrap();
expected_predicates = expected_predicates
.into_iter()
.filter_map(|(predicate, chunks)| {
let chunks: HashSet<_> = chunks
.into_iter()
.filter(|addr| addr.chunk_id != ChunkId::new_test(8))
.collect();
(!chunks.is_empty()).then(|| (predicate, chunks))
})
.collect();
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// Registering predicates for unknown chunks is just ignored because chunks might been in "persisting" intermediate
// state while the predicate was reported.
{
let predicate = create_delete_predicate(1);
let chunks = vec![ChunkAddrWithoutDatabase {
table_name: Arc::from("some_table"),
partition_key: Arc::from("part"),
chunk_id: ChunkId::new_test(1000),
}];
state.delete_predicate(Arc::clone(&predicate), chunks);
}
assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
}
/// Assert that tracked files and their linked metadata are equal.
fn assert_checkpoint<S, F>(
state: &S,
f: &F,
expected_chunks: &HashMap<u32, ParquetChunk>,
expected_predicates: &HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>>,
) where
F: Fn(&S) -> CheckpointData,
{
let data: CheckpointData = f(state);
let actual_files = data.files;
let sorted_keys_actual = get_sorted_keys(actual_files.keys());
let sorted_keys_expected = get_sorted_keys(expected_chunks.values().map(|chunk| chunk.path()));
assert_eq!(sorted_keys_actual, sorted_keys_expected);
for chunk in expected_chunks.values() {
let md_actual = &actual_files[chunk.path()].metadata;
let md_actual = md_actual.decode().unwrap();
let md_expected = chunk.parquet_metadata().decode().unwrap();
let iox_md_actual = md_actual.read_iox_metadata_old().unwrap();
let iox_md_expected = md_expected.read_iox_metadata_old().unwrap();
assert_eq!(iox_md_actual, iox_md_expected);
let schema_actual = md_actual.read_schema().unwrap();
let schema_expected = md_expected.read_schema().unwrap();
assert_eq!(schema_actual, schema_expected);
let stats_actual = md_actual.read_statistics(&schema_actual).unwrap();
let stats_expected = md_expected.read_statistics(&schema_expected).unwrap();
assert_eq!(stats_actual, stats_expected);
}
assert_eq!(&data.delete_predicates, expected_predicates);
}
/// Get a sorted list of keys from an iterator.
fn get_sorted_keys<'a>(
keys: impl Iterator<Item = &'a ParquetFilePath>,
) -> Vec<&'a ParquetFilePath> {
let mut keys: Vec<_> = keys.collect();
keys.sort();
keys
}
/// Helper to create a simple delete predicate.
pub fn create_delete_predicate(value: i64) -> Arc<DeletePredicate> {
Arc::new(DeletePredicate {
range: TimestampRange::new(11, 22),
exprs: vec![DeleteExpr::new(
"foo".to_string(),
Op::Eq,
Scalar::I64(value),
)],
})
}
/// Creates a new [`PreservedCatalogConfig`] with an in-memory object store
pub async fn make_config() -> PreservedCatalogConfig {
let iox_object_store = make_iox_object_store().await;
let time_provider = Arc::new(iox_time::SystemProvider::new());
PreservedCatalogConfig::new(iox_object_store, DB_NAME.to_string(), time_provider)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_catalog_state() {
assert_catalog_state_implementation(
TestCatalogState::default(),
TestCatalogState::checkpoint_data,
)
.await;
}
}

View File

@ -23,7 +23,6 @@ num_cpus = "1.13.0"
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
parquet_catalog = { path = "../parquet_catalog" }
persistence_windows = { path = "../persistence_windows" }
query = { path = "../query" }
rand = "0.8.3"