* feat: migrate preserved catalog to TimeProvider (#2722) * fix: deterministic catalog prune tests * fix: failing test Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
c31bcbced5
commit
8414e6edbb
|
@ -9,7 +9,6 @@ use parking_lot::Mutex;
|
|||
use predicate::delete_predicate::DeletePredicate;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::catalog::core::PreservedCatalogConfig;
|
||||
use crate::catalog::{
|
||||
core::PreservedCatalog,
|
||||
interface::{
|
||||
|
@ -62,11 +61,8 @@ pub async fn get_unreferenced_parquet_files(
|
|||
let iox_object_store = catalog.iox_object_store();
|
||||
let all_known = {
|
||||
// replay catalog transactions to track ALL (even dropped) files that are referenced
|
||||
let (_catalog, state) = PreservedCatalog::load::<TracerCatalogState>(
|
||||
db_name,
|
||||
PreservedCatalogConfig::new(Arc::clone(&iox_object_store)),
|
||||
(),
|
||||
)
|
||||
let (_catalog, state) =
|
||||
PreservedCatalog::load::<TracerCatalogState>(db_name, catalog.config(), ())
|
||||
.await
|
||||
.context(CatalogLoadError)?
|
||||
.expect("catalog gone while reading it?");
|
||||
|
|
|
@ -12,7 +12,6 @@ use crate::{
|
|||
metadata::IoxParquetMetaData,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
|
||||
|
@ -30,6 +29,7 @@ use std::{
|
|||
fmt::Debug,
|
||||
sync::Arc,
|
||||
};
|
||||
use time::{Time, TimeProvider};
|
||||
use tokio::sync::{Semaphore, SemaphorePermit};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -172,16 +172,19 @@ pub struct PreservedCatalogConfig {
|
|||
/// Fixed UUID for testing
|
||||
pub(crate) fixed_uuid: Option<Uuid>,
|
||||
|
||||
/// Fixed timestamp for testing
|
||||
pub(crate) fixed_timestamp: Option<DateTime<Utc>>,
|
||||
/// Time provider to use instead of [`time::SystemProvider`]
|
||||
pub(crate) time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
impl PreservedCatalogConfig {
|
||||
pub fn new(iox_object_store: Arc<IoxObjectStore>) -> Self {
|
||||
pub fn new(
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Self {
|
||||
Self {
|
||||
iox_object_store,
|
||||
fixed_timestamp: None,
|
||||
fixed_uuid: None,
|
||||
time_provider,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,12 +196,10 @@ impl PreservedCatalogConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// Fixed timestamp to use for all transactions instead of "now"
|
||||
///
|
||||
/// TODO: Replace with TimeProvider (#2722)
|
||||
pub fn with_fixed_timestamp(self, timestamp: DateTime<Utc>) -> Self {
|
||||
/// Override the time provider
|
||||
pub fn with_time_provider(self, time_provider: Arc<dyn TimeProvider>) -> Self {
|
||||
Self {
|
||||
fixed_timestamp: Some(timestamp),
|
||||
time_provider,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
@ -235,10 +236,8 @@ pub struct PreservedCatalog {
|
|||
/// This can be useful for testing to achieve deterministic outputs.
|
||||
fixed_uuid: Option<Uuid>,
|
||||
|
||||
/// If set, this start time will be used for all transaction instead of "now".
|
||||
///
|
||||
/// This can be useful for testing to achieve deterministic outputs.
|
||||
fixed_timestamp: Option<DateTime<Utc>>,
|
||||
/// Time provider
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
impl PreservedCatalog {
|
||||
|
@ -262,7 +261,7 @@ impl PreservedCatalog {
|
|||
/// most broken catalogs.
|
||||
pub async fn find_last_transaction_timestamp(
|
||||
iox_object_store: &IoxObjectStore,
|
||||
) -> Result<Option<DateTime<Utc>>> {
|
||||
) -> Result<Option<Time>> {
|
||||
let mut res = None;
|
||||
|
||||
let mut stream = iox_object_store
|
||||
|
@ -275,7 +274,7 @@ impl PreservedCatalog {
|
|||
match load_transaction_proto(iox_object_store, transaction_file_path).await {
|
||||
Ok(proto) => match proto_parse::parse_timestamp(&proto.start_timestamp) {
|
||||
Ok(ts) => {
|
||||
res = Some(res.map_or(ts, |res: DateTime<Utc>| res.max(ts)));
|
||||
res = Some(res.map_or(ts, |res: Time| res.max(ts)));
|
||||
}
|
||||
Err(e) => warn!(%e, ?transaction_file_path, "Cannot parse timestamp"),
|
||||
},
|
||||
|
@ -301,11 +300,6 @@ impl PreservedCatalog {
|
|||
Ok(iox_object_store.wipe_catalog().await.context(Write)?)
|
||||
}
|
||||
|
||||
/// Deletes the catalog described by the provided config
|
||||
pub async fn wipe_with_config(config: &PreservedCatalogConfig) -> Result<()> {
|
||||
Self::wipe(&config.iox_object_store).await
|
||||
}
|
||||
|
||||
/// Create new catalog w/o any data.
|
||||
///
|
||||
/// An empty transaction will be used to mark the catalog start so that concurrent open but
|
||||
|
@ -328,7 +322,7 @@ impl PreservedCatalog {
|
|||
transaction_semaphore: Semaphore::new(1),
|
||||
iox_object_store: config.iox_object_store,
|
||||
fixed_uuid: config.fixed_uuid,
|
||||
fixed_timestamp: config.fixed_timestamp,
|
||||
time_provider: config.time_provider,
|
||||
};
|
||||
|
||||
// add empty transaction
|
||||
|
@ -455,7 +449,7 @@ impl PreservedCatalog {
|
|||
transaction_semaphore: Semaphore::new(1),
|
||||
iox_object_store: config.iox_object_store,
|
||||
fixed_uuid: config.fixed_uuid,
|
||||
fixed_timestamp: config.fixed_timestamp,
|
||||
time_provider: config.time_provider,
|
||||
},
|
||||
state,
|
||||
)))
|
||||
|
@ -469,8 +463,7 @@ impl PreservedCatalog {
|
|||
/// transactions are given out in the order they were requested.
|
||||
pub async fn open_transaction(&self) -> TransactionHandle<'_> {
|
||||
let uuid = self.fixed_uuid.unwrap_or_else(Uuid::new_v4);
|
||||
let start_timestamp = self.fixed_timestamp.unwrap_or_else(Utc::now);
|
||||
TransactionHandle::new(self, uuid, start_timestamp).await
|
||||
TransactionHandle::new(self, uuid, self.time_provider.now()).await
|
||||
}
|
||||
|
||||
/// Get latest revision counter.
|
||||
|
@ -489,6 +482,15 @@ impl PreservedCatalog {
|
|||
.expect("catalog should have at least an empty transaction")
|
||||
}
|
||||
|
||||
/// Return the config for this `PreservedCatalog`
|
||||
pub fn config(&self) -> PreservedCatalogConfig {
|
||||
PreservedCatalogConfig {
|
||||
iox_object_store: Arc::clone(&self.iox_object_store),
|
||||
fixed_uuid: self.fixed_uuid,
|
||||
time_provider: Arc::clone(&self.time_provider),
|
||||
}
|
||||
}
|
||||
|
||||
/// Object store used by this catalog.
|
||||
pub fn iox_object_store(&self) -> Arc<IoxObjectStore> {
|
||||
Arc::clone(&self.iox_object_store)
|
||||
|
@ -509,11 +511,7 @@ struct OpenTransaction {
|
|||
impl OpenTransaction {
|
||||
/// Private API to create new transaction, users should always use
|
||||
/// [`PreservedCatalog::open_transaction`].
|
||||
fn new(
|
||||
previous_tkey: &Option<TransactionKey>,
|
||||
uuid: Uuid,
|
||||
start_timestamp: DateTime<Utc>,
|
||||
) -> Self {
|
||||
fn new(previous_tkey: &Option<TransactionKey>, uuid: Uuid, start_timestamp: Time) -> Self {
|
||||
let (revision_counter, previous_uuid) = match previous_tkey {
|
||||
Some(tkey) => (
|
||||
tkey.revision_counter + 1,
|
||||
|
@ -529,7 +527,7 @@ impl OpenTransaction {
|
|||
uuid: uuid.as_bytes().to_vec().into(),
|
||||
revision_counter,
|
||||
previous_uuid,
|
||||
start_timestamp: Some(start_timestamp.into()),
|
||||
start_timestamp: Some(start_timestamp.date_time().into()),
|
||||
encoding: proto::transaction::Encoding::Delta.into(),
|
||||
},
|
||||
}
|
||||
|
@ -744,7 +742,7 @@ impl<'c> TransactionHandle<'c> {
|
|||
async fn new(
|
||||
catalog: &'c PreservedCatalog,
|
||||
uuid: Uuid,
|
||||
start_timestamp: DateTime<Utc>,
|
||||
start_timestamp: Time,
|
||||
) -> TransactionHandle<'c> {
|
||||
// first acquire semaphore (which is only being used for transactions), then get state lock
|
||||
let permit = catalog
|
||||
|
@ -967,7 +965,7 @@ impl<'c> CheckpointHandle<'c> {
|
|||
previous_uuid: self
|
||||
.previous_tkey
|
||||
.map_or_else(Bytes::new, |tkey| tkey.uuid.as_bytes().to_vec().into()),
|
||||
start_timestamp: Some(Utc::now().into()),
|
||||
start_timestamp: Some(self.catalog.time_provider.now().date_time().into()),
|
||||
encoding: proto::transaction::Encoding::Full.into(),
|
||||
};
|
||||
let path = TransactionFilePath::new_checkpoint(self.tkey.revision_counter, self.tkey.uuid);
|
||||
|
@ -1855,7 +1853,7 @@ mod tests {
|
|||
states: Vec<TestCatalogState>,
|
||||
|
||||
/// Traces timestamp after every (committed and aborted) transaction.
|
||||
post_timestamps: Vec<DateTime<Utc>>,
|
||||
post_timestamps: Vec<Time>,
|
||||
|
||||
/// Traces if an transaction was aborted.
|
||||
aborted: Vec<bool>,
|
||||
|
@ -1874,7 +1872,7 @@ mod tests {
|
|||
fn record(&mut self, catalog: &PreservedCatalog, state: &TestCatalogState, aborted: bool) {
|
||||
self.tkeys.push(catalog.previous_tkey.read().unwrap());
|
||||
self.states.push(state.clone());
|
||||
self.post_timestamps.push(Utc::now());
|
||||
self.post_timestamps.push(catalog.time_provider.now());
|
||||
self.aborted.push(aborted);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -227,15 +227,16 @@ mod tests {
|
|||
},
|
||||
test_utils::{chunk_addr, make_config, make_metadata, TestSize},
|
||||
};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use time::Time;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dump_default_options() {
|
||||
let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp(10, 20)));
|
||||
let config = make_config()
|
||||
.await
|
||||
.with_fixed_uuid(Uuid::nil())
|
||||
.with_fixed_timestamp(Utc.timestamp(10, 20));
|
||||
.with_time_provider(time_provider);
|
||||
|
||||
let iox_object_store = &config.iox_object_store;
|
||||
|
||||
|
@ -352,10 +353,11 @@ File {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_dump_show_parsed_data() {
|
||||
let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp(10, 20)));
|
||||
let config = make_config()
|
||||
.await
|
||||
.with_fixed_uuid(Uuid::nil())
|
||||
.with_fixed_timestamp(Utc.timestamp(10, 20));
|
||||
.with_time_provider(time_provider);
|
||||
let iox_object_store = &config.iox_object_store;
|
||||
|
||||
// build catalog with some data
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use std::{convert::TryInto, num::TryFromIntError};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
use iox_object_store::{ParquetFilePath, ParquetFilePathParseError};
|
||||
use object_store::path::{parsed::DirsAndFileName, parts::PathPart};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use time::Time;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -81,13 +81,11 @@ pub fn unparse_dirs_and_filename(path: &ParquetFilePath) -> proto::Path {
|
|||
}
|
||||
|
||||
/// Parse timestamp from protobuf.
|
||||
pub fn parse_timestamp(
|
||||
ts: &Option<generated_types::google::protobuf::Timestamp>,
|
||||
) -> Result<DateTime<Utc>> {
|
||||
pub fn parse_timestamp(ts: &Option<generated_types::google::protobuf::Timestamp>) -> Result<Time> {
|
||||
let ts: generated_types::google::protobuf::Timestamp =
|
||||
ts.as_ref().context(DateTimeRequired)?.clone();
|
||||
let ts: DateTime<Utc> = ts.try_into().context(DateTimeParseError)?;
|
||||
Ok(ts)
|
||||
let ts = ts.try_into().context(DateTimeParseError)?;
|
||||
Ok(Time::from_date_time(ts))
|
||||
}
|
||||
|
||||
/// Parse encoding from protobuf.
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
//! Tooling to remove parts of the preserved catalog that are no longer needed.
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::TryStreamExt;
|
||||
use iox_object_store::{IoxObjectStore, TransactionFilePath};
|
||||
use object_store::{ObjectStore, ObjectStoreApi};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use time::Time;
|
||||
|
||||
use crate::catalog::{
|
||||
core::{ProtoIOError, ProtoParseError},
|
||||
|
@ -52,10 +52,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
///
|
||||
/// This will delete the following content: C1, T2, and T3. C3 and T4 cannot be deleted because it is required to
|
||||
/// recover T5 which is AFTER `before`.
|
||||
pub async fn prune_history(
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
before: DateTime<Utc>,
|
||||
) -> Result<()> {
|
||||
pub async fn prune_history(iox_object_store: Arc<IoxObjectStore>, before: Time) -> Result<()> {
|
||||
// collect all files so we can quickly filter them later for deletion
|
||||
// Use a btree-map so we can iterate from oldest to newest revision.
|
||||
let mut files: BTreeMap<u64, Vec<TransactionFilePath>> = Default::default();
|
||||
|
@ -122,7 +119,7 @@ fn is_checkpoint_or_zero(path: &TransactionFilePath) -> bool {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use chrono::Duration;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::{
|
||||
catalog::{
|
||||
|
@ -139,7 +136,9 @@ mod tests {
|
|||
async fn test_empty_store() {
|
||||
let iox_object_store = make_iox_object_store().await;
|
||||
|
||||
prune_history(iox_object_store, Utc::now()).await.unwrap();
|
||||
prune_history(iox_object_store, Time::from_timestamp_nanos(0))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -148,7 +147,10 @@ mod tests {
|
|||
|
||||
new_empty(config.clone()).await;
|
||||
|
||||
prune_history(Arc::clone(&config.iox_object_store), Utc::now())
|
||||
prune_history(
|
||||
Arc::clone(&config.iox_object_store),
|
||||
Time::from_timestamp_nanos(0),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -157,13 +159,20 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_complex_1() {
|
||||
let config = make_config().await;
|
||||
let time = Arc::new(time::MockProvider::new(Time::from_timestamp(0, 32)));
|
||||
let config = make_config()
|
||||
.await
|
||||
.with_time_provider(Arc::<time::MockProvider>::clone(&time));
|
||||
|
||||
let iox_object_store = &config.iox_object_store;
|
||||
|
||||
let (catalog, _state) = new_empty(config.clone()).await;
|
||||
create_transaction(&catalog).await;
|
||||
create_transaction_and_checkpoint(&catalog).await;
|
||||
let before = Utc::now();
|
||||
|
||||
let before = time.inc(Duration::from_secs(21));
|
||||
time.inc(Duration::from_secs(1));
|
||||
|
||||
create_transaction(&catalog).await;
|
||||
|
||||
prune_history(Arc::clone(iox_object_store), before)
|
||||
|
@ -178,14 +187,22 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_complex_2() {
|
||||
let config = make_config().await;
|
||||
let time = Arc::new(time::MockProvider::new(Time::from_timestamp(0, 32)));
|
||||
let config = make_config()
|
||||
.await
|
||||
.with_time_provider(Arc::<time::MockProvider>::clone(&time));
|
||||
|
||||
let iox_object_store = &config.iox_object_store;
|
||||
|
||||
let (catalog, _state) = new_empty(config.clone()).await;
|
||||
|
||||
create_transaction(&catalog).await;
|
||||
create_transaction_and_checkpoint(&catalog).await;
|
||||
create_transaction(&catalog).await;
|
||||
let before = Utc::now();
|
||||
|
||||
let before = time.inc(Duration::from_secs(25));
|
||||
time.inc(Duration::from_secs(1));
|
||||
|
||||
create_transaction(&catalog).await;
|
||||
create_transaction_and_checkpoint(&catalog).await;
|
||||
create_transaction(&catalog).await;
|
||||
|
@ -217,7 +234,7 @@ mod tests {
|
|||
create_transaction_and_checkpoint(&catalog).await;
|
||||
create_transaction(&catalog).await;
|
||||
|
||||
let before = Utc::now() - Duration::seconds(1_000);
|
||||
let before = config.time_provider.now() - Duration::from_secs(1_000);
|
||||
prune_history(Arc::clone(iox_object_store), before)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -871,7 +871,8 @@ pub async fn make_iox_object_store() -> Arc<IoxObjectStore> {
|
|||
/// Creates a new [`PreservedCatalogConfig`] with an in-memory object store
|
||||
pub async fn make_config() -> PreservedCatalogConfig {
|
||||
let iox_object_store = make_iox_object_store().await;
|
||||
PreservedCatalogConfig::new(iox_object_store)
|
||||
let time_provider = Arc::new(time::SystemProvider::new());
|
||||
PreservedCatalogConfig::new(iox_object_store, time_provider)
|
||||
}
|
||||
|
||||
pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) -> Vec<RecordBatch> {
|
||||
|
|
|
@ -20,7 +20,7 @@ use internal_types::freezable::Freezable;
|
|||
use iox_object_store::IoxObjectStore;
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
|
||||
use parquet_file::catalog::core::{PreservedCatalog, PreservedCatalogConfig};
|
||||
use parquet_file::catalog::core::PreservedCatalog;
|
||||
use persistence_windows::checkpoint::ReplayPlan;
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::{future::Future, sync::Arc, time::Duration};
|
||||
|
@ -210,10 +210,9 @@ impl Database {
|
|||
.await
|
||||
.context(SavingRules)?;
|
||||
|
||||
let config = PreservedCatalogConfig::new(iox_object_store);
|
||||
create_preserved_catalog(
|
||||
db_name,
|
||||
config,
|
||||
iox_object_store,
|
||||
Arc::clone(application.metric_registry()),
|
||||
Arc::clone(application.time_provider()),
|
||||
true,
|
||||
|
@ -1050,12 +1049,9 @@ impl DatabaseStateDatabaseObjectStoreFound {
|
|||
.fail();
|
||||
}
|
||||
|
||||
let catalog_config = PreservedCatalogConfig::new(Arc::clone(&self.iox_object_store));
|
||||
|
||||
Ok(DatabaseStateRulesLoaded {
|
||||
provided_rules: Arc::new(rules),
|
||||
iox_object_store: Arc::clone(&self.iox_object_store),
|
||||
catalog_config,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -1064,7 +1060,6 @@ impl DatabaseStateDatabaseObjectStoreFound {
|
|||
struct DatabaseStateRulesLoaded {
|
||||
provided_rules: Arc<ProvidedDatabaseRules>,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
catalog_config: PreservedCatalogConfig,
|
||||
}
|
||||
|
||||
impl DatabaseStateRulesLoaded {
|
||||
|
@ -1075,7 +1070,7 @@ impl DatabaseStateRulesLoaded {
|
|||
) -> Result<DatabaseStateCatalogLoaded, InitError> {
|
||||
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
|
||||
shared.config.name.as_str(),
|
||||
self.catalog_config.clone(),
|
||||
Arc::clone(&self.iox_object_store),
|
||||
Arc::clone(shared.application.metric_registry()),
|
||||
Arc::clone(shared.application.time_provider()),
|
||||
shared.config.wipe_catalog_on_error,
|
||||
|
|
|
@ -14,7 +14,6 @@ use std::{
|
|||
|
||||
use ::lifecycle::select_persistable_chunks;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use rand_distr::{Distribution, Poisson};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
|
@ -826,7 +825,9 @@ impl Db {
|
|||
.as_mut()
|
||||
.expect("lifecycle policy should be initialized");
|
||||
|
||||
policy.check_for_work(self.utc_now()).await
|
||||
policy
|
||||
.check_for_work(self.time_provider.now().date_time())
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -854,21 +855,14 @@ impl Db {
|
|||
debug!(?duration, "cleanup worker sleeps");
|
||||
tokio::time::sleep(duration).await;
|
||||
|
||||
match chrono::Duration::from_std(catalog_transaction_prune_age) {
|
||||
Ok(catalog_transaction_prune_age) => {
|
||||
if let Err(e) = prune_catalog_transaction_history(
|
||||
self.iox_object_store(),
|
||||
Utc::now() - catalog_transaction_prune_age,
|
||||
self.time_provider.now() - catalog_transaction_prune_age,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(%e, "error while pruning catalog transactions");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%e, "cannot convert `catalog_transaction_prune_age`, skipping transaction pruning");
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.cleanup_unreferenced_parquet_files().await {
|
||||
error!(%e, "error while cleaning unreferenced parquet files");
|
||||
|
@ -916,13 +910,6 @@ impl Db {
|
|||
info!("finished db background worker");
|
||||
}
|
||||
|
||||
/// `Utc::now()` that is used by `Db`. Can be mocked for testing.
|
||||
///
|
||||
/// TODO: Remove (#2722)
|
||||
fn utc_now(&self) -> DateTime<Utc> {
|
||||
self.time_provider.now().date_time()
|
||||
}
|
||||
|
||||
async fn cleanup_unreferenced_parquet_files(
|
||||
self: &Arc<Self>,
|
||||
) -> std::result::Result<(), parquet_file::catalog::cleanup::Error> {
|
||||
|
@ -1422,7 +1409,6 @@ mod tests {
|
|||
use iox_object_store::ParquetFilePath;
|
||||
use metric::{Attributes, CumulativeGauge, Metric, Observation};
|
||||
use object_store::ObjectStore;
|
||||
use parquet_file::catalog::core::PreservedCatalogConfig;
|
||||
use parquet_file::{
|
||||
catalog::test_helpers::load_ok,
|
||||
metadata::IoxParquetMetaData,
|
||||
|
@ -3252,7 +3238,7 @@ mod tests {
|
|||
|
||||
// ==================== check: empty catalog created ====================
|
||||
// at this point, an empty preserved catalog exists
|
||||
let config = PreservedCatalogConfig::new(Arc::clone(&db.iox_object_store));
|
||||
let config = db.preserved_catalog.config();
|
||||
let maybe_preserved_catalog = load_ok(config.clone()).await;
|
||||
assert!(maybe_preserved_catalog.is_some());
|
||||
|
||||
|
|
|
@ -238,7 +238,6 @@ mod tests {
|
|||
};
|
||||
use lifecycle::{LockableChunk, LockablePartition};
|
||||
use object_store::ObjectStore;
|
||||
use parquet_file::catalog::core::PreservedCatalogConfig;
|
||||
use predicate::delete_expr::{DeleteExpr, Op, Scalar};
|
||||
use query::QueryDatabase;
|
||||
use std::{
|
||||
|
@ -560,10 +559,9 @@ mod tests {
|
|||
|
||||
// check object store delete predicates
|
||||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let config = PreservedCatalogConfig::new(Arc::clone(&db.iox_object_store));
|
||||
let (_preserved_catalog, catalog, _replay_plan) = load_or_create_preserved_catalog(
|
||||
db_name,
|
||||
config,
|
||||
Arc::clone(&db.iox_object_store),
|
||||
metric_registry,
|
||||
Arc::clone(&db.time_provider),
|
||||
false,
|
||||
|
|
|
@ -4,10 +4,9 @@
|
|||
use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog};
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use parquet_file::catalog::core::PreservedCatalogConfig;
|
||||
use parquet_file::{
|
||||
catalog::{
|
||||
core::PreservedCatalog,
|
||||
core::{PreservedCatalog, PreservedCatalogConfig},
|
||||
interface::{
|
||||
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
|
||||
ChunkAddrWithoutDatabase, ChunkCreationFailed,
|
||||
|
@ -53,7 +52,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// <https://github.com/influxdata/influxdb_iox/issues/1522>
|
||||
pub async fn load_or_create_preserved_catalog(
|
||||
db_name: &str,
|
||||
config: PreservedCatalogConfig,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
metric_registry: Arc<::metric::Registry>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
wipe_on_error: bool,
|
||||
|
@ -62,7 +61,7 @@ pub async fn load_or_create_preserved_catalog(
|
|||
// first try to load existing catalogs
|
||||
match PreservedCatalog::load(
|
||||
db_name,
|
||||
config.clone(),
|
||||
PreservedCatalogConfig::new(Arc::clone(&iox_object_store), Arc::clone(&time_provider)),
|
||||
LoaderEmptyInput::new(
|
||||
Arc::clone(&metric_registry),
|
||||
Arc::clone(&time_provider),
|
||||
|
@ -90,7 +89,13 @@ pub async fn load_or_create_preserved_catalog(
|
|||
db_name
|
||||
);
|
||||
|
||||
create_preserved_catalog(db_name, config, metric_registry, time_provider, skip_replay)
|
||||
create_preserved_catalog(
|
||||
db_name,
|
||||
iox_object_store,
|
||||
metric_registry,
|
||||
time_provider,
|
||||
skip_replay,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(e) => {
|
||||
|
@ -99,13 +104,13 @@ pub async fn load_or_create_preserved_catalog(
|
|||
// broken => wipe for now (at least during early iterations)
|
||||
error!("cannot load catalog, so wipe it: {}", e);
|
||||
|
||||
PreservedCatalog::wipe_with_config(&config)
|
||||
PreservedCatalog::wipe(&iox_object_store)
|
||||
.await
|
||||
.context(CannotWipeCatalog)?;
|
||||
|
||||
create_preserved_catalog(
|
||||
db_name,
|
||||
config,
|
||||
iox_object_store,
|
||||
metric_registry,
|
||||
time_provider,
|
||||
skip_replay,
|
||||
|
@ -123,11 +128,13 @@ pub async fn load_or_create_preserved_catalog(
|
|||
/// This will fail if a preserved catalog already exists.
|
||||
pub async fn create_preserved_catalog(
|
||||
db_name: &str,
|
||||
config: PreservedCatalogConfig,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
skip_replay: bool,
|
||||
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
|
||||
let config = PreservedCatalogConfig::new(iox_object_store, Arc::clone(&time_provider));
|
||||
|
||||
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
|
||||
db_name,
|
||||
config,
|
||||
|
@ -341,15 +348,16 @@ mod tests {
|
|||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let config = PreservedCatalogConfig::new(iox_object_store);
|
||||
let config =
|
||||
PreservedCatalogConfig::new(Arc::clone(&iox_object_store), Arc::clone(&time_provider));
|
||||
|
||||
let (preserved_catalog, _catalog) = new_empty(config.clone()).await;
|
||||
let (preserved_catalog, _catalog) = new_empty(config).await;
|
||||
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
|
||||
.await;
|
||||
|
||||
load_or_create_preserved_catalog(
|
||||
&db_name,
|
||||
config,
|
||||
iox_object_store,
|
||||
Default::default(),
|
||||
time_provider,
|
||||
true,
|
||||
|
|
|
@ -2206,11 +2206,12 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
let (preserved_catalog, _catalog) = load_ok(PreservedCatalogConfig::new(
|
||||
let config = PreservedCatalogConfig::new(
|
||||
catalog_broken.iox_object_store().unwrap(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
Arc::clone(application.time_provider()),
|
||||
);
|
||||
|
||||
let (preserved_catalog, _catalog) = load_ok(config).await.unwrap();
|
||||
|
||||
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
|
||||
.await;
|
||||
|
@ -2289,7 +2290,13 @@ mod tests {
|
|||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
new_empty(PreservedCatalogConfig::new(non_existing_iox_object_store)).await;
|
||||
|
||||
let config = PreservedCatalogConfig::new(
|
||||
non_existing_iox_object_store,
|
||||
Arc::clone(application.time_provider()),
|
||||
);
|
||||
new_empty(config).await;
|
||||
|
||||
assert_eq!(
|
||||
server
|
||||
.wipe_preserved_catalog(&db_name_non_existing)
|
||||
|
@ -2384,8 +2391,11 @@ mod tests {
|
|||
.unwrap(),
|
||||
);
|
||||
|
||||
let config =
|
||||
PreservedCatalogConfig::new(iox_object_store, Arc::clone(application.time_provider()));
|
||||
|
||||
// create catalog
|
||||
new_empty(PreservedCatalogConfig::new(iox_object_store)).await;
|
||||
new_empty(config).await;
|
||||
|
||||
// creating database will now result in an error
|
||||
let err = create_simple_database(&server, db_name).await.unwrap_err();
|
||||
|
|
|
@ -10,7 +10,6 @@ use data_types::{
|
|||
};
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use object_store::ObjectStore;
|
||||
use parquet_file::catalog::core::PreservedCatalogConfig;
|
||||
use persistence_windows::checkpoint::ReplayPlan;
|
||||
use query::exec::ExecutorConfig;
|
||||
use query::{exec::Executor, QueryDatabase};
|
||||
|
@ -82,7 +81,6 @@ impl TestDbBuilder {
|
|||
};
|
||||
|
||||
let iox_object_store = Arc::new(iox_object_store);
|
||||
let config = PreservedCatalogConfig::new(Arc::clone(&iox_object_store));
|
||||
|
||||
// deterministic thread and concurrency count
|
||||
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
|
||||
|
@ -94,7 +92,7 @@ impl TestDbBuilder {
|
|||
|
||||
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
|
||||
db_name.as_str(),
|
||||
config,
|
||||
Arc::clone(&iox_object_store),
|
||||
Arc::clone(&metric_registry),
|
||||
Arc::clone(&time_provider),
|
||||
false,
|
||||
|
|
Loading…
Reference in New Issue