Merge branch 'main' into dom/schema-cache-warm

pull/24376/head
kodiakhq[bot] 2022-04-27 21:24:57 +00:00 committed by GitHub
commit b0ba0361ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 146 additions and 122 deletions

22
Cargo.lock generated
View File

@ -665,9 +665,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.1.11"
version = "3.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "423af4bd829996d7de0b8bf0279f5b91a20306937be514e972a95c3d3ab13f33"
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
dependencies = [
"atty",
"bitflags",
@ -684,7 +684,7 @@ dependencies = [
name = "clap_blocks"
version = "0.1.0"
dependencies = [
"clap 3.1.11",
"clap 3.1.12",
"data_types",
"futures",
"iox_catalog",
@ -2197,7 +2197,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 3.1.11",
"clap 3.1.12",
"clap_blocks",
"comfy-table",
"compactor",
@ -2519,7 +2519,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"chrono-english",
"clap 3.1.11",
"clap 3.1.12",
"criterion",
"data_types",
"futures",
@ -2548,7 +2548,7 @@ dependencies = [
"assert_matches",
"async-trait",
"chrono",
"clap 3.1.11",
"clap 3.1.12",
"dotenv",
"futures",
"glob",
@ -2651,7 +2651,7 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
"clap 3.1.11",
"clap 3.1.12",
"clap_blocks",
"data_types",
"dml",
@ -2724,7 +2724,7 @@ dependencies = [
"arrow_util",
"async-trait",
"bytes",
"clap 3.1.11",
"clap 3.1.12",
"clap_blocks",
"data_types",
"db",
@ -2900,7 +2900,7 @@ name = "ioxd_test"
version = "0.1.0"
dependencies = [
"async-trait",
"clap 3.1.11",
"clap 3.1.12",
"generated_types",
"hyper",
"ioxd_common",
@ -6464,7 +6464,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"clap 3.1.11",
"clap 3.1.12",
"futures",
"observability_deps",
"snafu",
@ -6610,7 +6610,7 @@ dependencies = [
name = "trogging"
version = "0.1.0"
dependencies = [
"clap 3.1.11",
"clap 3.1.12",
"logfmt",
"observability_deps",
"regex",

View File

@ -94,7 +94,7 @@ message Predicate {
// Optional arbitrary predicates on the special `_value` column. These expressions are applied to
// `field_columns` projections in the form of `CASE` statement conditions.
repeated BinaryExpr value_expr = 5;
repeated ValueExpr value_expr = 5;
}
// Specifies a continuous range of nanosecond timestamps.
@ -143,11 +143,9 @@ message LogicalExprNode {
}
}
// A representation of the `BinaryExpr` variant of a DataFusion expression for value expressions.
message BinaryExpr {
Column left = 1;
string op = 2;
LogicalExprNode right = 3;
// A wrapper around a DataFusion expression against `_value` columns
message ValueExpr {
LogicalExprNode expr = 1;
}
message BinaryExprNode {

View File

@ -18,7 +18,7 @@ use datafusion::{
sha384, sha512, trim, upper,
},
};
use predicate::{BinaryExpr, Predicate};
use predicate::{Predicate, ValueExpr};
impl TryFrom<proto::IngesterQueryRequest> for IngesterQueryRequest {
type Error = FieldViolation;
@ -121,16 +121,18 @@ impl TryFrom<proto::Predicate> for Predicate {
let value_expr = value_expr
.into_iter()
.map(|ve| {
let left = ve.left.unwrap_field("left")?;
let right = ve.right.unwrap_field("right")?;
Ok(BinaryExpr {
left: from_proto_column(left),
op: from_proto_binary_op("op", &ve.op)?,
right: from_proto_expr(right)?,
})
let expr = ve.expr.unwrap_field("expr")?;
let expr = from_proto_expr(expr)?;
expr.try_into() // into ValueExpr
.map_err(|e| FieldViolation {
field: "expr".into(),
description: format!(
"Internal: Serialized expr a valid ValueExpr: {:?}",
e
),
})
})
.collect::<Result<Vec<BinaryExpr>, FieldViolation>>()?;
.collect::<Result<Vec<ValueExpr>, FieldViolation>>()?;
Ok(Self {
field_columns,
@ -142,16 +144,12 @@ impl TryFrom<proto::Predicate> for Predicate {
}
}
impl TryFrom<BinaryExpr> for proto::BinaryExpr {
impl TryFrom<ValueExpr> for proto::ValueExpr {
type Error = FieldViolation;
fn try_from(bin_expr: BinaryExpr) -> Result<Self, Self::Error> {
let BinaryExpr { left, op, right } = bin_expr;
fn try_from(value_expr: ValueExpr) -> Result<Self, Self::Error> {
Ok(Self {
left: Some(from_column(left)),
op: op.to_string(),
right: Some(from_expr(right)?),
expr: Some(from_expr(value_expr.into())?),
})
}
}

View File

@ -35,7 +35,7 @@ async fn test_git_version() {
.arg("--version")
.assert()
.success()
.stderr(
.stdout(
predicate::str::contains("UNKNOWN")
.not()
.and(predicate::str::is_match("revision [0-9a-f]{40}").unwrap()),

View File

@ -483,7 +483,7 @@ async fn release_database() {
.arg(addr)
.assert()
.failure()
.stdout(predicate::str::contains(
.stderr(predicate::str::contains(
r#"Invalid value "foo" for '--uuid <UUID>'"#,
));
@ -636,7 +636,7 @@ async fn claim_database() {
.arg(addr)
.assert()
.failure()
.stdout(predicate::str::contains(
.stderr(predicate::str::contains(
r#"Invalid value "foo" for '<UUID>'"#,
));

View File

@ -8,7 +8,7 @@ async fn test_git_version() {
.arg("--version")
.assert()
.success()
.stderr(
.stdout(
predicate::str::contains("UNKNOWN")
.not()
.and(predicate::str::is_match("revision [0-9a-f]{40}").unwrap()),

View File

@ -645,7 +645,7 @@ impl NamespaceData {
return table_data
.partition_data
.get_mut(&partition_info.partition.partition_key)
.map(|partition_data| {
.and_then(|partition_data| {
partition_data.snapshot_to_persisting_batch(
partition_info.partition.sequencer_id,
partition_info.partition.table_id,
@ -958,7 +958,7 @@ impl PartitionData {
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Arc<PersistingBatch> {
) -> Option<Arc<PersistingBatch>> {
self.data
.snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name)
}
@ -1027,13 +1027,16 @@ impl PartitionData {
// First apply the tombstone on all in-memeory & non-persisting data
// Make a QueryableBatch for all buffer + snapshots + the given tombstone
let max_sequencer_number = tombstone.sequence_number;
let query_batch = self
let query_batch = match self
.data
.snapshot_to_queryable_batch(table_name, Some(tombstone.clone()));
if query_batch.is_empty() {
//No need to procedd further
return;
}
.snapshot_to_queryable_batch(table_name, Some(tombstone.clone()))
{
Some(query_batch) if !query_batch.is_empty() => query_batch,
_ => {
// No need to procedd further
return;
}
};
let (min_sequencer_number, _) = query_batch.min_max_sequence_numbers();
assert!(min_sequencer_number <= max_sequencer_number);
@ -1201,7 +1204,7 @@ impl DataBuffer {
&mut self,
table_name: &str,
tombstone: Option<Tombstone>,
) -> QueryableBatch {
) -> Option<QueryableBatch> {
self.snapshot()
.expect("This mutable batch snapshot error should be impossible.");
@ -1212,7 +1215,13 @@ impl DataBuffer {
if let Some(tombstone) = tombstone {
tombstones.push(tombstone);
}
QueryableBatch::new(table_name, data, tombstones)
// only produce batch if there is any data
if data.is_empty() {
None
} else {
Some(QueryableBatch::new(table_name, data, tombstones))
}
}
/// Returns all existing snapshots plus data in the buffer
@ -1230,32 +1239,36 @@ impl DataBuffer {
Ok(snapshots)
}
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. Returns error
/// if there is already a persisting batch.
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`.
///
/// # Panic
/// Panics if there is already a persisting batch.
pub fn snapshot_to_persisting(
&mut self,
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Arc<PersistingBatch> {
) -> Option<Arc<PersistingBatch>> {
if self.persisting.is_some() {
panic!("Unable to snapshot while persisting. This is an unexpected state.")
}
let queryable_batch = self.snapshot_to_queryable_batch(table_name, None);
if let Some(queryable_batch) = self.snapshot_to_queryable_batch(table_name, None) {
let persisting_batch = Arc::new(PersistingBatch {
sequencer_id,
table_id,
partition_id,
object_store_id: Uuid::new_v4(),
data: Arc::new(queryable_batch),
});
let persisting_batch = Arc::new(PersistingBatch {
sequencer_id,
table_id,
partition_id,
object_store_id: Uuid::new_v4(),
data: Arc::new(queryable_batch),
});
self.persisting = Some(Arc::clone(&persisting_batch));
self.persisting = Some(Arc::clone(&persisting_batch));
persisting_batch
Some(persisting_batch)
} else {
None
}
}
/// Add a persiting batch into the buffer persisting list
@ -1990,12 +2003,14 @@ mod tests {
// ------------------------------------------
// Persisting
let p_batch = p.snapshot_to_persisting_batch(
SequencerId::new(s_id),
TableId::new(t_id),
PartitionId::new(p_id),
table_name,
);
let p_batch = p
.snapshot_to_persisting_batch(
SequencerId::new(s_id),
TableId::new(t_id),
PartitionId::new(p_id),
table_name,
)
.unwrap();
// verify data
assert!(p.data.buffer.is_none()); // always empty after issuing persit

View File

@ -16,11 +16,12 @@ pub mod rpc_predicate;
use data_types::timestamp::{TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME};
use datafusion::{
error::DataFusionError,
logical_plan::{col, lit_timestamp_nano, Column, Expr, Operator},
logical_plan::{binary_expr, col, lit_timestamp_nano, Expr, Operator},
optimizer::utils,
};
use datafusion_util::{make_range_expr, AndExprBuilder};
use observability_deps::tracing::debug;
use rpc_predicate::VALUE_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
use std::{
collections::{BTreeSet, HashSet},
@ -66,7 +67,7 @@ pub struct Predicate {
/// Optional arbitrary predicates on the special `_value` column. These
/// expressions are applied to `field_columns` projections in the form of
/// `CASE` statement conditions.
pub value_expr: Vec<BinaryExpr>,
pub value_expr: Vec<ValueExpr>,
}
impl Predicate {
@ -450,12 +451,52 @@ impl PredicateBuilder {
}
}
// A representation of the `BinaryExpr` variant of a Datafusion expression.
// Wrapper around `Expr::BinaryExpr` where left input is known to be
// single Column reference to the `_value` column
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct BinaryExpr {
pub left: Column,
pub op: Operator,
pub right: Expr,
pub struct ValueExpr {
expr: Expr,
}
impl TryFrom<Expr> for ValueExpr {
/// Returns the original Expr if conversion doesn't work
type Error = Expr;
/// tries to create a new ValueExpr. If `expr` follows the
/// expected pattrn, returns Ok(Self). If not, returns Err(expr)
fn try_from(expr: Expr) -> Result<Self, Self::Error> {
if let Expr::BinaryExpr {
left,
op: _,
right: _,
} = &expr
{
if let Expr::Column(inner) = left.as_ref() {
if inner.name == VALUE_COLUMN_NAME {
return Ok(Self { expr });
}
}
}
Err(expr)
}
}
impl ValueExpr {
/// Returns a new [`Expr`] with the reference to the `_value`
/// column replaced with the specified column name
pub fn replace_col(&self, name: &str) -> Expr {
if let Expr::BinaryExpr { left: _, op, right } = &self.expr {
binary_expr(col(name), *op, right.as_ref().clone())
} else {
unreachable!("Unexpected content in ValueExpr")
}
}
}
impl From<ValueExpr> for Expr {
fn from(value_expr: ValueExpr) -> Self {
value_expr.expr
}
}
#[cfg(test)]

View File

@ -1,6 +1,6 @@
//! Interface logic between IOx ['Predicate`] and predicates used by the
//! InfluxDB Storage gRPC API
use crate::{rewrite, BinaryExpr, Predicate};
use crate::{rewrite, Predicate, ValueExpr};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::ExecutionProps;
@ -9,7 +9,6 @@ use datafusion::logical_plan::{
Operator, SimplifyInfo,
};
use datafusion::scalar::ScalarValue;
use datafusion_util::AsExpr;
use schema::Schema;
use std::collections::BTreeSet;
use std::sync::Arc;
@ -287,7 +286,7 @@ impl ExprRewriter for MeasurementRewriter<'_> {
/// encountered expressions onto `value_exprs` so they can be moved onto column
/// projections.
fn rewrite_field_value_references(
value_exprs: &mut Vec<BinaryExpr>,
value_exprs: &mut Vec<ValueExpr>,
expr: Expr,
) -> DataFusionResult<Expr> {
let mut rewriter = FieldValueRewriter { value_exprs };
@ -295,31 +294,21 @@ fn rewrite_field_value_references(
}
struct FieldValueRewriter<'a> {
value_exprs: &'a mut Vec<BinaryExpr>,
value_exprs: &'a mut Vec<ValueExpr>,
}
impl<'a> ExprRewriter for FieldValueRewriter<'a> {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
Ok(match expr {
Expr::BinaryExpr {
ref left,
op,
ref right,
} => {
if let Expr::Column(inner) = &**left {
if inner.name == VALUE_COLUMN_NAME {
self.value_exprs.push(BinaryExpr {
left: inner.to_owned(),
op,
right: right.as_expr(),
});
return Ok(lit(true));
}
}
expr
// try and convert Expr into a ValueExpr
match expr.try_into() {
// found a value expr. Save and replace with true
Ok(value_expr) => {
self.value_exprs.push(value_expr);
Ok(lit(true))
}
_ => expr,
})
// not a ValueExpr, so leave the same
Err(expr) => Ok(expr),
}
}
}
@ -389,13 +378,8 @@ mod tests {
binary_expr(col(VALUE_COLUMN_NAME), Operator::Eq, lit(1.82)),
// _value = 1.82 -> true
lit(true),
vec![BinaryExpr {
left: Column {
relation: None,
name: VALUE_COLUMN_NAME.into(),
},
op: Operator::Eq,
right: lit(1.82),
vec![ValueExpr {
expr: binary_expr(col(VALUE_COLUMN_NAME), Operator::Eq, lit(1.82)),
}],
),
];
@ -412,19 +396,9 @@ mod tests {
};
let input = binary_expr(col(VALUE_COLUMN_NAME), Operator::Gt, lit(1.88));
let rewritten = input.rewrite(&mut rewriter).unwrap();
let rewritten = input.clone().rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, lit(true));
assert_eq!(
rewriter.value_exprs,
&mut vec![BinaryExpr {
left: Column {
relation: None,
name: VALUE_COLUMN_NAME.into(),
},
op: Operator::Gt,
right: lit(1.88),
}]
);
assert_eq!(rewriter.value_exprs, &mut vec![ValueExpr { expr: input }]);
}
#[test]

View File

@ -9,7 +9,7 @@ use data_types::chunk_metadata::ChunkId;
use datafusion::{
error::DataFusionError,
logical_plan::{
binary_expr, col, when, DFSchemaRef, Expr, ExprRewritable, ExprSchemable, LogicalPlan,
col, when, DFSchemaRef, Expr, ExprRewritable, ExprSchemable, LogicalPlan,
LogicalPlanBuilder,
},
};
@ -18,7 +18,7 @@ use datafusion_util::AsExpr;
use hashbrown::HashSet;
use observability_deps::tracing::{debug, trace};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::{BinaryExpr, Predicate, PredicateMatch};
use predicate::{Predicate, PredicateMatch};
use query_functions::{
group_by::{Aggregate, WindowDuration},
make_window_bound_expr,
@ -1638,9 +1638,7 @@ fn filtered_fields_iter<'a>(
let expr = predicate
.value_expr
.iter()
.map(|BinaryExpr { left: _, op, right }| {
binary_expr(col(f.name()), *op, right.as_expr())
})
.map(|value_expr| value_expr.replace_col(f.name()))
.reduce(|a, b| a.or(b))
.map(|when_expr| when(when_expr, col(f.name())).end())
.unwrap_or_else(|| Ok(col(f.name())))