From eb31b347b0b42ae039c0c6a353cbce7676048514 Mon Sep 17 00:00:00 2001
From: "Carol (Nichols || Goulding)" <carol.nichols@gmail.com>
Date: Wed, 4 May 2022 15:56:17 -0400
Subject: [PATCH] refactor: Move tombstones_to_delete_predicates to the
 predicate crate

---
 Cargo.lock                        |  2 +-
 compactor/src/query.rs            |  6 +++---
 data_types2/Cargo.toml            |  1 -
 data_types2/src/lib.rs            | 22 ---------------------
 ingester/src/query.rs             |  9 ++++++---
 predicate/Cargo.toml              |  1 +
 predicate/src/delete_predicate.rs | 33 +++++++++++++++++++++++--------
 7 files changed, 36 insertions(+), 38 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 9f7c35caff..c7278dabde 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1174,7 +1174,6 @@ version = "0.1.0"
 dependencies = [
  "data_types",
  "influxdb_line_protocol",
- "predicate",
  "schema",
  "sqlx",
  "uuid 0.8.2",
@@ -4054,6 +4053,7 @@ dependencies = [
  "arrow",
  "chrono",
  "data_types",
+ "data_types2",
  "datafusion 0.1.0",
  "datafusion_util",
  "itertools",
diff --git a/compactor/src/query.rs b/compactor/src/query.rs
index 7d882f346e..fe179c17d4 100644
--- a/compactor/src/query.rs
+++ b/compactor/src/query.rs
@@ -4,13 +4,13 @@ use std::sync::Arc;
 
 use data_types::timestamp::TimestampMinMax;
 use data_types2::{
-    tombstones_to_delete_predicates, ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId,
-    SequenceNumber, TableSummary, Timestamp, Tombstone,
+    ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
+    Timestamp, Tombstone,
 };
 use datafusion::physical_plan::SendableRecordBatchStream;
 use observability_deps::tracing::trace;
 use parquet_file::chunk::ParquetChunk;
-use predicate::{Predicate, PredicateMatch};
+use predicate::{delete_predicate::tombstones_to_delete_predicates, Predicate, PredicateMatch};
 use query::{
     exec::{stringset::StringSet, IOxSessionContext},
     QueryChunk, QueryChunkError, QueryChunkMeta,
diff --git a/data_types2/Cargo.toml b/data_types2/Cargo.toml
index 4f03457c81..5e0b6c55c1 100644
--- a/data_types2/Cargo.toml
+++ b/data_types2/Cargo.toml
@@ -7,7 +7,6 @@ description = "Shared data types in the Iox NG architecture"
 [dependencies]
 data_types = { path = "../data_types" }
 influxdb_line_protocol = { path = "../influxdb_line_protocol" }
-predicate = { path = "../predicate" }
 schema = { path = "../schema" }
 sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] }
 uuid = { version = "0.8", features = ["v4"] }
diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs
index 5e594251d7..763d3c8df8 100644
--- a/data_types2/src/lib.rs
+++ b/data_types2/src/lib.rs
@@ -11,7 +11,6 @@
 )]
 
 use influxdb_line_protocol::FieldValue;
