From 885c831aff2d01bdd3a43273c433d35ae898f68f Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 14 Jan 2022 16:52:03 +0000 Subject: [PATCH] refactor: avoid constructing DmlOperation Instead of converting the set of MutableBatches into a DmlOperation to shard into more DmlOperation instances, the sharder can operate directly on the MutableBatches. --- Cargo.lock | 2 ++ router2/Cargo.toml | 2 ++ router2/src/dml_handler/mock.rs | 17 +++++++++++------ router2/src/dml_handler/nop.rs | 17 ++++++++++------- router2/src/dml_handler/trait.rs | 15 ++++++++++----- router2/src/server/http.rs | 15 ++++++--------- 6 files changed, 41 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49cb080e8b..4257d2a4d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3641,8 +3641,10 @@ dependencies = [ "flate2", "futures", "generated_types", + "hashbrown", "hyper", "metric", + "mutable_batch", "mutable_batch_lp", "observability_deps", "parking_lot", diff --git a/router2/Cargo.toml b/router2/Cargo.toml index b028e9acde..0c9701a800 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -13,8 +13,10 @@ dml = { path = "../dml" } flate2 = "1.0" futures = "0.3.19" generated_types = { path = "../generated_types" } +hashbrown = "0.11" hyper = "0.14" metric = { path = "../metric" } +mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } observability_deps = { path = "../observability_deps" } parking_lot = "0.11" diff --git a/router2/src/dml_handler/mock.rs b/router2/src/dml_handler/mock.rs index 73b44e058b..36dbd29eb3 100644 --- a/router2/src/dml_handler/mock.rs +++ b/router2/src/dml_handler/mock.rs @@ -1,9 +1,13 @@ use std::{collections::VecDeque, sync::Arc}; use async_trait::async_trait; -use dml::DmlOperation; +use data_types::DatabaseName; + +use hashbrown::HashMap; +use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; use parking_lot::Mutex; +use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; @@ -11,7 +15,7 @@ use super::{DmlError, DmlHandler}; pub enum MockDmlHandlerCall { Dispatch { db_name: String, - op: DmlOperation, + batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, }, @@ -57,18 +61,19 @@ macro_rules! record_and_return { #[async_trait] impl DmlHandler for Arc { - async fn dispatch<'a>( + async fn write<'a>( &'a self, - db_name: impl Into + Send + Sync + 'a, - op: impl Into + Send + Sync + 'a, + db_name: DatabaseName<'_>, + batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, + _span_ctx: Option, ) -> Result<(), DmlError> { record_and_return!( self, MockDmlHandlerCall::Dispatch { db_name: db_name.into(), - op: op.into(), + batches, payload_stats, body_len, }, diff --git a/router2/src/dml_handler/nop.rs b/router2/src/dml_handler/nop.rs index 024a97d59a..5b4eff2abf 100644 --- a/router2/src/dml_handler/nop.rs +++ b/router2/src/dml_handler/nop.rs @@ -1,9 +1,13 @@ //! A NOP implementation of [`DmlHandler`]. use async_trait::async_trait; -use dml::DmlOperation; +use data_types::DatabaseName; + +use hashbrown::HashMap; +use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; use observability_deps::tracing::*; +use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; @@ -13,16 +17,15 @@ pub struct NopDmlHandler; #[async_trait] impl DmlHandler for NopDmlHandler { - async fn dispatch<'a>( + async fn write<'a>( &'a self, - db_name: impl Into + Send + Sync + 'a, - op: impl Into + Send + Sync + 'a, + db_name: DatabaseName<'_>, + batches: HashMap, _payload_stats: PayloadStatistics, _body_len: usize, + _span_ctx: Option, ) -> Result<(), DmlError> { - let db_name = db_name.into(); - let op = op.into(); - info!(%db_name, ?op, "dropping dml operation"); + info!(%db_name, ?batches, "dropping write operation"); Ok(()) } } diff --git a/router2/src/dml_handler/trait.rs b/router2/src/dml_handler/trait.rs index bb87c7b616..da5aedd755 100644 --- a/router2/src/dml_handler/trait.rs +++ b/router2/src/dml_handler/trait.rs @@ -1,9 +1,13 @@ use std::fmt::Debug; use async_trait::async_trait; -use dml::DmlOperation; +use data_types::DatabaseName; + +use hashbrown::HashMap; +use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; use thiserror::Error; +use trace::ctx::SpanContext; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -21,12 +25,13 @@ pub enum DmlError { /// An abstract handler of [`DmlOperation`] requests. #[async_trait] pub trait DmlHandler: Debug + Send + Sync { - /// Apply `op` to `db_name`. - async fn dispatch<'a>( + /// Write `batches` to `db_name`. + async fn write<'a>( &'a self, - db_name: impl Into + Send + Sync + 'a, - op: impl Into + Send + Sync + 'a, + db_name: DatabaseName<'_>, + batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, + span_ctx: Option, ) -> Result<(), DmlError>; } diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 1042ad71e2..06ed587609 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -4,7 +4,7 @@ use std::str::Utf8Error; use bytes::{Bytes, BytesMut}; use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; -use dml::{DmlMeta, DmlWrite}; + use futures::StreamExt; use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode}; use observability_deps::tracing::*; @@ -184,7 +184,7 @@ where // contain a timestamp let default_time = self.time_provider.now().timestamp_nanos(); - let (tables, stats) = match mutable_batch_lp::lines_to_batches_stats(body, default_time) { + let (batches, stats) = match mutable_batch_lp::lines_to_batches_stats(body, default_time) { Ok(v) => v, Err(mutable_batch_lp::Error::EmptyPayload) => { debug!("nothing to write"); @@ -193,9 +193,8 @@ where Err(e) => return Err(Error::ParseLineProtocol(e)), }; - let op = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx)); self.dml_handler - .dispatch(db_name, op, stats, body.len()) + .write(db_name, batches, stats, body.len(), span_ctx) .await?; Ok(()) @@ -275,7 +274,7 @@ mod tests { use std::{io::Write, iter, sync::Arc}; use assert_matches::assert_matches; - use dml::DmlOperation; + use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; @@ -360,12 +359,10 @@ mod tests { assert_eq!(calls.len(), 1); // Validate the write op - let op = assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, op, body_len, .. } => { + assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, body_len, .. } => { assert_eq!(db_name, $want_write_db); assert_eq!(*body_len, want_body_len); - op - }); - assert_matches!(op, DmlOperation::Write(_)); + }) } else { assert!(calls.is_empty()); }