fix: Move DeletePredicate types to data_types2

pull/24376/head
Carol (Nichols || Goulding) 2022-05-04 16:41:09 -04:00
parent f39b093357
commit 3ab0788a94
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
13 changed files with 458 additions and 496 deletions

3
Cargo.lock generated
View File

@ -1174,6 +1174,7 @@ version = "0.1.0"
dependencies = [
"data_types",
"influxdb_line_protocol",
"ordered-float 3.0.0",
"schema",
"sqlx",
"uuid 0.8.2",
@ -1409,6 +1410,7 @@ version = "0.1.0"
dependencies = [
"arrow_util",
"data_types",
"data_types2",
"hashbrown 0.12.0",
"iox_time",
"mutable_batch",
@ -6784,6 +6786,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"data_types",
"data_types2",
"dml",
"dotenv",
"futures",

View File

@ -1,357 +0,0 @@
use crate::timestamp::TimestampRange;
use std::{fmt::Write, num::FpCategory};
/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx
/// query engine.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DeletePredicate {
/// Only rows within this range are included in
/// results. Other rows are excluded.
pub range: TimestampRange,
/// Optional arbitrary predicates, represented as list of
/// expressions applied a logical conjunction (aka they
/// are 'AND'ed together). Only rows that evaluate to TRUE for all
/// these expressions should be returned. Other rows are excluded
/// from the results.
pub exprs: Vec<DeleteExpr>,
}
impl DeletePredicate {
/// Format expr to SQL string.
pub fn expr_sql_string(&self) -> String {
let mut out = String::new();
for expr in &self.exprs {
if !out.is_empty() {
write!(&mut out, " AND ").expect("writing to a string shouldn't fail");
}
write!(&mut out, "{}", expr).expect("writing to a string shouldn't fail");
}
out
}
/// Return the approximate memory size of the predicate, in bytes.
///
/// This includes `Self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>() + self.exprs.iter().map(|expr| expr.size()).sum::<usize>()
}
}
/// Single expression to be used as parts of a predicate.
///
/// Only very simple expression of the type `<column> <op> <scalar>` are supported.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DeleteExpr {
/// Column (w/o table name).
pub column: String,
/// Operator.
pub op: Op,
/// Scalar value.
pub scalar: Scalar,
}
impl DeleteExpr {
/// Create a new [`DeleteExpr`]
pub fn new(column: String, op: Op, scalar: Scalar) -> Self {
Self { column, op, scalar }
}
/// Column (w/o table name).
pub fn column(&self) -> &str {
&self.column
}
/// Operator.
pub fn op(&self) -> Op {
self.op
}
/// Scalar value.
pub fn scalar(&self) -> &Scalar {
&self.scalar
}
/// Return the approximate memory size of the expression, in bytes.
///
/// This includes `Self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>() + self.column.capacity() + self.scalar.size()
}
}
impl std::fmt::Display for DeleteExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
r#""{}"{}{}"#,
self.column().replace('\\', r#"\\"#).replace('"', r#"\""#),
self.op(),
self.scalar(),
)
}
}
/// Binary operator that can be evaluated on a column and a scalar value.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Op {
/// Strict equality (`=`).
Eq,
/// Inequality (`!=`).
Ne,
}
impl std::fmt::Display for Op {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Eq => write!(f, "="),
Self::Ne => write!(f, "!="),
}
}
}
/// Scalar value of a certain type.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[allow(missing_docs)]
pub enum Scalar {
Bool(bool),
I64(i64),
F64(ordered_float::OrderedFloat<f64>),
String(String),
}
impl Scalar {
/// Return the approximate memory size of the scalar, in bytes.
///
/// This includes `Self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>()
+ match &self {
Self::Bool(_) | Self::I64(_) | Self::F64(_) => 0,
Self::String(s) => s.capacity(),
}
}
}
impl std::fmt::Display for Scalar {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Scalar::Bool(value) => value.fmt(f),
Scalar::I64(value) => value.fmt(f),
Scalar::F64(value) => match value.classify() {
FpCategory::Nan => write!(f, "'NaN'"),
FpCategory::Infinite if *value.as_ref() < 0.0 => write!(f, "'-Infinity'"),
FpCategory::Infinite => write!(f, "'Infinity'"),
_ => write!(f, "{:?}", value.as_ref()),
},
Scalar::String(value) => {
write!(
f,
"'{}'",
value.replace('\\', r#"\\"#).replace('\'', r#"\'"#),
)
}
}
}
}
#[cfg(test)]
mod tests {
use ordered_float::OrderedFloat;
use super::*;
#[test]
fn test_expr_to_sql_no_expressions() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
assert_eq!(&pred.expr_sql_string(), "");
}
#[test]
fn test_expr_to_sql_operators() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::I64(1),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Ne,
scalar: Scalar::I64(2),
},
],
};
assert_eq!(&pred.expr_sql_string(), r#""col1"=1 AND "col2"!=2"#);
}
#[test]
fn test_expr_to_sql_column_escape() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col 1"),
op: Op::Eq,
scalar: Scalar::I64(1),
},
DeleteExpr {
column: String::from(r#"col\2"#),
op: Op::Eq,
scalar: Scalar::I64(2),
},
DeleteExpr {
column: String::from(r#"col"3"#),
op: Op::Eq,
scalar: Scalar::I64(3),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col 1"=1 AND "col\\2"=2 AND "col\"3"=3"#
);
}
#[test]
fn test_expr_to_sql_bool() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::Bool(false),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::Bool(true),
},
],
};
assert_eq!(&pred.expr_sql_string(), r#""col1"=false AND "col2"=true"#);
}
#[test]
fn test_expr_to_sql_i64() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::I64(0),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::I64(-1),
},
DeleteExpr {
column: String::from("col3"),
op: Op::Eq,
scalar: Scalar::I64(1),
},
DeleteExpr {
column: String::from("col4"),
op: Op::Eq,
scalar: Scalar::I64(i64::MIN),
},
DeleteExpr {
column: String::from("col5"),
op: Op::Eq,
scalar: Scalar::I64(i64::MAX),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col1"=0 AND "col2"=-1 AND "col3"=1 AND "col4"=-9223372036854775808 AND "col5"=9223372036854775807"#
);
}
#[test]
fn test_expr_to_sql_f64() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(0.0)),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(-0.0)),
},
DeleteExpr {
column: String::from("col3"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(1.0)),
},
DeleteExpr {
column: String::from("col4"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(f64::INFINITY)),
},
DeleteExpr {
column: String::from("col5"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(f64::NEG_INFINITY)),
},
DeleteExpr {
column: String::from("col6"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(f64::NAN)),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col1"=0.0 AND "col2"=-0.0 AND "col3"=1.0 AND "col4"='Infinity' AND "col5"='-Infinity' AND "col6"='NaN'"#
);
}
#[test]
fn test_expr_to_sql_string() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::String(String::from("")),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::String(String::from("foo")),
},
DeleteExpr {
column: String::from("col3"),
op: Op::Eq,
scalar: Scalar::String(String::from(r#"fo\o"#)),
},
DeleteExpr {
column: String::from("col4"),
op: Op::Eq,
scalar: Scalar::String(String::from(r#"fo'o"#)),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col1"='' AND "col2"='foo' AND "col3"='fo\\o' AND "col4"='fo\'o'"#
);
}
}

View File

@ -15,7 +15,6 @@ pub mod chunk_metadata;
pub mod consistent_hasher;
mod database_name;
pub mod database_rules;
pub mod delete_predicate;
pub mod error;
pub mod job;
pub mod names;

View File

@ -2,11 +2,12 @@
name = "data_types2"
version = "0.1.0"
edition = "2021"
description = "Shared data types in the Iox NG architecture"
description = "Shared data types in the IOx NG architecture"
[dependencies]
data_types = { path = "../data_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ordered-float = "3"
schema = { path = "../schema" }
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] }
uuid = { version = "0.8", features = ["v4"] }

View File

@ -1,4 +1,4 @@
//! Shared data types in the Iox NG architecture
//! Shared data types in the IOx NG architecture
#![warn(
missing_copy_implementations,
@ -15,15 +15,14 @@ use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFiel
use std::{
collections::BTreeMap,
convert::TryFrom,
fmt::{Debug, Formatter},
num::NonZeroU32,
fmt::Write,
num::{FpCategory, NonZeroU32},
ops::{Add, Sub},
sync::Arc,
};
use uuid::Uuid;
pub use data_types::{
delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar},
names::{org_and_bucket_to_database, OrgBucketMappingError},
non_empty::NonEmptyString,
partition_metadata::{
@ -535,7 +534,7 @@ impl ColumnType {
}
impl std::fmt::Display for ColumnType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = self.as_str();
write!(f, "{}", s)
@ -1063,9 +1062,167 @@ pub struct StrftimeColumn {
pub format: String,
}
/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx
/// query engine.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DeletePredicate {
/// Only rows within this range are included in
/// results. Other rows are excluded.
pub range: TimestampRange,
/// Optional arbitrary predicates, represented as list of
/// expressions applied a logical conjunction (aka they
/// are 'AND'ed together). Only rows that evaluate to TRUE for all
/// these expressions should be returned. Other rows are excluded
/// from the results.
pub exprs: Vec<DeleteExpr>,
}
impl DeletePredicate {
/// Format expr to SQL string.
pub fn expr_sql_string(&self) -> String {
let mut out = String::new();
for expr in &self.exprs {
if !out.is_empty() {
write!(&mut out, " AND ").expect("writing to a string shouldn't fail");
}
write!(&mut out, "{}", expr).expect("writing to a string shouldn't fail");
}
out
}
/// Return the approximate memory size of the predicate, in bytes.
///
/// This includes `Self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>() + self.exprs.iter().map(|expr| expr.size()).sum::<usize>()
}
}
/// Single expression to be used as parts of a predicate.
///
/// Only very simple expression of the type `<column> <op> <scalar>` are supported.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DeleteExpr {
/// Column (w/o table name).
pub column: String,
/// Operator.
pub op: Op,
/// Scalar value.
pub scalar: Scalar,
}
impl DeleteExpr {
/// Create a new [`DeleteExpr`]
pub fn new(column: String, op: Op, scalar: Scalar) -> Self {
Self { column, op, scalar }
}
/// Column (w/o table name).
pub fn column(&self) -> &str {
&self.column
}
/// Operator.
pub fn op(&self) -> Op {
self.op
}
/// Scalar value.
pub fn scalar(&self) -> &Scalar {
&self.scalar
}
/// Return the approximate memory size of the expression, in bytes.
///
/// This includes `Self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>() + self.column.capacity() + self.scalar.size()
}
}
impl std::fmt::Display for DeleteExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
r#""{}"{}{}"#,
self.column().replace('\\', r#"\\"#).replace('"', r#"\""#),
self.op(),
self.scalar(),
)
}
}
/// Binary operator that can be evaluated on a column and a scalar value.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Op {
/// Strict equality (`=`).
Eq,
/// Inequality (`!=`).
Ne,
}
impl std::fmt::Display for Op {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Eq => write!(f, "="),
Self::Ne => write!(f, "!="),
}
}
}
/// Scalar value of a certain type.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[allow(missing_docs)]
pub enum Scalar {
Bool(bool),
I64(i64),
F64(ordered_float::OrderedFloat<f64>),
String(String),
}
impl Scalar {
/// Return the approximate memory size of the scalar, in bytes.
///
/// This includes `Self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>()
+ match &self {
Self::Bool(_) | Self::I64(_) | Self::F64(_) => 0,
Self::String(s) => s.capacity(),
}
}
}
impl std::fmt::Display for Scalar {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Scalar::Bool(value) => value.fmt(f),
Scalar::I64(value) => value.fmt(f),
Scalar::F64(value) => match value.classify() {
FpCategory::Nan => write!(f, "'NaN'"),
FpCategory::Infinite if *value.as_ref() < 0.0 => write!(f, "'-Infinity'"),
FpCategory::Infinite => write!(f, "'Infinity'"),
_ => write!(f, "{:?}", value.as_ref()),
},
Scalar::String(value) => {
write!(
f,
"'{}'",
value.replace('\\', r#"\\"#).replace('\'', r#"\'"#),
)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ordered_float::OrderedFloat;
#[test]
fn test_chunk_id_new() {
@ -1093,4 +1250,195 @@ mod tests {
assert_eq!(format!("{:?}", id_test), "ChunkId(42)");
assert_eq!(format!("{}", id_test), "ChunkId(42)");
}
#[test]
fn test_expr_to_sql_no_expressions() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
assert_eq!(&pred.expr_sql_string(), "");
}
#[test]
fn test_expr_to_sql_operators() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::I64(1),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Ne,
scalar: Scalar::I64(2),
},
],
};
assert_eq!(&pred.expr_sql_string(), r#""col1"=1 AND "col2"!=2"#);
}
#[test]
fn test_expr_to_sql_column_escape() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col 1"),
op: Op::Eq,
scalar: Scalar::I64(1),
},
DeleteExpr {
column: String::from(r#"col\2"#),
op: Op::Eq,
scalar: Scalar::I64(2),
},
DeleteExpr {
column: String::from(r#"col"3"#),
op: Op::Eq,
scalar: Scalar::I64(3),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col 1"=1 AND "col\\2"=2 AND "col\"3"=3"#
);
}
#[test]
fn test_expr_to_sql_bool() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::Bool(false),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::Bool(true),
},
],
};
assert_eq!(&pred.expr_sql_string(), r#""col1"=false AND "col2"=true"#);
}
#[test]
fn test_expr_to_sql_i64() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::I64(0),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::I64(-1),
},
DeleteExpr {
column: String::from("col3"),
op: Op::Eq,
scalar: Scalar::I64(1),
},
DeleteExpr {
column: String::from("col4"),
op: Op::Eq,
scalar: Scalar::I64(i64::MIN),
},
DeleteExpr {
column: String::from("col5"),
op: Op::Eq,
scalar: Scalar::I64(i64::MAX),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col1"=0 AND "col2"=-1 AND "col3"=1 AND "col4"=-9223372036854775808 AND "col5"=9223372036854775807"#
);
}
#[test]
fn test_expr_to_sql_f64() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(0.0)),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(-0.0)),
},
DeleteExpr {
column: String::from("col3"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(1.0)),
},
DeleteExpr {
column: String::from("col4"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(f64::INFINITY)),
},
DeleteExpr {
column: String::from("col5"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(f64::NEG_INFINITY)),
},
DeleteExpr {
column: String::from("col6"),
op: Op::Eq,
scalar: Scalar::F64(OrderedFloat::from(f64::NAN)),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col1"=0.0 AND "col2"=-0.0 AND "col3"=1.0 AND "col4"='Infinity' AND "col5"='-Infinity' AND "col6"='NaN'"#
);
}
#[test]
fn test_expr_to_sql_string() {
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
op: Op::Eq,
scalar: Scalar::String(String::from("")),
},
DeleteExpr {
column: String::from("col2"),
op: Op::Eq,
scalar: Scalar::String(String::from("foo")),
},
DeleteExpr {
column: String::from("col3"),
op: Op::Eq,
scalar: Scalar::String(String::from(r#"fo\o"#)),
},
DeleteExpr {
column: String::from("col4"),
op: Op::Eq,
scalar: Scalar::String(String::from(r#"fo'o"#)),
},
],
};
assert_eq!(
&pred.expr_sql_string(),
r#""col1"='' AND "col2"='foo' AND "col3"='fo\\o' AND "col4"='fo\'o'"#
);
}
}

