refactor: resolve namespace before DML dispatch

This commit introduces a new (composable) trait; a NamespaceResolver is
an abstraction responsible for taking a string namespace from a user
request, and mapping to it's catalog ID.

This allows the NamespaceId to be injected through the DmlHandler chain
in addition to the namespace name.

As part of this change, the NamespaceAutocreation layer was changed from
an implementator of the DmlHandler trait, to a NamespaceResolver as it
is a more appropriate abstraction for the functionality it provides.
pull/24376/head
Dom Dwyer 2022-10-26 16:33:01 +02:00
parent 0c5eb3f70f
commit d166de931d
21 changed files with 798 additions and 153 deletions

View File

@ -18,13 +18,13 @@ use object_store::DynObjectStore;
use observability_deps::tracing::info;
use router::{
dml_handlers::{
DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator,
NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioner,
SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter,
},
namespace_cache::{
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
},
namespace_resolver::{NamespaceAutocreation, NamespaceResolver, NamespaceSchemaResolver},
server::{
grpc::{sharder::ShardService, GrpcDelegate},
http::HttpDelegate,
@ -66,14 +66,14 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct RouterServerType<D, S> {
server: RouterServer<D, S>,
pub struct RouterServerType<D, N, S> {
server: RouterServer<D, N, S>,
shutdown: CancellationToken,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<D, S> RouterServerType<D, S> {
pub fn new(server: RouterServer<D, S>, common_state: &CommonServerState) -> Self {
impl<D, N, S> RouterServerType<D, N, S> {
pub fn new(server: RouterServer<D, N, S>, common_state: &CommonServerState) -> Self {
Self {
server,
shutdown: CancellationToken::new(),
@ -82,17 +82,18 @@ impl<D, S> RouterServerType<D, S> {
}
}
impl<D, S> std::fmt::Debug for RouterServerType<D, S> {
impl<D, N, S> std::fmt::Debug for RouterServerType<D, N, S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Router")
}
}
#[async_trait]
impl<D, S> ServerType for RouterServerType<D, S>
impl<D, N, S> ServerType for RouterServerType<D, N, S>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
S: Sharder<(), Item = Arc<Shard>> + Clone + 'static,
N: NamespaceResolver + 'static,
{
/// Return the [`metric::Registry`] used by the router.
fn metric_registry(&self) -> Arc<Registry> {
@ -210,6 +211,10 @@ pub async fn create_router_server_type(
});
let partitioner = InstrumentationDecorator::new("partitioner", &*metrics, partitioner);
// Initialise the Namespace ID lookup + cache
let namespace_resolver =
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
////////////////////////////////////////////////////////////////////////////
//
// THIS CODE IS FOR TESTING ONLY.
@ -247,9 +252,10 @@ pub async fn create_router_server_type(
});
txn.commit().await?;
let ns_creator = NamespaceAutocreation::new(
let namespace_resolver = NamespaceAutocreation::new(
namespace_resolver,
Arc::clone(&ns_cache),
Arc::clone(&catalog),
ns_cache,
topic_id,
query_id,
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
@ -262,8 +268,7 @@ pub async fn create_router_server_type(
// Build the chain of DML handlers that forms the request processing
// pipeline, starting with the namespace creator (for testing purposes) and
// write partitioner that yields a set of partitioned batches.
let handler_stack = ns_creator
.and_then(schema_validator)
let handler_stack = schema_validator
.and_then(partitioner)
// Once writes have been partitioned, they are processed in parallel.
//
@ -288,6 +293,7 @@ pub async fn create_router_server_type(
let http = HttpDelegate::new(
common_state.run_config().max_http_request_size,
request_limit,
namespace_resolver,
Arc::clone(&handler_stack),
&metrics,
);

View File

@ -1,7 +1,7 @@
use std::{collections::BTreeSet, iter, sync::Arc};
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use data_types::{PartitionTemplate, TemplatePart};
use data_types::{NamespaceId, PartitionTemplate, TemplatePart};
use hyper::{Body, Request};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use router::{
@ -10,6 +10,7 @@ use router::{
WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_resolver::mock::MockNamespaceResolver,
server::http::HttpDelegate,
shard::Shard,
};
@ -74,7 +75,16 @@ fn e2e_benchmarks(c: &mut Criterion) {
partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))),
);
HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics)
let namespace_resolver =
MockNamespaceResolver::default().with_mapping("bananas", NamespaceId::new(42));
HttpDelegate::new(
1024,
100,
namespace_resolver,
Arc::new(handler_stack),
&metrics,
)
};
let body_str = "platanos,tag1=A,tag2=B val=42i 123456";

View File

@ -4,7 +4,7 @@ use criterion::{
criterion_group, criterion_main, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion,
Throughput,
};
use data_types::DatabaseName;
use data_types::{DatabaseName, NamespaceId};
use hashbrown::HashMap;
use iox_catalog::mem::MemCatalog;
use mutable_batch::MutableBatch;
@ -49,7 +49,7 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
for i in 0..65_000 {
let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str());
let _ = runtime().block_on(validator.write(&*NAMESPACE, write, None));
let _ = runtime().block_on(validator.write(&*NAMESPACE, NamespaceId::new(42), write, None));
}
let write = lp_to_writes(&generate_lp(tables, columns_per_table));
@ -61,7 +61,7 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
group.bench_function(format!("{tables}x{columns_per_table}"), |b| {
b.to_async(runtime()).iter_batched(
|| write.clone(),
|write| validator.write(&*NAMESPACE, write, None),
|write| validator.write(&*NAMESPACE, NamespaceId::new(42), write, None),
BatchSize::SmallInput,
);
});

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId};
use trace::ctx::SpanContext;
use super::{DmlError, DmlHandler};
@ -59,17 +59,18 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let output = self
.first
.write(namespace, input, span_ctx.clone())
.write(namespace, namespace_id, input, span_ctx.clone())
.await
.map_err(Into::into)?;
self.second
.write(namespace, output, span_ctx)
.write(namespace, namespace_id, output, span_ctx)
.await
.map_err(Into::into)
}

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, marker::PhantomData};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId};
use futures::{stream::FuturesUnordered, TryStreamExt};
use trace::ctx::SpanContext;
@ -50,6 +50,7 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -58,7 +59,11 @@ where
.map(|v| {
let namespace = namespace.clone();
let span_ctx = span_ctx.clone();
async move { self.inner.write(&namespace, v, span_ctx).await }
async move {
self.inner
.write(&namespace, namespace_id, v, span_ctx)
.await
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use trace::{
@ -68,6 +68,7 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -77,7 +78,10 @@ where
let mut span_recorder =
SpanRecorder::new(span_ctx.clone().map(|parent| parent.child(self.name)));
let res = self.inner.write(namespace, input, span_ctx).await;
let res = self
.inner
.write(namespace, namespace_id, input, span_ctx)
.await;
// Avoid exploding if time goes backwards - simply drop the measurement
// if it happens.
@ -202,7 +206,7 @@ mod tests {
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler);
decorator
.write(&ns, (), Some(span))
.write(&ns, NamespaceId::new(42), (), Some(span))
.await
.expect("inner handler configured to succeed");
@ -225,7 +229,7 @@ mod tests {
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler);
let err = decorator
.write(&ns, (), Some(span))
.write(&ns, NamespaceId::new(42), (), Some(span))
.await
.expect_err("inner handler configured to fail");

View File

@ -1,7 +1,7 @@
use std::{collections::VecDeque, fmt::Debug};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId};
use parking_lot::Mutex;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
@ -14,6 +14,7 @@ use super::{DmlError, DmlHandler};
pub enum MockDmlHandlerCall<W> {
Write {
namespace: String,
namespace_id: NamespaceId,
write_input: W,
},
Delete {
@ -102,6 +103,7 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
write_input: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -109,6 +111,7 @@ where
self,
MockDmlHandlerCall::Write {
namespace: namespace.into(),
namespace_id,
write_input,
},
write_return

View File

@ -4,27 +4,17 @@
//! processing handler chain:
//!
//! ```text
//! ┌──────────────┐ ┌──────────────┐
//! │ HTTP API │ │ gRPC API │
//! └──────────────┘ └──────────────┘
//! │ │
//! └─────────┬─────────┘
//! │
//! ▼
//! ╔═ DmlHandler Stack ═════╗
//! ┌─────────────────┐
//! │ Namespace Cache │
//! └─────────────────┘
//! ╔═ DmlHandler Stack ═════╗ │
//! ║ ║
//! ║ ┌──────────────────┐ ║
//! ║ │ Namespace │ ║
//! ║ │ Autocreation │─ ─ ─ ─ ─ ─ ─ ┐
//! ║ └──────────────────┘ ║
//! ║ │ ║ │
//! ║ ▼ ║
//! ║ ┌──────────────────┐ ║ │
//! ║ │ Partitioner │ ║
//! ║ └──────────────────┘ ║ │
//! ║ │ ║ ┌─────────────────┐
//! ║ ▼ ║ │ Namespace Cache
//! ║ ┌──────────────────┐ ║ └─────────────────┘
//! ║ │ ║
//! ║ ▼ ║ │
//! ║ ┌──────────────────┐ ║
//! ║ │ Schema │ ║ │
//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─
//! ║ └──────────────────┘ ║
@ -49,23 +39,17 @@
//! └──────────────┘
//! ```
//!
//! The HTTP / gRPC APIs decode their respective request format and funnel the
//! resulting operation through the common [`DmlHandler`] composed of the layers
//! described above.
//! The HTTP API decodes the request and funnels the resulting operation through
//! the [`DmlHandler`] stack composed of the layers described above.
//!
//! The [`NamespaceAutocreation`] handler (for testing only) populates the
//! global catalog with an entry for each namespace it observes, using the
//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending
//! requests to the catalog for namespaces that are known to exist.
//!
//! Incoming line-protocol writes then pass through the [`Partitioner`], parsing
//! the LP and splitting them into batches per partition, before passing each
//! Incoming line-protocol writes pass through the [`Partitioner`], parsing the
//! LP and splitting them into batches per IOx partition, before passing each
//! partitioned batch through the rest of the request pipeline.
//!
//! Writes then pass through the [`SchemaValidator`] applying schema enforcement
//! (a NOP layer for deletes) which pushes additive schema changes to the
//! catalog and populates the [`NamespaceCache`], converging it to match the set
//! of [`NamespaceSchema`] in the global catalog.
//! Writes then pass through the [`SchemaValidator`] applying schema & limit
//! enforcement (a NOP layer for deletes) which pushes additive schema changes
//! to the catalog and populates the [`NamespaceCache`], converging it to match
//! the set of [`NamespaceSchema`] in the global catalog.
//!
//! The [`ShardedWriteBuffer`] uses a sharder implementation to direct the DML
//! operations into a fixed set of shards.
@ -84,9 +68,6 @@ pub mod nop;
mod sharded_write_buffer;
pub use sharded_write_buffer::*;
mod ns_autocreation;
pub use ns_autocreation::*;
mod partitioner;
pub use partitioner::*;

View File

@ -3,7 +3,7 @@
use std::{fmt::Debug, marker::PhantomData};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId};
use observability_deps::tracing::*;
use trace::ctx::SpanContext;
@ -32,10 +32,11 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
batches: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
info!(%namespace, ?batches, "dropping write operation");
info!(%namespace, %namespace_id, ?batches, "dropping write operation");
Ok(batches)
}

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, PartitionKey, PartitionTemplate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate};
use hashbrown::HashMap;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use observability_deps::tracing::*;
@ -71,6 +71,7 @@ impl DmlHandler for Partitioner {
async fn write(
&self,
_namespace: &DatabaseName<'static>,
_namespace_id: NamespaceId,
batch: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -145,7 +146,7 @@ mod tests {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIMESTAMP_NANOS).expect("failed to parse test LP");
let handler_ret = partitioner.write(&ns, writes, None).await;
let handler_ret = partitioner.write(&ns, NamespaceId::new(42), writes, None).await;
assert_matches!(handler_ret, $($want_handler_ret)+);
// Check the partition -> table mapping.

View File

@ -1,7 +1,7 @@
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NamespaceSchema};
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema};
use hashbrown::HashMap;
use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError},
@ -167,6 +167,7 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
batches: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -183,7 +184,12 @@ where
let schema = get_schema_by_name(namespace, repos.deref_mut())
.await
.map_err(|e| {
warn!(error=%e, %namespace, "failed to retrieve namespace schema");
warn!(
error=%e,
%namespace,
%namespace_id,
"failed to retrieve namespace schema"
);
SchemaError::NamespaceLookup(e)
})
.map(Arc::new)?;
@ -197,7 +203,12 @@ where
};
validate_column_limits(&batches, &schema).map_err(|e| {
warn!(%namespace, error=%e, "service protection limit reached");
warn!(
%namespace,
%namespace_id,
error=%e,
"service protection limit reached"
);
self.service_limit_hit.inc(1);
SchemaError::ServiceLimit(Box::new(e))
})?;
@ -218,6 +229,7 @@ where
} => {
warn!(
%namespace,
%namespace_id,
column_name=%name,
existing_column_type=%existing,
request_column_type=%new,
@ -230,12 +242,22 @@ where
// Service limits
CatalogError::ColumnCreateLimitError { .. }
| CatalogError::TableCreateLimitError { .. } => {
warn!(%namespace, error=%e, "service protection limit reached");
warn!(
%namespace,
%namespace_id,
error=%e,
"service protection limit reached"
);
self.service_limit_hit.inc(1);
SchemaError::ServiceLimit(Box::new(e.into_err()))
}
_ => {
error!(%namespace, error=%e, "schema validation failed");
error!(
%namespace,
%namespace_id,
error=%e,
"schema validation failed"
);
SchemaError::UnexpectedCatalogError(e.into_err())
}
}
@ -453,12 +475,12 @@ mod tests {
// namespace schema gets cached
let writes1_valid = lp_to_writes("dragonfruit val=42i 123456");
handler1
.write(&*NAMESPACE, writes1_valid, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes1_valid, None)
.await
.expect("request should succeed");
let writes2_valid = lp_to_writes("dragonfruit val=43i 123457");
handler2
.write(&*NAMESPACE, writes2_valid, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes2_valid, None)
.await
.expect("request should succeed");
@ -466,12 +488,12 @@ mod tests {
// putting the table over the limit
let writes1_add_column = lp_to_writes("dragonfruit,tag1=A val=42i 123456");
handler1
.write(&*NAMESPACE, writes1_add_column, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes1_add_column, None)
.await
.expect("request should succeed");
let writes2_add_column = lp_to_writes("dragonfruit,tag2=B val=43i 123457");
handler2
.write(&*NAMESPACE, writes2_add_column, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes2_add_column, None)
.await
.expect("request should succeed");
@ -542,7 +564,7 @@ mod tests {
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
handler
.write(&*NAMESPACE, writes, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes, None)
.await
.expect("request should succeed");
@ -567,7 +589,7 @@ mod tests {
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let err = handler
.write(&ns, writes, None)
.write(&ns, NamespaceId::new(42), writes, None)
.await
.expect_err("request should fail");
@ -590,7 +612,7 @@ mod tests {
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64
let got = handler
.write(&*NAMESPACE, writes.clone(), None)
.write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None)
.await
.expect("request should succeed");
assert_eq!(writes.len(), got.len());
@ -598,7 +620,7 @@ mod tests {
// Second write attempts to violate it causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42.0 123456"); // val=float
let err = handler
.write(&*NAMESPACE, writes, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes, None)
.await
.expect_err("request should fail");
@ -628,7 +650,7 @@ mod tests {
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let got = handler
.write(&*NAMESPACE, writes.clone(), None)
.write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None)
.await
.expect("request should succeed");
assert_eq!(writes.len(), got.len());
@ -646,7 +668,7 @@ mod tests {
// Second write attempts to violate limits, causing an error
let writes = lp_to_writes("bananas2,tag1=A,tag2=B val=42i 123456");
let err = handler
.write(&*NAMESPACE, writes, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes, None)
.await
.expect_err("request should fail");
@ -667,7 +689,7 @@ mod tests {
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let got = handler
.write(&*NAMESPACE, writes.clone(), None)
.write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None)
.await
.expect("request should succeed");
assert_eq!(writes.len(), got.len());
@ -683,7 +705,7 @@ mod tests {
// Second write attempts to violate limits, causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456");
let err = handler
.write(&*NAMESPACE, writes, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes, None)
.await
.expect_err("request should fail");
@ -714,7 +736,7 @@ mod tests {
// First write attempts to add columns over the limit, causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456");
let err = handler
.write(&*NAMESPACE, writes, None)
.write(&*NAMESPACE, NamespaceId::new(42), writes, None)
.await
.expect_err("request should fail");

View File

@ -6,7 +6,7 @@ use std::{
};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NonEmptyString};
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::HashMap;
@ -94,6 +94,7 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
writes: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, ShardError> {
@ -129,6 +130,7 @@ where
kafka_partition=%shard.shard_index(),
tables=%dml.table_count(),
%namespace,
%namespace_id,
approx_size=%dml.size(),
"routing writes to shard"
);
@ -274,7 +276,9 @@ mod tests {
// Call the ShardedWriteBuffer and drive the test
let ns = DatabaseName::new("bananas").unwrap();
w.write(&ns, writes, None).await.expect("write failed");
w.write(&ns, NamespaceId::new(42), writes, None)
.await
.expect("write failed");
// Assert the sharder saw all the tables
let calls = sharder.calls();
@ -340,7 +344,9 @@ mod tests {
// Call the ShardedWriteBuffer and drive the test
let ns = DatabaseName::new("bananas").unwrap();
w.write(&ns, writes, None).await.expect("write failed");
w.write(&ns, NamespaceId::new(42), writes, None)
.await
.expect("write failed");
// Assert the sharder saw all the tables
let calls = sharder.calls();
@ -417,7 +423,7 @@ mod tests {
// Call the ShardedWriteBuffer and drive the test
let ns = DatabaseName::new("bananas").unwrap();
let err = w
.write(&ns, writes, None)
.write(&ns, NamespaceId::new(42), writes, None)
.await
.expect_err("write should return a failure");
assert_matches!(err, ShardError::WriteBufferErrors{successes, errs} => {

View File

@ -1,11 +1,11 @@
use std::{error::Error, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId};
use thiserror::Error;
use trace::ctx::SpanContext;
use super::{partitioner::PartitionError, NamespaceCreationError, SchemaError, ShardError};
use super::{partitioner::PartitionError, SchemaError, ShardError};
/// Errors emitted by a [`DmlHandler`] implementation during DML request
/// processing.
@ -23,10 +23,6 @@ pub enum DmlError {
#[error(transparent)]
Schema(#[from] SchemaError),
/// Failed to create the request namespace.
#[error(transparent)]
NamespaceCreation(#[from] NamespaceCreationError),
/// An error partitioning the request.
#[error(transparent)]
Partition(#[from] PartitionError),
@ -63,6 +59,7 @@ pub trait DmlHandler: Debug + Send + Sync {
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError>;
@ -90,10 +87,13 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
(**self).write(namespace, input, span_ctx).await
(**self)
.write(namespace, namespace_id, input, span_ctx)
.await
}
/// Delete the data specified in `delete`.

View File

@ -1,7 +1,7 @@
use std::fmt::Debug;
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceId};
use dml::DmlMeta;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
@ -40,10 +40,14 @@ where
async fn write(
&self,
namespace: &DatabaseName<'static>,
namespace_id: NamespaceId,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let metas = self.inner.write(namespace, input, span_ctx).await?;
let metas = self
.inner
.write(namespace, namespace_id, input, span_ctx)
.await?;
Ok(WriteSummary::new(metas))
}

View File

@ -1,14 +1,96 @@
//! IOx router role implementation.
//! IOx router implementation.
//!
//! An IOx router is responsible for:
//!
//! * Creating IOx namespaces & synchronising them within the catalog.
//! * Handling writes:
//! * Receiving IOx write/delete requests via HTTP and gRPC endpoints.
//! * Receiving IOx write/delete requests via HTTP.
//! * Enforcing schema validation & synchronising it within the catalog.
//! * Deriving the partition key of each DML operation.
//! * Applying sharding logic.
//! * Push resulting operations into the appropriate shards (Kafka partitions if using Kafka).
//! * Push resulting operations into the appropriate shards (Kafka
//! partitions if using Kafka).
//!
//! The router is composed of singly-responsible components that each apply a
//! transformation to the request:
//!
//! ```text
//! ┌──────────────┐
//! │ HTTP API │
//! └──────────────┘
//! │
//! │
//! ▼
//! ╔═ NamespaceResolver ════╗
//! ║ ║
//! ║ ┌──────────────────┐ ║
//! ║ │ Namespace │ ║
//! ║ │ Autocreation │ ─║─ ─ ─ ─ ─ ─
//! ║ └──────────────────┘ ║ │
//! ║ │ ║
//! ║ ▼ ║ │
//! ║ ┌──────────────────┐ ║
//! ║ │ NamespaceSchema │ ║ │
//! ║ │ Resolver │ ─║─ ─ ─ ─ ─ ─
//! ║ └──────────────────┘ ║ │
//! ║ │ ║
//! ╚════════════│═══════════╝ │
//! │ ┌─────────────────┐
//! │ │ Namespace Cache │
//! │ └─────────────────┘
//! ╔═ DmlHandler│Stack ═════╗ │
//! ║ ▼ ║
//! ║ ┌──────────────────┐ ║ │
//! ║ │ Partitioner │ ║
//! ║ └──────────────────┘ ║ │
//! ║ │ ║
//! ║ ▼ ║ │
//! ║ ┌──────────────────┐ ║
//! ║ │ Schema │ ║ │
//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─
//! ║ └──────────────────┘ ║
//! ║ │ ║
//! ║ ▼ ║
//! ┌───────┐ ║ ┌──────────────────┐ ║
//! │Sharder│◀ ─ ─ ▶│ShardedWriteBuffer│ ║
//! └───────┘ ║ └──────────────────┘ ║
//! ║ │ ║
//! ╚════════════│═══════════╝
//! │
//! ▼
//! ┌──────────────┐
//! │ Write Buffer │
//! └──────────────┘
//! │
//! │
//! ┌────────▼─────┐
//! │ Kafka ├┐
//! └┬─────────────┘├┐
//! └┬─────────────┘│
//! └──────────────┘
//! ```
//!
//! The [`NamespaceAutocreation`] handler (for testing only) populates the
//! global catalog with an entry for each namespace it observes, using the
//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending
//! requests to the catalog for namespaces that are known to exist.
//!
//! A [`NamespaceResolver`] maps a user-provided namespace string to the
//! catalog ID ([`NamespaceId`]). The [`NamespaceSchemaResolver`] achieves this
//! by inspecting the contents of the [`NamespaceCache`]. If a cache-miss
//! occurs the [`NamespaceSchemaResolver`] queries the catalog and populates the
//! cache for subsequent requests.
//!
//! Once the [`NamespaceId`] has been resolved, the request is passed into the
//! [`DmlHandler`] stack.
//!
//! [`NamespaceAutocreation`]: crate::namespace_resolver::NamespaceAutocreation
//! [`NamespaceSchemaResolver`]: crate::namespace_resolver::NamespaceSchemaResolver
//! [`NamespaceResolver`]: crate::namespace_resolver
//! [`NamespaceId`]: data_types::NamespaceId
//! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache
//! [`NamespaceSchema`]: data_types::NamespaceSchema
//! [`DmlHandler`]: crate::dml_handlers
#![deny(
rustdoc::broken_intra_doc_links,
@ -28,5 +110,6 @@
pub mod dml_handlers;
pub mod namespace_cache;
pub mod namespace_resolver;
pub mod server;
pub mod shard;

View File

@ -0,0 +1,209 @@
//! An trait to abstract resolving a[`DatabaseName`] to [`NamespaceId`], and a
//! collection of composable implementations.
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{DatabaseName, NamespaceId};
use iox_catalog::interface::{get_schema_by_name, Catalog};
use observability_deps::tracing::*;
use thiserror::Error;
use crate::namespace_cache::NamespaceCache;
pub mod mock;
mod ns_autocreation;
pub use ns_autocreation::*;
/// Error states encountered during [`NamespaceId`] lookup.
#[derive(Debug, Error)]
pub enum Error {
/// An error occured when attempting to fetch the namespace ID.
#[error("failed to resolve namespace ID: {0}")]
Lookup(iox_catalog::interface::Error),
/// An error state for errors returned by [`NamespaceAutocreation`].
#[error(transparent)]
Create(#[from] NamespaceCreationError),
}
/// An abstract resolver of [`DatabaseName`] to [`NamespaceId`].
#[async_trait]
pub trait NamespaceResolver: std::fmt::Debug + Send + Sync {
/// Return the [`NamespaceId`] for the given [`DatabaseName`].
async fn get_namespace_id(
&self,
namespace: &DatabaseName<'static>,
) -> Result<NamespaceId, Error>;
}
/// An implementation of [`NamespaceResolver`] that queries the [`Catalog`] to
/// resolve a [`NamespaceId`], and populates the [`NamespaceCache`] as a side
/// effect.
#[derive(Debug)]
pub struct NamespaceSchemaResolver<C> {
catalog: Arc<dyn Catalog>,
cache: C,
}
impl<C> NamespaceSchemaResolver<C> {
/// Construct a new [`NamespaceSchemaResolver`] that fetches schemas from
/// `catalog` and caches them in `cache`.
pub fn new(catalog: Arc<dyn Catalog>, cache: C) -> Self {
Self { catalog, cache }
}
}
#[async_trait]
impl<C> NamespaceResolver for NamespaceSchemaResolver<C>
where
C: NamespaceCache,
{
async fn get_namespace_id(
&self,
namespace: &DatabaseName<'static>,
) -> Result<NamespaceId, Error> {
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
match self.cache.get_schema(namespace) {
Some(v) => Ok(v.id),
None => {
let mut repos = self.catalog.repositories().await;
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(namespace, repos.deref_mut())
.await
.map_err(|e| {
warn!(
error=%e,
%namespace,
"failed to retrieve namespace schema"
);
Error::Lookup(e)
})
.map(Arc::new)?;
// Cache population MAY race with other threads and lead to
// overwrites, but an entry will always exist once inserted, and
// the schemas will eventually converge.
self.cache
.put_schema(namespace.clone(), Arc::clone(&schema));
trace!(%namespace, "schema cache populated");
Ok(schema.id)
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId};
use iox_catalog::mem::MemCatalog;
use super::*;
use crate::namespace_cache::MemoryNamespaceCache;
#[tokio::test]
async fn test_cache_hit() {
let ns = DatabaseName::try_from("bananas").unwrap();
// Prep the cache before the test to cause a hit
let cache = Arc::new(MemoryNamespaceCache::default());
cache.put_schema(
ns.clone(),
NamespaceSchema {
id: NamespaceId::new(42),
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
max_columns_per_table: 4,
},
);
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
// Drive the code under test
resolver
.get_namespace_id(&ns)
.await
.expect("lookup should succeed");
assert!(cache.get_schema(&ns).is_some());
// The cache hit should mean the catalog SHOULD NOT see a create request
// for the namespace.
let mut repos = catalog.repositories().await;
assert!(
repos
.namespaces()
.get_by_name(ns.as_str())
.await
.expect("lookup should not error")
.is_none(),
"expected no request to the catalog"
);
}
#[tokio::test]
async fn test_cache_miss() {
let ns = DatabaseName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
// Create the namespace in the catalog
{
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("bananas").await.unwrap();
let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap();
repos
.namespaces()
.create(
&ns,
iox_catalog::INFINITE_RETENTION_POLICY,
topic.id,
query_pool.id,
)
.await
.expect("failed to setup catalog state");
}
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
resolver
.get_namespace_id(&ns)
.await
.expect("lookup should succeed");
// The cache should be populated as a result of the lookup.
assert!(cache.get_schema(&ns).is_some());
}
#[tokio::test]
async fn test_cache_miss_does_not_exist() {
let ns = DatabaseName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
let err = resolver
.get_namespace_id(&ns)
.await
.expect_err("lookup should error");
assert_matches!(err, Error::Lookup(_));
assert!(cache.get_schema(&ns).is_none());
}
}

View File

@ -0,0 +1,45 @@
//! A mock implementation of [`NamespaceResolver`].
#![allow(missing_docs)]
use std::collections::HashMap;
use async_trait::async_trait;
use data_types::{DatabaseName, NamespaceId};
use parking_lot::Mutex;
use super::NamespaceResolver;
#[derive(Debug, Default)]
pub struct MockNamespaceResolver {
map: Mutex<HashMap<DatabaseName<'static>, NamespaceId>>,
}
impl MockNamespaceResolver {
pub fn new(map: HashMap<DatabaseName<'static>, NamespaceId>) -> Self {
Self {
map: Mutex::new(map),
}
}
pub fn with_mapping(self, name: impl Into<String> + 'static, id: NamespaceId) -> Self {
let name = DatabaseName::try_from(name.into()).unwrap();
assert!(self.map.lock().insert(name, id).is_none());
self
}
}
#[async_trait]
impl NamespaceResolver for MockNamespaceResolver {
/// Return the [`NamespaceId`] for the given [`DatabaseName`].
async fn get_namespace_id(
&self,
namespace: &DatabaseName<'static>,
) -> Result<NamespaceId, super::Error> {
Ok(*self
.map
.lock()
.get(namespace)
.expect("mock namespace resolver does not have ID"))
}
}

View File

@ -0,0 +1,221 @@
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{DatabaseName, NamespaceId, QueryPoolId, TopicId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::*;
use thiserror::Error;
use super::NamespaceResolver;
use crate::namespace_cache::NamespaceCache;
/// An error auto-creating the request namespace.
#[derive(Debug, Error)]
pub enum NamespaceCreationError {
/// An error returned from a namespace creation request.
#[error("failed to create namespace: {0}")]
Create(iox_catalog::interface::Error),
}
/// A layer to populate the [`Catalog`] with all the namespaces the router
/// observes.
///
/// Uses a [`NamespaceCache`] to limit issuing create requests to namespaces the
/// router has not yet observed a schema for.
#[derive(Debug)]
pub struct NamespaceAutocreation<C, T> {
inner: T,
cache: C,
catalog: Arc<dyn Catalog>,
topic_id: TopicId,
query_id: QueryPoolId,
retention: String,
}
impl<C, T> NamespaceAutocreation<C, T> {
/// Return a new [`NamespaceAutocreation`] layer that ensures a requested
/// namespace exists in `catalog`.
///
/// If the namespace does not exist, it is created with the specified
/// `topic_id`, `query_id` and `retention` policy.
///
/// Namespaces are looked up in `cache`, skipping the creation request to
/// the catalog if there's a hit.
pub fn new(
inner: T,
cache: C,
catalog: Arc<dyn Catalog>,
topic_id: TopicId,
query_id: QueryPoolId,
retention: String,
) -> Self {
Self {
inner,
cache,
catalog,
topic_id,
query_id,
retention,
}
}
}
#[async_trait]
impl<C, T> NamespaceResolver for NamespaceAutocreation<C, T>
where
C: NamespaceCache,
T: NamespaceResolver,
{
/// Force the creation of `namespace` if it does not already exist in the
/// cache, before passing the request through to the inner delegate.
async fn get_namespace_id(
&self,
namespace: &DatabaseName<'static>,
) -> Result<NamespaceId, super::Error> {
if self.cache.get_schema(namespace).is_none() {
trace!(%namespace, "namespace auto-create cache miss");
let mut repos = self.catalog.repositories().await;
match repos
.namespaces()
.create(
namespace.as_str(),
&self.retention,
self.topic_id,
self.query_id,
)
.await
{
Ok(_) => {
debug!(%namespace, "created namespace");
}
Err(iox_catalog::interface::Error::NameExists { .. }) => {
// Either the cache has not yet converged to include this
// namespace, or another thread raced populating the catalog
// and beat this thread to it.
debug!(%namespace, "spurious namespace create failed");
}
Err(e) => {
error!(error=%e, %namespace, "failed to auto-create namespace");
return Err(NamespaceCreationError::Create(e).into());
}
}
}
self.inner.get_namespace_id(namespace).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::{Namespace, NamespaceId, NamespaceSchema};
use iox_catalog::mem::MemCatalog;
use super::*;
use crate::{
namespace_cache::MemoryNamespaceCache, namespace_resolver::mock::MockNamespaceResolver,
};
#[tokio::test]
async fn test_cache_hit() {
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
let ns = DatabaseName::try_from("bananas").unwrap();
// Prep the cache before the test to cause a hit
let cache = Arc::new(MemoryNamespaceCache::default());
cache.put_schema(
ns.clone(),
NamespaceSchema {
id: NAMESPACE_ID,
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
max_columns_per_table: 4,
},
);
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID),
cache,
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
"inf".to_owned(),
);
// Drive the code under test
let got = creator
.get_namespace_id(&ns)
.await
.expect("handler should succeed");
assert_eq!(got, NAMESPACE_ID);
// The cache hit should mean the catalog SHOULD NOT see a create request
// for the namespace.
let mut repos = catalog.repositories().await;
assert!(
repos
.namespaces()
.get_by_name(ns.as_str())
.await
.expect("lookup should not error")
.is_none(),
"expected no request to the catalog"
);
}
#[tokio::test]
async fn test_cache_miss() {
let ns = DatabaseName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
cache,
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
"inf".to_owned(),
);
let created_id = creator
.get_namespace_id(&ns)
.await
.expect("handler should succeed");
// The cache miss should mean the catalog MUST see a create request for
// the namespace.
let mut repos = catalog.repositories().await;
let got = repos
.namespaces()
.get_by_name(ns.as_str())
.await
.expect("lookup should not error")
.expect("creation request should be sent to catalog");
assert_eq!(got.id, created_id);
assert_eq!(
got,
Namespace {
id: NamespaceId::new(1),
name: ns.to_string(),
retention_duration: Some("inf".to_owned()),
topic_id: TopicId::new(42),
query_pool_id: QueryPoolId::new(42),
max_tables: iox_catalog::DEFAULT_MAX_TABLES,
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
}
);
}
}

View File

@ -15,19 +15,19 @@ pub mod http;
/// The [`RouterServer`] manages the lifecycle and contains all state for a
/// `router` server instance.
#[derive(Debug)]
pub struct RouterServer<D, S> {
pub struct RouterServer<D, N, S> {
metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
http: HttpDelegate<D>,
http: HttpDelegate<D, N>,
grpc: GrpcDelegate<S>,
}
impl<D, S> RouterServer<D, S> {
impl<D, N, S> RouterServer<D, N, S> {
/// Initialise a new [`RouterServer`] using the provided HTTP and gRPC
/// handlers.
pub fn new(
http: HttpDelegate<D>,
http: HttpDelegate<D, N>,
grpc: GrpcDelegate<S>,
metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
@ -51,12 +51,12 @@ impl<D, S> RouterServer<D, S> {
}
}
impl<D, S> RouterServer<D, S>
impl<D, N, S> RouterServer<D, N, S>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>>,
{
/// Get a reference to the router http delegate.
pub fn http(&self) -> &HttpDelegate<D> {
pub fn http(&self) -> &HttpDelegate<D, N> {
&self.http
}

View File

@ -22,7 +22,10 @@ use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use self::delete_predicate::parse_http_delete_request;
use crate::dml_handlers::{DmlError, DmlHandler, PartitionError, SchemaError};
use crate::{
dml_handlers::{DmlError, DmlHandler, PartitionError, SchemaError},
namespace_resolver::NamespaceResolver,
};
const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token";
@ -77,6 +80,13 @@ pub enum Error {
#[error("dml handler error: {0}")]
DmlHandler(#[from] DmlError),
/// An error that occurs when attempting to map the user-provided namespace
/// name into a [`NamespaceId`].
///
/// [`NamespaceId`]: data_types::NamespaceId
#[error("failed to resolve namespace ID: {0}")]
NamespaceResolver(#[from] crate::namespace_resolver::Error),
/// The router is currently servicing the maximum permitted number of
/// simultaneous requests.
#[error("this service is overloaded, please try again later")]
@ -103,6 +113,7 @@ impl Error {
StatusCode::UNSUPPORTED_MEDIA_TYPE
}
Error::DmlHandler(err) => StatusCode::from(err),
Error::NamespaceResolver(_) => StatusCode::INTERNAL_SERVER_ERROR,
Error::RequestLimit => StatusCode::SERVICE_UNAVAILABLE,
}
}
@ -128,9 +139,7 @@ impl From<&DmlError> for StatusCode {
StatusCode::INTERNAL_SERVER_ERROR
}
DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
DmlError::Internal(_) | DmlError::WriteBuffer(_) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::Partition(PartitionError::BatchWrite(_)) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
@ -216,9 +225,10 @@ impl<T> TryFrom<&Request<T>> for WriteInfo {
/// server runner framework takes care of implementing the heath endpoint,
/// metrics, pprof, etc.
#[derive(Debug)]
pub struct HttpDelegate<D, T = SystemProvider> {
pub struct HttpDelegate<D, N, T = SystemProvider> {
max_request_bytes: usize,
time_provider: T,
namespace_resolver: N,
dml_handler: Arc<D>,
// A request limiter to restrict the number of simultaneous requests this
@ -239,7 +249,7 @@ pub struct HttpDelegate<D, T = SystemProvider> {
request_limit_rejected: U64Counter,
}
impl<D> HttpDelegate<D, SystemProvider> {
impl<D, N> HttpDelegate<D, N, SystemProvider> {
/// Initialise a new [`HttpDelegate`] passing valid requests to the
/// specified `dml_handler`.
///
@ -248,6 +258,7 @@ impl<D> HttpDelegate<D, SystemProvider> {
pub fn new(
max_request_bytes: usize,
max_requests: usize,
namespace_resolver: N,
dml_handler: Arc<D>,
metrics: &metric::Registry,
) -> Self {
@ -297,6 +308,7 @@ impl<D> HttpDelegate<D, SystemProvider> {
Self {
max_request_bytes,
time_provider: SystemProvider::default(),
namespace_resolver,
dml_handler,
request_sem: Semaphore::new(max_requests),
write_metric_lines,
@ -310,9 +322,10 @@ impl<D> HttpDelegate<D, SystemProvider> {
}
}
impl<D, T> HttpDelegate<D, T>
impl<D, N, T> HttpDelegate<D, N, T>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary>,
N: NamespaceResolver,
T: TimeProvider,
{
/// Routes `req` to the appropriate handler, if any, returning the handler
@ -395,9 +408,12 @@ where
"routing write",
);
// Retrieve the namespace ID for this namespace.
let namespace_id = self.namespace_resolver.get_namespace_id(&namespace).await?;
let summary = self
.dml_handler
.write(&namespace, batches, span_ctx)
.write(&namespace, namespace_id, batches, span_ctx)
.await
.map_err(Into::into)?;
@ -522,6 +538,7 @@ mod tests {
use std::{io::Write, iter, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::NamespaceId;
use flate2::{write::GzEncoder, Compression};
use hyper::header::HeaderValue;
use metric::{Attributes, Metric};
@ -531,9 +548,13 @@ mod tests {
use tokio_stream::wrappers::ReceiverStream;
use super::*;
use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall};
use crate::{
dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall},
namespace_resolver::mock::MockNamespaceResolver,
};
const MAX_BYTES: usize = 1024;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
fn summary() -> WriteSummary {
WriteSummary::default()
@ -620,12 +641,20 @@ mod tests {
// encoding
test_http_handler!(encoding_header=$encoding, request);
let mock_namespace_resolver = MockNamespaceResolver::default()
.with_mapping("bananas_test", NAMESPACE_ID);
let dml_handler = Arc::new(MockDmlHandler::default()
.with_write_return($dml_write_handler)
.with_delete_return($dml_delete_handler)
);
let metrics = Arc::new(metric::Registry::default());
let delegate = HttpDelegate::new(MAX_BYTES, 100, Arc::clone(&dml_handler), &metrics);
let delegate = HttpDelegate::new(
MAX_BYTES,
100,
mock_namespace_resolver,
Arc::clone(&dml_handler),
&metrics
);
let got = delegate.route(request).await;
assert_matches!(got, $want_result);
@ -732,8 +761,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(),
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, "bananas_test");
assert_eq!(*namespace_id, NAMESPACE_ID);
let table = write_input.get("platanos").expect("table not found");
let ts = table.timestamp_summary().expect("no timestamp summary");
@ -747,8 +777,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(),
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, "bananas_test");
assert_eq!(*namespace_id, NAMESPACE_ID);
let table = write_input.get("platanos").expect("table not found");
let ts = table.timestamp_summary().expect("no timestamp summary");
@ -762,8 +793,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(),
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, "bananas_test");
assert_eq!(*namespace_id, NAMESPACE_ID);
let table = write_input.get("platanos").expect("table not found");
let ts = table.timestamp_summary().expect("no timestamp summary");
@ -777,8 +809,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, "bananas_test");
assert_eq!(*namespace_id, NAMESPACE_ID);
let table = write_input.get("platanos").expect("table not found");
let ts = table.timestamp_summary().expect("no timestamp summary");
@ -906,8 +939,9 @@ mod tests {
body = "test field=1u 100\ntest field=2u 100".as_bytes(),
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, "bananas_test");
assert_eq!(*namespace_id, NAMESPACE_ID);
let table = write_input.get("test").expect("table not in write");
let col = table.column("field").expect("column missing");
assert_matches!(col.data(), ColumnData::U64(data, _) => {
@ -1039,7 +1073,7 @@ mod tests {
body = "whydo InputPower=300i,InputPower=300i".as_bytes(),
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => {
assert_eq!(namespace, "bananas_test");
let table = write_input.get("whydo").expect("table not in write");
let col = table.column("InputPower").expect("column missing");
@ -1056,7 +1090,7 @@ mod tests {
body = "whydo InputPower=300i,InputPower=42i".as_bytes(),
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => {
assert_eq!(namespace, "bananas_test");
let table = write_input.get("whydo").expect("table not in write");
let col = table.column("InputPower").expect("column missing");
@ -1154,11 +1188,15 @@ mod tests {
// number of simultaneous requests are being serviced.
#[tokio::test]
async fn test_request_limit_enforced() {
let mock_namespace_resolver =
MockNamespaceResolver::default().with_mapping("bananas", NamespaceId::new(42));
let dml_handler = Arc::new(MockDmlHandler::default());
let metrics = Arc::new(metric::Registry::default());
let delegate = Arc::new(HttpDelegate::new(
MAX_BYTES,
1,
mock_namespace_resolver,
Arc::clone(&dml_handler),
&metrics,
));

View File

@ -10,11 +10,11 @@ use metric::{Attributes, DurationHistogram, Metric, Registry, U64Counter};
use mutable_batch::MutableBatch;
use router::{
dml_handlers::{
Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator,
NamespaceAutocreation, Partitioned, Partitioner, SchemaError, SchemaValidator,
ShardedWriteBuffer, WriteSummaryAdapter,
Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned,
Partitioner, SchemaError, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_resolver::{NamespaceAutocreation, NamespaceSchemaResolver},
server::http::HttpDelegate,
shard::Shard,
};
@ -46,16 +46,7 @@ pub struct TestContext {
type HttpDelegateStack = HttpDelegate<
InstrumentationDecorator<
Chain<
Chain<
Chain<
NamespaceAutocreation<
Arc<ShardedCache<Arc<MemoryNamespaceCache>>>,
HashMap<String, MutableBatch>,
>,
SchemaValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
>,
Partitioner,
>,
Chain<SchemaValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>, Partitioner>,
WriteSummaryAdapter<
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Shard>>>,
@ -64,6 +55,10 @@ type HttpDelegateStack = HttpDelegate<
>,
>,
>,
NamespaceAutocreation<
Arc<ShardedCache<Arc<MemoryNamespaceCache>>>,
NamespaceSchemaResolver<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
>,
>;
/// A [`router`] stack configured with the various DML handlers using mock
@ -95,29 +90,39 @@ impl TestContext {
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let ns_creator = NamespaceAutocreation::new(
Arc::clone(&catalog),
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let handler_stack =
schema_validator
.and_then(partitioner)
.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(
sharded_write_buffer,
)));
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);
let namespace_resolver =
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let namespace_resolver = NamespaceAutocreation::new(
namespace_resolver,
Arc::clone(&ns_cache),
Arc::clone(&catalog),
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
);
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache, &*metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let handler_stack = ns_creator
.and_then(schema_validator)
.and_then(partitioner)
.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(
sharded_write_buffer,
)));
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);
let delegate = HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics);
let delegate = HttpDelegate::new(
1024,
100,
namespace_resolver,
Arc::new(handler_stack),
&metrics,
);
Self {
delegate,