Merge branch 'main' into dom/ingester-rpc-write

pull/24376/head
Dom 2022-11-18 16:34:36 +00:00 committed by GitHub
commit 7103ecd733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 437 additions and 210 deletions

View File

@ -247,16 +247,11 @@ impl TableProvider for ChunkTableProvider {
// This debug shows the self.arrow_schema() includes all columns in all chunks
// which means the schema of all chunks are merged before invoking this scan
debug!(schema=?self.arrow_schema(), "All chunks schema");
// However, the schema of each chunk is still in its original form which does not
// include the merged columns of other chunks. The code below (put in comments on purpose) proves it
// for chunk in chunks.clone() {
// trace!("Schema of chunk {}: {:#?}", chunk.id(), chunk.schema());
// }
// Note that `filters` don't actually need to be evaluated in
// the scan for the plans to be correct, they are an extra
// optimization for providers which can offer them
let predicate = Predicate::default().with_pushdown_exprs(filters);
let predicate = Predicate::default().with_exprs(filters.to_vec());
let deduplicate = Deduplicater::new(self.ctx.child_ctx("deduplicator"))
.enable_deduplication(self.deduplication());

View File

@ -27,7 +27,7 @@ use datafusion::{
binary_expr,
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
utils::expr_to_columns,
BinaryExpr, Operator,
BinaryExpr,
},
optimizer::utils::split_conjunction,
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
@ -443,9 +443,8 @@ impl Predicate {
}
/// Adds an expression to the list of general purpose predicates
pub fn with_expr(mut self, expr: Expr) -> Self {
self.exprs.push(expr);
self
pub fn with_expr(self, expr: Expr) -> Self {
self.with_exprs([expr])
}
/// Adds a ValueExpr to the list of value expressons
@ -489,6 +488,12 @@ impl Predicate {
self
}
/// Adds all expressions to the list of general purpose predicates
pub fn with_exprs(mut self, filters: impl IntoIterator<Item = Expr>) -> Self {
self.exprs.extend(filters.into_iter());
self
}
/// Remove any clauses of this predicate that can not be run before deduplication.
///
/// See <https://github.com/influxdata/influxdb_iox/issues/6066> for more details.
@ -536,60 +541,6 @@ impl Predicate {
value_expr: vec![],
}
}
/// Adds only the expressions from `filters` that can be pushed down to
/// execution engines.
pub fn with_pushdown_exprs(mut self, filters: &[Expr]) -> Self {
// For each expression of the filters, recursively split it, if it is is an AND conjunction
// For example, expression (x AND y) will be split into a vector of 2 expressions [x, y]
let mut exprs = filters.iter().flat_map(split_conjunction);
// Only keep single_column and primitive binary expressions
let mut pushdown_exprs: Vec<Expr> = vec![];
let exprs_result = exprs.try_for_each::<_, Result<_, DataFusionError>>(|expr| {
let mut columns = HashSet::new();
expr_to_columns(expr, &mut columns)?;
if columns.len() == 1 && Self::primitive_binary_expr(expr) {
pushdown_exprs.push(expr.clone());
}
Ok(())
});
match exprs_result {
Ok(()) => {
// Return the builder with only the pushdownable expressions on it.
self.exprs.append(&mut pushdown_exprs);
}
Err(e) => {
debug!("Error, {}, building push-down predicates for filters: {:#?}. No predicates are pushed down", e, filters);
}
}
self
}
/// Return true if the given expression is in a primitive binary in the form: `column op constant`
// and op must be a comparison one
pub fn primitive_binary_expr(expr: &Expr) -> bool {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
matches!(
(&**left, &**right),
(Expr::Column(_), Expr::Literal(_)) | (Expr::Literal(_), Expr::Column(_))
) && matches!(
op,
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
)
}
_ => false,
}
}
}
// Wrapper around `Expr::BinaryExpr` where left input is known to be
@ -719,94 +670,6 @@ mod tests {
assert!(!p.is_empty());
}
#[test]
fn test_pushdown_predicates() {
let mut filters = vec![];
// state = CA
let expr1 = col("state").eq(lit("CA"));
filters.push(expr1);
// "price > 10"
let expr2 = col("price").gt(lit(10));
filters.push(expr2);
// a < 10 AND b >= 50 --> will be split to [a < 10, b >= 50]
let expr3 = col("a").lt(lit(10)).and(col("b").gt_eq(lit(50)));
filters.push(expr3);
// c != 3 OR d = 8 --> won't be pushed down
let expr4 = col("c").not_eq(lit(3)).or(col("d").eq(lit(8)));
filters.push(expr4);
// e is null --> won't be pushed down
let expr5 = col("e").is_null();
filters.push(expr5);
// f <= 60
let expr6 = col("f").lt_eq(lit(60));
filters.push(expr6);
// g is not null --> won't be pushed down
let expr7 = col("g").is_not_null();
filters.push(expr7);
// h + i --> won't be pushed down
let expr8 = col("h") + col("i");
filters.push(expr8);
// city = Boston
let expr9 = col("city").eq(lit("Boston"));
filters.push(expr9);
// city != Braintree
let expr9 = col("city").not_eq(lit("Braintree"));
filters.push(expr9);
// city != state --> won't be pushed down
let expr10 = col("city").not_eq(col("state"));
filters.push(expr10);
// city = state --> won't be pushed down
let expr11 = col("city").eq(col("state"));
filters.push(expr11);
// city_state = city + state --> won't be pushed down
let expr12 = col("city_sate").eq(col("city") + col("state"));
filters.push(expr12);
// city = city + 5 --> won't be pushed down
let expr13 = col("city").eq(col("city") + lit(5));
filters.push(expr13);
// city = city --> won't be pushed down
let expr14 = col("city").eq(col("city"));
filters.push(expr14);
// city + 5 = city --> won't be pushed down
let expr15 = (col("city") + lit(5)).eq(col("city"));
filters.push(expr15);
// 5 = city
let expr16 = lit(5).eq(col("city"));
filters.push(expr16);
println!(" --------------- Filters: {:#?}", filters);
// Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city]
let predicate = Predicate::default().with_pushdown_exprs(&filters);
println!(" ------------- Predicates: {:#?}", predicate);
assert_eq!(predicate.exprs.len(), 8);
assert_eq!(predicate.exprs[0], col("state").eq(lit("CA")));
assert_eq!(predicate.exprs[1], col("price").gt(lit(10)));
assert_eq!(predicate.exprs[2], col("a").lt(lit(10)));
assert_eq!(predicate.exprs[3], col("b").gt_eq(lit(50)));
assert_eq!(predicate.exprs[4], col("f").lt_eq(lit(60)));
assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston")));
assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree")));
assert_eq!(predicate.exprs[7], lit(5).eq(col("city")));
}
#[test]
fn predicate_display_ts() {
// TODO make this a doc example?

View File

@ -37,34 +37,34 @@
+-------+--------+--------------------------------+-----------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200;
-- Results After Normalizing UUIDs
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64) > Int64(200) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Int64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[count, system, time, town] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64) > Int64(200) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Int64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true, projection=[count, system, time, town] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200.0;
-- Results After Normalizing UUIDs
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Float64) > Float64(200) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Float64) > Float64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[count, system, time, town] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Float64) > Float64(200) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Float64) > Float64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true, projection=[count, system, time, town] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 4.0;
-- Results After Normalizing UUIDs
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
@ -93,19 +93,19 @@
+-------+--------+--------------------------------+-----------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury';
-- Results After Normalizing UUIDs
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury"))] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury"))] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
-- Results After Sorting
+-------+--------+--------------------------------+-----------+
@ -118,19 +118,19 @@
+-------+--------+--------------------------------+-----------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
-- Results After Normalizing UUIDs
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND town_min@0 != tewsbury OR tewsbury != town_max@1 AND system_min@2 <= 5 AND 5 <= system_max@3 OR town_min@0 <= lawrence AND lawrence <= town_max@1, projection=[count, system, time, town] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
-- Results After Sorting
+-------+--------+--------------------------------+-----------+
@ -154,7 +154,7 @@
| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND town@4 != tewsbury AND system@2 = 5 OR town@4 = lawrence AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 |
| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=town_min@0 != tewsbury OR tewsbury != town_max@1, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND town_min@0 != tewsbury OR tewsbury != town_max@1 AND system_min@2 <= 5 AND 5 <= system_max@3 OR town_min@0 <= lawrence AND lawrence <= town_max@1 AND true, projection=[count, system, time, town] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where count > 200 and count < 40000;
@ -182,7 +182,7 @@
| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 |
| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=true AND true, projection=[count, system, time, town] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where system > 4.0 and system < 7.0;
@ -278,7 +278,7 @@
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: system@1 > 5 AND tewsbury != town@3 AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR town@3 = reading |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7 AND true OR town_min@1 <= reading AND reading <= town_max@2, projection=[count, system, time, town] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');

View File

@ -83,6 +83,9 @@ pub use chain::*;
mod fan_out;
pub use fan_out::*;
mod rpc_write;
pub use rpc_write::*;
mod write_summary;
pub use self::write_summary::*;

View File

@ -126,7 +126,7 @@ mod tests {
use super::*;
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<TableId, (String, MutableBatch)> {
pub(crate) fn lp_to_writes(lp: &str) -> HashMap<TableId, (String, MutableBatch)> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");

View File

@ -0,0 +1,291 @@
mod client;
use std::{fmt::Debug, time::Duration};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName, TableId};
use dml::{DmlMeta, DmlWrite};
use generated_types::influxdata::iox::ingester::v1::{
write_service_client::WriteServiceClient, WriteRequest,
};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use mutable_batch_pb::encode::encode_write;
use observability_deps::tracing::*;
use sharder::RoundRobin;
use thiserror::Error;
use tonic::transport::Channel;
use trace::ctx::SpanContext;
use super::{DmlHandler, Partitioned};
/// The bound on RPC request duration.
///
/// This includes the time taken to send the request, and wait for the response.
pub const RPC_TIMEOUT: Duration = Duration::from_secs(5);
/// Errors experienced when submitting an RPC write request to an Ingester.
#[derive(Debug, Error)]
pub enum RpcWriteError {
/// The upstream ingester returned an error response.
#[error("upstream ingester error: {0}")]
Upstream(#[from] tonic::Status),
/// The RPC call timed out after [`RPC_TIMEOUT`] length of time.
#[error("timeout writing to upstream ingester")]
Timeout(#[from] tokio::time::error::Elapsed),
/// A delete request was rejected (not supported).
#[error("deletes are not supported")]
DeletesUnsupported,
}
/// A convenience alias for the generated gRPC client.
type GrpcClient = WriteServiceClient<Channel>;
/// An [`RpcWrite`] handler submits a write directly to an Ingester via the
/// [gRPC write service].
///
/// Requests are sent to an arbitrary downstream Ingester, and request load is
/// distributed approximately uniformly across all downstream Ingesters. There
/// is no effort made to enforce or attempt data locality.
///
/// # Deletes
///
/// This handler drops delete requests, logging the attempt and returning an
/// error to the client.
///
/// [gRPC write service]: WriteServiceClient
#[derive(Debug)]
pub struct RpcWrite<C = GrpcClient> {
endpoints: RoundRobin<C>,
}
impl<C> RpcWrite<C> {
/// Initialise a new [`RpcWrite`] that sends requests to an arbitrary
/// downstream Ingester, using a round-robin strategy.
pub fn new(endpoints: RoundRobin<C>) -> Self {
Self { endpoints }
}
}
#[async_trait]
impl<C> DmlHandler for RpcWrite<C>
where
C: client::WriteClient,
{
type WriteInput = Partitioned<HashMap<TableId, (String, MutableBatch)>>;
type WriteOutput = ();
type WriteError = RpcWriteError;
type DeleteError = RpcWriteError;
async fn write(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
writes: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, RpcWriteError> {
// Extract the partition key & DML writes.
let (partition_key, writes) = writes.into_parts();
// Drop the table names from the value tuple.
let writes = writes
.into_iter()
.map(|(id, (_name, data))| (id, data))
.collect();
// Build the DmlWrite
let op = DmlWrite::new(
namespace_id,
writes,
partition_key.clone(),
DmlMeta::unsequenced(span_ctx.clone()),
);
// Serialise this write into the wire format.
let req = WriteRequest {
payload: Some(encode_write(namespace_id.get(), &op)),
};
// Perform the gRPC write to an ingester.
//
// This includes a dirt simple retry mechanism that WILL need improving
// (#6173).
tokio::time::timeout(RPC_TIMEOUT, async {
loop {
match self.endpoints.next().write(req.clone()).await {
Ok(()) => break,
Err(e) => warn!(error=%e, "failed ingester rpc write"),
};
}
})
.await?;
debug!(
%partition_key,
table_count=op.table_count(),
%namespace,
%namespace_id,
approx_size=%op.size(),
"dispatched write to ingester"
);
Ok(())
}
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
_predicate: &DeletePredicate,
_span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteError> {
warn!(
%namespace,
%namespace_id,
%table_name,
"dropping delete request"
);
Err(RpcWriteError::DeletesUnsupported)
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::Arc};
use assert_matches::assert_matches;
use data_types::PartitionKey;
use super::{client::mock::MockWriteClient, *};
// Parse `lp` into a table-keyed MutableBatch map.
pub(crate) fn lp_to_writes(lp: &str) -> HashMap<TableId, (String, MutableBatch)> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| (TableId::new(i as _), (name, data)))
.collect()
}
const NAMESPACE_NAME: &str = "bananas";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn test_write() {
let batches = lp_to_writes(
"\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 2\n\
another,tag1=A,tag2=B value=42i 3\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1\n\
",
);
// Wrap the table batches in a partition key
let input = Partitioned::new(PartitionKey::from("2022-01-01"), batches.clone());
// Init the write handler with a mock client to capture the rpc calls.
let client = Arc::new(MockWriteClient::default());
let handler = RpcWrite::new(RoundRobin::new([Arc::clone(&client)]));
// Drive the RPC writer
let got = handler
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
input,
None,
)
.await;
assert_matches!(got, Ok(()));
// Inspect the resulting RPC call
let call = {
let mut calls = client.calls();
assert_eq!(calls.len(), 1);
calls.pop().unwrap()
};
let payload = assert_matches!(call.payload, Some(p) => p);
assert_eq!(payload.database_id, NAMESPACE_ID.get());
assert_eq!(payload.partition_key, "2022-01-01");
assert_eq!(payload.table_batches.len(), 4);
let got_tables = payload
.table_batches
.into_iter()
.map(|t| t.table_id)
.collect::<HashSet<_>>();
let want_tables = batches
.into_iter()
.map(|(id, (_name, _data))| id.get())
.collect::<HashSet<_>>();
assert_eq!(got_tables, want_tables);
}
#[tokio::test]
async fn test_write_retries() {
let batches = lp_to_writes("bananas,tag1=A,tag2=B val=42i 1");
// Wrap the table batches in a partition key
let input = Partitioned::new(PartitionKey::from("2022-01-01"), batches.clone());
// Init the write handler with a mock client to capture the rpc calls.
let client1 = Arc::new(
MockWriteClient::default()
.with_ret([Err(RpcWriteError::Upstream(tonic::Status::internal("")))]),
);
let client2 = Arc::new(MockWriteClient::default());
let handler = RpcWrite::new(RoundRobin::new([
Arc::clone(&client1),
Arc::clone(&client2),
]));
// Drive the RPC writer
let got = handler
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
input,
None,
)
.await;
assert_matches!(got, Ok(()));
// Ensure client 2 observed a write.
let call = {
let mut calls = client2.calls();
assert_eq!(calls.len(), 1);
calls.pop().unwrap()
};
let payload = assert_matches!(call.payload, Some(p) => p);
assert_eq!(payload.database_id, NAMESPACE_ID.get());
assert_eq!(payload.partition_key, "2022-01-01");
assert_eq!(payload.table_batches.len(), 1);
let got_tables = payload
.table_batches
.into_iter()
.map(|t| t.table_id)
.collect::<HashSet<_>>();
let want_tables = batches
.into_iter()
.map(|(id, (_name, _data))| id.get())
.collect::<HashSet<_>>();
assert_eq!(got_tables, want_tables);
}
}