View File

@ -7,6 +7,7 @@ description = "DML types"
[dependencies]
arrow_util = { path = "../arrow_util" }
data_types = { path = "../data_types" }
data_types2 = { path = "../data_types2" }
hashbrown = "0.12"
mutable_batch = { path = "../mutable_batch" }
ordered-float = "3"

View File

@ -11,17 +11,17 @@
clippy::clone_on_ref_ptr
)]
use std::collections::{BTreeMap, HashSet};
use data_types::router::{ShardConfig, ShardId};
use data_types::{
non_empty::NonEmptyString,
partition_metadata::{StatValues, Statistics},
router::{ShardConfig, ShardId},
sequence::Sequence,
};
use data_types2::DeletePredicate;
use hashbrown::HashMap;
use data_types::delete_predicate::DeletePredicate;
use data_types::non_empty::NonEmptyString;
use data_types::partition_metadata::{StatValues, Statistics};
use data_types::sequence::Sequence;
use iox_time::Time;
use mutable_batch::MutableBatch;
use std::collections::{BTreeMap, HashSet};
use trace::ctx::SpanContext;
/// Metadata information about a DML operation
@ -487,9 +487,10 @@ pub mod test_util {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::assert_writes_eq;
use data_types::{
consistent_hasher::ConsistentHasher,
delete_predicate::DeletePredicate,
non_empty::NonEmptyString,
router::{HashRing, Matcher, MatcherToShard},
timestamp::TimestampRange,
@ -497,10 +498,6 @@ mod tests {
use mutable_batch_lp::lines_to_batches;
use regex::Regex;
use crate::test_util::assert_writes_eq;
use super::*;
#[test]
fn test_write_sharding() {
let config = ShardConfig {

View File

@ -1,16 +1,16 @@
//! This module contains testing scenarios for Delete
use data_types::delete_predicate::{DeleteExpr, DeletePredicate};
use data_types::timestamp::TimestampRange;
use async_trait::async_trait;
use super::util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew};
use super::{DbScenario, DbSetup};
use super::{
util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew},
DbScenario, DbSetup,
};
use crate::scenarios::util::all_scenarios_for_one_chunk;
use async_trait::async_trait;
use data_types2::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange};
// =========================================================================================================================
// DELETE TEST SETUPS: chunk lp data, how many chunks, their types, how many delete predicates and when they happen
// ================================================================================================
// DELETE TEST SETUPS: chunk lp data, how many chunks, their types, how many delete predicates and
// when they happen
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS
@ -53,8 +53,8 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
range: TimestampRange::new(0, 15),
exprs: vec![DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::F64((1.0).into()),
Op::Eq,
Scalar::F64((1.0).into()),
)],
};
@ -104,16 +104,8 @@ impl DbSetup for OneDeleteMultiExprsOneChunk {
let pred = DeletePredicate {
range: TimestampRange::new(0, 30),
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::F64((1.0).into()),
),
DeleteExpr::new(
"foo".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String("me".to_string()),
),
DeleteExpr::new("bar".to_string(), Op::Eq, Scalar::F64((1.0).into())),
DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::String("me".to_string())),
],
};
@ -147,16 +139,8 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
let pred1 = DeletePredicate {
range: TimestampRange::new(0, 32),
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::F64((1.0).into()),
),
DeleteExpr::new(
"foo".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String("me".to_string()),
),
DeleteExpr::new("bar".to_string(), Op::Eq, Scalar::F64((1.0).into())),
DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::String("me".to_string())),
],
};
@ -165,8 +149,8 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
range: TimestampRange::new(10, 40),
exprs: vec![DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Ne,
data_types::delete_predicate::Scalar::F64((1.0).into()),
Op::Ne,
Scalar::F64((1.0).into()),
)],
};
@ -204,16 +188,8 @@ impl DbSetup for ThreeDeleteThreeChunks {
let pred1 = DeletePredicate {
range: TimestampRange::new(0, 30),
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::F64((1.0).into()),
),
DeleteExpr::new(
"foo".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String("me".to_string()),
),
DeleteExpr::new("bar".to_string(), Op::Eq, Scalar::F64((1.0).into())),
DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::String("me".to_string())),
],
};
@ -229,8 +205,8 @@ impl DbSetup for ThreeDeleteThreeChunks {
range: TimestampRange::new(20, 45),
exprs: vec![DeleteExpr::new(
"foo".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String("you".to_string()),
Op::Eq,
Scalar::String("you".to_string()),
)],
};
@ -246,8 +222,8 @@ impl DbSetup for ThreeDeleteThreeChunks {
range: TimestampRange::new(75, 95),
exprs: vec![DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Ne,
data_types::delete_predicate::Scalar::F64((7.0).into()),
Op::Ne,
Scalar::F64((7.0).into()),
)],
};

