diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 3070079171..5c7370ba24 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -748,7 +748,7 @@ impl Compactor { Ok(count_pf) => count_pf, _ => { warn!( - "Error getting parquet file count for table ID {}, sequencer ID {}, min time {:?}, max time {:?}. + "Error getting parquet file count for table ID {}, sequencer ID {}, min time {:?}, max time {:?}. Won't be able to verify whether its tombstone is fully processed", tombstone.table_id, tombstone.sequencer_id, tombstone.min_time, tombstone.max_time ); @@ -765,7 +765,7 @@ impl Compactor { Ok(count_pt) => count_pt, _ => { warn!( - "Error getting processed tombstone count for tombstone ID {}. + "Error getting processed tombstone count for tombstone ID {}. Won't be able to verify whether the tombstone is fully processed", tombstone.id ); @@ -867,9 +867,8 @@ mod tests { use super::*; use arrow_util::assert_batches_sorted_eq; use data_types2::{KafkaPartition, NamespaceId, ParquetFileParams, SequenceNumber}; - use futures::{stream, StreamExt, TryStreamExt}; use iox_tests::util::TestCatalog; - use object_store::path::Path; + use object_store::ObjectStoreTestConvenience; use querier::{ cache::CatalogCache, chunk::{collect_read_filter, ParquetChunkAdapter}, @@ -1500,6 +1499,7 @@ mod tests { ParquetFile { id: ParquetFileId::new(0), sequencer_id: SequencerId::new(0), + namespace_id: NamespaceId::new(0), table_id: TableId::new(0), partition_id: PartitionId::new(0), object_store_id: Uuid::new_v4(), @@ -1692,6 +1692,7 @@ mod tests { let p1 = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: table.id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -1814,16 +1815,6 @@ mod tests { assert_eq!(actual_pf2_tombstones, &[t3.id, t4.id]); } - async fn list_all(object_store: &DynObjectStore) -> Result, object_store::Error> { - object_store - .list(None) - .await? - .map_ok(|v| stream::iter(v).map(Ok)) - .try_flatten() - .try_collect() - .await - } - #[tokio::test] async fn persist_adds_to_object_store() { let catalog = TestCatalog::new(); @@ -1897,7 +1888,7 @@ mod tests { .await .unwrap(); - let object_store_files = list_all(&*compactor.object_store).await.unwrap(); + let object_store_files = compactor.object_store.list_all().await.unwrap(); assert_eq!(object_store_files.len(), 1); } @@ -2004,6 +1995,7 @@ mod tests { // Prepare metadata in form of ParquetFileParams to get added with tombstone let parquet = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: table.id, partition_id: partition.id, object_store_id: Uuid::new_v4(), diff --git a/compactor/src/garbage_collector.rs b/compactor/src/garbage_collector.rs new file mode 100644 index 0000000000..f7c950a798 --- /dev/null +++ b/compactor/src/garbage_collector.rs @@ -0,0 +1,398 @@ +//! Clean up parquet files from object storage and their associated entries in the catalog that are +//! no longer needed because they've been compacted and they're old enough to no longer be used by +//! any queriers. + +use data_types2::Timestamp; +use iox_catalog::interface::Catalog; +use iox_object_store::ParquetFilePath; +use object_store::DynObjectStore; +use snafu::{ResultExt, Snafu}; +use std::sync::Arc; +use time::TimeProvider; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error while deleting catalog records {}", source))] + DeletingCatalogRecords { + source: iox_catalog::interface::Error, + }, + + #[snafu(display("Error(s) while deleting object store files: {:#?}", sources))] + DeletingObjectStoreFiles { sources: Vec }, +} + +/// A specialized `Result` for garbage collection errors +pub type Result = std::result::Result; + +/// Information needed to clean up old parquet files from object storage and their entries in the +/// catalog +pub struct GarbageCollector { + /// Object store where parquet files should be cleaned up + object_store: Arc, + /// The global catalog for parquet files + catalog: Arc, + /// Time provider for all activities in this garbage collector + pub time_provider: Arc, +} + +impl GarbageCollector { + /// Initialize the Garbage Collector + pub fn new(catalog: Arc, object_store: Arc) -> Self { + let time_provider = catalog.time_provider(); + + Self { + catalog, + object_store, + time_provider, + } + } + + /// Perform a pass of garbage collection, querying the catalog for all files marked to be + /// deleted earlier than the specified time. Remove the catalog entries, then remove the + /// associated object store files. + /// Meant to be invoked in a background loop. + pub async fn cleanup(&self, older_than: Timestamp) -> Result<()> { + // Make a fake IOx object store to conform to the parquet file + // interface, but note this isn't actually used to find parquet + // paths to write to + use iox_object_store::IoxObjectStore; + let iox_object_store = Arc::new(IoxObjectStore::existing( + Arc::clone(&self.object_store), + IoxObjectStore::root_path_for(&*self.object_store, uuid::Uuid::new_v4()), + )); + + let deleted_catalog_records = self + .catalog + .repositories() + .await + .parquet_files() + .delete_old(older_than) + .await + .context(DeletingCatalogRecordsSnafu)?; + + let mut object_store_errors = Vec::with_capacity(deleted_catalog_records.len()); + + for catalog_record in deleted_catalog_records { + let path = ParquetFilePath::new_new_gen( + catalog_record.namespace_id, + catalog_record.table_id, + catalog_record.sequencer_id, + catalog_record.partition_id, + catalog_record.object_store_id, + ); + + if let Err(e) = iox_object_store.delete_parquet_file(&path).await { + object_store_errors.push(e); + } + } + + if object_store_errors.is_empty() { + Ok(()) + } else { + DeletingObjectStoreFilesSnafu { + sources: object_store_errors, + } + .fail() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_types2::{KafkaPartition, ParquetFile, ParquetFileParams, SequenceNumber}; + use iox_object_store::ParquetFilePath; + use iox_tests::util::TestCatalog; + use object_store::ObjectStoreTestConvenience; + use std::time::Duration; + use uuid::Uuid; + + /// Test helper to put an empty object store file at the expected location for a parquet file + /// tracked by the catalog, but without having to process data because the garbage collector + /// isn't concerned with the contents of the file, only its existence or lack thereof + async fn put_object_store_file( + catalog_record: &ParquetFile, + object_store: Arc, + ) { + let bytes = "arbitrary".into(); + + // Make a fake IOx object store to conform to the parquet file + // interface, but note this isn't actually used to find parquet + // paths to write to + use iox_object_store::IoxObjectStore; + let iox_object_store = Arc::new(IoxObjectStore::existing( + Arc::clone(&object_store), + IoxObjectStore::root_path_for(&*object_store, uuid::Uuid::new_v4()), + )); + + let path = ParquetFilePath::new_new_gen( + catalog_record.namespace_id, + catalog_record.table_id, + catalog_record.sequencer_id, + catalog_record.partition_id, + catalog_record.object_store_id, + ); + + iox_object_store + .put_parquet_file(&path, bytes) + .await + .unwrap(); + } + + #[tokio::test] + async fn nothing_to_delete_is_success() { + let catalog = TestCatalog::new(); + let gc = GarbageCollector::new( + Arc::clone(&catalog.catalog), + Arc::clone(&catalog.object_store), + ); + let older_than = + Timestamp::new((gc.time_provider.now() + Duration::from_secs(100)).timestamp_nanos()); + + gc.cleanup(older_than).await.unwrap(); + } + + #[tokio::test] + async fn leave_undeleted_files_alone() { + let catalog = TestCatalog::new(); + let gc = GarbageCollector::new( + Arc::clone(&catalog.catalog), + Arc::clone(&catalog.object_store), + ); + let older_than = + Timestamp::new((gc.time_provider.now() + Duration::from_secs(100)).timestamp_nanos()); + + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = txn.query_pools().create_or_get("foo").await.unwrap(); + let namespace = txn + .namespaces() + .create("gc_leave_undeleted_files_alone", "inf", kafka.id, pool.id) + .await + .unwrap(); + let table = txn + .tables() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let sequencer = txn + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + let partition = txn + .partitions() + .create_or_get("one", sequencer.id, table.id) + .await + .unwrap(); + + let min_time = Timestamp::new(1); + let max_time = Timestamp::new(10); + + let parquet_file_params = ParquetFileParams { + sequencer_id: sequencer.id, + namespace_id: namespace.id, + table_id: partition.table_id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(140), + min_time, + max_time, + file_size_bytes: 1337, + parquet_metadata: b"md1".to_vec(), + row_count: 0, + created_at: Timestamp::new(1), + }; + let parquet_file = txn + .parquet_files() + .create(parquet_file_params.clone()) + .await + .unwrap(); + + put_object_store_file(&parquet_file, Arc::clone(&catalog.object_store)).await; + + txn.commit().await.unwrap(); + + gc.cleanup(older_than).await.unwrap(); + + assert_eq!( + catalog + .catalog + .repositories() + .await + .parquet_files() + .count() + .await + .unwrap(), + 1 + ); + + assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1); + } + + #[tokio::test] + async fn leave_too_new_files_alone() { + let catalog = TestCatalog::new(); + let gc = GarbageCollector::new( + Arc::clone(&catalog.catalog), + Arc::clone(&catalog.object_store), + ); + let older_than = + Timestamp::new((gc.time_provider.now() - Duration::from_secs(100)).timestamp_nanos()); + + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = txn.query_pools().create_or_get("foo").await.unwrap(); + let namespace = txn + .namespaces() + .create("gc_leave_too_new_files_alone", "inf", kafka.id, pool.id) + .await + .unwrap(); + let table = txn + .tables() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let sequencer = txn + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + let partition = txn + .partitions() + .create_or_get("one", sequencer.id, table.id) + .await + .unwrap(); + + let min_time = Timestamp::new(1); + let max_time = Timestamp::new(10); + + let parquet_file_params = ParquetFileParams { + sequencer_id: sequencer.id, + namespace_id: namespace.id, + table_id: partition.table_id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(140), + min_time, + max_time, + file_size_bytes: 1337, + parquet_metadata: b"md1".to_vec(), + row_count: 0, + created_at: Timestamp::new(1), + }; + let parquet_file = txn + .parquet_files() + .create(parquet_file_params.clone()) + .await + .unwrap(); + put_object_store_file(&parquet_file, Arc::clone(&catalog.object_store)).await; + + txn.parquet_files() + .flag_for_delete(parquet_file.id) + .await + .unwrap(); + + txn.commit().await.unwrap(); + + gc.cleanup(older_than).await.unwrap(); + + assert_eq!( + catalog + .catalog + .repositories() + .await + .parquet_files() + .count() + .await + .unwrap(), + 1 + ); + assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1); + } + + #[tokio::test] + async fn remove_old_enough_files() { + let catalog = TestCatalog::new(); + let gc = GarbageCollector::new( + Arc::clone(&catalog.catalog), + Arc::clone(&catalog.object_store), + ); + let older_than = + Timestamp::new((gc.time_provider.now() + Duration::from_secs(100)).timestamp_nanos()); + + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = txn.query_pools().create_or_get("foo").await.unwrap(); + let namespace = txn + .namespaces() + .create("gc_remove_old_enough_files", "inf", kafka.id, pool.id) + .await + .unwrap(); + let table = txn + .tables() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let sequencer = txn + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + let partition = txn + .partitions() + .create_or_get("one", sequencer.id, table.id) + .await + .unwrap(); + + let min_time = Timestamp::new(1); + let max_time = Timestamp::new(10); + + let parquet_file_params = ParquetFileParams { + sequencer_id: sequencer.id, + namespace_id: namespace.id, + table_id: partition.table_id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(140), + min_time, + max_time, + file_size_bytes: 1337, + parquet_metadata: b"md1".to_vec(), + row_count: 0, + created_at: Timestamp::new(1), + }; + let parquet_file = txn + .parquet_files() + .create(parquet_file_params.clone()) + .await + .unwrap(); + put_object_store_file(&parquet_file, Arc::clone(&catalog.object_store)).await; + + txn.parquet_files() + .flag_for_delete(parquet_file.id) + .await + .unwrap(); + + txn.commit().await.unwrap(); + + gc.cleanup(older_than).await.unwrap(); + + assert_eq!( + catalog + .catalog + .repositories() + .await + .parquet_files() + .count() + .await + .unwrap(), + 0 + ); + assert!(catalog.object_store.list_all().await.unwrap().is_empty()); + } +} diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index c4c345b981..058967618c 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -13,6 +13,7 @@ #![allow(dead_code)] pub mod compact; +pub mod garbage_collector; pub mod handler; pub mod query; pub mod server; diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index dbf256ef37..753e1ac1a1 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -710,6 +710,8 @@ pub struct ParquetFile { pub id: ParquetFileId, /// the sequencer that sequenced writes that went into this file pub sequencer_id: SequencerId, + /// the namespace + pub namespace_id: NamespaceId, /// the table pub table_id: TableId, /// the partition @@ -743,6 +745,8 @@ pub struct ParquetFile { pub struct ParquetFileParams { /// the sequencer that sequenced writes that went into this file pub sequencer_id: SequencerId, + /// the namespace + pub namespace_id: NamespaceId, /// the table pub table_id: TableId, /// the partition diff --git a/ingester/src/data.rs b/ingester/src/data.rs index cbb0b1a074..6a2ee77554 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1897,6 +1897,7 @@ mod tests { .unwrap(); let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: table.id, partition_id: partition.id, object_store_id: Uuid::new_v4(), diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index 244d8331cf..9e8989dc4a 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -87,8 +87,7 @@ pub async fn persist( mod tests { use super::*; use data_types2::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId}; - use futures::{stream, StreamExt, TryStreamExt}; - use object_store::{path::Path, ObjectStoreImpl}; + use object_store::{ObjectStoreImpl, ObjectStoreTestConvenience}; use query::test::{raw_data, TestChunk}; use std::sync::Arc; use time::Time; @@ -102,16 +101,6 @@ mod tests { Arc::new(ObjectStoreImpl::new_in_memory()) } - async fn list_all(object_store: &DynObjectStore) -> Result, object_store::Error> { - object_store - .list(None) - .await? - .map_ok(|v| stream::iter(v).map(Ok)) - .try_flatten() - .try_collect() - .await - } - #[tokio::test] async fn empty_list_writes_nothing() { let metadata = IoxMetadata { @@ -135,7 +124,7 @@ mod tests { persist(&metadata, vec![], &object_store).await.unwrap(); - assert!(list_all(&*object_store).await.unwrap().is_empty()); + assert!(object_store.list_all().await.unwrap().is_empty()); } #[tokio::test] @@ -173,7 +162,7 @@ mod tests { persist(&metadata, batches, &object_store).await.unwrap(); - let obj_store_paths = list_all(&*object_store).await.unwrap(); + let obj_store_paths = object_store.list_all().await.unwrap(); assert_eq!(obj_store_paths.len(), 1); } } diff --git a/iox_catalog/migrations/20220324152729_add_namespace_id_to_parquet_file.sql b/iox_catalog/migrations/20220324152729_add_namespace_id_to_parquet_file.sql new file mode 100644 index 0000000000..a4962b8cb5 --- /dev/null +++ b/iox_catalog/migrations/20220324152729_add_namespace_id_to_parquet_file.sql @@ -0,0 +1,10 @@ +ALTER TABLE + IF EXISTS parquet_file + ADD COLUMN namespace_id INT NOT NULL; +ALTER TABLE + IF EXISTS parquet_file + ADD FOREIGN KEY (namespace_id) + REFERENCES namespace (id) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE NO ACTION + NOT VALID; diff --git a/iox_catalog/migrations/20220328150925_add_index_to_to_delete.sql b/iox_catalog/migrations/20220328150925_add_index_to_to_delete.sql new file mode 100644 index 0000000000..c6dbba92c4 --- /dev/null +++ b/iox_catalog/migrations/20220328150925_add_index_to_to_delete.sql @@ -0,0 +1,4 @@ +-- Add indexes to support querying for and deleting Parquet Files marked for deletion before +-- a specified time. + +CREATE INDEX IF NOT EXISTS parquet_file_deleted_at_idx ON parquet_file (to_delete); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 89cf6cfe4c..0721902eb1 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -10,6 +10,7 @@ use data_types2::{ }; use snafu::{OptionExt, Snafu}; use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc}; +use time::TimeProvider; use uuid::Uuid; #[derive(Debug, Snafu)] @@ -126,6 +127,9 @@ pub trait Catalog: Send + Sync + Debug { /// Get metric registry associated w/ this catalog. fn metrics(&self) -> Arc; + + /// Get the time provider associated w/ this catalog. + fn time_provider(&self) -> Arc; } /// Secret module for [sealed traits]. @@ -484,15 +488,21 @@ pub trait ParquetFileRepo: Send + Sync { sequence_number: SequenceNumber, ) -> Result>; - /// List all parquet files within a given namespace that are NOT marked as [`to_delete`](ParquetFile::to_delete). + /// List all parquet files within a given namespace that are NOT marked as + /// [`to_delete`](ParquetFile::to_delete). async fn list_by_namespace_not_to_delete( &mut self, namespace_id: NamespaceId, ) -> Result>; - /// List all parquet files within a given table that are NOT marked as [`to_delete`](ParquetFile::to_delete). + /// List all parquet files within a given table that are NOT marked as + /// [`to_delete`](ParquetFile::to_delete). async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; + /// Delete all parquet files that were marked to be deleted earlier than the specified time. + /// Returns the deleted records. + async fn delete_old(&mut self, older_than: Timestamp) -> Result>; + /// List parquet files for a given sequencer with compaction level 0 and other criteria that /// define a file as a candidate for compaction async fn level_0(&mut self, sequencer_id: SequencerId) -> Result>; @@ -892,6 +902,7 @@ pub(crate) mod test_helpers { .await .unwrap(); let parquet_file_params = ParquetFileParams { + namespace_id: namespace.id, sequencer_id: seq.id, table_id: t.id, partition_id: partition.id, @@ -1493,6 +1504,7 @@ pub(crate) mod test_helpers { let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -1702,6 +1714,7 @@ pub(crate) mod test_helpers { let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -1760,8 +1773,16 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(vec![other_file.clone()], files); - // verify that to_delete is initially set to null and that it can be updated to a timestamp + // verify that to_delete is initially set to null and the file does not get deleted assert!(parquet_file.to_delete.is_none()); + let older_than = Timestamp::new( + (catalog.time_provider().now() + Duration::from_secs(100)).timestamp_nanos(), + ); + let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap(); + assert!(deleted_files.is_empty()); + assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap()); + + // verify to_delete can be updated to a timestamp repos .parquet_files() .flag_for_delete(parquet_file.id) @@ -1772,7 +1793,26 @@ pub(crate) mod test_helpers { .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); - assert!(files.first().unwrap().to_delete.is_some()); + let marked_deleted = files.first().unwrap(); + assert!(marked_deleted.to_delete.is_some()); + + // File is not deleted if it was marked to be deleted after the specified time + let before_deleted = Timestamp::new( + (catalog.time_provider().now() - Duration::from_secs(100)).timestamp_nanos(), + ); + let deleted_files = repos + .parquet_files() + .delete_old(before_deleted) + .await + .unwrap(); + assert!(deleted_files.is_empty()); + assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap()); + + // File is deleted if it was marked to be deleted before the specified time + let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap(); + assert_eq!(deleted_files.len(), 1); + assert_eq!(marked_deleted, &deleted_files[0]); + assert!(!repos.parquet_files().exist(parquet_file.id).await.unwrap()); // test list_by_table_not_to_delete let files = repos @@ -2001,6 +2041,7 @@ pub(crate) mod test_helpers { let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -2129,6 +2170,7 @@ pub(crate) mod test_helpers { // Create a file with times entirely within the window let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -2329,6 +2371,7 @@ pub(crate) mod test_helpers { // Create a file with times entirely within the window let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -2443,6 +2486,7 @@ pub(crate) mod test_helpers { // parquet files let parquet_file_params = ParquetFileParams { + namespace_id: namespace.id, sequencer_id: sequencer.id, table_id: partition.table_id, partition_id: partition.id, diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 7d4ef957fa..8fb01d5c30 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -29,6 +29,7 @@ use tokio::sync::{Mutex, OwnedMutexGuard}; pub struct MemCatalog { metrics: Arc, collections: Arc>, + time_provider: Arc, } impl MemCatalog { @@ -37,6 +38,7 @@ impl MemCatalog { Self { metrics, collections: Default::default(), + time_provider: Arc::new(SystemProvider::new()), } } } @@ -47,7 +49,7 @@ impl std::fmt::Debug for MemCatalog { } } -#[derive(Debug, Clone)] +#[derive(Default, Debug, Clone)] struct MemCollections { kafka_topics: Vec, query_pools: Vec, @@ -59,25 +61,6 @@ struct MemCollections { tombstones: Vec, parquet_files: Vec, processed_tombstones: Vec, - time_provider: Arc, -} - -impl Default for MemCollections { - fn default() -> Self { - Self { - kafka_topics: Default::default(), - query_pools: Default::default(), - namespaces: Default::default(), - tables: Default::default(), - columns: Default::default(), - sequencers: Default::default(), - partitions: Default::default(), - tombstones: Default::default(), - parquet_files: Default::default(), - processed_tombstones: Default::default(), - time_provider: Arc::new(SystemProvider::new()), - } - } } #[derive(Debug)] @@ -97,6 +80,7 @@ enum MemTxnInner { #[derive(Debug)] pub struct MemTxn { inner: MemTxnInner, + time_provider: Arc, } impl MemTxn { @@ -136,6 +120,7 @@ impl Catalog for MemCatalog { stage, finalized: false, }, + time_provider: self.time_provider(), }, Arc::clone(&self.metrics), ))) @@ -146,6 +131,7 @@ impl Catalog for MemCatalog { Box::new(MetricDecorator::new( MemTxn { inner: MemTxnInner::NoTxn { collections }, + time_provider: self.time_provider(), }, Arc::clone(&self.metrics), )) @@ -154,6 +140,10 @@ impl Catalog for MemCatalog { fn metrics(&self) -> Arc { Arc::clone(&self.metrics) } + + fn time_provider(&self) -> Arc { + Arc::clone(&self.time_provider) + } } #[async_trait] @@ -912,6 +902,7 @@ impl ParquetFileRepo for MemTxn { let ParquetFileParams { sequencer_id, + namespace_id, table_id, partition_id, object_store_id, @@ -936,6 +927,7 @@ impl ParquetFileRepo for MemTxn { let parquet_file = ParquetFile { id: ParquetFileId::new(stage.parquet_files.len() as i64 + 1), sequencer_id, + namespace_id, table_id, partition_id, object_store_id, @@ -955,8 +947,8 @@ impl ParquetFileRepo for MemTxn { } async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { + let marked_at = Timestamp::new(self.time_provider.now().timestamp_nanos()); let stage = self.stage(); - let marked_at = Timestamp::new(stage.time_provider.now().timestamp_nanos()); match stage.parquet_files.iter_mut().find(|p| p.id == id) { Some(f) => f.to_delete = Some(marked_at), @@ -1014,6 +1006,18 @@ impl ParquetFileRepo for MemTxn { Ok(parquet_files) } + async fn delete_old(&mut self, older_than: Timestamp) -> Result> { + let stage = self.stage(); + + let (delete, keep): (Vec<_>, Vec<_>) = stage.parquet_files.iter().cloned().partition( + |f| matches!(f.to_delete, Some(marked_deleted) if marked_deleted < older_than), + ); + + stage.parquet_files = keep; + + Ok(delete) + } + async fn level_0(&mut self, sequencer_id: SequencerId) -> Result> { let stage = self.stage(); diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 5d929c4cd2..7d95374119 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -268,6 +268,7 @@ decorate!( "parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; "parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; + "parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result>; "parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result>; "parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result>; "parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 976ac025c7..7de87a6e77 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -269,6 +269,10 @@ impl Catalog for PostgresCatalog { fn metrics(&self) -> Arc { Arc::clone(&self.metrics) } + + fn time_provider(&self) -> Arc { + Arc::clone(&self.time_provider) + } } /// Creates a new [`sqlx::Pool`] from a database config and an explicit DSN. @@ -1373,6 +1377,7 @@ impl ParquetFileRepo for PostgresTxn { async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { let ParquetFileParams { sequencer_id, + namespace_id, table_id, partition_id, object_store_id, @@ -1391,8 +1396,8 @@ impl ParquetFileRepo for PostgresTxn { INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, file_size_bytes, parquet_metadata, - row_count, compaction_level, created_at ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) + row_count, compaction_level, created_at, namespace_id ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) RETURNING *; "#, ) @@ -1409,6 +1414,7 @@ RETURNING *; .bind(row_count) // $11 .bind(INITIAL_COMPACTION_LEVEL) // $12 .bind(created_at) // $13 + .bind(namespace_id) // $14 .fetch_one(&mut self.inner) .await .map_err(|e| { @@ -1491,6 +1497,20 @@ WHERE table_id = $1 AND to_delete IS NULL; .map_err(|e| Error::SqlxError { source: e }) } + async fn delete_old(&mut self, older_than: Timestamp) -> Result> { + sqlx::query_as::<_, ParquetFile>( + r#" +DELETE FROM parquet_file +WHERE to_delete < $1 +RETURNING *; + "#, + ) + .bind(&older_than) // $1 + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e }) + } + async fn level_0(&mut self, sequencer_id: SequencerId) -> Result> { sqlx::query_as::<_, ParquetFile>( r#" diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 56f2459f8e..dd11ebd58d 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -438,6 +438,7 @@ impl TestPartition { let parquet_file_params = ParquetFileParams { sequencer_id: self.sequencer.sequencer.id, + namespace_id: self.namespace.namespace.id, table_id: self.table.table.id, partition_id: self.partition.id, object_store_id, diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 6ab42f6bed..d11b7aba6d 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -752,6 +752,28 @@ impl From for Error { } } +/// Convenience functions for object stores that are only appropriate to use in tests. Not marked +/// with cfg(test) to make this accessible to other crates. +#[async_trait] +pub trait ObjectStoreTestConvenience { + /// A convenience function for getting all results from a list operation without a prefix. Only + /// appropriate for tests because production code should handle the stream of potentially a + /// large number of returned paths. + async fn list_all(&self) -> Result>; +} + +#[async_trait] +impl ObjectStoreTestConvenience for dyn ObjectStoreApi { + async fn list_all(&self) -> Result> { + self.list(None) + .await? + .map_ok(|v| futures::stream::iter(v).map(Ok)) + .try_flatten() + .try_collect() + .await + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 6fbf0378bd..5ae01d00d7 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -662,6 +662,7 @@ impl IoxMetadata { ) -> ParquetFileParams { ParquetFileParams { sequencer_id: self.sequencer_id, + namespace_id: self.namespace_id, table_id: self.table_id, partition_id: self.partition_id, object_store_id: self.object_store_id,