-use predicate::delete_predicate::parse_delete_predicate;
 use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
 use std::{
     collections::BTreeMap,
@@ -699,27 +698,6 @@ pub struct Tombstone {
     pub serialized_predicate: String,
 }
 
-/// Convert tombstones to delete predicates
-pub fn tombstones_to_delete_predicates(tombstones: &[Tombstone]) -> Vec<Arc<DeletePredicate>> {
-    tombstones_to_delete_predicates_iter(tombstones).collect()
-}
-
-/// Return Iterator of delete predicates
-pub fn tombstones_to_delete_predicates_iter(
-    tombstones: &[Tombstone],
-) -> impl Iterator<Item = Arc<DeletePredicate>> + '_ {
-    tombstones.iter().map(|tombstone| {
-        Arc::new(
-            parse_delete_predicate(
-                &tombstone.min_time.get().to_string(),
-                &tombstone.max_time.get().to_string(),
-                &tombstone.serialized_predicate,
-            )
-            .expect("Error building delete predicate"),
-        )
-    })
-}
-
 /// Data for a parquet file reference that has been inserted in the catalog.
 #[derive(Debug, Clone, Copy, PartialEq, sqlx::FromRow)]
 pub struct ParquetFile {
diff --git a/ingester/src/query.rs b/ingester/src/query.rs
index aff10d9b28..6d3a3b5ff4 100644
--- a/ingester/src/query.rs
+++ b/ingester/src/query.rs
@@ -5,8 +5,8 @@ use arrow::record_batch::RecordBatch;
 use arrow_util::util::merge_record_batches;
 use data_types::timestamp::TimestampMinMax;
 use data_types2::{
-    tombstones_to_delete_predicates, tombstones_to_delete_predicates_iter, ChunkAddr, ChunkId,
-    ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, Tombstone,
+    ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
+    Tombstone,
 };
 use datafusion::{
     logical_plan::ExprRewritable,
@@ -18,7 +18,10 @@ use datafusion::{
 };
 use datafusion_util::batch_filter;
 use observability_deps::tracing::{debug, trace};
-use predicate::{Predicate, PredicateMatch};
+use predicate::{
+    delete_predicate::{tombstones_to_delete_predicates, tombstones_to_delete_predicates_iter},
+    Predicate, PredicateMatch,
+};
 use query::{
     exec::{stringset::StringSet, IOxSessionContext},
     util::{df_physical_expr_from_schema_and_expr, MissingColumnsToNull},
diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml
index 763e383be1..098ec60c7d 100644
--- a/predicate/Cargo.toml
+++ b/predicate/Cargo.toml
@@ -7,6 +7,7 @@ edition = "2021"
 arrow = { version = "13", features = ["prettyprint"] }
 chrono = { version = "0.4", default-features = false }
 data_types = { path = "../data_types" }
+data_types2 = { path = "../data_types2" }
 datafusion = { path = "../datafusion" }
 datafusion_util = { path = "../datafusion_util" }
 itertools = "0.10"
diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs
index 024de46ca0..a97d9ba5b5 100644
--- a/predicate/src/delete_predicate.rs
+++ b/predicate/src/delete_predicate.rs
@@ -1,18 +1,14 @@
+use crate::delete_expr::{df_to_expr, expr_to_df};
 use chrono::DateTime;
+use data_types2::{DeleteExpr, DeletePredicate, TimestampRange, Tombstone};
+use datafusion::logical_plan::{lit, Column, Expr, Operator};
 use snafu::{ResultExt, Snafu};
 use sqlparser::{
     ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
     dialect::GenericDialect,
     parser::Parser,
 };
-
-use data_types::{
-    delete_predicate::{DeleteExpr, DeletePredicate},
-    timestamp::TimestampRange,
-};
-use datafusion::logical_plan::{lit, Column, Expr, Operator};
-
-use crate::delete_expr::{df_to_expr, expr_to_df};
+use std::sync::Arc;
 
 const FLUX_TABLE: &str = "_measurement";
 
@@ -80,6 +76,27 @@ impl From<DeletePredicate> for crate::Predicate {
     }
 }
 
+/// Convert tombstones to delete predicates
+pub fn tombstones_to_delete_predicates(tombstones: &[Tombstone]) -> Vec<Arc<DeletePredicate>> {
+    tombstones_to_delete_predicates_iter(tombstones).collect()
+}
+
+/// Return Iterator of delete predicates
+pub fn tombstones_to_delete_predicates_iter(
+    tombstones: &[Tombstone],
+) -> impl Iterator<Item = Arc<DeletePredicate>> + '_ {
+    tombstones.iter().map(|tombstone| {
+        Arc::new(
+            parse_delete_predicate(
+                &tombstone.min_time.get().to_string(),
+                &tombstone.max_time.get().to_string(),
+                &tombstone.serialized_predicate,
+            )
+            .expect("Error building delete predicate"),
+        )
+    })
+}
+
 /// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
 pub fn parse_delete_predicate(
     start_time: &str,