refactor: Move tombstones_to_delete_predicates to the predicate crate

pull/24376/head
Carol (Nichols || Goulding) 2022-05-04 15:56:17 -04:00
parent 485d6edb8f
commit eb31b347b0
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
7 changed files with 36 additions and 38 deletions

2
Cargo.lock generated
View File

@ -1174,7 +1174,6 @@ version = "0.1.0"
dependencies = [
"data_types",
"influxdb_line_protocol",
"predicate",
"schema",
"sqlx",
"uuid 0.8.2",
@ -4054,6 +4053,7 @@ dependencies = [
"arrow",
"chrono",
"data_types",
"data_types2",
"datafusion 0.1.0",
"datafusion_util",
"itertools",

View File

@ -4,13 +4,13 @@ use std::sync::Arc;
use data_types::timestamp::TimestampMinMax;
use data_types2::{
tombstones_to_delete_predicates, ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId,
SequenceNumber, TableSummary, Timestamp, Tombstone,
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
Timestamp, Tombstone,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use observability_deps::tracing::trace;
use parquet_file::chunk::ParquetChunk;
use predicate::{Predicate, PredicateMatch};
use predicate::{delete_predicate::tombstones_to_delete_predicates, Predicate, PredicateMatch};
use query::{
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkError, QueryChunkMeta,

View File

@ -7,7 +7,6 @@ description = "Shared data types in the Iox NG architecture"
[dependencies]
data_types = { path = "../data_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
predicate = { path = "../predicate" }
schema = { path = "../schema" }
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] }
uuid = { version = "0.8", features = ["v4"] }

View File

@ -11,7 +11,6 @@
)]
use influxdb_line_protocol::FieldValue;
use predicate::delete_predicate::parse_delete_predicate;
use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
use std::{
collections::BTreeMap,
@ -699,27 +698,6 @@ pub struct Tombstone {
pub serialized_predicate: String,
}
/// Convert tombstones to delete predicates
pub fn tombstones_to_delete_predicates(tombstones: &[Tombstone]) -> Vec<Arc<DeletePredicate>> {
tombstones_to_delete_predicates_iter(tombstones).collect()
}
/// Return Iterator of delete predicates
pub fn tombstones_to_delete_predicates_iter(
tombstones: &[Tombstone],
) -> impl Iterator<Item = Arc<DeletePredicate>> + '_ {
tombstones.iter().map(|tombstone| {
Arc::new(
parse_delete_predicate(
&tombstone.min_time.get().to_string(),
&tombstone.max_time.get().to_string(),
&tombstone.serialized_predicate,
)
.expect("Error building delete predicate"),
)
})
}
/// Data for a parquet file reference that has been inserted in the catalog.
#[derive(Debug, Clone, Copy, PartialEq, sqlx::FromRow)]
pub struct ParquetFile {

View File

@ -5,8 +5,8 @@ use arrow::record_batch::RecordBatch;
use arrow_util::util::merge_record_batches;
use data_types::timestamp::TimestampMinMax;
use data_types2::{
tombstones_to_delete_predicates, tombstones_to_delete_predicates_iter, ChunkAddr, ChunkId,
ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, Tombstone,
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
Tombstone,
};
use datafusion::{
logical_plan::ExprRewritable,
@ -18,7 +18,10 @@ use datafusion::{
};
use datafusion_util::batch_filter;
use observability_deps::tracing::{debug, trace};
use predicate::{Predicate, PredicateMatch};
use predicate::{
delete_predicate::{tombstones_to_delete_predicates, tombstones_to_delete_predicates_iter},
Predicate, PredicateMatch,
};
use query::{
exec::{stringset::StringSet, IOxSessionContext},
util::{df_physical_expr_from_schema_and_expr, MissingColumnsToNull},

View File

@ -7,6 +7,7 @@ edition = "2021"
arrow = { version = "13", features = ["prettyprint"] }
chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" }
data_types2 = { path = "../data_types2" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
itertools = "0.10"

View File

@ -1,18 +1,14 @@
use crate::delete_expr::{df_to_expr, expr_to_df};
use chrono::DateTime;
use data_types2::{DeleteExpr, DeletePredicate, TimestampRange, Tombstone};
use datafusion::logical_plan::{lit, Column, Expr, Operator};
use snafu::{ResultExt, Snafu};
use sqlparser::{
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
dialect::GenericDialect,
parser::Parser,
};
use data_types::{
delete_predicate::{DeleteExpr, DeletePredicate},
timestamp::TimestampRange,
};
use datafusion::logical_plan::{lit, Column, Expr, Operator};
use crate::delete_expr::{df_to_expr, expr_to_df};
use std::sync::Arc;
const FLUX_TABLE: &str = "_measurement";
@ -80,6 +76,27 @@ impl From<DeletePredicate> for crate::Predicate {
}
}
/// Convert tombstones to delete predicates
pub fn tombstones_to_delete_predicates(tombstones: &[Tombstone]) -> Vec<Arc<DeletePredicate>> {
tombstones_to_delete_predicates_iter(tombstones).collect()
}
/// Return Iterator of delete predicates
pub fn tombstones_to_delete_predicates_iter(
tombstones: &[Tombstone],
) -> impl Iterator<Item = Arc<DeletePredicate>> + '_ {
tombstones.iter().map(|tombstone| {
Arc::new(
parse_delete_predicate(
&tombstone.min_time.get().to_string(),
&tombstone.max_time.get().to_string(),
&tombstone.serialized_predicate,
)
.expect("Error building delete predicate"),
)
})
}
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
pub fn parse_delete_predicate(
start_time: &str,