View File

@ -5,10 +5,7 @@ use super::{
};
use crate::scenarios::util::{make_n_chunks_scenario_new, ChunkDataNew};
use async_trait::async_trait;
use data_types::{
delete_predicate::{DeleteExpr, DeletePredicate},
timestamp::TimestampRange,
};
use data_types2::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange};
use query::frontend::sql::SqlQueryPlanner;
#[derive(Debug)]
@ -112,8 +109,8 @@ impl DbSetup for OneMeasurementManyNullTagsWithDelete {
range: TimestampRange::new(400, 602),
exprs: vec![DeleteExpr::new(
"state".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String(("NY").to_string()),
Op::Eq,
Scalar::String(("NY").to_string()),
)],
};
@ -205,8 +202,8 @@ impl DbSetup for TwoMeasurementsWithDelete {
range: TimestampRange::new(120, 160),
exprs: vec![DeleteExpr::new(
"region".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String("west".to_string()),
Op::Eq,
Scalar::String("west".to_string()),
)],
};
@ -237,8 +234,8 @@ impl DbSetup for TwoMeasurementsWithDeleteAll {
range: TimestampRange::new(120, 160),
exprs: vec![DeleteExpr::new(
"region".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String("west".to_string()),
Op::Eq,
Scalar::String("west".to_string()),
)],
};
@ -661,8 +658,8 @@ impl DbSetup for EndToEndTestWithDelete {
range: TimestampRange::new(6000, 6000),
exprs: vec![DeleteExpr::new(
"name".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String(("disk0").to_string()),
Op::Eq,
Scalar::String(("disk0").to_string()),
)],
};
@ -835,8 +832,8 @@ impl DbSetup for MeasurementsSortableTagsWithDelete {
range: TimestampRange::new(120, 350),
exprs: vec![DeleteExpr::new(
"state".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::String(("CA").to_string()),
Op::Eq,
Scalar::String(("CA").to_string()),
)],
};
@ -896,8 +893,8 @@ impl DbSetup for OneMeasurementNoTagsWithDelete {
range: TimestampRange::new(1, 1),
exprs: vec![DeleteExpr::new(
"foo".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::F64((1.0).into()),
Op::Eq,
Scalar::F64((1.0).into()),
)],
};

