refactor: avoid constructing DmlOperation
Instead of converting the set of MutableBatches into a DmlOperation to shard into more DmlOperation instances, the sharder can operate directly on the MutableBatches.pull/24376/head
parent
7f99d18dd1
commit
885c831aff
|
@ -3641,8 +3641,10 @@ dependencies = [
|
|||
"flate2",
|
||||
"futures",
|
||||
"generated_types",
|
||||
"hashbrown",
|
||||
"hyper",
|
||||
"metric",
|
||||
"mutable_batch",
|
||||
"mutable_batch_lp",
|
||||
"observability_deps",
|
||||
"parking_lot",
|
||||
|
|
|
@ -13,8 +13,10 @@ dml = { path = "../dml" }
|
|||
flate2 = "1.0"
|
||||
futures = "0.3.19"
|
||||
generated_types = { path = "../generated_types" }
|
||||
hashbrown = "0.11"
|
||||
hyper = "0.14"
|
||||
metric = { path = "../metric" }
|
||||
mutable_batch = { path = "../mutable_batch" }
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.11"
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use dml::DmlOperation;
|
||||
use data_types::DatabaseName;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use mutable_batch::MutableBatch;
|
||||
use mutable_batch_lp::PayloadStatistics;
|
||||
use parking_lot::Mutex;
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
use super::{DmlError, DmlHandler};
|
||||
|
||||
|
@ -11,7 +15,7 @@ use super::{DmlError, DmlHandler};
|
|||
pub enum MockDmlHandlerCall {
|
||||
Dispatch {
|
||||
db_name: String,
|
||||
op: DmlOperation,
|
||||
batches: HashMap<String, MutableBatch>,
|
||||
payload_stats: PayloadStatistics,
|
||||
body_len: usize,
|
||||
},
|
||||
|
@ -57,18 +61,19 @@ macro_rules! record_and_return {
|
|||
|
||||
#[async_trait]
|
||||
impl DmlHandler for Arc<MockDmlHandler> {
|
||||
async fn dispatch<'a>(
|
||||
async fn write<'a>(
|
||||
&'a self,
|
||||
db_name: impl Into<String> + Send + Sync + 'a,
|
||||
op: impl Into<DmlOperation> + Send + Sync + 'a,
|
||||
db_name: DatabaseName<'_>,
|
||||
batches: HashMap<String, MutableBatch>,
|
||||
payload_stats: PayloadStatistics,
|
||||
body_len: usize,
|
||||
_span_ctx: Option<SpanContext>,
|
||||
) -> Result<(), DmlError> {
|
||||
record_and_return!(
|
||||
self,
|
||||
MockDmlHandlerCall::Dispatch {
|
||||
db_name: db_name.into(),
|
||||
op: op.into(),
|
||||
batches,
|
||||
payload_stats,
|
||||
body_len,
|
||||
},
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
//! A NOP implementation of [`DmlHandler`].
|
||||
|
||||
use async_trait::async_trait;
|
||||
use dml::DmlOperation;
|
||||
use data_types::DatabaseName;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use mutable_batch::MutableBatch;
|
||||
use mutable_batch_lp::PayloadStatistics;
|
||||
use observability_deps::tracing::*;
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
use super::{DmlError, DmlHandler};
|
||||
|
||||
|
@ -13,16 +17,15 @@ pub struct NopDmlHandler;
|
|||
|
||||
#[async_trait]
|
||||
impl DmlHandler for NopDmlHandler {
|
||||
async fn dispatch<'a>(
|
||||
async fn write<'a>(
|
||||
&'a self,
|
||||
db_name: impl Into<String> + Send + Sync + 'a,
|
||||
op: impl Into<DmlOperation> + Send + Sync + 'a,
|
||||
db_name: DatabaseName<'_>,
|
||||
batches: HashMap<String, MutableBatch>,
|
||||
_payload_stats: PayloadStatistics,
|
||||
_body_len: usize,
|
||||
_span_ctx: Option<SpanContext>,
|
||||
) -> Result<(), DmlError> {
|
||||
let db_name = db_name.into();
|
||||
let op = op.into();
|
||||
info!(%db_name, ?op, "dropping dml operation");
|
||||
info!(%db_name, ?batches, "dropping write operation");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
use std::fmt::Debug;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use dml::DmlOperation;
|
||||
use data_types::DatabaseName;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use mutable_batch::MutableBatch;
|
||||
use mutable_batch_lp::PayloadStatistics;
|
||||
use thiserror::Error;
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
/// Errors emitted by a [`DmlHandler`] implementation during DML request
|
||||
/// processing.
|
||||
|
@ -21,12 +25,13 @@ pub enum DmlError {
|
|||
/// An abstract handler of [`DmlOperation`] requests.
|
||||
#[async_trait]
|
||||
pub trait DmlHandler: Debug + Send + Sync {
|
||||
/// Apply `op` to `db_name`.
|
||||
async fn dispatch<'a>(
|
||||
/// Write `batches` to `db_name`.
|
||||
async fn write<'a>(
|
||||
&'a self,
|
||||
db_name: impl Into<String> + Send + Sync + 'a,
|
||||
op: impl Into<DmlOperation> + Send + Sync + 'a,
|
||||
db_name: DatabaseName<'_>,
|
||||
batches: HashMap<String, MutableBatch>,
|
||||
payload_stats: PayloadStatistics,
|
||||
body_len: usize,
|
||||
span_ctx: Option<SpanContext>,
|
||||
) -> Result<(), DmlError>;
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::str::Utf8Error;
|
|||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError};
|
||||
use dml::{DmlMeta, DmlWrite};
|
||||
|
||||
use futures::StreamExt;
|
||||
use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode};
|
||||
use observability_deps::tracing::*;
|
||||
|
@ -184,7 +184,7 @@ where
|
|||
// contain a timestamp
|
||||
let default_time = self.time_provider.now().timestamp_nanos();
|
||||
|
||||
let (tables, stats) = match mutable_batch_lp::lines_to_batches_stats(body, default_time) {
|
||||
let (batches, stats) = match mutable_batch_lp::lines_to_batches_stats(body, default_time) {
|
||||
Ok(v) => v,
|
||||
Err(mutable_batch_lp::Error::EmptyPayload) => {
|
||||
debug!("nothing to write");
|
||||
|
@ -193,9 +193,8 @@ where
|
|||
Err(e) => return Err(Error::ParseLineProtocol(e)),
|
||||
};
|
||||
|
||||
let op = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx));
|
||||
self.dml_handler
|
||||
.dispatch(db_name, op, stats, body.len())
|
||||
.write(db_name, batches, stats, body.len(), span_ctx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
@ -275,7 +274,7 @@ mod tests {
|
|||
use std::{io::Write, iter, sync::Arc};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use dml::DmlOperation;
|
||||
|
||||
use flate2::{write::GzEncoder, Compression};
|
||||
use hyper::header::HeaderValue;
|
||||
|
||||
|
@ -360,12 +359,10 @@ mod tests {
|
|||
assert_eq!(calls.len(), 1);
|
||||
|
||||
// Validate the write op
|
||||
let op = assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, op, body_len, .. } => {
|
||||
assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, body_len, .. } => {
|
||||
assert_eq!(db_name, $want_write_db);
|
||||
assert_eq!(*body_len, want_body_len);
|
||||
op
|
||||
});
|
||||
assert_matches!(op, DmlOperation::Write(_));
|
||||
})
|
||||
} else {
|
||||
assert!(calls.is_empty());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue