From 3ab0788a947bf5ae92ab4b1c9598803ad5e8e5ee Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 4 May 2022 16:41:09 -0400 Subject: [PATCH] fix: Move DeletePredicate types to data_types2 --- Cargo.lock | 3 + data_types/src/delete_predicate.rs | 357 -------------------------- data_types/src/lib.rs | 1 - data_types2/Cargo.toml | 3 +- data_types2/src/lib.rs | 358 ++++++++++++++++++++++++++- dml/Cargo.toml | 1 + dml/src/lib.rs | 23 +- query_tests/src/scenarios/delete.rs | 70 ++---- query_tests/src/scenarios/library.rs | 29 +-- query_tests/src/scenarios/util.rs | 27 +- write_buffer/Cargo.toml | 1 + write_buffer/src/codec.rs | 29 +-- write_buffer/src/kafka/mod.rs | 52 ++-- 13 files changed, 458 insertions(+), 496 deletions(-) delete mode 100644 data_types/src/delete_predicate.rs diff --git a/Cargo.lock b/Cargo.lock index 65c96318b8..1a8f7d86b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1174,6 +1174,7 @@ version = "0.1.0" dependencies = [ "data_types", "influxdb_line_protocol", + "ordered-float 3.0.0", "schema", "sqlx", "uuid 0.8.2", @@ -1409,6 +1410,7 @@ version = "0.1.0" dependencies = [ "arrow_util", "data_types", + "data_types2", "hashbrown 0.12.0", "iox_time", "mutable_batch", @@ -6784,6 +6786,7 @@ version = "0.1.0" dependencies = [ "async-trait", "data_types", + "data_types2", "dml", "dotenv", "futures", diff --git a/data_types/src/delete_predicate.rs b/data_types/src/delete_predicate.rs deleted file mode 100644 index e10dab7112..0000000000 --- a/data_types/src/delete_predicate.rs +++ /dev/null @@ -1,357 +0,0 @@ -use crate::timestamp::TimestampRange; -use std::{fmt::Write, num::FpCategory}; - -/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx -/// query engine. -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct DeletePredicate { - /// Only rows within this range are included in - /// results. Other rows are excluded. - pub range: TimestampRange, - - /// Optional arbitrary predicates, represented as list of - /// expressions applied a logical conjunction (aka they - /// are 'AND'ed together). Only rows that evaluate to TRUE for all - /// these expressions should be returned. Other rows are excluded - /// from the results. - pub exprs: Vec, -} - -impl DeletePredicate { - /// Format expr to SQL string. - pub fn expr_sql_string(&self) -> String { - let mut out = String::new(); - for expr in &self.exprs { - if !out.is_empty() { - write!(&mut out, " AND ").expect("writing to a string shouldn't fail"); - } - write!(&mut out, "{}", expr).expect("writing to a string shouldn't fail"); - } - out - } - - /// Return the approximate memory size of the predicate, in bytes. - /// - /// This includes `Self`. - pub fn size(&self) -> usize { - std::mem::size_of::() + self.exprs.iter().map(|expr| expr.size()).sum::() - } -} - -/// Single expression to be used as parts of a predicate. -/// -/// Only very simple expression of the type ` ` are supported. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct DeleteExpr { - /// Column (w/o table name). - pub column: String, - - /// Operator. - pub op: Op, - - /// Scalar value. - pub scalar: Scalar, -} - -impl DeleteExpr { - /// Create a new [`DeleteExpr`] - pub fn new(column: String, op: Op, scalar: Scalar) -> Self { - Self { column, op, scalar } - } - - /// Column (w/o table name). - pub fn column(&self) -> &str { - &self.column - } - - /// Operator. - pub fn op(&self) -> Op { - self.op - } - - /// Scalar value. - pub fn scalar(&self) -> &Scalar { - &self.scalar - } - - /// Return the approximate memory size of the expression, in bytes. - /// - /// This includes `Self`. - pub fn size(&self) -> usize { - std::mem::size_of::() + self.column.capacity() + self.scalar.size() - } -} - -impl std::fmt::Display for DeleteExpr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - r#""{}"{}{}"#, - self.column().replace('\\', r#"\\"#).replace('"', r#"\""#), - self.op(), - self.scalar(), - ) - } -} - -/// Binary operator that can be evaluated on a column and a scalar value. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum Op { - /// Strict equality (`=`). - Eq, - - /// Inequality (`!=`). - Ne, -} - -impl std::fmt::Display for Op { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Eq => write!(f, "="), - Self::Ne => write!(f, "!="), - } - } -} - -/// Scalar value of a certain type. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[allow(missing_docs)] -pub enum Scalar { - Bool(bool), - I64(i64), - F64(ordered_float::OrderedFloat), - String(String), -} - -impl Scalar { - /// Return the approximate memory size of the scalar, in bytes. - /// - /// This includes `Self`. - pub fn size(&self) -> usize { - std::mem::size_of::() - + match &self { - Self::Bool(_) | Self::I64(_) | Self::F64(_) => 0, - Self::String(s) => s.capacity(), - } - } -} - -impl std::fmt::Display for Scalar { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Scalar::Bool(value) => value.fmt(f), - Scalar::I64(value) => value.fmt(f), - Scalar::F64(value) => match value.classify() { - FpCategory::Nan => write!(f, "'NaN'"), - FpCategory::Infinite if *value.as_ref() < 0.0 => write!(f, "'-Infinity'"), - FpCategory::Infinite => write!(f, "'Infinity'"), - _ => write!(f, "{:?}", value.as_ref()), - }, - Scalar::String(value) => { - write!( - f, - "'{}'", - value.replace('\\', r#"\\"#).replace('\'', r#"\'"#), - ) - } - } - } -} - -#[cfg(test)] -mod tests { - use ordered_float::OrderedFloat; - - use super::*; - - #[test] - fn test_expr_to_sql_no_expressions() { - let pred = DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![], - }; - assert_eq!(&pred.expr_sql_string(), ""); - } - - #[test] - fn test_expr_to_sql_operators() { - let pred = DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![ - DeleteExpr { - column: String::from("col1"), - op: Op::Eq, - scalar: Scalar::I64(1), - }, - DeleteExpr { - column: String::from("col2"), - op: Op::Ne, - scalar: Scalar::I64(2), - }, - ], - }; - assert_eq!(&pred.expr_sql_string(), r#""col1"=1 AND "col2"!=2"#); - } - - #[test] - fn test_expr_to_sql_column_escape() { - let pred = DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![ - DeleteExpr { - column: String::from("col 1"), - op: Op::Eq, - scalar: Scalar::I64(1), - }, - DeleteExpr { - column: String::from(r#"col\2"#), - op: Op::Eq, - scalar: Scalar::I64(2), - }, - DeleteExpr { - column: String::from(r#"col"3"#), - op: Op::Eq, - scalar: Scalar::I64(3), - }, - ], - }; - assert_eq!( - &pred.expr_sql_string(), - r#""col 1"=1 AND "col\\2"=2 AND "col\"3"=3"# - ); - } - - #[test] - fn test_expr_to_sql_bool() { - let pred = DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![ - DeleteExpr { - column: String::from("col1"), - op: Op::Eq, - scalar: Scalar::Bool(false), - }, - DeleteExpr { - column: String::from("col2"), - op: Op::Eq, - scalar: Scalar::Bool(true), - }, - ], - }; - assert_eq!(&pred.expr_sql_string(), r#""col1"=false AND "col2"=true"#); - } - - #[test] - fn test_expr_to_sql_i64() { - let pred = DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![ - DeleteExpr { - column: String::from("col1"), - op: Op::Eq, - scalar: Scalar::I64(0), - }, - DeleteExpr { - column: String::from("col2"), - op: Op::Eq, - scalar: Scalar::I64(-1), - }, - DeleteExpr { - column: String::from("col3"), - op: Op::Eq, - scalar: Scalar::I64(1), - }, - DeleteExpr { - column: String::from("col4"), - op: Op::Eq, - scalar: Scalar::I64(i64::MIN), - }, - DeleteExpr { - column: String::from("col5"), - op: Op::Eq, - scalar: Scalar::I64(i64::MAX), - }, - ], - }; - assert_eq!( - &pred.expr_sql_string(), - r#""col1"=0 AND "col2"=-1 AND "col3"=1 AND "col4"=-9223372036854775808 AND "col5"=9223372036854775807"# - ); - } - - #[test] - fn test_expr_to_sql_f64() { - let pred = DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![ - DeleteExpr { - column: String::from("col1"), - op: Op::Eq, - scalar: Scalar::F64(OrderedFloat::from(0.0)), - }, - DeleteExpr { - column: String::from("col2"), - op: Op::Eq, - scalar: Scalar::F64(OrderedFloat::from(-0.0)), - }, - DeleteExpr { - column: String::from("col3"), - op: Op::Eq, - scalar: Scalar::F64(OrderedFloat::from(1.0)), - }, - DeleteExpr { - column: String::from("col4"), - op: Op::Eq, - scalar: Scalar::F64(OrderedFloat::from(f64::INFINITY)), - }, - DeleteExpr { - column: String::from("col5"), - op: Op::Eq, - scalar: Scalar::F64(OrderedFloat::from(f64::NEG_INFINITY)), - }, - DeleteExpr { - column: String::from("col6"), - op: Op::Eq, - scalar: Scalar::F64(OrderedFloat::from(f64::NAN)), - }, - ], - }; - assert_eq!( - &pred.expr_sql_string(), - r#""col1"=0.0 AND "col2"=-0.0 AND "col3"=1.0 AND "col4"='Infinity' AND "col5"='-Infinity' AND "col6"='NaN'"# - ); - } - - #[test] - fn test_expr_to_sql_string() { - let pred = DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![ - DeleteExpr { - column: String::from("col1"), - op: Op::Eq, - scalar: Scalar::String(String::from("")), - }, - DeleteExpr { - column: String::from("col2"), - op: Op::Eq, - scalar: Scalar::String(String::from("foo")), - }, - DeleteExpr { - column: String::from("col3"), - op: Op::Eq, - scalar: Scalar::String(String::from(r#"fo\o"#)), - }, - DeleteExpr { - column: String::from("col4"), - op: Op::Eq, - scalar: Scalar::String(String::from(r#"fo'o"#)), - }, - ], - }; - assert_eq!( - &pred.expr_sql_string(), - r#""col1"='' AND "col2"='foo' AND "col3"='fo\\o' AND "col4"='fo\'o'"# - ); - } -} diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 0ee1d52f33..622fad2327 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -15,7 +15,6 @@ pub mod chunk_metadata; pub mod consistent_hasher; mod database_name; pub mod database_rules; -pub mod delete_predicate; pub mod error; pub mod job; pub mod names; diff --git a/data_types2/Cargo.toml b/data_types2/Cargo.toml index 5e0b6c55c1..5ce2b0a42b 100644 --- a/data_types2/Cargo.toml +++ b/data_types2/Cargo.toml @@ -2,11 +2,12 @@ name = "data_types2" version = "0.1.0" edition = "2021" -description = "Shared data types in the Iox NG architecture" +description = "Shared data types in the IOx NG architecture" [dependencies] data_types = { path = "../data_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } +ordered-float = "3" schema = { path = "../schema" } sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] } uuid = { version = "0.8", features = ["v4"] } diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index a0fc2f610b..978c7811ac 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -1,4 +1,4 @@ -//! Shared data types in the Iox NG architecture +//! Shared data types in the IOx NG architecture #![warn( missing_copy_implementations, @@ -15,15 +15,14 @@ use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFiel use std::{ collections::BTreeMap, convert::TryFrom, - fmt::{Debug, Formatter}, - num::NonZeroU32, + fmt::Write, + num::{FpCategory, NonZeroU32}, ops::{Add, Sub}, sync::Arc, }; use uuid::Uuid; pub use data_types::{ - delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar}, names::{org_and_bucket_to_database, OrgBucketMappingError}, non_empty::NonEmptyString, partition_metadata::{ @@ -535,7 +534,7 @@ impl ColumnType { } impl std::fmt::Display for ColumnType { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s = self.as_str(); write!(f, "{}", s) @@ -1063,9 +1062,167 @@ pub struct StrftimeColumn { pub format: String, } +/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx +/// query engine. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct DeletePredicate { + /// Only rows within this range are included in + /// results. Other rows are excluded. + pub range: TimestampRange, + + /// Optional arbitrary predicates, represented as list of + /// expressions applied a logical conjunction (aka they + /// are 'AND'ed together). Only rows that evaluate to TRUE for all + /// these expressions should be returned. Other rows are excluded + /// from the results. + pub exprs: Vec, +} + +impl DeletePredicate { + /// Format expr to SQL string. + pub fn expr_sql_string(&self) -> String { + let mut out = String::new(); + for expr in &self.exprs { + if !out.is_empty() { + write!(&mut out, " AND ").expect("writing to a string shouldn't fail"); + } + write!(&mut out, "{}", expr).expect("writing to a string shouldn't fail"); + } + out + } + + /// Return the approximate memory size of the predicate, in bytes. + /// + /// This includes `Self`. + pub fn size(&self) -> usize { + std::mem::size_of::() + self.exprs.iter().map(|expr| expr.size()).sum::() + } +} + +/// Single expression to be used as parts of a predicate. +/// +/// Only very simple expression of the type ` ` are supported. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct DeleteExpr { + /// Column (w/o table name). + pub column: String, + + /// Operator. + pub op: Op, + + /// Scalar value. + pub scalar: Scalar, +} + +impl DeleteExpr { + /// Create a new [`DeleteExpr`] + pub fn new(column: String, op: Op, scalar: Scalar) -> Self { + Self { column, op, scalar } + } + + /// Column (w/o table name). + pub fn column(&self) -> &str { + &self.column + } + + /// Operator. + pub fn op(&self) -> Op { + self.op + } + + /// Scalar value. + pub fn scalar(&self) -> &Scalar { + &self.scalar + } + + /// Return the approximate memory size of the expression, in bytes. + /// + /// This includes `Self`. + pub fn size(&self) -> usize { + std::mem::size_of::() + self.column.capacity() + self.scalar.size() + } +} + +impl std::fmt::Display for DeleteExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + r#""{}"{}{}"#, + self.column().replace('\\', r#"\\"#).replace('"', r#"\""#), + self.op(), + self.scalar(), + ) + } +} + +/// Binary operator that can be evaluated on a column and a scalar value. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Op { + /// Strict equality (`=`). + Eq, + + /// Inequality (`!=`). + Ne, +} + +impl std::fmt::Display for Op { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Eq => write!(f, "="), + Self::Ne => write!(f, "!="), + } + } +} + +/// Scalar value of a certain type. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[allow(missing_docs)] +pub enum Scalar { + Bool(bool), + I64(i64), + F64(ordered_float::OrderedFloat), + String(String), +} + +impl Scalar { + /// Return the approximate memory size of the scalar, in bytes. + /// + /// This includes `Self`. + pub fn size(&self) -> usize { + std::mem::size_of::() + + match &self { + Self::Bool(_) | Self::I64(_) | Self::F64(_) => 0, + Self::String(s) => s.capacity(), + } + } +} + +impl std::fmt::Display for Scalar { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Scalar::Bool(value) => value.fmt(f), + Scalar::I64(value) => value.fmt(f), + Scalar::F64(value) => match value.classify() { + FpCategory::Nan => write!(f, "'NaN'"), + FpCategory::Infinite if *value.as_ref() < 0.0 => write!(f, "'-Infinity'"), + FpCategory::Infinite => write!(f, "'Infinity'"), + _ => write!(f, "{:?}", value.as_ref()), + }, + Scalar::String(value) => { + write!( + f, + "'{}'", + value.replace('\\', r#"\\"#).replace('\'', r#"\'"#), + ) + } + } + } +} + #[cfg(test)] mod tests { use super::*; + use ordered_float::OrderedFloat; #[test] fn test_chunk_id_new() { @@ -1093,4 +1250,195 @@ mod tests { assert_eq!(format!("{:?}", id_test), "ChunkId(42)"); assert_eq!(format!("{}", id_test), "ChunkId(42)"); } + + #[test] + fn test_expr_to_sql_no_expressions() { + let pred = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![], + }; + assert_eq!(&pred.expr_sql_string(), ""); + } + + #[test] + fn test_expr_to_sql_operators() { + let pred = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![ + DeleteExpr { + column: String::from("col1"), + op: Op::Eq, + scalar: Scalar::I64(1), + }, + DeleteExpr { + column: String::from("col2"), + op: Op::Ne, + scalar: Scalar::I64(2), + }, + ], + }; + assert_eq!(&pred.expr_sql_string(), r#""col1"=1 AND "col2"!=2"#); + } + + #[test] + fn test_expr_to_sql_column_escape() { + let pred = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![ + DeleteExpr { + column: String::from("col 1"), + op: Op::Eq, + scalar: Scalar::I64(1), + }, + DeleteExpr { + column: String::from(r#"col\2"#), + op: Op::Eq, + scalar: Scalar::I64(2), + }, + DeleteExpr { + column: String::from(r#"col"3"#), + op: Op::Eq, + scalar: Scalar::I64(3), + }, + ], + }; + assert_eq!( + &pred.expr_sql_string(), + r#""col 1"=1 AND "col\\2"=2 AND "col\"3"=3"# + ); + } + + #[test] + fn test_expr_to_sql_bool() { + let pred = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![ + DeleteExpr { + column: String::from("col1"), + op: Op::Eq, + scalar: Scalar::Bool(false), + }, + DeleteExpr { + column: String::from("col2"), + op: Op::Eq, + scalar: Scalar::Bool(true), + }, + ], + }; + assert_eq!(&pred.expr_sql_string(), r#""col1"=false AND "col2"=true"#); + } + + #[test] + fn test_expr_to_sql_i64() { + let pred = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![ + DeleteExpr { + column: String::from("col1"), + op: Op::Eq, + scalar: Scalar::I64(0), + }, + DeleteExpr { + column: String::from("col2"), + op: Op::Eq, + scalar: Scalar::I64(-1), + }, + DeleteExpr { + column: String::from("col3"), + op: Op::Eq, + scalar: Scalar::I64(1), + }, + DeleteExpr { + column: String::from("col4"), + op: Op::Eq, + scalar: Scalar::I64(i64::MIN), + }, + DeleteExpr { + column: String::from("col5"), + op: Op::Eq, + scalar: Scalar::I64(i64::MAX), + }, + ], + }; + assert_eq!( + &pred.expr_sql_string(), + r#""col1"=0 AND "col2"=-1 AND "col3"=1 AND "col4"=-9223372036854775808 AND "col5"=9223372036854775807"# + ); + } + + #[test] + fn test_expr_to_sql_f64() { + let pred = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![ + DeleteExpr { + column: String::from("col1"), + op: Op::Eq, + scalar: Scalar::F64(OrderedFloat::from(0.0)), + }, + DeleteExpr { + column: String::from("col2"), + op: Op::Eq, + scalar: Scalar::F64(OrderedFloat::from(-0.0)), + }, + DeleteExpr { + column: String::from("col3"), + op: Op::Eq, + scalar: Scalar::F64(OrderedFloat::from(1.0)), + }, + DeleteExpr { + column: String::from("col4"), + op: Op::Eq, + scalar: Scalar::F64(OrderedFloat::from(f64::INFINITY)), + }, + DeleteExpr { + column: String::from("col5"), + op: Op::Eq, + scalar: Scalar::F64(OrderedFloat::from(f64::NEG_INFINITY)), + }, + DeleteExpr { + column: String::from("col6"), + op: Op::Eq, + scalar: Scalar::F64(OrderedFloat::from(f64::NAN)), + }, + ], + }; + assert_eq!( + &pred.expr_sql_string(), + r#""col1"=0.0 AND "col2"=-0.0 AND "col3"=1.0 AND "col4"='Infinity' AND "col5"='-Infinity' AND "col6"='NaN'"# + ); + } + + #[test] + fn test_expr_to_sql_string() { + let pred = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![ + DeleteExpr { + column: String::from("col1"), + op: Op::Eq, + scalar: Scalar::String(String::from("")), + }, + DeleteExpr { + column: String::from("col2"), + op: Op::Eq, + scalar: Scalar::String(String::from("foo")), + }, + DeleteExpr { + column: String::from("col3"), + op: Op::Eq, + scalar: Scalar::String(String::from(r#"fo\o"#)), + }, + DeleteExpr { + column: String::from("col4"), + op: Op::Eq, + scalar: Scalar::String(String::from(r#"fo'o"#)), + }, + ], + }; + assert_eq!( + &pred.expr_sql_string(), + r#""col1"='' AND "col2"='foo' AND "col3"='fo\\o' AND "col4"='fo\'o'"# + ); + } } diff --git a/dml/Cargo.toml b/dml/Cargo.toml index 25a62fb2cd..4606ad7ab2 100644 --- a/dml/Cargo.toml +++ b/dml/Cargo.toml @@ -7,6 +7,7 @@ description = "DML types" [dependencies] arrow_util = { path = "../arrow_util" } data_types = { path = "../data_types" } +data_types2 = { path = "../data_types2" } hashbrown = "0.12" mutable_batch = { path = "../mutable_batch" } ordered-float = "3" diff --git a/dml/src/lib.rs b/dml/src/lib.rs index c52db75bf1..29a6b9e10f 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -11,17 +11,17 @@ clippy::clone_on_ref_ptr )] -use std::collections::{BTreeMap, HashSet}; - -use data_types::router::{ShardConfig, ShardId}; +use data_types::{ + non_empty::NonEmptyString, + partition_metadata::{StatValues, Statistics}, + router::{ShardConfig, ShardId}, + sequence::Sequence, +}; +use data_types2::DeletePredicate; use hashbrown::HashMap; - -use data_types::delete_predicate::DeletePredicate; -use data_types::non_empty::NonEmptyString; -use data_types::partition_metadata::{StatValues, Statistics}; -use data_types::sequence::Sequence; use iox_time::Time; use mutable_batch::MutableBatch; +use std::collections::{BTreeMap, HashSet}; use trace::ctx::SpanContext; /// Metadata information about a DML operation @@ -487,9 +487,10 @@ pub mod test_util { #[cfg(test)] mod tests { + use super::*; + use crate::test_util::assert_writes_eq; use data_types::{ consistent_hasher::ConsistentHasher, - delete_predicate::DeletePredicate, non_empty::NonEmptyString, router::{HashRing, Matcher, MatcherToShard}, timestamp::TimestampRange, @@ -497,10 +498,6 @@ mod tests { use mutable_batch_lp::lines_to_batches; use regex::Regex; - use crate::test_util::assert_writes_eq; - - use super::*; - #[test] fn test_write_sharding() { let config = ShardConfig { diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index a6b480224f..05d80c6433 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -1,16 +1,16 @@ //! This module contains testing scenarios for Delete -use data_types::delete_predicate::{DeleteExpr, DeletePredicate}; -use data_types::timestamp::TimestampRange; - -use async_trait::async_trait; - -use super::util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew}; -use super::{DbScenario, DbSetup}; +use super::{ + util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew}, + DbScenario, DbSetup, +}; use crate::scenarios::util::all_scenarios_for_one_chunk; +use async_trait::async_trait; +use data_types2::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange}; -// ========================================================================================================================= -// DELETE TEST SETUPS: chunk lp data, how many chunks, their types, how many delete predicates and when they happen +// ================================================================================================ +// DELETE TEST SETUPS: chunk lp data, how many chunks, their types, how many delete predicates and +// when they happen #[derive(Debug)] /// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS @@ -53,8 +53,8 @@ impl DbSetup for OneDeleteSimpleExprOneChunk { range: TimestampRange::new(0, 15), exprs: vec![DeleteExpr::new( "bar".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::F64((1.0).into()), + Op::Eq, + Scalar::F64((1.0).into()), )], }; @@ -104,16 +104,8 @@ impl DbSetup for OneDeleteMultiExprsOneChunk { let pred = DeletePredicate { range: TimestampRange::new(0, 30), exprs: vec![ - DeleteExpr::new( - "bar".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::F64((1.0).into()), - ), - DeleteExpr::new( - "foo".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String("me".to_string()), - ), + DeleteExpr::new("bar".to_string(), Op::Eq, Scalar::F64((1.0).into())), + DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::String("me".to_string())), ], }; @@ -147,16 +139,8 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk { let pred1 = DeletePredicate { range: TimestampRange::new(0, 32), exprs: vec![ - DeleteExpr::new( - "bar".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::F64((1.0).into()), - ), - DeleteExpr::new( - "foo".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String("me".to_string()), - ), + DeleteExpr::new("bar".to_string(), Op::Eq, Scalar::F64((1.0).into())), + DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::String("me".to_string())), ], }; @@ -165,8 +149,8 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk { range: TimestampRange::new(10, 40), exprs: vec![DeleteExpr::new( "bar".to_string(), - data_types::delete_predicate::Op::Ne, - data_types::delete_predicate::Scalar::F64((1.0).into()), + Op::Ne, + Scalar::F64((1.0).into()), )], }; @@ -204,16 +188,8 @@ impl DbSetup for ThreeDeleteThreeChunks { let pred1 = DeletePredicate { range: TimestampRange::new(0, 30), exprs: vec![ - DeleteExpr::new( - "bar".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::F64((1.0).into()), - ), - DeleteExpr::new( - "foo".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String("me".to_string()), - ), + DeleteExpr::new("bar".to_string(), Op::Eq, Scalar::F64((1.0).into())), + DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::String("me".to_string())), ], }; @@ -229,8 +205,8 @@ impl DbSetup for ThreeDeleteThreeChunks { range: TimestampRange::new(20, 45), exprs: vec![DeleteExpr::new( "foo".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String("you".to_string()), + Op::Eq, + Scalar::String("you".to_string()), )], }; @@ -246,8 +222,8 @@ impl DbSetup for ThreeDeleteThreeChunks { range: TimestampRange::new(75, 95), exprs: vec![DeleteExpr::new( "bar".to_string(), - data_types::delete_predicate::Op::Ne, - data_types::delete_predicate::Scalar::F64((7.0).into()), + Op::Ne, + Scalar::F64((7.0).into()), )], }; diff --git a/query_tests/src/scenarios/library.rs b/query_tests/src/scenarios/library.rs index ddbe85f38a..fc62201eba 100644 --- a/query_tests/src/scenarios/library.rs +++ b/query_tests/src/scenarios/library.rs @@ -5,10 +5,7 @@ use super::{ }; use crate::scenarios::util::{make_n_chunks_scenario_new, ChunkDataNew}; use async_trait::async_trait; -use data_types::{ - delete_predicate::{DeleteExpr, DeletePredicate}, - timestamp::TimestampRange, -}; +use data_types2::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange}; use query::frontend::sql::SqlQueryPlanner; #[derive(Debug)] @@ -112,8 +109,8 @@ impl DbSetup for OneMeasurementManyNullTagsWithDelete { range: TimestampRange::new(400, 602), exprs: vec![DeleteExpr::new( "state".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String(("NY").to_string()), + Op::Eq, + Scalar::String(("NY").to_string()), )], }; @@ -205,8 +202,8 @@ impl DbSetup for TwoMeasurementsWithDelete { range: TimestampRange::new(120, 160), exprs: vec![DeleteExpr::new( "region".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String("west".to_string()), + Op::Eq, + Scalar::String("west".to_string()), )], }; @@ -237,8 +234,8 @@ impl DbSetup for TwoMeasurementsWithDeleteAll { range: TimestampRange::new(120, 160), exprs: vec![DeleteExpr::new( "region".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String("west".to_string()), + Op::Eq, + Scalar::String("west".to_string()), )], }; @@ -661,8 +658,8 @@ impl DbSetup for EndToEndTestWithDelete { range: TimestampRange::new(6000, 6000), exprs: vec![DeleteExpr::new( "name".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String(("disk0").to_string()), + Op::Eq, + Scalar::String(("disk0").to_string()), )], }; @@ -835,8 +832,8 @@ impl DbSetup for MeasurementsSortableTagsWithDelete { range: TimestampRange::new(120, 350), exprs: vec![DeleteExpr::new( "state".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String(("CA").to_string()), + Op::Eq, + Scalar::String(("CA").to_string()), )], }; @@ -896,8 +893,8 @@ impl DbSetup for OneMeasurementNoTagsWithDelete { range: TimestampRange::new(1, 1), exprs: vec![DeleteExpr::new( "foo".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::F64((1.0).into()), + Op::Eq, + Scalar::F64((1.0).into()), )], }; diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 480a47a6fe..22df97bac9 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -3,9 +3,9 @@ use super::DbScenario; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use backoff::BackoffConfig; -use data_types::delete_predicate::DeletePredicate; use data_types2::{ - NonEmptyString, PartitionId, Sequence, SequenceNumber, SequencerId, TombstoneId, + DeletePredicate, NonEmptyString, PartitionId, Sequence, SequenceNumber, SequencerId, + TombstoneId, }; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use futures::StreamExt; @@ -14,10 +14,12 @@ use generated_types::{ ingester::IngesterQueryRequest, }; use influxdb_iox_client::flight::Error as FlightError; -use ingester::data::{IngesterData, IngesterQueryResponse, Persister, SequencerData}; -use ingester::lifecycle::LifecycleHandle; -use ingester::partioning::{Partitioner, PartitionerError}; -use ingester::querier_handler::prepare_data_to_querier; +use ingester::{ + data::{IngesterData, IngesterQueryResponse, Persister, SequencerData}, + lifecycle::LifecycleHandle, + partioning::{Partitioner, PartitionerError}, + querier_handler::prepare_data_to_querier, +}; use iox_catalog::interface::get_schema_by_name; use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer}; use itertools::Itertools; @@ -28,11 +30,14 @@ use querier::{ IngesterFlightClientQueryData, QuerierCatalogCache, QuerierNamespace, }; use schema::selection::Selection; -use std::cmp::Ordering; -use std::collections::{BTreeMap, HashMap}; -use std::fmt::Write; -use std::sync::Mutex; -use std::{fmt::Display, sync::Arc}; +use std::{ + cmp::Ordering, + collections::{BTreeMap, HashMap}, + fmt::Display, + fmt::Write, + sync::Arc, + sync::Mutex, +}; // Structs, enums, and functions used to exhaust all test scenarios of chunk life cycle // & when delete predicates are applied diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index c35ffe981b..61e2a20a43 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] async-trait = "0.1" data_types = { path = "../data_types" } +data_types2 = { path = "../data_types2" } dml = { path = "../dml" } dotenv = "0.15.0" futures = "0.3" diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index f5bcb900c3..2e318d49a1 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -1,26 +1,23 @@ //! Encode/Decode for messages -use std::borrow::Cow; -use std::sync::Arc; - -use data_types::non_empty::NonEmptyString; -use http::{HeaderMap, HeaderValue}; -use prost::Message; - -use data_types::sequence::Sequence; +use crate::core::WriteBufferError; +use data_types::{non_empty::NonEmptyString, sequence::Sequence}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; -use generated_types::google::FromOptionalField; -use generated_types::influxdata::iox::delete::v1::DeletePayload; -use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload; -use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload; +use generated_types::{ + google::FromOptionalField, + influxdata::iox::{ + delete::v1::DeletePayload, + write_buffer::v1::{write_buffer_payload::Payload, WriteBufferPayload}, + }, +}; +use http::{HeaderMap, HeaderValue}; use iox_time::Time; use mutable_batch_pb::decode::decode_database_batch; -use trace::ctx::SpanContext; -use trace::TraceCollector; +use prost::Message; +use std::{borrow::Cow, sync::Arc}; +use trace::{ctx::SpanContext, TraceCollector}; use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser}; -use crate::core::WriteBufferError; - /// Pbdata based content type pub const CONTENT_TYPE_PROTOBUF: &str = r#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#; diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index ec6c7cde2f..8387b0911f 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -1,11 +1,14 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, +use self::{ + aggregator::DmlAggregator, + config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig}, +}; +use crate::{ + codec::IoxHeaders, + core::{ + WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler, + WriteBufferWriting, }, }; - use async_trait::async_trait; use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig}; use dml::{DmlMeta, DmlOperation}; @@ -19,20 +22,14 @@ use rskafka::client::{ producer::{BatchProducer, BatchProducerBuilder}, ClientBuilder, }; -use trace::TraceCollector; - -use crate::{ - codec::IoxHeaders, - core::{ - WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler, - WriteBufferWriting, +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, }, }; - -use self::{ - aggregator::DmlAggregator, - config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig}, -}; +use trace::TraceCollector; mod aggregator; mod config; @@ -361,15 +358,7 @@ async fn setup_topic( #[cfg(test)] mod tests { - use std::num::NonZeroU32; - - use data_types::{delete_predicate::DeletePredicate, timestamp::TimestampRange}; - use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite}; - use futures::{stream::FuturesUnordered, TryStreamExt}; - use rskafka::{client::partition::Compression, record::Record}; - use test_helpers::assert_contains; - use trace::{ctx::SpanContext, RingBufferTraceCollector}; - + use super::*; use crate::{ core::test_utils::{ assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name, @@ -377,8 +366,13 @@ mod tests { }, maybe_skip_kafka_integration, }; - - use super::*; + use data_types2::{DeletePredicate, TimestampRange}; + use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite}; + use futures::{stream::FuturesUnordered, TryStreamExt}; + use rskafka::{client::partition::Compression, record::Record}; + use std::num::NonZeroU32; + use test_helpers::assert_contains; + use trace::{ctx::SpanContext, RingBufferTraceCollector}; struct RSKafkaTestAdapter { conn: String,