View File

@ -0,0 +1,64 @@
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::{
write_service_client::WriteServiceClient, WriteRequest,
};
use tonic::transport::Channel;
use super::RpcWriteError;
/// An abstract RPC client that pushes `op` to an opaque receiver.
#[async_trait]
pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
/// Write `op` and wait for a response.
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError>;
}
/// An implementation of [`WriteClient`] for the tonic gRPC client.
#[async_trait]
impl WriteClient for WriteServiceClient<Channel> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
WriteServiceClient::write(&mut self.clone(), op).await?;
Ok(())
}
}
#[cfg(test)]
pub(crate) mod mock {
use std::{collections::VecDeque, sync::Arc};
use parking_lot::Mutex;
use super::*;
#[derive(Debug, Default)]
struct State {
calls: Vec<WriteRequest>,
ret: VecDeque<Result<(), RpcWriteError>>,
}
/// A mock implementation of the [`WriteClient`] for testing purposes.
#[derive(Debug, Default)]
pub(crate) struct MockWriteClient {
state: Mutex<State>,
}
impl MockWriteClient {
pub(crate) fn calls(&self) -> Vec<WriteRequest> {
self.state.lock().calls.clone()
}
pub(crate) fn with_ret(self, ret: impl Into<VecDeque<Result<(), RpcWriteError>>>) -> Self {
self.state.lock().ret = ret.into();
self
}
}
#[async_trait]
impl WriteClient for Arc<MockWriteClient> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
let mut guard = self.state.lock();
guard.calls.push(op);
guard.ret.pop_front().unwrap_or(Ok(()))
}
}
}

