Merge pull request #4131 from influxdata/cn/delete-old

feat: Clean up old parquet files in the catalog and object storage
pull/24376/head
kodiakhq[bot] 2022-03-29 12:37:06 +00:00 committed by GitHub
commit 99b6be9c2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 548 additions and 56 deletions

View File

@ -867,9 +867,8 @@ mod tests {
use super::*;
use arrow_util::assert_batches_sorted_eq;
use data_types2::{KafkaPartition, NamespaceId, ParquetFileParams, SequenceNumber};
use futures::{stream, StreamExt, TryStreamExt};
use iox_tests::util::TestCatalog;
use object_store::path::Path;
use object_store::ObjectStoreTestConvenience;
use querier::{
cache::CatalogCache,
chunk::{collect_read_filter, ParquetChunkAdapter},
@ -1500,6 +1499,7 @@ mod tests {
ParquetFile {
id: ParquetFileId::new(0),
sequencer_id: SequencerId::new(0),
namespace_id: NamespaceId::new(0),
table_id: TableId::new(0),
partition_id: PartitionId::new(0),
object_store_id: Uuid::new_v4(),
@ -1692,6 +1692,7 @@ mod tests {
let p1 = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: table.id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
@ -1814,16 +1815,6 @@ mod tests {
assert_eq!(actual_pf2_tombstones, &[t3.id, t4.id]);
}
async fn list_all(object_store: &DynObjectStore) -> Result<Vec<Path>, object_store::Error> {
object_store
.list(None)
.await?
.map_ok(|v| stream::iter(v).map(Ok))
.try_flatten()
.try_collect()
.await
}
#[tokio::test]
async fn persist_adds_to_object_store() {
let catalog = TestCatalog::new();
@ -1897,7 +1888,7 @@ mod tests {
.await
.unwrap();
let object_store_files = list_all(&*compactor.object_store).await.unwrap();
let object_store_files = compactor.object_store.list_all().await.unwrap();
assert_eq!(object_store_files.len(), 1);
}
@ -2004,6 +1995,7 @@ mod tests {
// Prepare metadata in form of ParquetFileParams to get added with tombstone
let parquet = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: table.id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),

View File

@ -0,0 +1,398 @@
//! Clean up parquet files from object storage and their associated entries in the catalog that are
//! no longer needed because they've been compacted and they're old enough to no longer be used by
//! any queriers.
use data_types2::Timestamp;
use iox_catalog::interface::Catalog;
use iox_object_store::ParquetFilePath;
use object_store::DynObjectStore;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use time::TimeProvider;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Error while deleting catalog records {}", source))]
DeletingCatalogRecords {
source: iox_catalog::interface::Error,
},
#[snafu(display("Error(s) while deleting object store files: {:#?}", sources))]
DeletingObjectStoreFiles { sources: Vec<object_store::Error> },
}
/// A specialized `Result` for garbage collection errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Information needed to clean up old parquet files from object storage and their entries in the
/// catalog
pub struct GarbageCollector {
/// Object store where parquet files should be cleaned up
object_store: Arc<DynObjectStore>,
/// The global catalog for parquet files
catalog: Arc<dyn Catalog>,
/// Time provider for all activities in this garbage collector
pub time_provider: Arc<dyn TimeProvider>,
}
impl GarbageCollector {
/// Initialize the Garbage Collector
pub fn new(catalog: Arc<dyn Catalog>, object_store: Arc<DynObjectStore>) -> Self {
let time_provider = catalog.time_provider();
Self {
catalog,
object_store,
time_provider,
}
}
/// Perform a pass of garbage collection, querying the catalog for all files marked to be
/// deleted earlier than the specified time. Remove the catalog entries, then remove the
/// associated object store files.
/// Meant to be invoked in a background loop.
pub async fn cleanup(&self, older_than: Timestamp) -> Result<()> {
// Make a fake IOx object store to conform to the parquet file
// interface, but note this isn't actually used to find parquet
// paths to write to
use iox_object_store::IoxObjectStore;
let iox_object_store = Arc::new(IoxObjectStore::existing(
Arc::clone(&self.object_store),
IoxObjectStore::root_path_for(&*self.object_store, uuid::Uuid::new_v4()),
));
let deleted_catalog_records = self
.catalog
.repositories()
.await
.parquet_files()
.delete_old(older_than)
.await
.context(DeletingCatalogRecordsSnafu)?;
let mut object_store_errors = Vec::with_capacity(deleted_catalog_records.len());
for catalog_record in deleted_catalog_records {
let path = ParquetFilePath::new_new_gen(
catalog_record.namespace_id,
catalog_record.table_id,
catalog_record.sequencer_id,
catalog_record.partition_id,
catalog_record.object_store_id,
);
if let Err(e) = iox_object_store.delete_parquet_file(&path).await {
object_store_errors.push(e);
}
}
if object_store_errors.is_empty() {
Ok(())
} else {
DeletingObjectStoreFilesSnafu {
sources: object_store_errors,
}
.fail()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types2::{KafkaPartition, ParquetFile, ParquetFileParams, SequenceNumber};
use iox_object_store::ParquetFilePath;
use iox_tests::util::TestCatalog;
use object_store::ObjectStoreTestConvenience;
use std::time::Duration;
use uuid::Uuid;
/// Test helper to put an empty object store file at the expected location for a parquet file
/// tracked by the catalog, but without having to process data because the garbage collector
/// isn't concerned with the contents of the file, only its existence or lack thereof
async fn put_object_store_file(
catalog_record: &ParquetFile,
object_store: Arc<DynObjectStore>,
) {
let bytes = "arbitrary".into();
// Make a fake IOx object store to conform to the parquet file
// interface, but note this isn't actually used to find parquet
// paths to write to
use iox_object_store::IoxObjectStore;
let iox_object_store = Arc::new(IoxObjectStore::existing(
Arc::clone(&object_store),
IoxObjectStore::root_path_for(&*object_store, uuid::Uuid::new_v4()),
));
let path = ParquetFilePath::new_new_gen(
catalog_record.namespace_id,
catalog_record.table_id,
catalog_record.sequencer_id,
catalog_record.partition_id,
catalog_record.object_store_id,
);
iox_object_store
.put_parquet_file(&path, bytes)
.await
.unwrap();
}
#[tokio::test]
async fn nothing_to_delete_is_success() {
let catalog = TestCatalog::new();
let gc = GarbageCollector::new(
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
);
let older_than =
Timestamp::new((gc.time_provider.now() + Duration::from_secs(100)).timestamp_nanos());
gc.cleanup(older_than).await.unwrap();
}
#[tokio::test]
async fn leave_undeleted_files_alone() {
let catalog = TestCatalog::new();
let gc = GarbageCollector::new(
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
);
let older_than =
Timestamp::new((gc.time_provider.now() + Duration::from_secs(100)).timestamp_nanos());
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create("gc_leave_undeleted_files_alone", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = txn
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = txn
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition = txn
.partitions()
.create_or_get("one", sequencer.id, table.id)
.await
.unwrap();
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(140),
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
};
let parquet_file = txn
.parquet_files()
.create(parquet_file_params.clone())
.await
.unwrap();
put_object_store_file(&parquet_file, Arc::clone(&catalog.object_store)).await;
txn.commit().await.unwrap();
gc.cleanup(older_than).await.unwrap();
assert_eq!(
catalog
.catalog
.repositories()
.await
.parquet_files()
.count()
.await
.unwrap(),
1
);
assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1);
}
#[tokio::test]
async fn leave_too_new_files_alone() {
let catalog = TestCatalog::new();
let gc = GarbageCollector::new(
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
);
let older_than =
Timestamp::new((gc.time_provider.now() - Duration::from_secs(100)).timestamp_nanos());
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create("gc_leave_too_new_files_alone", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = txn
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = txn
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition = txn
.partitions()
.create_or_get("one", sequencer.id, table.id)
.await
.unwrap();
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(140),
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
};
let parquet_file = txn
.parquet_files()
.create(parquet_file_params.clone())
.await
.unwrap();
put_object_store_file(&parquet_file, Arc::clone(&catalog.object_store)).await;
txn.parquet_files()
.flag_for_delete(parquet_file.id)
.await
.unwrap();
txn.commit().await.unwrap();
gc.cleanup(older_than).await.unwrap();
assert_eq!(
catalog
.catalog
.repositories()
.await
.parquet_files()
.count()
.await
.unwrap(),
1
);
assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1);
}
#[tokio::test]
async fn remove_old_enough_files() {
let catalog = TestCatalog::new();
let gc = GarbageCollector::new(
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
);
let older_than =
Timestamp::new((gc.time_provider.now() + Duration::from_secs(100)).timestamp_nanos());
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create("gc_remove_old_enough_files", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = txn
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = txn
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition = txn
.partitions()
.create_or_get("one", sequencer.id, table.id)
.await
.unwrap();
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(140),
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
};
let parquet_file = txn
.parquet_files()
.create(parquet_file_params.clone())
.await
.unwrap();
put_object_store_file(&parquet_file, Arc::clone(&catalog.object_store)).await;
txn.parquet_files()
.flag_for_delete(parquet_file.id)
.await
.unwrap();
txn.commit().await.unwrap();
gc.cleanup(older_than).await.unwrap();
assert_eq!(
catalog
.catalog
.repositories()
.await
.parquet_files()
.count()
.await
.unwrap(),
0
);
assert!(catalog.object_store.list_all().await.unwrap().is_empty());
}
}

View File

@ -13,6 +13,7 @@
#![allow(dead_code)]
pub mod compact;
pub mod garbage_collector;
pub mod handler;
pub mod query;
pub mod server;

View File

@ -710,6 +710,8 @@ pub struct ParquetFile {
pub id: ParquetFileId,
/// the sequencer that sequenced writes that went into this file
pub sequencer_id: SequencerId,
/// the namespace
pub namespace_id: NamespaceId,
/// the table
pub table_id: TableId,
/// the partition
@ -743,6 +745,8 @@ pub struct ParquetFile {
pub struct ParquetFileParams {
/// the sequencer that sequenced writes that went into this file
pub sequencer_id: SequencerId,
/// the namespace
pub namespace_id: NamespaceId,
/// the table
pub table_id: TableId,
/// the partition

View File

@ -1897,6 +1897,7 @@ mod tests {
.unwrap();
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: table.id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),

View File

@ -87,8 +87,7 @@ pub async fn persist(
mod tests {
use super::*;
use data_types2::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId};
use futures::{stream, StreamExt, TryStreamExt};
use object_store::{path::Path, ObjectStoreImpl};
use object_store::{ObjectStoreImpl, ObjectStoreTestConvenience};
use query::test::{raw_data, TestChunk};
use std::sync::Arc;
use time::Time;
@ -102,16 +101,6 @@ mod tests {
Arc::new(ObjectStoreImpl::new_in_memory())
}
async fn list_all(object_store: &DynObjectStore) -> Result<Vec<Path>, object_store::Error> {
object_store
.list(None)
.await?
.map_ok(|v| stream::iter(v).map(Ok))
.try_flatten()
.try_collect()
.await
}
#[tokio::test]
async fn empty_list_writes_nothing() {
let metadata = IoxMetadata {
@ -135,7 +124,7 @@ mod tests {
persist(&metadata, vec![], &object_store).await.unwrap();
assert!(list_all(&*object_store).await.unwrap().is_empty());
assert!(object_store.list_all().await.unwrap().is_empty());
}
#[tokio::test]
@ -173,7 +162,7 @@ mod tests {
persist(&metadata, batches, &object_store).await.unwrap();
let obj_store_paths = list_all(&*object_store).await.unwrap();
let obj_store_paths = object_store.list_all().await.unwrap();
assert_eq!(obj_store_paths.len(), 1);
}
}

View File

@ -0,0 +1,10 @@
ALTER TABLE
IF EXISTS parquet_file
ADD COLUMN namespace_id INT NOT NULL;
ALTER TABLE
IF EXISTS parquet_file
ADD FOREIGN KEY (namespace_id)
REFERENCES namespace (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;

View File

@ -0,0 +1,4 @@
-- Add indexes to support querying for and deleting Parquet Files marked for deletion before
-- a specified time.
CREATE INDEX IF NOT EXISTS parquet_file_deleted_at_idx ON parquet_file (to_delete);

View File

@ -10,6 +10,7 @@ use data_types2::{
};
use snafu::{OptionExt, Snafu};
use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc};
use time::TimeProvider;
use uuid::Uuid;
#[derive(Debug, Snafu)]
@ -126,6 +127,9 @@ pub trait Catalog: Send + Sync + Debug {
/// Get metric registry associated w/ this catalog.
fn metrics(&self) -> Arc<metric::Registry>;
/// Get the time provider associated w/ this catalog.
fn time_provider(&self) -> Arc<dyn TimeProvider>;
}
/// Secret module for [sealed traits].
@ -484,15 +488,21 @@ pub trait ParquetFileRepo: Send + Sync {
sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>>;
/// List all parquet files within a given namespace that are NOT marked as [`to_delete`](ParquetFile::to_delete).
/// List all parquet files within a given namespace that are NOT marked as
/// [`to_delete`](ParquetFile::to_delete).
async fn list_by_namespace_not_to_delete(
&mut self,
namespace_id: NamespaceId,
) -> Result<Vec<ParquetFile>>;
/// List all parquet files within a given table that are NOT marked as [`to_delete`](ParquetFile::to_delete).
/// List all parquet files within a given table that are NOT marked as
/// [`to_delete`](ParquetFile::to_delete).
async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
/// Delete all parquet files that were marked to be deleted earlier than the specified time.
/// Returns the deleted records.
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
/// List parquet files for a given sequencer with compaction level 0 and other criteria that
/// define a file as a candidate for compaction
async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>>;
@ -892,6 +902,7 @@ pub(crate) mod test_helpers {
.await
.unwrap();
let parquet_file_params = ParquetFileParams {
namespace_id: namespace.id,
sequencer_id: seq.id,
table_id: t.id,
partition_id: partition.id,
@ -1493,6 +1504,7 @@ pub(crate) mod test_helpers {
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
@ -1702,6 +1714,7 @@ pub(crate) mod test_helpers {
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
@ -1760,8 +1773,16 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(vec![other_file.clone()], files);
// verify that to_delete is initially set to null and that it can be updated to a timestamp
// verify that to_delete is initially set to null and the file does not get deleted
assert!(parquet_file.to_delete.is_none());
let older_than = Timestamp::new(
(catalog.time_provider().now() + Duration::from_secs(100)).timestamp_nanos(),
);
let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap();
assert!(deleted_files.is_empty());
assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap());
// verify to_delete can be updated to a timestamp
repos
.parquet_files()
.flag_for_delete(parquet_file.id)
@ -1772,7 +1793,26 @@ pub(crate) mod test_helpers {
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
.await
.unwrap();
assert!(files.first().unwrap().to_delete.is_some());
let marked_deleted = files.first().unwrap();
assert!(marked_deleted.to_delete.is_some());
// File is not deleted if it was marked to be deleted after the specified time
let before_deleted = Timestamp::new(
(catalog.time_provider().now() - Duration::from_secs(100)).timestamp_nanos(),
);
let deleted_files = repos
.parquet_files()
.delete_old(before_deleted)
.await
.unwrap();
assert!(deleted_files.is_empty());
assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap());
// File is deleted if it was marked to be deleted before the specified time
let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap();
assert_eq!(deleted_files.len(), 1);
assert_eq!(marked_deleted, &deleted_files[0]);
assert!(!repos.parquet_files().exist(parquet_file.id).await.unwrap());
// test list_by_table_not_to_delete
let files = repos
@ -2001,6 +2041,7 @@ pub(crate) mod test_helpers {
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
@ -2129,6 +2170,7 @@ pub(crate) mod test_helpers {
// Create a file with times entirely within the window
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
@ -2329,6 +2371,7 @@ pub(crate) mod test_helpers {
// Create a file with times entirely within the window
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
@ -2443,6 +2486,7 @@ pub(crate) mod test_helpers {
// parquet files
let parquet_file_params = ParquetFileParams {
namespace_id: namespace.id,
sequencer_id: sequencer.id,
table_id: partition.table_id,
partition_id: partition.id,

View File

@ -29,6 +29,7 @@ use tokio::sync::{Mutex, OwnedMutexGuard};
pub struct MemCatalog {
metrics: Arc<metric::Registry>,
collections: Arc<Mutex<MemCollections>>,
time_provider: Arc<dyn TimeProvider>,
}
impl MemCatalog {
@ -37,6 +38,7 @@ impl MemCatalog {
Self {
metrics,
collections: Default::default(),
time_provider: Arc::new(SystemProvider::new()),
}
}
}
@ -47,7 +49,7 @@ impl std::fmt::Debug for MemCatalog {
}
}
#[derive(Debug, Clone)]
#[derive(Default, Debug, Clone)]
struct MemCollections {
kafka_topics: Vec<KafkaTopic>,
query_pools: Vec<QueryPool>,
@ -59,25 +61,6 @@ struct MemCollections {
tombstones: Vec<Tombstone>,
parquet_files: Vec<ParquetFile>,
processed_tombstones: Vec<ProcessedTombstone>,
time_provider: Arc<dyn TimeProvider>,
}
impl Default for MemCollections {
fn default() -> Self {
Self {
kafka_topics: Default::default(),
query_pools: Default::default(),
namespaces: Default::default(),
tables: Default::default(),
columns: Default::default(),
sequencers: Default::default(),
partitions: Default::default(),
tombstones: Default::default(),
parquet_files: Default::default(),
processed_tombstones: Default::default(),
time_provider: Arc::new(SystemProvider::new()),
}
}
}
#[derive(Debug)]
@ -97,6 +80,7 @@ enum MemTxnInner {
#[derive(Debug)]
pub struct MemTxn {
inner: MemTxnInner,
time_provider: Arc<dyn TimeProvider>,
}
impl MemTxn {
@ -136,6 +120,7 @@ impl Catalog for MemCatalog {
stage,
finalized: false,
},
time_provider: self.time_provider(),
},
Arc::clone(&self.metrics),
)))
@ -146,6 +131,7 @@ impl Catalog for MemCatalog {
Box::new(MetricDecorator::new(
MemTxn {
inner: MemTxnInner::NoTxn { collections },
time_provider: self.time_provider(),
},
Arc::clone(&self.metrics),
))
@ -154,6 +140,10 @@ impl Catalog for MemCatalog {
fn metrics(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metrics)
}
fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(&self.time_provider)
}
}
#[async_trait]
@ -912,6 +902,7 @@ impl ParquetFileRepo for MemTxn {
let ParquetFileParams {
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
@ -936,6 +927,7 @@ impl ParquetFileRepo for MemTxn {
let parquet_file = ParquetFile {
id: ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
@ -955,8 +947,8 @@ impl ParquetFileRepo for MemTxn {
}
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
let marked_at = Timestamp::new(self.time_provider.now().timestamp_nanos());
let stage = self.stage();
let marked_at = Timestamp::new(stage.time_provider.now().timestamp_nanos());
match stage.parquet_files.iter_mut().find(|p| p.id == id) {
Some(f) => f.to_delete = Some(marked_at),
@ -1014,6 +1006,18 @@ impl ParquetFileRepo for MemTxn {
Ok(parquet_files)
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
let (delete, keep): (Vec<_>, Vec<_>) = stage.parquet_files.iter().cloned().partition(
|f| matches!(f.to_delete, Some(marked_deleted) if marked_deleted < older_than),
);
stage.parquet_files = keep;
Ok(delete)
}
async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>> {
let stage = self.stage();

View File

@ -268,6 +268,7 @@ decorate!(
"parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
"parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>>;
"parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result<Vec<ParquetFileId>>;

View File

@ -269,6 +269,10 @@ impl Catalog for PostgresCatalog {
fn metrics(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metrics)
}
fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(&self.time_provider)
}
}
/// Creates a new [`sqlx::Pool`] from a database config and an explicit DSN.
@ -1373,6 +1377,7 @@ impl ParquetFileRepo for PostgresTxn {
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
let ParquetFileParams {
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
@ -1391,8 +1396,8 @@ impl ParquetFileRepo for PostgresTxn {
INSERT INTO parquet_file (
sequencer_id, table_id, partition_id, object_store_id, min_sequence_number,
max_sequence_number, min_time, max_time, file_size_bytes, parquet_metadata,
row_count, compaction_level, created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
row_count, compaction_level, created_at, namespace_id )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
RETURNING *;
"#,
)
@ -1409,6 +1414,7 @@ RETURNING *;
.bind(row_count) // $11
.bind(INITIAL_COMPACTION_LEVEL) // $12
.bind(created_at) // $13
.bind(namespace_id) // $14
.fetch_one(&mut self.inner)
.await
.map_err(|e| {
@ -1491,6 +1497,20 @@ WHERE table_id = $1 AND to_delete IS NULL;
.map_err(|e| Error::SqlxError { source: e })
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(
r#"
DELETE FROM parquet_file
WHERE to_delete < $1
RETURNING *;
"#,
)
.bind(&older_than) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(
r#"

View File

@ -438,6 +438,7 @@ impl TestPartition {
let parquet_file_params = ParquetFileParams {
sequencer_id: self.sequencer.sequencer.id,
namespace_id: self.namespace.namespace.id,
table_id: self.table.table.id,
partition_id: self.partition.id,
object_store_id,

View File

@ -752,6 +752,28 @@ impl From<dummy::Error> for Error {
}
}
/// Convenience functions for object stores that are only appropriate to use in tests. Not marked
/// with cfg(test) to make this accessible to other crates.
#[async_trait]
pub trait ObjectStoreTestConvenience {
/// A convenience function for getting all results from a list operation without a prefix. Only
/// appropriate for tests because production code should handle the stream of potentially a
/// large number of returned paths.
async fn list_all(&self) -> Result<Vec<Path>>;
}
#[async_trait]
impl ObjectStoreTestConvenience for dyn ObjectStoreApi<Path = path::Path, Error = Error> {
async fn list_all(&self) -> Result<Vec<Path>> {
self.list(None)
.await?
.map_ok(|v| futures::stream::iter(v).map(Ok))
.try_flatten()
.try_collect()
.await
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -662,6 +662,7 @@ impl IoxMetadata {
) -> ParquetFileParams {
ParquetFileParams {
sequencer_id: self.sequencer_id,
namespace_id: self.namespace_id,
table_id: self.table_id,
partition_id: self.partition_id,
object_store_id: self.object_store_id,