Merge branch 'main' into dom/isolated-catalog-tests
commit
74768db827
|
@ -17,8 +17,11 @@ use crate::{
|
|||
use data_types::database_rules::{PartitionTemplate, TemplatePart};
|
||||
use observability_deps::tracing::*;
|
||||
use router2::{
|
||||
dml_handlers::{NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer},
|
||||
namespace_cache::{MemoryNamespaceCache, ShardedCache},
|
||||
dml_handlers::{
|
||||
InstrumentationDecorator, NamespaceAutocreation, Partitioner, SchemaValidator,
|
||||
ShardedWriteBuffer,
|
||||
},
|
||||
namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache},
|
||||
sequencer::Sequencer,
|
||||
server::{http::HttpDelegate, RouterServer},
|
||||
sharder::JumpHash,
|
||||
|
@ -94,14 +97,24 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
)
|
||||
.await?;
|
||||
|
||||
// Initialise a namespace cache to be shared with the schema validator, and
|
||||
// namespace auto-creator.
|
||||
let ns_cache = Arc::new(ShardedCache::new(
|
||||
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
|
||||
// Wrap the write buffer with metrics
|
||||
let handler_stack =
|
||||
InstrumentationDecorator::new("sharded_write_buffer", Arc::clone(&metrics), write_buffer);
|
||||
|
||||
// Initialise an instrumented namespace cache to be shared with the schema
|
||||
// validator, and namespace auto-creator that reports cache hit/miss/update
|
||||
// metrics.
|
||||
let ns_cache = Arc::new(InstrumentedCache::new(
|
||||
Arc::new(ShardedCache::new(
|
||||
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
|
||||
)),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
// Add the schema validator layer.
|
||||
let handler_stack =
|
||||
SchemaValidator::new(write_buffer, Arc::clone(&catalog), Arc::clone(&ns_cache));
|
||||
SchemaValidator::new(handler_stack, Arc::clone(&catalog), Arc::clone(&ns_cache));
|
||||
let handler_stack =
|
||||
InstrumentationDecorator::new("schema_validator", Arc::clone(&metrics), handler_stack);
|
||||
|
||||
// Add a write partitioner into the handler stack that splits by the date
|
||||
// portion of the write's timestamp.
|
||||
|
@ -111,6 +124,8 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
|
||||
},
|
||||
);
|
||||
let handler_stack =
|
||||
InstrumentationDecorator::new("partitioner", Arc::clone(&metrics), handler_stack);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
|
@ -163,6 +178,10 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
//
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Record the overall request handling latency
|
||||
let handler_stack =
|
||||
InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack);
|
||||
|
||||
let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack);
|
||||
let router_server = RouterServer::new(
|
||||
http,
|
||||
|
|
|
@ -894,7 +894,7 @@ mod tests {
|
|||
}
|
||||
|
||||
fn table_predicate(predicate: InfluxRpcPredicate) -> Predicate {
|
||||
let predicates = predicate.table_predicates(&Tables::new(&["foo"]));
|
||||
let predicates = predicate.table_predicates(&Tables::new(&["foo"])).unwrap();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
predicates.into_iter().next().unwrap().1
|
||||
}
|
||||
|
@ -986,7 +986,7 @@ mod tests {
|
|||
|
||||
let tables = Tables::new(&["foo", "bar"]);
|
||||
|
||||
let table_predicates = predicate.table_predicates(&tables);
|
||||
let table_predicates = predicate.table_predicates(&tables).unwrap();
|
||||
assert_eq!(table_predicates.len(), 2);
|
||||
|
||||
for (expected_table, (table, predicate)) in tables.table_names.iter().zip(table_predicates)
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//! InfluxDB Storage gRPC API
|
||||
use crate::{rewrite, BinaryExpr, Predicate};
|
||||
|
||||
use datafusion::error::Result as DataFusionResult;
|
||||
use datafusion::error::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion::execution::context::ExecutionProps;
|
||||
use datafusion::logical_plan::{
|
||||
lit, Column, Expr, ExprRewritable, ExprRewriter, ExprSchema, ExprSchemable, ExprSimplifiable,
|
||||
|
@ -84,7 +84,7 @@ impl InfluxRpcPredicate {
|
|||
pub fn table_predicates<D: QueryDatabaseMeta>(
|
||||
&self,
|
||||
table_info: &D,
|
||||
) -> Vec<(String, Predicate)> {
|
||||
) -> DataFusionResult<Vec<(String, Predicate)>> {
|
||||
let table_names = match &self.table_names {
|
||||
Some(table_names) => itertools::Either::Left(table_names.iter().cloned()),
|
||||
None => itertools::Either::Right(table_info.table_names().into_iter()),
|
||||
|
@ -93,9 +93,9 @@ impl InfluxRpcPredicate {
|
|||
table_names
|
||||
.map(|table| {
|
||||
let schema = table_info.table_schema(&table);
|
||||
let predicate = normalize_predicate(&table, schema, &self.inner);
|
||||
let predicate = normalize_predicate(&table, schema, &self.inner)?;
|
||||
|
||||
(table, predicate)
|
||||
Ok((table, predicate))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ fn normalize_predicate(
|
|||
table_name: &str,
|
||||
schema: Option<Arc<Schema>>,
|
||||
predicate: &Predicate,
|
||||
) -> Predicate {
|
||||
) -> DataFusionResult<Predicate> {
|
||||
let mut predicate = predicate.clone();
|
||||
let mut field_projections = BTreeSet::new();
|
||||
let mut field_value_exprs = vec![];
|
||||
|
@ -162,35 +162,31 @@ fn normalize_predicate(
|
|||
predicate.exprs = predicate
|
||||
.exprs
|
||||
.into_iter()
|
||||
.map(|e| rewrite_measurement_references(table_name, e))
|
||||
// Rewrite any references to `_value = some_value` to literal true values.
|
||||
// Keeps track of these expressions, which can then be used to
|
||||
// augment field projections with conditions using `CASE` statements.
|
||||
.map(|e| rewrite_field_value_references(&mut field_value_exprs, e))
|
||||
.map(|e| {
|
||||
// Rewrite any references to `_field = a_field_name` with a literal true
|
||||
// and keep track of referenced field names to add to the field
|
||||
// column projection set.
|
||||
rewrite_field_column_references(&mut field_projections, e)
|
||||
rewrite_measurement_references(table_name, e)
|
||||
// Rewrite any references to `_value = some_value` to literal true values.
|
||||
// Keeps track of these expressions, which can then be used to
|
||||
// augment field projections with conditions using `CASE` statements.
|
||||
.and_then(|e| rewrite_field_value_references(&mut field_value_exprs, e))
|
||||
// Rewrite any references to `_field = a_field_name` with a literal true
|
||||
// and keep track of referenced field names to add to the field
|
||||
// column projection set.
|
||||
.and_then(|e| rewrite_field_column_references(&mut field_projections, e))
|
||||
// apply IOx specific rewrites (that unlock other simplifications)
|
||||
.and_then(rewrite::rewrite)
|
||||
// Call the core DataFusion simplification logic
|
||||
.and_then(|e| {
|
||||
if let Some(schema) = &schema {
|
||||
let adapter = SimplifyAdapter::new(schema.as_ref());
|
||||
// simplify twice to ensure "full" cleanup
|
||||
e.simplify(&adapter)?.simplify(&adapter)
|
||||
} else {
|
||||
Ok(e)
|
||||
}
|
||||
})
|
||||
.and_then(rewrite::simplify_predicate)
|
||||
})
|
||||
.map(|e| {
|
||||
// apply IOx specific rewrites (that unlock other simplifications)
|
||||
rewrite::rewrite(e).expect("rewrite failed")
|
||||
})
|
||||
.map(|e| {
|
||||
if let Some(schema) = &schema {
|
||||
let adapter = SimplifyAdapter::new(schema.as_ref());
|
||||
// simplify twice to ensure "full" cleanup
|
||||
e.simplify(&adapter)
|
||||
.expect("Expression simplificiation round 1 failed")
|
||||
.simplify(&adapter)
|
||||
.expect("Expression simplificiation round 2 failed")
|
||||
} else {
|
||||
e
|
||||
}
|
||||
})
|
||||
.map(|e| rewrite::simplify_predicate(e).expect("simplify failed"))
|
||||
.collect::<Vec<_>>();
|
||||
.collect::<DataFusionResult<Vec<_>>>()?;
|
||||
// Store any field value (`_value`) expressions on the `Predicate`.
|
||||
predicate.value_expr = field_value_exprs;
|
||||
|
||||
|
@ -200,7 +196,7 @@ fn normalize_predicate(
|
|||
None => predicate.field_columns = Some(field_projections),
|
||||
};
|
||||
}
|
||||
predicate
|
||||
Ok(predicate)
|
||||
}
|
||||
|
||||
struct SimplifyAdapter<'a> {
|
||||
|
@ -254,18 +250,17 @@ impl<'a> ExprSchema for SimplifyAdapter<'a> {
|
|||
|
||||
fn data_type(&self, col: &Column) -> DataFusionResult<&arrow::datatypes::DataType> {
|
||||
assert!(col.relation.is_none());
|
||||
Ok(self
|
||||
.field(&col.name)
|
||||
self.field(&col.name)
|
||||
.map(|f| f.data_type())
|
||||
.expect("found field for datatype"))
|
||||
.ok_or_else(|| DataFusionError::Plan(format!("Unknown field {}", &col.name)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrites all references to the [MEASUREMENT_COLUMN_NAME] column
|
||||
/// with the actual table name
|
||||
fn rewrite_measurement_references(table_name: &str, expr: Expr) -> Expr {
|
||||
fn rewrite_measurement_references(table_name: &str, expr: Expr) -> DataFusionResult<Expr> {
|
||||
let mut rewriter = MeasurementRewriter { table_name };
|
||||
expr.rewrite(&mut rewriter).expect("rewrite is infallible")
|
||||
expr.rewrite(&mut rewriter)
|
||||
}
|
||||
|
||||
struct MeasurementRewriter<'a> {
|
||||
|
@ -291,9 +286,12 @@ impl ExprRewriter for MeasurementRewriter<'_> {
|
|||
/// Rewrites an expression on `_value` as a boolean true literal, pushing any
|
||||
/// encountered expressions onto `value_exprs` so they can be moved onto column
|
||||
/// projections.
|
||||
fn rewrite_field_value_references(value_exprs: &mut Vec<BinaryExpr>, expr: Expr) -> Expr {
|
||||
fn rewrite_field_value_references(
|
||||
value_exprs: &mut Vec<BinaryExpr>,
|
||||
expr: Expr,
|
||||
) -> DataFusionResult<Expr> {
|
||||
let mut rewriter = FieldValueRewriter { value_exprs };
|
||||
expr.rewrite(&mut rewriter).expect("rewrite is infallible")
|
||||
expr.rewrite(&mut rewriter)
|
||||
}
|
||||
|
||||
struct FieldValueRewriter<'a> {
|
||||
|
@ -334,9 +332,9 @@ impl<'a> ExprRewriter for FieldValueRewriter<'a> {
|
|||
fn rewrite_field_column_references(
|
||||
field_projections: &'_ mut BTreeSet<String>,
|
||||
expr: Expr,
|
||||
) -> Expr {
|
||||
) -> DataFusionResult<Expr> {
|
||||
let mut rewriter = FieldColumnRewriter { field_projections };
|
||||
expr.rewrite(&mut rewriter).expect("rewrite is infallible")
|
||||
expr.rewrite(&mut rewriter)
|
||||
}
|
||||
|
||||
struct FieldColumnRewriter<'a> {
|
||||
|
|
|
@ -96,6 +96,11 @@ pub enum Error {
|
|||
source: crate::provider::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("gRPC planner got error creating predicates: {}", source))]
|
||||
CreatingPredicates {
|
||||
source: datafusion::error::DataFusionError,
|
||||
},
|
||||
|
||||
#[snafu(display("gRPC planner got error building plan: {}", source))]
|
||||
BuildingPlan {
|
||||
source: datafusion::error::DataFusionError,
|
||||
|
@ -227,7 +232,9 @@ impl InfluxRpcPlanner {
|
|||
// Mapping between table and chunks that need full plan
|
||||
let mut full_plan_table_chunks = BTreeMap::new();
|
||||
|
||||
let table_predicates = rpc_predicate.table_predicates(database);
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(database)
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
for (table_name, predicate) in &table_predicates {
|
||||
// Identify which chunks can answer from its metadata and then record its table,
|
||||
// and which chunks needs full plan and group them into their table
|
||||
|
@ -334,7 +341,9 @@ impl InfluxRpcPlanner {
|
|||
let mut need_full_plans = BTreeMap::new();
|
||||
let mut known_columns = BTreeSet::new();
|
||||
|
||||
let table_predicates = rpc_predicate.table_predicates(database);
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(database)
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
for (table_name, predicate) in &table_predicates {
|
||||
for chunk in database.chunks(table_name, predicate) {
|
||||
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
|
||||
|
@ -464,7 +473,9 @@ impl InfluxRpcPlanner {
|
|||
let mut need_full_plans = BTreeMap::new();
|
||||
let mut known_values = BTreeSet::new();
|
||||
|
||||
let table_predicates = rpc_predicate.table_predicates(database);
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(database)
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
for (table_name, predicate) in &table_predicates {
|
||||
for chunk in database.chunks(table_name, predicate) {
|
||||
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
|
||||
|
@ -629,7 +640,9 @@ impl InfluxRpcPlanner {
|
|||
// The executor then figures out which columns have non-null
|
||||
// values and stops the plan executing once it has them
|
||||
|
||||
let table_predicates = rpc_predicate.table_predicates(database);
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(database)
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
let mut field_list_plan = FieldListPlan::with_capacity(table_predicates.len());
|
||||
|
||||
for (table_name, predicate) in &table_predicates {
|
||||
|
@ -680,7 +693,9 @@ impl InfluxRpcPlanner {
|
|||
{
|
||||
debug!(?rpc_predicate, "planning read_filter");
|
||||
|
||||
let table_predicates = rpc_predicate.table_predicates(database);
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(database)
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
let mut ss_plans = Vec::with_capacity(table_predicates.len());
|
||||
for (table_name, predicate) in &table_predicates {
|
||||
let chunks = database.chunks(table_name, predicate);
|
||||
|
@ -736,7 +751,9 @@ impl InfluxRpcPlanner {
|
|||
{
|
||||
debug!(?rpc_predicate, ?agg, "planning read_group");
|
||||
|
||||
let table_predicates = rpc_predicate.table_predicates(database);
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(database)
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
let mut ss_plans = Vec::with_capacity(table_predicates.len());
|
||||
|
||||
for (table_name, predicate) in &table_predicates {
|
||||
|
@ -798,7 +815,9 @@ impl InfluxRpcPlanner {
|
|||
);
|
||||
|
||||
// group tables by chunk, pruning if possible
|
||||
let table_predicates = rpc_predicate.table_predicates(database);
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(database)
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
let mut ss_plans = Vec::with_capacity(table_predicates.len());
|
||||
for (table_name, predicate) in &table_predicates {
|
||||
let chunks = database.chunks(table_name, predicate);
|
||||
|
|
|
@ -1,21 +1,25 @@
|
|||
//! Tests for the Influx gRPC queries
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(test)]
|
||||
use crate::scenarios::{
|
||||
DbScenario, DbSetup, NoData, TwoMeasurements, TwoMeasurementsManyFields,
|
||||
TwoMeasurementsWithDelete, TwoMeasurementsWithDeleteAll,
|
||||
};
|
||||
use crate::{
|
||||
influxrpc::util::run_series_set_plan,
|
||||
influxrpc::util::run_series_set_plan_maybe_error,
|
||||
scenarios::{
|
||||
MeasurementStatusCode, MeasurementsForDefect2845, MeasurementsSortableTags,
|
||||
MeasurementsSortableTagsWithDelete, TwoMeasurementsMultiSeries,
|
||||
TwoMeasurementsMultiSeriesWithDelete, TwoMeasurementsMultiSeriesWithDeleteAll,
|
||||
},
|
||||
};
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use datafusion::logical_plan::{col, lit, when};
|
||||
use db::Db;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::frontend::influxrpc::InfluxRpcPlanner;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
/// runs read_filter(predicate) and compares it to the expected
|
||||
/// output
|
||||
|
@ -34,14 +38,9 @@ async fn run_read_filter_test_case<D>(
|
|||
} = scenario;
|
||||
println!("Running scenario '{}'", scenario_name);
|
||||
println!("Predicate: '{:#?}'", predicate);
|
||||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
let plan = planner
|
||||
.read_filter(db.as_ref(), predicate.clone())
|
||||
.expect("built plan successfully");
|
||||
|
||||
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
|
||||
let string_results = run_series_set_plan(&ctx, plan).await;
|
||||
let string_results = run_read_filter(predicate.clone(), db)
|
||||
.await
|
||||
.expect("Unexpected error running read filter");
|
||||
|
||||
assert_eq!(
|
||||
expected_results, string_results,
|
||||
|
@ -51,6 +50,48 @@ async fn run_read_filter_test_case<D>(
|
|||
}
|
||||
}
|
||||
|
||||
/// runs read_filter(predicate) and compares it to the expected
|
||||
/// output
|
||||
async fn run_read_filter(
|
||||
predicate: InfluxRpcPredicate,
|
||||
db: Arc<Db>,
|
||||
) -> Result<Vec<String>, String> {
|
||||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
let plan = planner
|
||||
.read_filter(db.as_ref(), predicate)
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
|
||||
run_series_set_plan_maybe_error(&ctx, plan)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
/// runs read_filter(predicate), expecting an error and compares to expected message
|
||||
async fn run_read_filter_error_case<D>(
|
||||
db_setup: D,
|
||||
predicate: InfluxRpcPredicate,
|
||||
expected_error: &str,
|
||||
) where
|
||||
D: DbSetup,
|
||||
{
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
for scenario in db_setup.make().await {
|
||||
let DbScenario {
|
||||
scenario_name, db, ..
|
||||
} = scenario;
|
||||
println!("Running scenario '{}'", scenario_name);
|
||||
println!("Predicate: '{:#?}'", predicate);
|
||||
let result = run_read_filter(predicate.clone(), db)
|
||||
.await
|
||||
.expect_err("Unexpected success running error case");
|
||||
|
||||
assert_contains!(result.to_string(), expected_error);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_no_data_no_pred() {
|
||||
let expected_results = vec![] as Vec<&str>;
|
||||
|
@ -138,13 +179,65 @@ async fn test_read_filter_data_tag_predicate() {
|
|||
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_invalid_predicate() {
|
||||
let predicate = PredicateBuilder::new()
|
||||
// region > 5 (region is a tag(string) column, so this predicate is invalid)
|
||||
.add_expr(col("region").gt(lit(5i32)))
|
||||
.build();
|
||||
let predicate = InfluxRpcPredicate::new(None, predicate);
|
||||
|
||||
let expected_error = "Error during planning: 'Dictionary(Int32, Utf8) > Int32' can't be evaluated because there isn't a common type to coerce the types to";
|
||||
|
||||
run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_invalid_predicate_case() {
|
||||
let predicate = PredicateBuilder::new()
|
||||
// https://github.com/influxdata/influxdb_iox/issues/3635
|
||||
// model what happens when a field is treated like a tag
|
||||
// CASE WHEN system" IS NULL THEN '' ELSE system END = 5;
|
||||
.add_expr(
|
||||
when(col("system").is_null(), lit(""))
|
||||
.otherwise(col("system"))
|
||||
.unwrap()
|
||||
.eq(lit(5i32)),
|
||||
)
|
||||
.build();
|
||||
let predicate = InfluxRpcPredicate::new(None, predicate);
|
||||
|
||||
let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Int32' can't be evaluated because there isn't a common type to coerce the types to";
|
||||
|
||||
run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_unknown_column_in_predicate() {
|
||||
let predicate = PredicateBuilder::new()
|
||||
// mystery_region is not a real column, so this predicate is
|
||||
// invalid but IOx should be able to handle it (and produce no results)
|
||||
.add_expr(
|
||||
col("baz")
|
||||
.eq(lit(4i32))
|
||||
.or(col("bar").and(col("mystery_region").gt(lit(5i32)))),
|
||||
)
|
||||
.build();
|
||||
|
||||
let predicate = InfluxRpcPredicate::new(None, predicate);
|
||||
|
||||
let expected_results = vec![];
|
||||
|
||||
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_data_no_pred_with_delete() {
|
||||
let expected_results = vec![
|
||||
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100], values: [70.4]",
|
||||
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
|
||||
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
|
||||
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
|
||||
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100], values: [70.4]",
|
||||
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
|
||||
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
|
||||
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
|
||||
];
|
||||
|
||||
run_read_filter_test_case(
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use datafusion::error::DataFusionError;
|
||||
use query::exec::IOxExecutionContext;
|
||||
use query::plan::seriesset::SeriesSetPlans;
|
||||
|
||||
|
@ -9,10 +10,21 @@ use query::plan::seriesset::SeriesSetPlans;
|
|||
/// items are returned.
|
||||
#[cfg(test)]
|
||||
pub async fn run_series_set_plan(ctx: &IOxExecutionContext, plans: SeriesSetPlans) -> Vec<String> {
|
||||
ctx.to_series_and_groups(plans)
|
||||
run_series_set_plan_maybe_error(ctx, plans)
|
||||
.await
|
||||
.expect("running plans")
|
||||
}
|
||||
|
||||
/// Run a series set plan to completion and produce a Result<Vec<String>> representation
|
||||
#[cfg(test)]
|
||||
pub async fn run_series_set_plan_maybe_error(
|
||||
ctx: &IOxExecutionContext,
|
||||
plans: SeriesSetPlans,
|
||||
) -> Result<Vec<String>, DataFusionError> {
|
||||
Ok(ctx
|
||||
.to_series_and_groups(plans)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|series_or_group| series_or_group.to_string())
|
||||
.collect()
|
||||
.collect())
|
||||
}
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
|
||||
use metric::{Metric, U64Histogram, U64HistogramOptions};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
use super::DmlHandler;
|
||||
|
||||
/// An instrumentation decorator recording call latencies for [`DmlHandler`]
|
||||
/// implementations.
|
||||
///
|
||||
/// Metrics are broken down by operation (write/delete) and result
|
||||
/// (success/error) with call latency reported in milliseconds.
|
||||
///
|
||||
/// # Chained / Nested Handlers
|
||||
///
|
||||
/// Because [`DmlHandler`] implementations are constructed as a chain of
|
||||
/// decorators to build up a full request handling pipeline, the reported call
|
||||
/// latency of a given handler is a cumulative measure of the execution time for
|
||||
/// handler and all of its children.
|
||||
#[derive(Debug)]
|
||||
pub struct InstrumentationDecorator<T, P = SystemProvider> {
|
||||
inner: T,
|
||||
time_provider: P,
|
||||
|
||||
write_success: U64Histogram,
|
||||
write_error: U64Histogram,
|
||||
|
||||
delete_success: U64Histogram,
|
||||
delete_error: U64Histogram,
|
||||
}
|
||||
|
||||
impl<T> InstrumentationDecorator<T> {
|
||||
/// Wrap a new [`InstrumentationDecorator`] over `T` exposing metrics
|
||||
/// labelled with `handler=name`.
|
||||
pub fn new(name: &'static str, registry: Arc<metric::Registry>, inner: T) -> Self {
|
||||
let buckets = || {
|
||||
U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX])
|
||||
};
|
||||
|
||||
let write: Metric<U64Histogram> = registry.register_metric_with_options(
|
||||
"dml_handler_write_duration_ms",
|
||||
"write handler call duration in milliseconds",
|
||||
buckets,
|
||||
);
|
||||
let delete: Metric<U64Histogram> = registry.register_metric_with_options(
|
||||
"dml_handler_delete_duration_ms",
|
||||
"delete handler call duration in milliseconds",
|
||||
buckets,
|
||||
);
|
||||
|
||||
let write_success = write.recorder(&[("handler", name), ("result", "success")]);
|
||||
let write_error = write.recorder(&[("handler", name), ("result", "error")]);
|
||||
|
||||
let delete_success = delete.recorder(&[("handler", name), ("result", "success")]);
|
||||
let delete_error = delete.recorder(&[("handler", name), ("result", "error")]);
|
||||
|
||||
Self {
|
||||
inner,
|
||||
time_provider: Default::default(),
|
||||
write_success,
|
||||
write_error,
|
||||
delete_success,
|
||||
delete_error,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T> DmlHandler for InstrumentationDecorator<T>
|
||||
where
|
||||
T: DmlHandler,
|
||||
{
|
||||
type WriteInput = T::WriteInput;
|
||||
type WriteError = T::WriteError;
|
||||
type DeleteError = T::DeleteError;
|
||||
|
||||
/// Call the inner `write` method and record the call latency.
|
||||
async fn write(
|
||||
&self,
|
||||
namespace: DatabaseName<'static>,
|
||||
input: Self::WriteInput,
|
||||
span_ctx: Option<SpanContext>,
|
||||
) -> Result<(), Self::WriteError> {
|
||||
let t = self.time_provider.now();
|
||||
let res = self.inner.write(namespace, input, span_ctx).await;
|
||||
|
||||
// Avoid exploding if time goes backwards - simply drop the measurement
|
||||
// if it happens.
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
|
||||
match &res {
|
||||
Ok(_) => self.write_success.record(delta.as_millis() as _),
|
||||
Err(_) => self.write_error.record(delta.as_millis() as _),
|
||||
};
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Call the inner `delete` method and record the call latency.
|
||||
async fn delete<'a>(
|
||||
&self,
|
||||
namespace: DatabaseName<'static>,
|
||||
table_name: impl Into<String> + Send + Sync + 'a,
|
||||
predicate: DeletePredicate,
|
||||
span_ctx: Option<SpanContext>,
|
||||
) -> Result<(), Self::DeleteError> {
|
||||
let t = self.time_provider.now();
|
||||
let res = self
|
||||
.inner
|
||||
.delete(namespace, table_name, predicate, span_ctx)
|
||||
.await;
|
||||
|
||||
// Avoid exploding if time goes backwards - simply drop the measurement
|
||||
// if it happens.
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
|
||||
match &res {
|
||||
Ok(_) => self.delete_success.record(delta.as_millis() as _),
|
||||
Err(_) => self.delete_error.record(delta.as_millis() as _),
|
||||
};
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
|
@ -90,5 +90,8 @@ pub use ns_autocreation::*;
|
|||
mod partitioner;
|
||||
pub use partitioner::*;
|
||||
|
||||
mod instrumentation;
|
||||
pub use instrumentation::*;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod mock;
|
||||
|
|
|
@ -12,7 +12,7 @@ use observability_deps::tracing::*;
|
|||
use thiserror::Error;
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
use crate::namespace_cache::{MemoryNamespaceCache, NamespaceCache};
|
||||
use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache};
|
||||
|
||||
use super::{DmlError, DmlHandler, Partitioned};
|
||||
|
||||
|
@ -82,7 +82,7 @@ pub enum SchemaError {
|
|||
///
|
||||
/// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573
|
||||
#[derive(Debug)]
|
||||
pub struct SchemaValidator<D, C = Arc<MemoryNamespaceCache>> {
|
||||
pub struct SchemaValidator<D, C = Arc<InstrumentedCache<MemoryNamespaceCache>>> {
|
||||
inner: D,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
|
||||
|
@ -266,7 +266,10 @@ mod tests {
|
|||
catalog
|
||||
}
|
||||
|
||||
fn assert_cache<D>(handler: &SchemaValidator<D>, table: &str, col: &str, want: ColumnType) {
|
||||
fn assert_cache<D, C>(handler: &SchemaValidator<D, C>, table: &str, col: &str, want: ColumnType)
|
||||
where
|
||||
C: NamespaceCache,
|
||||
{
|
||||
// The cache should be populated.
|
||||
let ns = handler
|
||||
.cache
|
||||
|
|
|
@ -6,6 +6,8 @@ pub use memory::*;
|
|||
mod sharded_cache;
|
||||
pub use sharded_cache::*;
|
||||
|
||||
pub mod metrics;
|
||||
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use data_types::DatabaseName;
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
//! Metric instrumentation for a [`NamespaceCache`] implementation.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::DatabaseName;
|
||||
use iox_catalog::interface::NamespaceSchema;
|
||||
use metric::{Metric, U64Counter};
|
||||
|
||||
use super::NamespaceCache;
|
||||
|
||||
/// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read
|
||||
/// hit/miss and cache put insert/update metrics.
|
||||
#[derive(Debug)]
|
||||
pub struct InstrumentedCache<T> {
|
||||
inner: T,
|
||||
|
||||
/// A cache read hit
|
||||
get_hit_counter: U64Counter,
|
||||
/// A cache read miss
|
||||
get_miss_counter: U64Counter,
|
||||
|
||||
/// A cache put for a namespace that did not previously exist.
|
||||
put_insert_counter: U64Counter,
|
||||
/// A cache put replacing a namespace that previously had a cache entry.
|
||||
put_update_counter: U64Counter,
|
||||
}
|
||||
|
||||
impl<T> InstrumentedCache<T> {
|
||||
/// Instrument `T`, recording cache operations to `registry`.
|
||||
pub fn new(inner: T, registry: Arc<metric::Registry>) -> Self {
|
||||
let get_counter: Metric<U64Counter> =
|
||||
registry.register_metric("namespace_cache_get_count", "cache read requests");
|
||||
let get_hit_counter = get_counter.recorder(&[("result", "hit")]);
|
||||
let get_miss_counter = get_counter.recorder(&[("result", "miss")]);
|
||||
|
||||
let put_counter: Metric<U64Counter> =
|
||||
registry.register_metric("namespace_cache_put_count", "cache put requests");
|
||||
let put_insert_counter = put_counter.recorder(&[("op", "insert")]);
|
||||
let put_update_counter = put_counter.recorder(&[("op", "update")]);
|
||||
|
||||
Self {
|
||||
inner,
|
||||
get_hit_counter,
|
||||
get_miss_counter,
|
||||
put_insert_counter,
|
||||
put_update_counter,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> NamespaceCache for Arc<InstrumentedCache<T>>
|
||||
where
|
||||
T: NamespaceCache,
|
||||
{
|
||||
fn get_schema(&self, namespace: &DatabaseName<'_>) -> Option<Arc<NamespaceSchema>> {
|
||||
match self.inner.get_schema(namespace) {
|
||||
Some(v) => {
|
||||
self.get_hit_counter.inc(1);
|
||||
Some(v)
|
||||
}
|
||||
None => {
|
||||
self.get_miss_counter.inc(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn put_schema(
|
||||
&self,
|
||||
namespace: DatabaseName<'static>,
|
||||
schema: impl Into<Arc<NamespaceSchema>>,
|
||||
) -> Option<Arc<NamespaceSchema>> {
|
||||
match self.inner.put_schema(namespace, schema) {
|
||||
Some(v) => {
|
||||
self.put_update_counter.inc(1);
|
||||
Some(v)
|
||||
}
|
||||
None => {
|
||||
self.put_insert_counter.inc(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue