diff --git a/parquet_file/src/catalog/cleanup.rs b/parquet_file/src/catalog/cleanup.rs index 35f95857d6..a43eb00513 100644 --- a/parquet_file/src/catalog/cleanup.rs +++ b/parquet_file/src/catalog/cleanup.rs @@ -9,7 +9,6 @@ use parking_lot::Mutex; use predicate::delete_predicate::DeletePredicate; use snafu::{ResultExt, Snafu}; -use crate::catalog::core::PreservedCatalogConfig; use crate::catalog::{ core::PreservedCatalog, interface::{ @@ -62,14 +61,11 @@ pub async fn get_unreferenced_parquet_files( let iox_object_store = catalog.iox_object_store(); let all_known = { // replay catalog transactions to track ALL (even dropped) files that are referenced - let (_catalog, state) = PreservedCatalog::load::( - db_name, - PreservedCatalogConfig::new(Arc::clone(&iox_object_store)), - (), - ) - .await - .context(CatalogLoadError)? - .expect("catalog gone while reading it?"); + let (_catalog, state) = + PreservedCatalog::load::(db_name, catalog.config(), ()) + .await + .context(CatalogLoadError)? + .expect("catalog gone while reading it?"); state.files.into_inner() }; diff --git a/parquet_file/src/catalog/core.rs b/parquet_file/src/catalog/core.rs index b375482bc9..ccb1159488 100644 --- a/parquet_file/src/catalog/core.rs +++ b/parquet_file/src/catalog/core.rs @@ -12,7 +12,6 @@ use crate::{ metadata::IoxParquetMetaData, }; use bytes::Bytes; -use chrono::{DateTime, Utc}; use futures::{StreamExt, TryStreamExt}; use generated_types::influxdata::iox::catalog::v1 as proto; use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath}; @@ -30,6 +29,7 @@ use std::{ fmt::Debug, sync::Arc, }; +use time::{Time, TimeProvider}; use tokio::sync::{Semaphore, SemaphorePermit}; use uuid::Uuid; @@ -172,16 +172,19 @@ pub struct PreservedCatalogConfig { /// Fixed UUID for testing pub(crate) fixed_uuid: Option, - /// Fixed timestamp for testing - pub(crate) fixed_timestamp: Option>, + /// Time provider to use instead of [`time::SystemProvider`] + pub(crate) time_provider: Arc, } impl PreservedCatalogConfig { - pub fn new(iox_object_store: Arc) -> Self { + pub fn new( + iox_object_store: Arc, + time_provider: Arc, + ) -> Self { Self { iox_object_store, - fixed_timestamp: None, fixed_uuid: None, + time_provider, } } @@ -193,12 +196,10 @@ impl PreservedCatalogConfig { } } - /// Fixed timestamp to use for all transactions instead of "now" - /// - /// TODO: Replace with TimeProvider (#2722) - pub fn with_fixed_timestamp(self, timestamp: DateTime) -> Self { + /// Override the time provider + pub fn with_time_provider(self, time_provider: Arc) -> Self { Self { - fixed_timestamp: Some(timestamp), + time_provider, ..self } } @@ -235,10 +236,8 @@ pub struct PreservedCatalog { /// This can be useful for testing to achieve deterministic outputs. fixed_uuid: Option, - /// If set, this start time will be used for all transaction instead of "now". - /// - /// This can be useful for testing to achieve deterministic outputs. - fixed_timestamp: Option>, + /// Time provider + time_provider: Arc, } impl PreservedCatalog { @@ -262,7 +261,7 @@ impl PreservedCatalog { /// most broken catalogs. pub async fn find_last_transaction_timestamp( iox_object_store: &IoxObjectStore, - ) -> Result>> { + ) -> Result> { let mut res = None; let mut stream = iox_object_store @@ -275,7 +274,7 @@ impl PreservedCatalog { match load_transaction_proto(iox_object_store, transaction_file_path).await { Ok(proto) => match proto_parse::parse_timestamp(&proto.start_timestamp) { Ok(ts) => { - res = Some(res.map_or(ts, |res: DateTime| res.max(ts))); + res = Some(res.map_or(ts, |res: Time| res.max(ts))); } Err(e) => warn!(%e, ?transaction_file_path, "Cannot parse timestamp"), }, @@ -301,11 +300,6 @@ impl PreservedCatalog { Ok(iox_object_store.wipe_catalog().await.context(Write)?) } - /// Deletes the catalog described by the provided config - pub async fn wipe_with_config(config: &PreservedCatalogConfig) -> Result<()> { - Self::wipe(&config.iox_object_store).await - } - /// Create new catalog w/o any data. /// /// An empty transaction will be used to mark the catalog start so that concurrent open but @@ -328,7 +322,7 @@ impl PreservedCatalog { transaction_semaphore: Semaphore::new(1), iox_object_store: config.iox_object_store, fixed_uuid: config.fixed_uuid, - fixed_timestamp: config.fixed_timestamp, + time_provider: config.time_provider, }; // add empty transaction @@ -455,7 +449,7 @@ impl PreservedCatalog { transaction_semaphore: Semaphore::new(1), iox_object_store: config.iox_object_store, fixed_uuid: config.fixed_uuid, - fixed_timestamp: config.fixed_timestamp, + time_provider: config.time_provider, }, state, ))) @@ -469,8 +463,7 @@ impl PreservedCatalog { /// transactions are given out in the order they were requested. pub async fn open_transaction(&self) -> TransactionHandle<'_> { let uuid = self.fixed_uuid.unwrap_or_else(Uuid::new_v4); - let start_timestamp = self.fixed_timestamp.unwrap_or_else(Utc::now); - TransactionHandle::new(self, uuid, start_timestamp).await + TransactionHandle::new(self, uuid, self.time_provider.now()).await } /// Get latest revision counter. @@ -489,6 +482,15 @@ impl PreservedCatalog { .expect("catalog should have at least an empty transaction") } + /// Return the config for this `PreservedCatalog` + pub fn config(&self) -> PreservedCatalogConfig { + PreservedCatalogConfig { + iox_object_store: Arc::clone(&self.iox_object_store), + fixed_uuid: self.fixed_uuid, + time_provider: Arc::clone(&self.time_provider), + } + } + /// Object store used by this catalog. pub fn iox_object_store(&self) -> Arc { Arc::clone(&self.iox_object_store) @@ -509,11 +511,7 @@ struct OpenTransaction { impl OpenTransaction { /// Private API to create new transaction, users should always use /// [`PreservedCatalog::open_transaction`]. - fn new( - previous_tkey: &Option, - uuid: Uuid, - start_timestamp: DateTime, - ) -> Self { + fn new(previous_tkey: &Option, uuid: Uuid, start_timestamp: Time) -> Self { let (revision_counter, previous_uuid) = match previous_tkey { Some(tkey) => ( tkey.revision_counter + 1, @@ -529,7 +527,7 @@ impl OpenTransaction { uuid: uuid.as_bytes().to_vec().into(), revision_counter, previous_uuid, - start_timestamp: Some(start_timestamp.into()), + start_timestamp: Some(start_timestamp.date_time().into()), encoding: proto::transaction::Encoding::Delta.into(), }, } @@ -744,7 +742,7 @@ impl<'c> TransactionHandle<'c> { async fn new( catalog: &'c PreservedCatalog, uuid: Uuid, - start_timestamp: DateTime, + start_timestamp: Time, ) -> TransactionHandle<'c> { // first acquire semaphore (which is only being used for transactions), then get state lock let permit = catalog @@ -967,7 +965,7 @@ impl<'c> CheckpointHandle<'c> { previous_uuid: self .previous_tkey .map_or_else(Bytes::new, |tkey| tkey.uuid.as_bytes().to_vec().into()), - start_timestamp: Some(Utc::now().into()), + start_timestamp: Some(self.catalog.time_provider.now().date_time().into()), encoding: proto::transaction::Encoding::Full.into(), }; let path = TransactionFilePath::new_checkpoint(self.tkey.revision_counter, self.tkey.uuid); @@ -1855,7 +1853,7 @@ mod tests { states: Vec, /// Traces timestamp after every (committed and aborted) transaction. - post_timestamps: Vec>, + post_timestamps: Vec