View File

@ -6,7 +6,8 @@ use thiserror::Error;
use trace::ctx::SpanContext;
use super::{
partitioner::PartitionError, retention_validator::RetentionError, SchemaError, ShardError,
partitioner::PartitionError, retention_validator::RetentionError, RpcWriteError, SchemaError,
ShardError,
};
/// Errors emitted by a [`DmlHandler`] implementation during DML request
@ -21,6 +22,11 @@ pub enum DmlError {
#[error(transparent)]
WriteBuffer(#[from] ShardError),
/// An error pushing the request to a downstream ingester via a direct RPC
/// call.
#[error(transparent)]
RpcWrite(#[from] RpcWriteError),
/// A schema validation failure.
#[error(transparent)]
Schema(#[from] SchemaError),

View File

@ -22,7 +22,9 @@ use write_summary::WriteSummary;
use self::delete_predicate::parse_http_delete_request;
use crate::{
dml_handlers::{DmlError, DmlHandler, PartitionError, RetentionError, SchemaError},
dml_handlers::{
DmlError, DmlHandler, PartitionError, RetentionError, RpcWriteError, SchemaError,
},
namespace_resolver::NamespaceResolver,
};
@ -144,6 +146,9 @@ impl From<&DmlError> for StatusCode {
StatusCode::INTERNAL_SERVER_ERROR
}
DmlError::Retention(RetentionError::OutsideRetention(_)) => StatusCode::FORBIDDEN,
DmlError::RpcWrite(RpcWriteError::Upstream(_)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::RpcWrite(RpcWriteError::DeletesUnsupported) => StatusCode::NOT_IMPLEMENTED,
DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT,
}
}
}