Merge pull request #2757 from influxdata/crepererum/issue2748
fix: flaky `delete_predicate_preservation`pull/24376/head
commit
be60b6c473
|
@ -63,9 +63,6 @@ pub async fn prune_history(
|
||||||
// remember latest checkpoint is <= before
|
// remember latest checkpoint is <= before
|
||||||
let mut latest_in_prune_range: Option<u64> = None;
|
let mut latest_in_prune_range: Option<u64> = None;
|
||||||
|
|
||||||
// remember known checkpoint
|
|
||||||
let mut latest_known: Option<u64> = None;
|
|
||||||
|
|
||||||
let mut stream = iox_object_store
|
let mut stream = iox_object_store
|
||||||
.catalog_transaction_files()
|
.catalog_transaction_files()
|
||||||
.await
|
.await
|
||||||
|
@ -87,12 +84,6 @@ pub async fn prune_history(
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
latest_known = Some(
|
|
||||||
latest_known.map_or(transaction_file_path.revision_counter, |x| {
|
|
||||||
x.max(transaction_file_path.revision_counter)
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
files
|
files
|
||||||
|
@ -102,7 +93,7 @@ pub async fn prune_history(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(earliest_keep) = latest_in_prune_range.or(latest_known) {
|
if let Some(earliest_keep) = latest_in_prune_range {
|
||||||
for (revision, files) in files {
|
for (revision, files) in files {
|
||||||
if revision > earliest_keep {
|
if revision > earliest_keep {
|
||||||
break;
|
break;
|
||||||
|
@ -131,6 +122,8 @@ fn is_checkpoint_or_zero(path: &TransactionFilePath) -> bool {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use chrono::Duration;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
catalog::{
|
catalog::{
|
||||||
core::PreservedCatalog, interface::CheckpointData, test_helpers::TestCatalogState,
|
core::PreservedCatalog, interface::CheckpointData, test_helpers::TestCatalogState,
|
||||||
|
@ -221,6 +214,29 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_keep_all() {
|
||||||
|
let iox_object_store = make_iox_object_store().await;
|
||||||
|
|
||||||
|
let (catalog, _state) =
|
||||||
|
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
create_transaction(&catalog).await;
|
||||||
|
create_transaction_and_checkpoint(&catalog).await;
|
||||||
|
create_transaction(&catalog).await;
|
||||||
|
|
||||||
|
let before = Utc::now() - Duration::seconds(1_000);
|
||||||
|
prune_history(Arc::clone(&iox_object_store), before)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
known_revisions(&iox_object_store).await,
|
||||||
|
vec![(0, false), (1, false), (2, false), (2, true), (3, false)],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
async fn create_transaction(catalog: &PreservedCatalog) {
|
async fn create_transaction(catalog: &PreservedCatalog) {
|
||||||
let t = catalog.open_transaction().await;
|
let t = catalog.open_transaction().await;
|
||||||
t.commit().await.unwrap();
|
t.commit().await.unwrap();
|
||||||
|
|
|
@ -3642,6 +3642,8 @@ mod tests {
|
||||||
.db_name(db_name)
|
.db_name(db_name)
|
||||||
.lifecycle_rules(LifecycleRules {
|
.lifecycle_rules(LifecycleRules {
|
||||||
catalog_transactions_until_checkpoint: NonZeroU64::try_from(1).unwrap(),
|
catalog_transactions_until_checkpoint: NonZeroU64::try_from(1).unwrap(),
|
||||||
|
// do not prune transactions files because this tests relies on them
|
||||||
|
catalog_transaction_prune_age: Duration::from_secs(1_000),
|
||||||
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
|
@ -3835,8 +3837,8 @@ mod tests {
|
||||||
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
||||||
"+------+-----+----------+--------------------------------+",
|
"+------+-----+----------+--------------------------------+",
|
||||||
];
|
];
|
||||||
// let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
||||||
// assert_batches_sorted_eq!(&expected, &batches);
|
assert_batches_sorted_eq!(&expected, &batches);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
Loading…
Reference in New Issue