diff --git a/Cargo.lock b/Cargo.lock index 9f7c35caff..c7278dabde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1174,7 +1174,6 @@ version = "0.1.0" dependencies = [ "data_types", "influxdb_line_protocol", - "predicate", "schema", "sqlx", "uuid 0.8.2", @@ -4054,6 +4053,7 @@ dependencies = [ "arrow", "chrono", "data_types", + "data_types2", "datafusion 0.1.0", "datafusion_util", "itertools", diff --git a/compactor/src/query.rs b/compactor/src/query.rs index 7d882f346e..fe179c17d4 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -4,13 +4,13 @@ use std::sync::Arc; use data_types::timestamp::TimestampMinMax; use data_types2::{ - tombstones_to_delete_predicates, ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, - SequenceNumber, TableSummary, Timestamp, Tombstone, + ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, + Timestamp, Tombstone, }; use datafusion::physical_plan::SendableRecordBatchStream; use observability_deps::tracing::trace; use parquet_file::chunk::ParquetChunk; -use predicate::{Predicate, PredicateMatch}; +use predicate::{delete_predicate::tombstones_to_delete_predicates, Predicate, PredicateMatch}; use query::{ exec::{stringset::StringSet, IOxSessionContext}, QueryChunk, QueryChunkError, QueryChunkMeta, diff --git a/data_types2/Cargo.toml b/data_types2/Cargo.toml index 4f03457c81..5e0b6c55c1 100644 --- a/data_types2/Cargo.toml +++ b/data_types2/Cargo.toml @@ -7,7 +7,6 @@ description = "Shared data types in the Iox NG architecture" [dependencies] data_types = { path = "../data_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } -predicate = { path = "../predicate" } schema = { path = "../schema" } sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] } uuid = { version = "0.8", features = ["v4"] } diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index 5e594251d7..763d3c8df8 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -11,7 +11,6 @@ )] use influxdb_line_protocol::FieldValue; -use predicate::delete_predicate::parse_delete_predicate; use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema}; use std::{ collections::BTreeMap, @@ -699,27 +698,6 @@ pub struct Tombstone { pub serialized_predicate: String, } -/// Convert tombstones to delete predicates -pub fn tombstones_to_delete_predicates(tombstones: &[Tombstone]) -> Vec> { - tombstones_to_delete_predicates_iter(tombstones).collect() -} - -/// Return Iterator of delete predicates -pub fn tombstones_to_delete_predicates_iter( - tombstones: &[Tombstone], -) -> impl Iterator> + '_ { - tombstones.iter().map(|tombstone| { - Arc::new( - parse_delete_predicate( - &tombstone.min_time.get().to_string(), - &tombstone.max_time.get().to_string(), - &tombstone.serialized_predicate, - ) - .expect("Error building delete predicate"), - ) - }) -} - /// Data for a parquet file reference that has been inserted in the catalog. #[derive(Debug, Clone, Copy, PartialEq, sqlx::FromRow)] pub struct ParquetFile { diff --git a/ingester/src/query.rs b/ingester/src/query.rs index aff10d9b28..6d3a3b5ff4 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -5,8 +5,8 @@ use arrow::record_batch::RecordBatch; use arrow_util::util::merge_record_batches; use data_types::timestamp::TimestampMinMax; use data_types2::{ - tombstones_to_delete_predicates, tombstones_to_delete_predicates_iter, ChunkAddr, ChunkId, - ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, Tombstone, + ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, + Tombstone, }; use datafusion::{ logical_plan::ExprRewritable, @@ -18,7 +18,10 @@ use datafusion::{ }; use datafusion_util::batch_filter; use observability_deps::tracing::{debug, trace}; -use predicate::{Predicate, PredicateMatch}; +use predicate::{ + delete_predicate::{tombstones_to_delete_predicates, tombstones_to_delete_predicates_iter}, + Predicate, PredicateMatch, +}; use query::{ exec::{stringset::StringSet, IOxSessionContext}, util::{df_physical_expr_from_schema_and_expr, MissingColumnsToNull}, diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index 763e383be1..098ec60c7d 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" arrow = { version = "13", features = ["prettyprint"] } chrono = { version = "0.4", default-features = false } data_types = { path = "../data_types" } +data_types2 = { path = "../data_types2" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } itertools = "0.10" diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index 024de46ca0..a97d9ba5b5 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -1,18 +1,14 @@ +use crate::delete_expr::{df_to_expr, expr_to_df}; use chrono::DateTime; +use data_types2::{DeleteExpr, DeletePredicate, TimestampRange, Tombstone}; +use datafusion::logical_plan::{lit, Column, Expr, Operator}; use snafu::{ResultExt, Snafu}; use sqlparser::{ ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value}, dialect::GenericDialect, parser::Parser, }; - -use data_types::{ - delete_predicate::{DeleteExpr, DeletePredicate}, - timestamp::TimestampRange, -}; -use datafusion::logical_plan::{lit, Column, Expr, Operator}; - -use crate::delete_expr::{df_to_expr, expr_to_df}; +use std::sync::Arc; const FLUX_TABLE: &str = "_measurement"; @@ -80,6 +76,27 @@ impl From for crate::Predicate { } } +/// Convert tombstones to delete predicates +pub fn tombstones_to_delete_predicates(tombstones: &[Tombstone]) -> Vec> { + tombstones_to_delete_predicates_iter(tombstones).collect() +} + +/// Return Iterator of delete predicates +pub fn tombstones_to_delete_predicates_iter( + tombstones: &[Tombstone], +) -> impl Iterator> + '_ { + tombstones.iter().map(|tombstone| { + Arc::new( + parse_delete_predicate( + &tombstone.min_time.get().to_string(), + &tombstone.max_time.get().to_string(), + &tombstone.serialized_predicate, + ) + .expect("Error building delete predicate"), + ) + }) +} + /// Parse and convert the delete grpc API into ParseDeletePredicate to send to server pub fn parse_delete_predicate( start_time: &str,