diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 7f57af82f7..6cc32e5e8c 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -247,16 +247,11 @@ impl TableProvider for ChunkTableProvider { // This debug shows the self.arrow_schema() includes all columns in all chunks // which means the schema of all chunks are merged before invoking this scan debug!(schema=?self.arrow_schema(), "All chunks schema"); - // However, the schema of each chunk is still in its original form which does not - // include the merged columns of other chunks. The code below (put in comments on purpose) proves it - // for chunk in chunks.clone() { - // trace!("Schema of chunk {}: {:#?}", chunk.id(), chunk.schema()); - // } // Note that `filters` don't actually need to be evaluated in // the scan for the plans to be correct, they are an extra // optimization for providers which can offer them - let predicate = Predicate::default().with_pushdown_exprs(filters); + let predicate = Predicate::default().with_exprs(filters.to_vec()); let deduplicate = Deduplicater::new(self.ctx.child_ctx("deduplicator")) .enable_deduplication(self.deduplication()); diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index 1bf4b8cb2a..3c8ffa98ef 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -27,7 +27,7 @@ use datafusion::{ binary_expr, expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, utils::expr_to_columns, - BinaryExpr, Operator, + BinaryExpr, }, optimizer::utils::split_conjunction, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, @@ -443,9 +443,8 @@ impl Predicate { } /// Adds an expression to the list of general purpose predicates - pub fn with_expr(mut self, expr: Expr) -> Self { - self.exprs.push(expr); - self + pub fn with_expr(self, expr: Expr) -> Self { + self.with_exprs([expr]) } /// Adds a ValueExpr to the list of value expressons @@ -489,6 +488,12 @@ impl Predicate { self } + /// Adds all expressions to the list of general purpose predicates + pub fn with_exprs(mut self, filters: impl IntoIterator) -> Self { + self.exprs.extend(filters.into_iter()); + self + } + /// Remove any clauses of this predicate that can not be run before deduplication. /// /// See for more details. @@ -536,60 +541,6 @@ impl Predicate { value_expr: vec![], } } - - /// Adds only the expressions from `filters` that can be pushed down to - /// execution engines. - pub fn with_pushdown_exprs(mut self, filters: &[Expr]) -> Self { - // For each expression of the filters, recursively split it, if it is is an AND conjunction - // For example, expression (x AND y) will be split into a vector of 2 expressions [x, y] - let mut exprs = filters.iter().flat_map(split_conjunction); - - // Only keep single_column and primitive binary expressions - let mut pushdown_exprs: Vec = vec![]; - let exprs_result = exprs.try_for_each::<_, Result<_, DataFusionError>>(|expr| { - let mut columns = HashSet::new(); - expr_to_columns(expr, &mut columns)?; - - if columns.len() == 1 && Self::primitive_binary_expr(expr) { - pushdown_exprs.push(expr.clone()); - } - Ok(()) - }); - - match exprs_result { - Ok(()) => { - // Return the builder with only the pushdownable expressions on it. - self.exprs.append(&mut pushdown_exprs); - } - Err(e) => { - debug!("Error, {}, building push-down predicates for filters: {:#?}. No predicates are pushed down", e, filters); - } - } - - self - } - - /// Return true if the given expression is in a primitive binary in the form: `column op constant` - // and op must be a comparison one - pub fn primitive_binary_expr(expr: &Expr) -> bool { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - matches!( - (&**left, &**right), - (Expr::Column(_), Expr::Literal(_)) | (Expr::Literal(_), Expr::Column(_)) - ) && matches!( - op, - Operator::Eq - | Operator::NotEq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - ) - } - _ => false, - } - } } // Wrapper around `Expr::BinaryExpr` where left input is known to be @@ -719,94 +670,6 @@ mod tests { assert!(!p.is_empty()); } - #[test] - fn test_pushdown_predicates() { - let mut filters = vec![]; - - // state = CA - let expr1 = col("state").eq(lit("CA")); - filters.push(expr1); - - // "price > 10" - let expr2 = col("price").gt(lit(10)); - filters.push(expr2); - - // a < 10 AND b >= 50 --> will be split to [a < 10, b >= 50] - let expr3 = col("a").lt(lit(10)).and(col("b").gt_eq(lit(50))); - filters.push(expr3); - - // c != 3 OR d = 8 --> won't be pushed down - let expr4 = col("c").not_eq(lit(3)).or(col("d").eq(lit(8))); - filters.push(expr4); - - // e is null --> won't be pushed down - let expr5 = col("e").is_null(); - filters.push(expr5); - - // f <= 60 - let expr6 = col("f").lt_eq(lit(60)); - filters.push(expr6); - - // g is not null --> won't be pushed down - let expr7 = col("g").is_not_null(); - filters.push(expr7); - - // h + i --> won't be pushed down - let expr8 = col("h") + col("i"); - filters.push(expr8); - - // city = Boston - let expr9 = col("city").eq(lit("Boston")); - filters.push(expr9); - - // city != Braintree - let expr9 = col("city").not_eq(lit("Braintree")); - filters.push(expr9); - - // city != state --> won't be pushed down - let expr10 = col("city").not_eq(col("state")); - filters.push(expr10); - - // city = state --> won't be pushed down - let expr11 = col("city").eq(col("state")); - filters.push(expr11); - - // city_state = city + state --> won't be pushed down - let expr12 = col("city_sate").eq(col("city") + col("state")); - filters.push(expr12); - - // city = city + 5 --> won't be pushed down - let expr13 = col("city").eq(col("city") + lit(5)); - filters.push(expr13); - - // city = city --> won't be pushed down - let expr14 = col("city").eq(col("city")); - filters.push(expr14); - - // city + 5 = city --> won't be pushed down - let expr15 = (col("city") + lit(5)).eq(col("city")); - filters.push(expr15); - - // 5 = city - let expr16 = lit(5).eq(col("city")); - filters.push(expr16); - - println!(" --------------- Filters: {:#?}", filters); - - // Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city] - let predicate = Predicate::default().with_pushdown_exprs(&filters); - - println!(" ------------- Predicates: {:#?}", predicate); - assert_eq!(predicate.exprs.len(), 8); - assert_eq!(predicate.exprs[0], col("state").eq(lit("CA"))); - assert_eq!(predicate.exprs[1], col("price").gt(lit(10))); - assert_eq!(predicate.exprs[2], col("a").lt(lit(10))); - assert_eq!(predicate.exprs[3], col("b").gt_eq(lit(50))); - assert_eq!(predicate.exprs[4], col("f").lt_eq(lit(60))); - assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston"))); - assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree"))); - assert_eq!(predicate.exprs[7], lit(5).eq(col("city"))); - } #[test] fn predicate_display_ts() { // TODO make this a doc example? diff --git a/query_tests/cases/in/pushdown.expected b/query_tests/cases/in/pushdown.expected index 8347c216aa..df86ad5a0a 100644 --- a/query_tests/cases/in/pushdown.expected +++ b/query_tests/cases/in/pushdown.expected @@ -37,34 +37,34 @@ +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200; -- Results After Normalizing UUIDs -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Int64) > Int64(200) | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200)] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(count@0 AS Int64) > 200 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[count, system, time, town] | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Int64) > Int64(200) | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200)] | +| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(count@0 AS Int64) > 200 | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true, projection=[count, system, time, town] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200.0; -- Results After Normalizing UUIDs -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Float64) > Float64(200) | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Float64) > Float64(200)] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(count@0 AS Float64) > 200 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[count, system, time, town] | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Float64) > Float64(200) | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Float64) > Float64(200)] | +| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(count@0 AS Float64) > 200 | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true, projection=[count, system, time, town] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: EXPLAIN SELECT * from restaurant where system > 4.0; -- Results After Normalizing UUIDs +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -93,19 +93,19 @@ +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury'; -- Results After Normalizing UUIDs -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury"))] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury"))] | +| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence'); -- Results After Sorting +-------+--------+--------------------------------+-----------+ @@ -118,19 +118,19 @@ +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence'); -- Results After Normalizing UUIDs -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))] | +| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND town_min@0 != tewsbury OR tewsbury != town_max@1 AND system_min@2 <= 5 AND 5 <= system_max@3 OR town_min@0 <= lawrence AND lawrence <= town_max@1, projection=[count, system, time, town] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000; -- Results After Sorting +-------+--------+--------------------------------+-----------+ @@ -154,7 +154,7 @@ | | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND town@4 != tewsbury AND system@2 = 5 OR town@4 = lawrence AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 | | | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND town_min@0 != tewsbury OR tewsbury != town_max@1 AND system_min@2 <= 5 AND 5 <= system_max@3 OR town_min@0 <= lawrence AND lawrence <= town_max@1 AND true, projection=[count, system, time, town] | | | | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where count > 200 and count < 40000; @@ -182,7 +182,7 @@ | | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 | | | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[count, system, time, town] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND true, projection=[count, system, time, town] | | | | +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where system > 4.0 and system < 7.0; @@ -278,7 +278,7 @@ | | CoalesceBatchesExec: target_batch_size=4096 | | | FilterExec: system@1 > 5 AND tewsbury != town@3 AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR town@3 = reading | | | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7, projection=[count, system, time, town] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7 AND true OR town_min@1 <= reading AND reading <= town_max@2, projection=[count, system, time, town] | | | | +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00'); diff --git a/router/src/dml_handlers/mod.rs b/router/src/dml_handlers/mod.rs index 839ec87b12..04d8f5ee78 100644 --- a/router/src/dml_handlers/mod.rs +++ b/router/src/dml_handlers/mod.rs @@ -83,6 +83,9 @@ pub use chain::*; mod fan_out; pub use fan_out::*; +mod rpc_write; +pub use rpc_write::*; + mod write_summary; pub use self::write_summary::*; diff --git a/router/src/dml_handlers/partitioner.rs b/router/src/dml_handlers/partitioner.rs index f1ecf1f8d0..c6aeea61a8 100644 --- a/router/src/dml_handlers/partitioner.rs +++ b/router/src/dml_handlers/partitioner.rs @@ -126,7 +126,7 @@ mod tests { use super::*; // Parse `lp` into a table-keyed MutableBatch map. - fn lp_to_writes(lp: &str) -> HashMap { + pub(crate) fn lp_to_writes(lp: &str) -> HashMap { let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) .expect("failed to build test writes from LP"); diff --git a/router/src/dml_handlers/rpc_write.rs b/router/src/dml_handlers/rpc_write.rs new file mode 100644 index 0000000000..ab2c7a679d --- /dev/null +++ b/router/src/dml_handlers/rpc_write.rs @@ -0,0 +1,291 @@ +mod client; + +use std::{fmt::Debug, time::Duration}; + +use async_trait::async_trait; +use data_types::{DeletePredicate, NamespaceId, NamespaceName, TableId}; +use dml::{DmlMeta, DmlWrite}; +use generated_types::influxdata::iox::ingester::v1::{ + write_service_client::WriteServiceClient, WriteRequest, +}; +use hashbrown::HashMap; +use mutable_batch::MutableBatch; +use mutable_batch_pb::encode::encode_write; +use observability_deps::tracing::*; +use sharder::RoundRobin; +use thiserror::Error; +use tonic::transport::Channel; +use trace::ctx::SpanContext; + +use super::{DmlHandler, Partitioned}; + +/// The bound on RPC request duration. +/// +/// This includes the time taken to send the request, and wait for the response. +pub const RPC_TIMEOUT: Duration = Duration::from_secs(5); + +/// Errors experienced when submitting an RPC write request to an Ingester. +#[derive(Debug, Error)] +pub enum RpcWriteError { + /// The upstream ingester returned an error response. + #[error("upstream ingester error: {0}")] + Upstream(#[from] tonic::Status), + + /// The RPC call timed out after [`RPC_TIMEOUT`] length of time. + #[error("timeout writing to upstream ingester")] + Timeout(#[from] tokio::time::error::Elapsed), + + /// A delete request was rejected (not supported). + #[error("deletes are not supported")] + DeletesUnsupported, +} + +/// A convenience alias for the generated gRPC client. +type GrpcClient = WriteServiceClient; + +/// An [`RpcWrite`] handler submits a write directly to an Ingester via the +/// [gRPC write service]. +/// +/// Requests are sent to an arbitrary downstream Ingester, and request load is +/// distributed approximately uniformly across all downstream Ingesters. There +/// is no effort made to enforce or attempt data locality. +/// +/// # Deletes +/// +/// This handler drops delete requests, logging the attempt and returning an +/// error to the client. +/// +/// [gRPC write service]: WriteServiceClient +#[derive(Debug)] +pub struct RpcWrite { + endpoints: RoundRobin, +} + +impl RpcWrite { + /// Initialise a new [`RpcWrite`] that sends requests to an arbitrary + /// downstream Ingester, using a round-robin strategy. + pub fn new(endpoints: RoundRobin) -> Self { + Self { endpoints } + } +} + +#[async_trait] +impl DmlHandler for RpcWrite +where + C: client::WriteClient, +{ + type WriteInput = Partitioned>; + type WriteOutput = (); + + type WriteError = RpcWriteError; + type DeleteError = RpcWriteError; + + async fn write( + &self, + namespace: &NamespaceName<'static>, + namespace_id: NamespaceId, + writes: Self::WriteInput, + span_ctx: Option, + ) -> Result { + // Extract the partition key & DML writes. + let (partition_key, writes) = writes.into_parts(); + + // Drop the table names from the value tuple. + let writes = writes + .into_iter() + .map(|(id, (_name, data))| (id, data)) + .collect(); + + // Build the DmlWrite + let op = DmlWrite::new( + namespace_id, + writes, + partition_key.clone(), + DmlMeta::unsequenced(span_ctx.clone()), + ); + + // Serialise this write into the wire format. + let req = WriteRequest { + payload: Some(encode_write(namespace_id.get(), &op)), + }; + + // Perform the gRPC write to an ingester. + // + // This includes a dirt simple retry mechanism that WILL need improving + // (#6173). + tokio::time::timeout(RPC_TIMEOUT, async { + loop { + match self.endpoints.next().write(req.clone()).await { + Ok(()) => break, + Err(e) => warn!(error=%e, "failed ingester rpc write"), + }; + } + }) + .await?; + + debug!( + %partition_key, + table_count=op.table_count(), + %namespace, + %namespace_id, + approx_size=%op.size(), + "dispatched write to ingester" + ); + + Ok(()) + } + + async fn delete( + &self, + namespace: &NamespaceName<'static>, + namespace_id: NamespaceId, + table_name: &str, + _predicate: &DeletePredicate, + _span_ctx: Option, + ) -> Result<(), RpcWriteError> { + warn!( + %namespace, + %namespace_id, + %table_name, + "dropping delete request" + ); + + Err(RpcWriteError::DeletesUnsupported) + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, sync::Arc}; + + use assert_matches::assert_matches; + use data_types::PartitionKey; + + use super::{client::mock::MockWriteClient, *}; + + // Parse `lp` into a table-keyed MutableBatch map. + pub(crate) fn lp_to_writes(lp: &str) -> HashMap { + let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) + .expect("failed to build test writes from LP"); + + writes + .into_iter() + .enumerate() + .map(|(i, (name, data))| (TableId::new(i as _), (name, data))) + .collect() + } + + const NAMESPACE_NAME: &str = "bananas"; + const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + + #[tokio::test] + async fn test_write() { + let batches = lp_to_writes( + "\ + bananas,tag1=A,tag2=B val=42i 1\n\ + platanos,tag1=A,tag2=B value=42i 2\n\ + another,tag1=A,tag2=B value=42i 3\n\ + bananas,tag1=A,tag2=B val=42i 2\n\ + table,tag1=A,tag2=B val=42i 1\n\ + ", + ); + + // Wrap the table batches in a partition key + let input = Partitioned::new(PartitionKey::from("2022-01-01"), batches.clone()); + + // Init the write handler with a mock client to capture the rpc calls. + let client = Arc::new(MockWriteClient::default()); + let handler = RpcWrite::new(RoundRobin::new([Arc::clone(&client)])); + + // Drive the RPC writer + let got = handler + .write( + &NamespaceName::new(NAMESPACE_NAME).unwrap(), + NAMESPACE_ID, + input, + None, + ) + .await; + assert_matches!(got, Ok(())); + + // Inspect the resulting RPC call + let call = { + let mut calls = client.calls(); + assert_eq!(calls.len(), 1); + calls.pop().unwrap() + }; + + let payload = assert_matches!(call.payload, Some(p) => p); + assert_eq!(payload.database_id, NAMESPACE_ID.get()); + assert_eq!(payload.partition_key, "2022-01-01"); + assert_eq!(payload.table_batches.len(), 4); + + let got_tables = payload + .table_batches + .into_iter() + .map(|t| t.table_id) + .collect::>(); + + let want_tables = batches + .into_iter() + .map(|(id, (_name, _data))| id.get()) + .collect::>(); + + assert_eq!(got_tables, want_tables); + } + + #[tokio::test] + async fn test_write_retries() { + let batches = lp_to_writes("bananas,tag1=A,tag2=B val=42i 1"); + + // Wrap the table batches in a partition key + let input = Partitioned::new(PartitionKey::from("2022-01-01"), batches.clone()); + + // Init the write handler with a mock client to capture the rpc calls. + let client1 = Arc::new( + MockWriteClient::default() + .with_ret([Err(RpcWriteError::Upstream(tonic::Status::internal("")))]), + ); + let client2 = Arc::new(MockWriteClient::default()); + let handler = RpcWrite::new(RoundRobin::new([ + Arc::clone(&client1), + Arc::clone(&client2), + ])); + + // Drive the RPC writer + let got = handler + .write( + &NamespaceName::new(NAMESPACE_NAME).unwrap(), + NAMESPACE_ID, + input, + None, + ) + .await; + assert_matches!(got, Ok(())); + + // Ensure client 2 observed a write. + let call = { + let mut calls = client2.calls(); + assert_eq!(calls.len(), 1); + calls.pop().unwrap() + }; + + let payload = assert_matches!(call.payload, Some(p) => p); + assert_eq!(payload.database_id, NAMESPACE_ID.get()); + assert_eq!(payload.partition_key, "2022-01-01"); + assert_eq!(payload.table_batches.len(), 1); + + let got_tables = payload + .table_batches + .into_iter() + .map(|t| t.table_id) + .collect::>(); + + let want_tables = batches + .into_iter() + .map(|(id, (_name, _data))| id.get()) + .collect::>(); + + assert_eq!(got_tables, want_tables); + } +} diff --git a/router/src/dml_handlers/rpc_write/client.rs b/router/src/dml_handlers/rpc_write/client.rs new file mode 100644 index 0000000000..07736955c9 --- /dev/null +++ b/router/src/dml_handlers/rpc_write/client.rs @@ -0,0 +1,64 @@ +use async_trait::async_trait; +use generated_types::influxdata::iox::ingester::v1::{ + write_service_client::WriteServiceClient, WriteRequest, +}; +use tonic::transport::Channel; + +use super::RpcWriteError; + +/// An abstract RPC client that pushes `op` to an opaque receiver. +#[async_trait] +pub(super) trait WriteClient: Send + Sync + std::fmt::Debug { + /// Write `op` and wait for a response. + async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError>; +} + +/// An implementation of [`WriteClient`] for the tonic gRPC client. +#[async_trait] +impl WriteClient for WriteServiceClient { + async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> { + WriteServiceClient::write(&mut self.clone(), op).await?; + Ok(()) + } +} + +#[cfg(test)] +pub(crate) mod mock { + use std::{collections::VecDeque, sync::Arc}; + + use parking_lot::Mutex; + + use super::*; + + #[derive(Debug, Default)] + struct State { + calls: Vec, + ret: VecDeque>, + } + + /// A mock implementation of the [`WriteClient`] for testing purposes. + #[derive(Debug, Default)] + pub(crate) struct MockWriteClient { + state: Mutex, + } + + impl MockWriteClient { + pub(crate) fn calls(&self) -> Vec { + self.state.lock().calls.clone() + } + + pub(crate) fn with_ret(self, ret: impl Into>>) -> Self { + self.state.lock().ret = ret.into(); + self + } + } + + #[async_trait] + impl WriteClient for Arc { + async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> { + let mut guard = self.state.lock(); + guard.calls.push(op); + guard.ret.pop_front().unwrap_or(Ok(())) + } + } +} diff --git a/router/src/dml_handlers/trait.rs b/router/src/dml_handlers/trait.rs index 5d4bb152c6..78ae9e4a5b 100644 --- a/router/src/dml_handlers/trait.rs +++ b/router/src/dml_handlers/trait.rs @@ -6,7 +6,8 @@ use thiserror::Error; use trace::ctx::SpanContext; use super::{ - partitioner::PartitionError, retention_validator::RetentionError, SchemaError, ShardError, + partitioner::PartitionError, retention_validator::RetentionError, RpcWriteError, SchemaError, + ShardError, }; /// Errors emitted by a [`DmlHandler`] implementation during DML request @@ -21,6 +22,11 @@ pub enum DmlError { #[error(transparent)] WriteBuffer(#[from] ShardError), + /// An error pushing the request to a downstream ingester via a direct RPC + /// call. + #[error(transparent)] + RpcWrite(#[from] RpcWriteError), + /// A schema validation failure. #[error(transparent)] Schema(#[from] SchemaError), diff --git a/router/src/server/http.rs b/router/src/server/http.rs index f6ee9f2b80..735b43a7db 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -22,7 +22,9 @@ use write_summary::WriteSummary; use self::delete_predicate::parse_http_delete_request; use crate::{ - dml_handlers::{DmlError, DmlHandler, PartitionError, RetentionError, SchemaError}, + dml_handlers::{ + DmlError, DmlHandler, PartitionError, RetentionError, RpcWriteError, SchemaError, + }, namespace_resolver::NamespaceResolver, }; @@ -144,6 +146,9 @@ impl From<&DmlError> for StatusCode { StatusCode::INTERNAL_SERVER_ERROR } DmlError::Retention(RetentionError::OutsideRetention(_)) => StatusCode::FORBIDDEN, + DmlError::RpcWrite(RpcWriteError::Upstream(_)) => StatusCode::INTERNAL_SERVER_ERROR, + DmlError::RpcWrite(RpcWriteError::DeletesUnsupported) => StatusCode::NOT_IMPLEMENTED, + DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT, } } }