View File

@ -3,9 +3,9 @@ use super::DbScenario;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::delete_predicate::DeletePredicate;
use data_types2::{
NonEmptyString, PartitionId, Sequence, SequenceNumber, SequencerId, TombstoneId,
DeletePredicate, NonEmptyString, PartitionId, Sequence, SequenceNumber, SequencerId,
TombstoneId,
};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::StreamExt;
@ -14,10 +14,12 @@ use generated_types::{
ingester::IngesterQueryRequest,
};
use influxdb_iox_client::flight::Error as FlightError;
use ingester::data::{IngesterData, IngesterQueryResponse, Persister, SequencerData};
use ingester::lifecycle::LifecycleHandle;
use ingester::partioning::{Partitioner, PartitionerError};
use ingester::querier_handler::prepare_data_to_querier;
use ingester::{
data::{IngesterData, IngesterQueryResponse, Persister, SequencerData},
lifecycle::LifecycleHandle,
partioning::{Partitioner, PartitionerError},
querier_handler::prepare_data_to_querier,
};
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer};
use itertools::Itertools;
@ -28,11 +30,14 @@ use querier::{
IngesterFlightClientQueryData, QuerierCatalogCache, QuerierNamespace,
};
use schema::selection::Selection;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Write;
use std::sync::Mutex;
use std::{fmt::Display, sync::Arc};
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
fmt::Display,
fmt::Write,
sync::Arc,
sync::Mutex,
};
// Structs, enums, and functions used to exhaust all test scenarios of chunk life cycle
// & when delete predicates are applied

View File

@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
data_types = { path = "../data_types" }
data_types2 = { path = "../data_types2" }
dml = { path = "../dml" }
dotenv = "0.15.0"
futures = "0.3"

View File

@ -1,26 +1,23 @@
//! Encode/Decode for messages
use std::borrow::Cow;
use std::sync::Arc;
use data_types::non_empty::NonEmptyString;
use http::{HeaderMap, HeaderValue};
use prost::Message;
use data_types::sequence::Sequence;
use crate::core::WriteBufferError;
use data_types::{non_empty::NonEmptyString, sequence::Sequence};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use generated_types::google::FromOptionalField;
use generated_types::influxdata::iox::delete::v1::DeletePayload;
use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload;
use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload;
use generated_types::{
google::FromOptionalField,
influxdata::iox::{
delete::v1::DeletePayload,
write_buffer::v1::{write_buffer_payload::Payload, WriteBufferPayload},
},
};
use http::{HeaderMap, HeaderValue};
use iox_time::Time;
use mutable_batch_pb::decode::decode_database_batch;
use trace::ctx::SpanContext;
use trace::TraceCollector;
use prost::Message;
use std::{borrow::Cow, sync::Arc};
use trace::{ctx::SpanContext, TraceCollector};
use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser};
use crate::core::WriteBufferError;
/// Pbdata based content type
pub const CONTENT_TYPE_PROTOBUF: &str =
r#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#;

