From 2a52fd90d9379c055de8e6050c032bb91bf93446 Mon Sep 17 00:00:00 2001
From: Marco Neumann <marco@crepererum.net>
Date: Thu, 7 Oct 2021 10:14:42 +0200
Subject: [PATCH 1/2] fix: transaction pruning logic for "nothing to do"

---
 parquet_file/src/catalog/prune.rs | 36 ++++++++++++++++++++++---------
 1 file changed, 26 insertions(+), 10 deletions(-)

diff --git a/parquet_file/src/catalog/prune.rs b/parquet_file/src/catalog/prune.rs
index 2b8a173d8f..31b8228074 100644
--- a/parquet_file/src/catalog/prune.rs
+++ b/parquet_file/src/catalog/prune.rs
@@ -63,9 +63,6 @@ pub async fn prune_history(
     // remember latest checkpoint is <= before
     let mut latest_in_prune_range: Option<u64> = None;
 
-    // remember known checkpoint
-    let mut latest_known: Option<u64> = None;
-
     let mut stream = iox_object_store
         .catalog_transaction_files()
         .await
@@ -87,12 +84,6 @@ pub async fn prune_history(
                         }),
                     );
                 }
-
-                latest_known = Some(
-                    latest_known.map_or(transaction_file_path.revision_counter, |x| {
-                        x.max(transaction_file_path.revision_counter)
-                    }),
-                );
             }
 
             files
@@ -102,7 +93,7 @@ pub async fn prune_history(
         }
     }
 
-    if let Some(earliest_keep) = latest_in_prune_range.or(latest_known) {
+    if let Some(earliest_keep) = latest_in_prune_range {
         for (revision, files) in files {
             if revision > earliest_keep {
                 break;
@@ -131,6 +122,8 @@ fn is_checkpoint_or_zero(path: &TransactionFilePath) -> bool {
 
 #[cfg(test)]
 mod tests {
+    use chrono::Duration;
+
     use crate::{
         catalog::{
             core::PreservedCatalog, interface::CheckpointData, test_helpers::TestCatalogState,
@@ -221,6 +214,29 @@ mod tests {
         );
     }
 
+    #[tokio::test]
+    async fn test_keep_all() {
+        let iox_object_store = make_iox_object_store().await;
+
+        let (catalog, _state) =
+            PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
+                .await
+                .unwrap();
+        create_transaction(&catalog).await;
+        create_transaction_and_checkpoint(&catalog).await;
+        create_transaction(&catalog).await;
+
+        let before = Utc::now() - Duration::seconds(1_000);
+        prune_history(Arc::clone(&iox_object_store), before)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            known_revisions(&iox_object_store).await,
+            vec![(0, false), (1, false), (2, false), (2, true), (3, false)],
+        );
+    }
+
     async fn create_transaction(catalog: &PreservedCatalog) {
         let t = catalog.open_transaction().await;
         t.commit().await.unwrap();

From 0374ba2284d30affb3355194de16592f8ae0b505 Mon Sep 17 00:00:00 2001
From: Marco Neumann <marco@crepererum.net>
Date: Thu, 7 Oct 2021 10:15:37 +0200
Subject: [PATCH 2/2] fix: re-enable no longer flaky part of
 `delete_predicate_preservation`

Fix #2748.
---
 server/src/db.rs | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/server/src/db.rs b/server/src/db.rs
index 8ad9692d67..c782445bb0 100644
--- a/server/src/db.rs
+++ b/server/src/db.rs
@@ -3639,6 +3639,8 @@ mod tests {
             .db_name(db_name)
             .lifecycle_rules(LifecycleRules {
                 catalog_transactions_until_checkpoint: NonZeroU64::try_from(1).unwrap(),
+                // do not prune transactions files because this tests relies on them
+                catalog_transaction_prune_age: Duration::from_secs(1_000),
                 late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
                 ..Default::default()
             })
@@ -3832,8 +3834,8 @@ mod tests {
             "| d    | 40  | 0        | 1970-01-01T00:00:00.000000040Z |",
             "+------+-----+----------+--------------------------------+",
         ];
-        // let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
-        // assert_batches_sorted_eq!(&expected, &batches);
+        let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
+        assert_batches_sorted_eq!(&expected, &batches);
     }
 
     #[tokio::test]