From d166de931d39c62664a1cee79330626f1ad1118a Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 26 Oct 2022 16:33:01 +0200 Subject: [PATCH] refactor: resolve namespace before DML dispatch This commit introduces a new (composable) trait; a NamespaceResolver is an abstraction responsible for taking a string namespace from a user request, and mapping to it's catalog ID. This allows the NamespaceId to be injected through the DmlHandler chain in addition to the namespace name. As part of this change, the NamespaceAutocreation layer was changed from an implementator of the DmlHandler trait, to a NamespaceResolver as it is a more appropriate abstraction for the functionality it provides. --- ioxd_router/src/lib.rs | 32 +-- router/benches/e2e.rs | 14 +- router/benches/schema_validator.rs | 6 +- router/src/dml_handlers/chain.rs | 7 +- router/src/dml_handlers/fan_out.rs | 9 +- router/src/dml_handlers/instrumentation.rs | 12 +- router/src/dml_handlers/mock.rs | 5 +- router/src/dml_handlers/mod.rs | 49 ++-- router/src/dml_handlers/nop.rs | 5 +- router/src/dml_handlers/partitioner.rs | 5 +- router/src/dml_handlers/schema_validation.rs | 58 +++-- .../src/dml_handlers/sharded_write_buffer.rs | 14 +- router/src/dml_handlers/trait.rs | 14 +- router/src/dml_handlers/write_summary.rs | 8 +- router/src/lib.rs | 89 ++++++- router/src/namespace_resolver.rs | 209 +++++++++++++++++ router/src/namespace_resolver/mock.rs | 45 ++++ .../src/namespace_resolver/ns_autocreation.rs | 221 ++++++++++++++++++ router/src/server.rs | 12 +- router/src/server/http.rs | 72 ++++-- router/tests/http.rs | 65 +++--- 21 files changed, 798 insertions(+), 153 deletions(-) create mode 100644 router/src/namespace_resolver.rs create mode 100644 router/src/namespace_resolver/mock.rs create mode 100644 router/src/namespace_resolver/ns_autocreation.rs diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index ba4e3777cb..8c23987422 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -18,13 +18,13 @@ use object_store::DynObjectStore; use observability_deps::tracing::info; use router::{ dml_handlers::{ - DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, - NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer, - WriteSummaryAdapter, + DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioner, + SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter, }, namespace_cache::{ metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache, }, + namespace_resolver::{NamespaceAutocreation, NamespaceResolver, NamespaceSchemaResolver}, server::{ grpc::{sharder::ShardService, GrpcDelegate}, http::HttpDelegate, @@ -66,14 +66,14 @@ pub enum Error { pub type Result = std::result::Result; -pub struct RouterServerType { - server: RouterServer, +pub struct RouterServerType { + server: RouterServer, shutdown: CancellationToken, trace_collector: Option>, } -impl RouterServerType { - pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self { +impl RouterServerType { + pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self { Self { server, shutdown: CancellationToken::new(), @@ -82,17 +82,18 @@ impl RouterServerType { } } -impl std::fmt::Debug for RouterServerType { +impl std::fmt::Debug for RouterServerType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Router") } } #[async_trait] -impl ServerType for RouterServerType +impl ServerType for RouterServerType where D: DmlHandler, WriteOutput = WriteSummary> + 'static, S: Sharder<(), Item = Arc> + Clone + 'static, + N: NamespaceResolver + 'static, { /// Return the [`metric::Registry`] used by the router. fn metric_registry(&self) -> Arc { @@ -210,6 +211,10 @@ pub async fn create_router_server_type( }); let partitioner = InstrumentationDecorator::new("partitioner", &*metrics, partitioner); + // Initialise the Namespace ID lookup + cache + let namespace_resolver = + NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + //////////////////////////////////////////////////////////////////////////// // // THIS CODE IS FOR TESTING ONLY. @@ -247,9 +252,10 @@ pub async fn create_router_server_type( }); txn.commit().await?; - let ns_creator = NamespaceAutocreation::new( + let namespace_resolver = NamespaceAutocreation::new( + namespace_resolver, + Arc::clone(&ns_cache), Arc::clone(&catalog), - ns_cache, topic_id, query_id, iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), @@ -262,8 +268,7 @@ pub async fn create_router_server_type( // Build the chain of DML handlers that forms the request processing // pipeline, starting with the namespace creator (for testing purposes) and // write partitioner that yields a set of partitioned batches. - let handler_stack = ns_creator - .and_then(schema_validator) + let handler_stack = schema_validator .and_then(partitioner) // Once writes have been partitioned, they are processed in parallel. // @@ -288,6 +293,7 @@ pub async fn create_router_server_type( let http = HttpDelegate::new( common_state.run_config().max_http_request_size, request_limit, + namespace_resolver, Arc::clone(&handler_stack), &metrics, ); diff --git a/router/benches/e2e.rs b/router/benches/e2e.rs index 321b6fbd6b..0935a3ed0f 100644 --- a/router/benches/e2e.rs +++ b/router/benches/e2e.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeSet, iter, sync::Arc}; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use data_types::{PartitionTemplate, TemplatePart}; +use data_types::{NamespaceId, PartitionTemplate, TemplatePart}; use hyper::{Body, Request}; use iox_catalog::{interface::Catalog, mem::MemCatalog}; use router::{ @@ -10,6 +10,7 @@ use router::{ WriteSummaryAdapter, }, namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_resolver::mock::MockNamespaceResolver, server::http::HttpDelegate, shard::Shard, }; @@ -74,7 +75,16 @@ fn e2e_benchmarks(c: &mut Criterion) { partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))), ); - HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics) + let namespace_resolver = + MockNamespaceResolver::default().with_mapping("bananas", NamespaceId::new(42)); + + HttpDelegate::new( + 1024, + 100, + namespace_resolver, + Arc::new(handler_stack), + &metrics, + ) }; let body_str = "platanos,tag1=A,tag2=B val=42i 123456"; diff --git a/router/benches/schema_validator.rs b/router/benches/schema_validator.rs index ee20f8abba..975060ae9b 100644 --- a/router/benches/schema_validator.rs +++ b/router/benches/schema_validator.rs @@ -4,7 +4,7 @@ use criterion::{ criterion_group, criterion_main, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion, Throughput, }; -use data_types::DatabaseName; +use data_types::{DatabaseName, NamespaceId}; use hashbrown::HashMap; use iox_catalog::mem::MemCatalog; use mutable_batch::MutableBatch; @@ -49,7 +49,7 @@ fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: for i in 0..65_000 { let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str()); - let _ = runtime().block_on(validator.write(&*NAMESPACE, write, None)); + let _ = runtime().block_on(validator.write(&*NAMESPACE, NamespaceId::new(42), write, None)); } let write = lp_to_writes(&generate_lp(tables, columns_per_table)); @@ -61,7 +61,7 @@ fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: group.bench_function(format!("{tables}x{columns_per_table}"), |b| { b.to_async(runtime()).iter_batched( || write.clone(), - |write| validator.write(&*NAMESPACE, write, None), + |write| validator.write(&*NAMESPACE, NamespaceId::new(42), write, None), BatchSize::SmallInput, ); }); diff --git a/router/src/dml_handlers/chain.rs b/router/src/dml_handlers/chain.rs index bf8b2f3619..2d6248667c 100644 --- a/router/src/dml_handlers/chain.rs +++ b/router/src/dml_handlers/chain.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; @@ -59,17 +59,18 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { let output = self .first - .write(namespace, input, span_ctx.clone()) + .write(namespace, namespace_id, input, span_ctx.clone()) .await .map_err(Into::into)?; self.second - .write(namespace, output, span_ctx) + .write(namespace, namespace_id, output, span_ctx) .await .map_err(Into::into) } diff --git a/router/src/dml_handlers/fan_out.rs b/router/src/dml_handlers/fan_out.rs index ec171e9fc7..5af7d966f7 100644 --- a/router/src/dml_handlers/fan_out.rs +++ b/router/src/dml_handlers/fan_out.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, marker::PhantomData}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use futures::{stream::FuturesUnordered, TryStreamExt}; use trace::ctx::SpanContext; @@ -50,6 +50,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { @@ -58,7 +59,11 @@ where .map(|v| { let namespace = namespace.clone(); let span_ctx = span_ctx.clone(); - async move { self.inner.write(&namespace, v, span_ctx).await } + async move { + self.inner + .write(&namespace, namespace_id, v, span_ctx) + .await + } }) .collect::>() .try_collect::>() diff --git a/router/src/dml_handlers/instrumentation.rs b/router/src/dml_handlers/instrumentation.rs index 3289cfd4d0..b919ac6904 100644 --- a/router/src/dml_handlers/instrumentation.rs +++ b/router/src/dml_handlers/instrumentation.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; use trace::{ @@ -68,6 +68,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { @@ -77,7 +78,10 @@ where let mut span_recorder = SpanRecorder::new(span_ctx.clone().map(|parent| parent.child(self.name))); - let res = self.inner.write(namespace, input, span_ctx).await; + let res = self + .inner + .write(namespace, namespace_id, input, span_ctx) + .await; // Avoid exploding if time goes backwards - simply drop the measurement // if it happens. @@ -202,7 +206,7 @@ mod tests { let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); decorator - .write(&ns, (), Some(span)) + .write(&ns, NamespaceId::new(42), (), Some(span)) .await .expect("inner handler configured to succeed"); @@ -225,7 +229,7 @@ mod tests { let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); let err = decorator - .write(&ns, (), Some(span)) + .write(&ns, NamespaceId::new(42), (), Some(span)) .await .expect_err("inner handler configured to fail"); diff --git a/router/src/dml_handlers/mock.rs b/router/src/dml_handlers/mock.rs index 50da8c5aa3..3cfee1e7ed 100644 --- a/router/src/dml_handlers/mock.rs +++ b/router/src/dml_handlers/mock.rs @@ -1,7 +1,7 @@ use std::{collections::VecDeque, fmt::Debug}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use parking_lot::Mutex; use trace::ctx::SpanContext; use write_summary::WriteSummary; @@ -14,6 +14,7 @@ use super::{DmlError, DmlHandler}; pub enum MockDmlHandlerCall { Write { namespace: String, + namespace_id: NamespaceId, write_input: W, }, Delete { @@ -102,6 +103,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, write_input: Self::WriteInput, _span_ctx: Option, ) -> Result { @@ -109,6 +111,7 @@ where self, MockDmlHandlerCall::Write { namespace: namespace.into(), + namespace_id, write_input, }, write_return diff --git a/router/src/dml_handlers/mod.rs b/router/src/dml_handlers/mod.rs index 3990606026..94eca94a78 100644 --- a/router/src/dml_handlers/mod.rs +++ b/router/src/dml_handlers/mod.rs @@ -4,27 +4,17 @@ //! processing handler chain: //! //! ```text -//! ┌──────────────┐ ┌──────────────┐ -//! │ HTTP API │ │ gRPC API │ -//! └──────────────┘ └──────────────┘ -//! │ │ -//! └─────────┬─────────┘ -//! │ -//! ▼ -//! ╔═ DmlHandler Stack ═════╗ +//! ┌─────────────────┐ +//! │ Namespace Cache │ +//! └─────────────────┘ +//! ╔═ DmlHandler Stack ═════╗ │ //! ║ ║ -//! ║ ┌──────────────────┐ ║ -//! ║ │ Namespace │ ║ -//! ║ │ Autocreation │─ ─ ─ ─ ─ ─ ─ ┐ -//! ║ └──────────────────┘ ║ -//! ║ │ ║ │ -//! ║ ▼ ║ //! ║ ┌──────────────────┐ ║ │ //! ║ │ Partitioner │ ║ //! ║ └──────────────────┘ ║ │ -//! ║ │ ║ ┌─────────────────┐ -//! ║ ▼ ║ │ Namespace Cache │ -//! ║ ┌──────────────────┐ ║ └─────────────────┘ +//! ║ │ ║ +//! ║ ▼ ║ │ +//! ║ ┌──────────────────┐ ║ //! ║ │ Schema │ ║ │ //! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─ //! ║ └──────────────────┘ ║ @@ -49,23 +39,17 @@ //! └──────────────┘ //! ``` //! -//! The HTTP / gRPC APIs decode their respective request format and funnel the -//! resulting operation through the common [`DmlHandler`] composed of the layers -//! described above. +//! The HTTP API decodes the request and funnels the resulting operation through +//! the [`DmlHandler`] stack composed of the layers described above. //! -//! The [`NamespaceAutocreation`] handler (for testing only) populates the -//! global catalog with an entry for each namespace it observes, using the -//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending -//! requests to the catalog for namespaces that are known to exist. -//! -//! Incoming line-protocol writes then pass through the [`Partitioner`], parsing -//! the LP and splitting them into batches per partition, before passing each +//! Incoming line-protocol writes pass through the [`Partitioner`], parsing the +//! LP and splitting them into batches per IOx partition, before passing each //! partitioned batch through the rest of the request pipeline. //! -//! Writes then pass through the [`SchemaValidator`] applying schema enforcement -//! (a NOP layer for deletes) which pushes additive schema changes to the -//! catalog and populates the [`NamespaceCache`], converging it to match the set -//! of [`NamespaceSchema`] in the global catalog. +//! Writes then pass through the [`SchemaValidator`] applying schema & limit +//! enforcement (a NOP layer for deletes) which pushes additive schema changes +//! to the catalog and populates the [`NamespaceCache`], converging it to match +//! the set of [`NamespaceSchema`] in the global catalog. //! //! The [`ShardedWriteBuffer`] uses a sharder implementation to direct the DML //! operations into a fixed set of shards. @@ -84,9 +68,6 @@ pub mod nop; mod sharded_write_buffer; pub use sharded_write_buffer::*; -mod ns_autocreation; -pub use ns_autocreation::*; - mod partitioner; pub use partitioner::*; diff --git a/router/src/dml_handlers/nop.rs b/router/src/dml_handlers/nop.rs index cdad61186a..f3299fbe1f 100644 --- a/router/src/dml_handlers/nop.rs +++ b/router/src/dml_handlers/nop.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, marker::PhantomData}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use observability_deps::tracing::*; use trace::ctx::SpanContext; @@ -32,10 +32,11 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, batches: Self::WriteInput, _span_ctx: Option, ) -> Result { - info!(%namespace, ?batches, "dropping write operation"); + info!(%namespace, %namespace_id, ?batches, "dropping write operation"); Ok(batches) } diff --git a/router/src/dml_handlers/partitioner.rs b/router/src/dml_handlers/partitioner.rs index 099ffbff11..0bd948b413 100644 --- a/router/src/dml_handlers/partitioner.rs +++ b/router/src/dml_handlers/partitioner.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, PartitionKey, PartitionTemplate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate}; use hashbrown::HashMap; use mutable_batch::{MutableBatch, PartitionWrite, WritePayload}; use observability_deps::tracing::*; @@ -71,6 +71,7 @@ impl DmlHandler for Partitioner { async fn write( &self, _namespace: &DatabaseName<'static>, + _namespace_id: NamespaceId, batch: Self::WriteInput, _span_ctx: Option, ) -> Result { @@ -145,7 +146,7 @@ mod tests { let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIMESTAMP_NANOS).expect("failed to parse test LP"); - let handler_ret = partitioner.write(&ns, writes, None).await; + let handler_ret = partitioner.write(&ns, NamespaceId::new(42), writes, None).await; assert_matches!(handler_ret, $($want_handler_ret)+); // Check the partition -> table mapping. diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index f649f6abe5..8a2660292a 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -1,7 +1,7 @@ use std::{ops::DerefMut, sync::Arc}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, NamespaceSchema}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema}; use hashbrown::HashMap; use iox_catalog::{ interface::{get_schema_by_name, Catalog, Error as CatalogError}, @@ -167,6 +167,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, batches: Self::WriteInput, _span_ctx: Option, ) -> Result { @@ -183,7 +184,12 @@ where let schema = get_schema_by_name(namespace, repos.deref_mut()) .await .map_err(|e| { - warn!(error=%e, %namespace, "failed to retrieve namespace schema"); + warn!( + error=%e, + %namespace, + %namespace_id, + "failed to retrieve namespace schema" + ); SchemaError::NamespaceLookup(e) }) .map(Arc::new)?; @@ -197,7 +203,12 @@ where }; validate_column_limits(&batches, &schema).map_err(|e| { - warn!(%namespace, error=%e, "service protection limit reached"); + warn!( + %namespace, + %namespace_id, + error=%e, + "service protection limit reached" + ); self.service_limit_hit.inc(1); SchemaError::ServiceLimit(Box::new(e)) })?; @@ -218,6 +229,7 @@ where } => { warn!( %namespace, + %namespace_id, column_name=%name, existing_column_type=%existing, request_column_type=%new, @@ -230,12 +242,22 @@ where // Service limits CatalogError::ColumnCreateLimitError { .. } | CatalogError::TableCreateLimitError { .. } => { - warn!(%namespace, error=%e, "service protection limit reached"); + warn!( + %namespace, + %namespace_id, + error=%e, + "service protection limit reached" + ); self.service_limit_hit.inc(1); SchemaError::ServiceLimit(Box::new(e.into_err())) } _ => { - error!(%namespace, error=%e, "schema validation failed"); + error!( + %namespace, + %namespace_id, + error=%e, + "schema validation failed" + ); SchemaError::UnexpectedCatalogError(e.into_err()) } } @@ -453,12 +475,12 @@ mod tests { // namespace schema gets cached let writes1_valid = lp_to_writes("dragonfruit val=42i 123456"); handler1 - .write(&*NAMESPACE, writes1_valid, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes1_valid, None) .await .expect("request should succeed"); let writes2_valid = lp_to_writes("dragonfruit val=43i 123457"); handler2 - .write(&*NAMESPACE, writes2_valid, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes2_valid, None) .await .expect("request should succeed"); @@ -466,12 +488,12 @@ mod tests { // putting the table over the limit let writes1_add_column = lp_to_writes("dragonfruit,tag1=A val=42i 123456"); handler1 - .write(&*NAMESPACE, writes1_add_column, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes1_add_column, None) .await .expect("request should succeed"); let writes2_add_column = lp_to_writes("dragonfruit,tag2=B val=43i 123457"); handler2 - .write(&*NAMESPACE, writes2_add_column, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes2_add_column, None) .await .expect("request should succeed"); @@ -542,7 +564,7 @@ mod tests { let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect("request should succeed"); @@ -567,7 +589,7 @@ mod tests { let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); let err = handler - .write(&ns, writes, None) + .write(&ns, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -590,7 +612,7 @@ mod tests { // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64 let got = handler - .write(&*NAMESPACE, writes.clone(), None) + .write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None) .await .expect("request should succeed"); assert_eq!(writes.len(), got.len()); @@ -598,7 +620,7 @@ mod tests { // Second write attempts to violate it causing an error let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42.0 123456"); // val=float let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -628,7 +650,7 @@ mod tests { // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); let got = handler - .write(&*NAMESPACE, writes.clone(), None) + .write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None) .await .expect("request should succeed"); assert_eq!(writes.len(), got.len()); @@ -646,7 +668,7 @@ mod tests { // Second write attempts to violate limits, causing an error let writes = lp_to_writes("bananas2,tag1=A,tag2=B val=42i 123456"); let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -667,7 +689,7 @@ mod tests { // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); let got = handler - .write(&*NAMESPACE, writes.clone(), None) + .write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None) .await .expect("request should succeed"); assert_eq!(writes.len(), got.len()); @@ -683,7 +705,7 @@ mod tests { // Second write attempts to violate limits, causing an error let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456"); let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -714,7 +736,7 @@ mod tests { // First write attempts to add columns over the limit, causing an error let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456"); let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); diff --git a/router/src/dml_handlers/sharded_write_buffer.rs b/router/src/dml_handlers/sharded_write_buffer.rs index be5c34d9e1..11ef89b132 100644 --- a/router/src/dml_handlers/sharded_write_buffer.rs +++ b/router/src/dml_handlers/sharded_write_buffer.rs @@ -6,7 +6,7 @@ use std::{ }; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, NonEmptyString}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use futures::{stream::FuturesUnordered, StreamExt}; use hashbrown::HashMap; @@ -94,6 +94,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, writes: Self::WriteInput, span_ctx: Option, ) -> Result { @@ -129,6 +130,7 @@ where kafka_partition=%shard.shard_index(), tables=%dml.table_count(), %namespace, + %namespace_id, approx_size=%dml.size(), "routing writes to shard" ); @@ -274,7 +276,9 @@ mod tests { // Call the ShardedWriteBuffer and drive the test let ns = DatabaseName::new("bananas").unwrap(); - w.write(&ns, writes, None).await.expect("write failed"); + w.write(&ns, NamespaceId::new(42), writes, None) + .await + .expect("write failed"); // Assert the sharder saw all the tables let calls = sharder.calls(); @@ -340,7 +344,9 @@ mod tests { // Call the ShardedWriteBuffer and drive the test let ns = DatabaseName::new("bananas").unwrap(); - w.write(&ns, writes, None).await.expect("write failed"); + w.write(&ns, NamespaceId::new(42), writes, None) + .await + .expect("write failed"); // Assert the sharder saw all the tables let calls = sharder.calls(); @@ -417,7 +423,7 @@ mod tests { // Call the ShardedWriteBuffer and drive the test let ns = DatabaseName::new("bananas").unwrap(); let err = w - .write(&ns, writes, None) + .write(&ns, NamespaceId::new(42), writes, None) .await .expect_err("write should return a failure"); assert_matches!(err, ShardError::WriteBufferErrors{successes, errs} => { diff --git a/router/src/dml_handlers/trait.rs b/router/src/dml_handlers/trait.rs index 784ea82584..5e75ab3788 100644 --- a/router/src/dml_handlers/trait.rs +++ b/router/src/dml_handlers/trait.rs @@ -1,11 +1,11 @@ use std::{error::Error, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use thiserror::Error; use trace::ctx::SpanContext; -use super::{partitioner::PartitionError, NamespaceCreationError, SchemaError, ShardError}; +use super::{partitioner::PartitionError, SchemaError, ShardError}; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -23,10 +23,6 @@ pub enum DmlError { #[error(transparent)] Schema(#[from] SchemaError), - /// Failed to create the request namespace. - #[error(transparent)] - NamespaceCreation(#[from] NamespaceCreationError), - /// An error partitioning the request. #[error(transparent)] Partition(#[from] PartitionError), @@ -63,6 +59,7 @@ pub trait DmlHandler: Debug + Send + Sync { async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result; @@ -90,10 +87,13 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { - (**self).write(namespace, input, span_ctx).await + (**self) + .write(namespace, namespace_id, input, span_ctx) + .await } /// Delete the data specified in `delete`. diff --git a/router/src/dml_handlers/write_summary.rs b/router/src/dml_handlers/write_summary.rs index eefa1d6594..86f12f06c5 100644 --- a/router/src/dml_handlers/write_summary.rs +++ b/router/src/dml_handlers/write_summary.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use dml::DmlMeta; use trace::ctx::SpanContext; use write_summary::WriteSummary; @@ -40,10 +40,14 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { - let metas = self.inner.write(namespace, input, span_ctx).await?; + let metas = self + .inner + .write(namespace, namespace_id, input, span_ctx) + .await?; Ok(WriteSummary::new(metas)) } diff --git a/router/src/lib.rs b/router/src/lib.rs index 24cf7c77d0..3d17dd5937 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -1,14 +1,96 @@ -//! IOx router role implementation. +//! IOx router implementation. //! //! An IOx router is responsible for: //! //! * Creating IOx namespaces & synchronising them within the catalog. //! * Handling writes: -//! * Receiving IOx write/delete requests via HTTP and gRPC endpoints. +//! * Receiving IOx write/delete requests via HTTP. //! * Enforcing schema validation & synchronising it within the catalog. //! * Deriving the partition key of each DML operation. //! * Applying sharding logic. -//! * Push resulting operations into the appropriate shards (Kafka partitions if using Kafka). +//! * Push resulting operations into the appropriate shards (Kafka +//! partitions if using Kafka). +//! +//! The router is composed of singly-responsible components that each apply a +//! transformation to the request: +//! +//! ```text +//! ┌──────────────┐ +//! │ HTTP API │ +//! └──────────────┘ +//! │ +//! │ +//! ▼ +//! ╔═ NamespaceResolver ════╗ +//! ║ ║ +//! ║ ┌──────────────────┐ ║ +//! ║ │ Namespace │ ║ +//! ║ │ Autocreation │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ +//! ║ ▼ ║ │ +//! ║ ┌──────────────────┐ ║ +//! ║ │ NamespaceSchema │ ║ │ +//! ║ │ Resolver │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ +//! ╚════════════│═══════════╝ │ +//! │ ┌─────────────────┐ +//! │ │ Namespace Cache │ +//! │ └─────────────────┘ +//! ╔═ DmlHandler│Stack ═════╗ │ +//! ║ ▼ ║ +//! ║ ┌──────────────────┐ ║ │ +//! ║ │ Partitioner │ ║ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ +//! ║ ▼ ║ │ +//! ║ ┌──────────────────┐ ║ +//! ║ │ Schema │ ║ │ +//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ║ ▼ ║ +//! ┌───────┐ ║ ┌──────────────────┐ ║ +//! │Sharder│◀ ─ ─ ▶│ShardedWriteBuffer│ ║ +//! └───────┘ ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ╚════════════│═══════════╝ +//! │ +//! ▼ +//! ┌──────────────┐ +//! │ Write Buffer │ +//! └──────────────┘ +//! │ +//! │ +//! ┌────────▼─────┐ +//! │ Kafka ├┐ +//! └┬─────────────┘├┐ +//! └┬─────────────┘│ +//! └──────────────┘ +//! ``` +//! +//! The [`NamespaceAutocreation`] handler (for testing only) populates the +//! global catalog with an entry for each namespace it observes, using the +//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending +//! requests to the catalog for namespaces that are known to exist. +//! +//! A [`NamespaceResolver`] maps a user-provided namespace string to the +//! catalog ID ([`NamespaceId`]). The [`NamespaceSchemaResolver`] achieves this +//! by inspecting the contents of the [`NamespaceCache`]. If a cache-miss +//! occurs the [`NamespaceSchemaResolver`] queries the catalog and populates the +//! cache for subsequent requests. +//! +//! Once the [`NamespaceId`] has been resolved, the request is passed into the +//! [`DmlHandler`] stack. +//! +//! [`NamespaceAutocreation`]: crate::namespace_resolver::NamespaceAutocreation +//! [`NamespaceSchemaResolver`]: crate::namespace_resolver::NamespaceSchemaResolver +//! [`NamespaceResolver`]: crate::namespace_resolver +//! [`NamespaceId`]: data_types::NamespaceId +//! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache +//! [`NamespaceSchema`]: data_types::NamespaceSchema +//! [`DmlHandler`]: crate::dml_handlers #![deny( rustdoc::broken_intra_doc_links, @@ -28,5 +110,6 @@ pub mod dml_handlers; pub mod namespace_cache; +pub mod namespace_resolver; pub mod server; pub mod shard; diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs new file mode 100644 index 0000000000..da1801a11f --- /dev/null +++ b/router/src/namespace_resolver.rs @@ -0,0 +1,209 @@ +//! An trait to abstract resolving a[`DatabaseName`] to [`NamespaceId`], and a +//! collection of composable implementations. + +use std::{ops::DerefMut, sync::Arc}; + +use async_trait::async_trait; +use data_types::{DatabaseName, NamespaceId}; +use iox_catalog::interface::{get_schema_by_name, Catalog}; +use observability_deps::tracing::*; +use thiserror::Error; + +use crate::namespace_cache::NamespaceCache; + +pub mod mock; +mod ns_autocreation; +pub use ns_autocreation::*; + +/// Error states encountered during [`NamespaceId`] lookup. +#[derive(Debug, Error)] +pub enum Error { + /// An error occured when attempting to fetch the namespace ID. + #[error("failed to resolve namespace ID: {0}")] + Lookup(iox_catalog::interface::Error), + + /// An error state for errors returned by [`NamespaceAutocreation`]. + #[error(transparent)] + Create(#[from] NamespaceCreationError), +} + +/// An abstract resolver of [`DatabaseName`] to [`NamespaceId`]. +#[async_trait] +pub trait NamespaceResolver: std::fmt::Debug + Send + Sync { + /// Return the [`NamespaceId`] for the given [`DatabaseName`]. + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result; +} + +/// An implementation of [`NamespaceResolver`] that queries the [`Catalog`] to +/// resolve a [`NamespaceId`], and populates the [`NamespaceCache`] as a side +/// effect. +#[derive(Debug)] +pub struct NamespaceSchemaResolver { + catalog: Arc, + cache: C, +} + +impl NamespaceSchemaResolver { + /// Construct a new [`NamespaceSchemaResolver`] that fetches schemas from + /// `catalog` and caches them in `cache`. + pub fn new(catalog: Arc, cache: C) -> Self { + Self { catalog, cache } + } +} + +#[async_trait] +impl NamespaceResolver for NamespaceSchemaResolver +where + C: NamespaceCache, +{ + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result { + // Load the namespace schema from the cache, falling back to pulling it + // from the global catalog (if it exists). + match self.cache.get_schema(namespace) { + Some(v) => Ok(v.id), + None => { + let mut repos = self.catalog.repositories().await; + + // Pull the schema from the global catalog or error if it does + // not exist. + let schema = get_schema_by_name(namespace, repos.deref_mut()) + .await + .map_err(|e| { + warn!( + error=%e, + %namespace, + "failed to retrieve namespace schema" + ); + Error::Lookup(e) + }) + .map(Arc::new)?; + + // Cache population MAY race with other threads and lead to + // overwrites, but an entry will always exist once inserted, and + // the schemas will eventually converge. + self.cache + .put_schema(namespace.clone(), Arc::clone(&schema)); + + trace!(%namespace, "schema cache populated"); + Ok(schema.id) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId}; + use iox_catalog::mem::MemCatalog; + + use super::*; + use crate::namespace_cache::MemoryNamespaceCache; + + #[tokio::test] + async fn test_cache_hit() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + // Prep the cache before the test to cause a hit + let cache = Arc::new(MemoryNamespaceCache::default()); + cache.put_schema( + ns.clone(), + NamespaceSchema { + id: NamespaceId::new(42), + topic_id: TopicId::new(2), + query_pool_id: QueryPoolId::new(3), + tables: Default::default(), + max_columns_per_table: 4, + }, + ); + + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + + // Drive the code under test + resolver + .get_namespace_id(&ns) + .await + .expect("lookup should succeed"); + + assert!(cache.get_schema(&ns).is_some()); + + // The cache hit should mean the catalog SHOULD NOT see a create request + // for the namespace. + let mut repos = catalog.repositories().await; + assert!( + repos + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .is_none(), + "expected no request to the catalog" + ); + } + + #[tokio::test] + async fn test_cache_miss() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + // Create the namespace in the catalog + { + let mut repos = catalog.repositories().await; + let topic = repos.topics().create_or_get("bananas").await.unwrap(); + let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap(); + repos + .namespaces() + .create( + &ns, + iox_catalog::INFINITE_RETENTION_POLICY, + topic.id, + query_pool.id, + ) + .await + .expect("failed to setup catalog state"); + } + + let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + + resolver + .get_namespace_id(&ns) + .await + .expect("lookup should succeed"); + + // The cache should be populated as a result of the lookup. + assert!(cache.get_schema(&ns).is_some()); + } + + #[tokio::test] + async fn test_cache_miss_does_not_exist() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + + let err = resolver + .get_namespace_id(&ns) + .await + .expect_err("lookup should error"); + + assert_matches!(err, Error::Lookup(_)); + assert!(cache.get_schema(&ns).is_none()); + } +} diff --git a/router/src/namespace_resolver/mock.rs b/router/src/namespace_resolver/mock.rs new file mode 100644 index 0000000000..d90ad3afd3 --- /dev/null +++ b/router/src/namespace_resolver/mock.rs @@ -0,0 +1,45 @@ +//! A mock implementation of [`NamespaceResolver`]. + +#![allow(missing_docs)] + +use std::collections::HashMap; + +use async_trait::async_trait; +use data_types::{DatabaseName, NamespaceId}; +use parking_lot::Mutex; + +use super::NamespaceResolver; + +#[derive(Debug, Default)] +pub struct MockNamespaceResolver { + map: Mutex, NamespaceId>>, +} + +impl MockNamespaceResolver { + pub fn new(map: HashMap, NamespaceId>) -> Self { + Self { + map: Mutex::new(map), + } + } + + pub fn with_mapping(self, name: impl Into + 'static, id: NamespaceId) -> Self { + let name = DatabaseName::try_from(name.into()).unwrap(); + assert!(self.map.lock().insert(name, id).is_none()); + self + } +} + +#[async_trait] +impl NamespaceResolver for MockNamespaceResolver { + /// Return the [`NamespaceId`] for the given [`DatabaseName`]. + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result { + Ok(*self + .map + .lock() + .get(namespace) + .expect("mock namespace resolver does not have ID")) + } +} diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs new file mode 100644 index 0000000000..180f4d1a49 --- /dev/null +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -0,0 +1,221 @@ +use std::{fmt::Debug, sync::Arc}; + +use async_trait::async_trait; +use data_types::{DatabaseName, NamespaceId, QueryPoolId, TopicId}; +use iox_catalog::interface::Catalog; +use observability_deps::tracing::*; +use thiserror::Error; + +use super::NamespaceResolver; +use crate::namespace_cache::NamespaceCache; + +/// An error auto-creating the request namespace. +#[derive(Debug, Error)] +pub enum NamespaceCreationError { + /// An error returned from a namespace creation request. + #[error("failed to create namespace: {0}")] + Create(iox_catalog::interface::Error), +} + +/// A layer to populate the [`Catalog`] with all the namespaces the router +/// observes. +/// +/// Uses a [`NamespaceCache`] to limit issuing create requests to namespaces the +/// router has not yet observed a schema for. +#[derive(Debug)] +pub struct NamespaceAutocreation { + inner: T, + cache: C, + catalog: Arc, + + topic_id: TopicId, + query_id: QueryPoolId, + retention: String, +} + +impl NamespaceAutocreation { + /// Return a new [`NamespaceAutocreation`] layer that ensures a requested + /// namespace exists in `catalog`. + /// + /// If the namespace does not exist, it is created with the specified + /// `topic_id`, `query_id` and `retention` policy. + /// + /// Namespaces are looked up in `cache`, skipping the creation request to + /// the catalog if there's a hit. + pub fn new( + inner: T, + cache: C, + catalog: Arc, + topic_id: TopicId, + query_id: QueryPoolId, + retention: String, + ) -> Self { + Self { + inner, + cache, + catalog, + topic_id, + query_id, + retention, + } + } +} + +#[async_trait] +impl NamespaceResolver for NamespaceAutocreation +where + C: NamespaceCache, + T: NamespaceResolver, +{ + /// Force the creation of `namespace` if it does not already exist in the + /// cache, before passing the request through to the inner delegate. + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result { + if self.cache.get_schema(namespace).is_none() { + trace!(%namespace, "namespace auto-create cache miss"); + + let mut repos = self.catalog.repositories().await; + + match repos + .namespaces() + .create( + namespace.as_str(), + &self.retention, + self.topic_id, + self.query_id, + ) + .await + { + Ok(_) => { + debug!(%namespace, "created namespace"); + } + Err(iox_catalog::interface::Error::NameExists { .. }) => { + // Either the cache has not yet converged to include this + // namespace, or another thread raced populating the catalog + // and beat this thread to it. + debug!(%namespace, "spurious namespace create failed"); + } + Err(e) => { + error!(error=%e, %namespace, "failed to auto-create namespace"); + return Err(NamespaceCreationError::Create(e).into()); + } + } + } + + self.inner.get_namespace_id(namespace).await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use data_types::{Namespace, NamespaceId, NamespaceSchema}; + use iox_catalog::mem::MemCatalog; + + use super::*; + use crate::{ + namespace_cache::MemoryNamespaceCache, namespace_resolver::mock::MockNamespaceResolver, + }; + + #[tokio::test] + async fn test_cache_hit() { + const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + + let ns = DatabaseName::try_from("bananas").unwrap(); + + // Prep the cache before the test to cause a hit + let cache = Arc::new(MemoryNamespaceCache::default()); + cache.put_schema( + ns.clone(), + NamespaceSchema { + id: NAMESPACE_ID, + topic_id: TopicId::new(2), + query_pool_id: QueryPoolId::new(3), + tables: Default::default(), + max_columns_per_table: 4, + }, + ); + + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let creator = NamespaceAutocreation::new( + MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID), + cache, + Arc::clone(&catalog), + TopicId::new(42), + QueryPoolId::new(42), + "inf".to_owned(), + ); + + // Drive the code under test + let got = creator + .get_namespace_id(&ns) + .await + .expect("handler should succeed"); + assert_eq!(got, NAMESPACE_ID); + + // The cache hit should mean the catalog SHOULD NOT see a create request + // for the namespace. + let mut repos = catalog.repositories().await; + assert!( + repos + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .is_none(), + "expected no request to the catalog" + ); + } + + #[tokio::test] + async fn test_cache_miss() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let creator = NamespaceAutocreation::new( + MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)), + cache, + Arc::clone(&catalog), + TopicId::new(42), + QueryPoolId::new(42), + "inf".to_owned(), + ); + + let created_id = creator + .get_namespace_id(&ns) + .await + .expect("handler should succeed"); + + // The cache miss should mean the catalog MUST see a create request for + // the namespace. + let mut repos = catalog.repositories().await; + let got = repos + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .expect("creation request should be sent to catalog"); + + assert_eq!(got.id, created_id); + assert_eq!( + got, + Namespace { + id: NamespaceId::new(1), + name: ns.to_string(), + retention_duration: Some("inf".to_owned()), + topic_id: TopicId::new(42), + query_pool_id: QueryPoolId::new(42), + max_tables: iox_catalog::DEFAULT_MAX_TABLES, + max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, + } + ); + } +} diff --git a/router/src/server.rs b/router/src/server.rs index 10e3213c32..9e9b71489e 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -15,19 +15,19 @@ pub mod http; /// The [`RouterServer`] manages the lifecycle and contains all state for a /// `router` server instance. #[derive(Debug)] -pub struct RouterServer { +pub struct RouterServer { metrics: Arc, trace_collector: Option>, - http: HttpDelegate, + http: HttpDelegate, grpc: GrpcDelegate, } -impl RouterServer { +impl RouterServer { /// Initialise a new [`RouterServer`] using the provided HTTP and gRPC /// handlers. pub fn new( - http: HttpDelegate, + http: HttpDelegate, grpc: GrpcDelegate, metrics: Arc, trace_collector: Option>, @@ -51,12 +51,12 @@ impl RouterServer { } } -impl RouterServer +impl RouterServer where D: DmlHandler>, { /// Get a reference to the router http delegate. - pub fn http(&self) -> &HttpDelegate { + pub fn http(&self) -> &HttpDelegate { &self.http } diff --git a/router/src/server/http.rs b/router/src/server/http.rs index dbae3e6be8..8206e7d783 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -22,7 +22,10 @@ use trace::ctx::SpanContext; use write_summary::WriteSummary; use self::delete_predicate::parse_http_delete_request; -use crate::dml_handlers::{DmlError, DmlHandler, PartitionError, SchemaError}; +use crate::{ + dml_handlers::{DmlError, DmlHandler, PartitionError, SchemaError}, + namespace_resolver::NamespaceResolver, +}; const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token"; @@ -77,6 +80,13 @@ pub enum Error { #[error("dml handler error: {0}")] DmlHandler(#[from] DmlError), + /// An error that occurs when attempting to map the user-provided namespace + /// name into a [`NamespaceId`]. + /// + /// [`NamespaceId`]: data_types::NamespaceId + #[error("failed to resolve namespace ID: {0}")] + NamespaceResolver(#[from] crate::namespace_resolver::Error), + /// The router is currently servicing the maximum permitted number of /// simultaneous requests. #[error("this service is overloaded, please try again later")] @@ -103,6 +113,7 @@ impl Error { StatusCode::UNSUPPORTED_MEDIA_TYPE } Error::DmlHandler(err) => StatusCode::from(err), + Error::NamespaceResolver(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::RequestLimit => StatusCode::SERVICE_UNAVAILABLE, } } @@ -128,9 +139,7 @@ impl From<&DmlError> for StatusCode { StatusCode::INTERNAL_SERVER_ERROR } - DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } + DmlError::Internal(_) | DmlError::WriteBuffer(_) => StatusCode::INTERNAL_SERVER_ERROR, DmlError::Partition(PartitionError::BatchWrite(_)) => StatusCode::INTERNAL_SERVER_ERROR, } } @@ -216,9 +225,10 @@ impl TryFrom<&Request> for WriteInfo { /// server runner framework takes care of implementing the heath endpoint, /// metrics, pprof, etc. #[derive(Debug)] -pub struct HttpDelegate { +pub struct HttpDelegate { max_request_bytes: usize, time_provider: T, + namespace_resolver: N, dml_handler: Arc, // A request limiter to restrict the number of simultaneous requests this @@ -239,7 +249,7 @@ pub struct HttpDelegate { request_limit_rejected: U64Counter, } -impl HttpDelegate { +impl HttpDelegate { /// Initialise a new [`HttpDelegate`] passing valid requests to the /// specified `dml_handler`. /// @@ -248,6 +258,7 @@ impl HttpDelegate { pub fn new( max_request_bytes: usize, max_requests: usize, + namespace_resolver: N, dml_handler: Arc, metrics: &metric::Registry, ) -> Self { @@ -297,6 +308,7 @@ impl HttpDelegate { Self { max_request_bytes, time_provider: SystemProvider::default(), + namespace_resolver, dml_handler, request_sem: Semaphore::new(max_requests), write_metric_lines, @@ -310,9 +322,10 @@ impl HttpDelegate { } } -impl HttpDelegate +impl HttpDelegate where D: DmlHandler, WriteOutput = WriteSummary>, + N: NamespaceResolver, T: TimeProvider, { /// Routes `req` to the appropriate handler, if any, returning the handler @@ -395,9 +408,12 @@ where "routing write", ); + // Retrieve the namespace ID for this namespace. + let namespace_id = self.namespace_resolver.get_namespace_id(&namespace).await?; + let summary = self .dml_handler - .write(&namespace, batches, span_ctx) + .write(&namespace, namespace_id, batches, span_ctx) .await .map_err(Into::into)?; @@ -522,6 +538,7 @@ mod tests { use std::{io::Write, iter, sync::Arc, time::Duration}; use assert_matches::assert_matches; + use data_types::NamespaceId; use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; use metric::{Attributes, Metric}; @@ -531,9 +548,13 @@ mod tests { use tokio_stream::wrappers::ReceiverStream; use super::*; - use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; + use crate::{ + dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}, + namespace_resolver::mock::MockNamespaceResolver, + }; const MAX_BYTES: usize = 1024; + const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); fn summary() -> WriteSummary { WriteSummary::default() @@ -620,12 +641,20 @@ mod tests { // encoding test_http_handler!(encoding_header=$encoding, request); + let mock_namespace_resolver = MockNamespaceResolver::default() + .with_mapping("bananas_test", NAMESPACE_ID); let dml_handler = Arc::new(MockDmlHandler::default() .with_write_return($dml_write_handler) .with_delete_return($dml_delete_handler) ); let metrics = Arc::new(metric::Registry::default()); - let delegate = HttpDelegate::new(MAX_BYTES, 100, Arc::clone(&dml_handler), &metrics); + let delegate = HttpDelegate::new( + MAX_BYTES, + 100, + mock_namespace_resolver, + Arc::clone(&dml_handler), + &metrics + ); let got = delegate.route(request).await; assert_matches!(got, $want_result); @@ -732,8 +761,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -747,8 +777,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -762,8 +793,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -777,8 +809,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -906,8 +939,9 @@ mod tests { body = "test field=1u 100\ntest field=2u 100".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("test").expect("table not in write"); let col = table.column("field").expect("column missing"); assert_matches!(col.data(), ColumnData::U64(data, _) => { @@ -1039,7 +1073,7 @@ mod tests { body = "whydo InputPower=300i,InputPower=300i".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => { assert_eq!(namespace, "bananas_test"); let table = write_input.get("whydo").expect("table not in write"); let col = table.column("InputPower").expect("column missing"); @@ -1056,7 +1090,7 @@ mod tests { body = "whydo InputPower=300i,InputPower=42i".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => { assert_eq!(namespace, "bananas_test"); let table = write_input.get("whydo").expect("table not in write"); let col = table.column("InputPower").expect("column missing"); @@ -1154,11 +1188,15 @@ mod tests { // number of simultaneous requests are being serviced. #[tokio::test] async fn test_request_limit_enforced() { + let mock_namespace_resolver = + MockNamespaceResolver::default().with_mapping("bananas", NamespaceId::new(42)); + let dml_handler = Arc::new(MockDmlHandler::default()); let metrics = Arc::new(metric::Registry::default()); let delegate = Arc::new(HttpDelegate::new( MAX_BYTES, 1, + mock_namespace_resolver, Arc::clone(&dml_handler), &metrics, )); diff --git a/router/tests/http.rs b/router/tests/http.rs index f990bac483..f4698c766a 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -10,11 +10,11 @@ use metric::{Attributes, DurationHistogram, Metric, Registry, U64Counter}; use mutable_batch::MutableBatch; use router::{ dml_handlers::{ - Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, - NamespaceAutocreation, Partitioned, Partitioner, SchemaError, SchemaValidator, - ShardedWriteBuffer, WriteSummaryAdapter, + Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned, + Partitioner, SchemaError, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter, }, namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_resolver::{NamespaceAutocreation, NamespaceSchemaResolver}, server::http::HttpDelegate, shard::Shard, }; @@ -46,16 +46,7 @@ pub struct TestContext { type HttpDelegateStack = HttpDelegate< InstrumentationDecorator< Chain< - Chain< - Chain< - NamespaceAutocreation< - Arc>>, - HashMap, - >, - SchemaValidator>>>, - >, - Partitioner, - >, + Chain>>>, Partitioner>, WriteSummaryAdapter< FanOutAdaptor< ShardedWriteBuffer>>, @@ -64,6 +55,10 @@ type HttpDelegateStack = HttpDelegate< >, >, >, + NamespaceAutocreation< + Arc>>, + NamespaceSchemaResolver>>>, + >, >; /// A [`router`] stack configured with the various DML handlers using mock @@ -95,29 +90,39 @@ impl TestContext { iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), )); - let ns_creator = NamespaceAutocreation::new( - Arc::clone(&catalog), + let schema_validator = + SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics); + let partitioner = Partitioner::new(PartitionTemplate { + parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], + }); + + let handler_stack = + schema_validator + .and_then(partitioner) + .and_then(WriteSummaryAdapter::new(FanOutAdaptor::new( + sharded_write_buffer, + ))); + + let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); + + let namespace_resolver = + NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let namespace_resolver = NamespaceAutocreation::new( + namespace_resolver, Arc::clone(&ns_cache), + Arc::clone(&catalog), TopicId::new(TEST_TOPIC_ID), QueryPoolId::new(TEST_QUERY_POOL_ID), iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), ); - let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache, &*metrics); - let partitioner = Partitioner::new(PartitionTemplate { - parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], - }); - - let handler_stack = ns_creator - .and_then(schema_validator) - .and_then(partitioner) - .and_then(WriteSummaryAdapter::new(FanOutAdaptor::new( - sharded_write_buffer, - ))); - - let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); - - let delegate = HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics); + let delegate = HttpDelegate::new( + 1024, + 100, + namespace_resolver, + Arc::new(handler_stack), + &metrics, + ); Self { delegate,