View File

@ -1,11 +1,14 @@
use std::{
collections::{BTreeMap, BTreeSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
use self::{
aggregator::DmlAggregator,
config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig},
};
use crate::{
codec::IoxHeaders,
core::{
WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler,
WriteBufferWriting,
},
};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use dml::{DmlMeta, DmlOperation};
@ -19,20 +22,14 @@ use rskafka::client::{
producer::{BatchProducer, BatchProducerBuilder},
ClientBuilder,
};
use trace::TraceCollector;
use crate::{
codec::IoxHeaders,
core::{
WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler,
WriteBufferWriting,
use std::{
collections::{BTreeMap, BTreeSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use self::{
aggregator::DmlAggregator,
config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig},
};
use trace::TraceCollector;
mod aggregator;
mod config;
@ -361,15 +358,7 @@ async fn setup_topic(
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;
use data_types::{delete_predicate::DeletePredicate, timestamp::TimestampRange};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use rskafka::{client::partition::Compression, record::Record};
use test_helpers::assert_contains;
use trace::{ctx::SpanContext, RingBufferTraceCollector};
use super::*;
use crate::{
core::test_utils::{
assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name,
@ -377,8 +366,13 @@ mod tests {
},
maybe_skip_kafka_integration,
};
use super::*;
use data_types2::{DeletePredicate, TimestampRange};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use rskafka::{client::partition::Compression, record::Record};
use std::num::NonZeroU32;
use test_helpers::assert_contains;
use trace::{ctx::SpanContext, RingBufferTraceCollector};
struct RSKafkaTestAdapter {
conn: String,