diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index ba4e3777cb..8c23987422 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -18,13 +18,13 @@ use object_store::DynObjectStore; use observability_deps::tracing::info; use router::{ dml_handlers::{ - DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, - NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer, - WriteSummaryAdapter, + DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioner, + SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter, }, namespace_cache::{ metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache, }, + namespace_resolver::{NamespaceAutocreation, NamespaceResolver, NamespaceSchemaResolver}, server::{ grpc::{sharder::ShardService, GrpcDelegate}, http::HttpDelegate, @@ -66,14 +66,14 @@ pub enum Error { pub type Result = std::result::Result; -pub struct RouterServerType { - server: RouterServer, +pub struct RouterServerType { + server: RouterServer, shutdown: CancellationToken, trace_collector: Option>, } -impl RouterServerType { - pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self { +impl RouterServerType { + pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self { Self { server, shutdown: CancellationToken::new(), @@ -82,17 +82,18 @@ impl RouterServerType { } } -impl std::fmt::Debug for RouterServerType { +impl std::fmt::Debug for RouterServerType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Router") } } #[async_trait] -impl ServerType for RouterServerType +impl ServerType for RouterServerType where D: DmlHandler, WriteOutput = WriteSummary> + 'static, S: Sharder<(), Item = Arc> + Clone + 'static, + N: NamespaceResolver + 'static, { /// Return the [`metric::Registry`] used by the router. fn metric_registry(&self) -> Arc { @@ -210,6 +211,10 @@ pub async fn create_router_server_type( }); let partitioner = InstrumentationDecorator::new("partitioner", &*metrics, partitioner); + // Initialise the Namespace ID lookup + cache + let namespace_resolver = + NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + //////////////////////////////////////////////////////////////////////////// // // THIS CODE IS FOR TESTING ONLY. @@ -247,9 +252,10 @@ pub async fn create_router_server_type( }); txn.commit().await?; - let ns_creator = NamespaceAutocreation::new( + let namespace_resolver = NamespaceAutocreation::new( + namespace_resolver, + Arc::clone(&ns_cache), Arc::clone(&catalog), - ns_cache, topic_id, query_id, iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), @@ -262,8 +268,7 @@ pub async fn create_router_server_type( // Build the chain of DML handlers that forms the request processing // pipeline, starting with the namespace creator (for testing purposes) and // write partitioner that yields a set of partitioned batches. - let handler_stack = ns_creator - .and_then(schema_validator) + let handler_stack = schema_validator .and_then(partitioner) // Once writes have been partitioned, they are processed in parallel. // @@ -288,6 +293,7 @@ pub async fn create_router_server_type( let http = HttpDelegate::new( common_state.run_config().max_http_request_size, request_limit, + namespace_resolver, Arc::clone(&handler_stack), &metrics, ); diff --git a/router/benches/e2e.rs b/router/benches/e2e.rs index 321b6fbd6b..0935a3ed0f 100644 --- a/router/benches/e2e.rs +++ b/router/benches/e2e.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeSet, iter, sync::Arc}; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use data_types::{PartitionTemplate, TemplatePart}; +use data_types::{NamespaceId, PartitionTemplate, TemplatePart}; use hyper::{Body, Request}; use iox_catalog::{interface::Catalog, mem::MemCatalog}; use router::{ @@ -10,6 +10,7 @@ use router::{ WriteSummaryAdapter, }, namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_resolver::mock::MockNamespaceResolver, server::http::HttpDelegate, shard::Shard, }; @@ -74,7 +75,16 @@ fn e2e_benchmarks(c: &mut Criterion) { partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))), ); - HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics) + let namespace_resolver = + MockNamespaceResolver::default().with_mapping("bananas", NamespaceId::new(42)); + + HttpDelegate::new( + 1024, + 100, + namespace_resolver, + Arc::new(handler_stack), + &metrics, + ) }; let body_str = "platanos,tag1=A,tag2=B val=42i 123456"; diff --git a/router/benches/schema_validator.rs b/router/benches/schema_validator.rs index ee20f8abba..975060ae9b 100644 --- a/router/benches/schema_validator.rs +++ b/router/benches/schema_validator.rs @@ -4,7 +4,7 @@ use criterion::{ criterion_group, criterion_main, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion, Throughput, }; -use data_types::DatabaseName; +use data_types::{DatabaseName, NamespaceId}; use hashbrown::HashMap; use iox_catalog::mem::MemCatalog; use mutable_batch::MutableBatch; @@ -49,7 +49,7 @@ fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: for i in 0..65_000 { let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str()); - let _ = runtime().block_on(validator.write(&*NAMESPACE, write, None)); + let _ = runtime().block_on(validator.write(&*NAMESPACE, NamespaceId::new(42), write, None)); } let write = lp_to_writes(&generate_lp(tables, columns_per_table)); @@ -61,7 +61,7 @@ fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: group.bench_function(format!("{tables}x{columns_per_table}"), |b| { b.to_async(runtime()).iter_batched( || write.clone(), - |write| validator.write(&*NAMESPACE, write, None), + |write| validator.write(&*NAMESPACE, NamespaceId::new(42), write, None), BatchSize::SmallInput, ); }); diff --git a/router/src/dml_handlers/chain.rs b/router/src/dml_handlers/chain.rs index bf8b2f3619..2d6248667c 100644 --- a/router/src/dml_handlers/chain.rs +++ b/router/src/dml_handlers/chain.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; @@ -59,17 +59,18 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { let output = self .first - .write(namespace, input, span_ctx.clone()) + .write(namespace, namespace_id, input, span_ctx.clone()) .await .map_err(Into::into)?; self.second - .write(namespace, output, span_ctx) + .write(namespace, namespace_id, output, span_ctx) .await .map_err(Into::into) } diff --git a/router/src/dml_handlers/fan_out.rs b/router/src/dml_handlers/fan_out.rs index ec171e9fc7..5af7d966f7 100644 --- a/router/src/dml_handlers/fan_out.rs +++ b/router/src/dml_handlers/fan_out.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, marker::PhantomData}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use futures::{stream::FuturesUnordered, TryStreamExt}; use trace::ctx::SpanContext; @@ -50,6 +50,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { @@ -58,7 +59,11 @@ where .map(|v| { let namespace = namespace.clone(); let span_ctx = span_ctx.clone(); - async move { self.inner.write(&namespace, v, span_ctx).await } + async move { + self.inner + .write(&namespace, namespace_id, v, span_ctx) + .await + } }) .collect::>() .try_collect::>() diff --git a/router/src/dml_handlers/instrumentation.rs b/router/src/dml_handlers/instrumentation.rs index 3289cfd4d0..b919ac6904 100644 --- a/router/src/dml_handlers/instrumentation.rs +++ b/router/src/dml_handlers/instrumentation.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; use trace::{ @@ -68,6 +68,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { @@ -77,7 +78,10 @@ where let mut span_recorder = SpanRecorder::new(span_ctx.clone().map(|parent| parent.child(self.name))); - let res = self.inner.write(namespace, input, span_ctx).await; + let res = self + .inner + .write(namespace, namespace_id, input, span_ctx) + .await; // Avoid exploding if time goes backwards - simply drop the measurement // if it happens. @@ -202,7 +206,7 @@ mod tests { let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); decorator - .write(&ns, (), Some(span)) + .write(&ns, NamespaceId::new(42), (), Some(span)) .await .expect("inner handler configured to succeed"); @@ -225,7 +229,7 @@ mod tests { let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); let err = decorator - .write(&ns, (), Some(span)) + .write(&ns, NamespaceId::new(42), (), Some(span)) .await .expect_err("inner handler configured to fail"); diff --git a/router/src/dml_handlers/mock.rs b/router/src/dml_handlers/mock.rs index 50da8c5aa3..3cfee1e7ed 100644 --- a/router/src/dml_handlers/mock.rs +++ b/router/src/dml_handlers/mock.rs @@ -1,7 +1,7 @@ use std::{collections::VecDeque, fmt::Debug}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use parking_lot::Mutex; use trace::ctx::SpanContext; use write_summary::WriteSummary; @@ -14,6 +14,7 @@ use super::{DmlError, DmlHandler}; pub enum MockDmlHandlerCall { Write { namespace: String, + namespace_id: NamespaceId, write_input: W, }, Delete { @@ -102,6 +103,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, write_input: Self::WriteInput, _span_ctx: Option, ) -> Result { @@ -109,6 +111,7 @@ where self, MockDmlHandlerCall::Write { namespace: namespace.into(), + namespace_id, write_input, }, write_return diff --git a/router/src/dml_handlers/mod.rs b/router/src/dml_handlers/mod.rs index 3990606026..94eca94a78 100644 --- a/router/src/dml_handlers/mod.rs +++ b/router/src/dml_handlers/mod.rs @@ -4,27 +4,17 @@ //! processing handler chain: //! //! ```text -//! ┌──────────────┐ ┌──────────────┐ -//! │ HTTP API │ │ gRPC API │ -//! └──────────────┘ └──────────────┘ -//! │ │ -//! └─────────┬─────────┘ -//! │ -//! ▼ -//! ╔═ DmlHandler Stack ═════╗ +//! ┌─────────────────┐ +//! │ Namespace Cache │ +//! └─────────────────┘ +//! ╔═ DmlHandler Stack ═════╗ │ //! ║ ║ -//! ║ ┌──────────────────┐ ║ -//! ║ │ Namespace │ ║ -//! ║ │ Autocreation │─ ─ ─ ─ ─ ─ ─ ┐ -//! ║ └──────────────────┘ ║ -//! ║ │ ║ │ -//! ║ ▼ ║ //! ║ ┌──────────────────┐ ║ │ //! ║ │ Partitioner │ ║ //! ║ └──────────────────┘ ║ │ -//! ║ │ ║ ┌─────────────────┐ -//! ║ ▼ ║ │ Namespace Cache │ -//! ║ ┌──────────────────┐ ║ └─────────────────┘ +//! ║ │ ║ +//! ║ ▼ ║ │ +//! ║ ┌──────────────────┐ ║ //! ║ │ Schema │ ║ │ //! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─ //! ║ └──────────────────┘ ║ @@ -49,23 +39,17 @@ //! └──────────────┘ //! ``` //! -//! The HTTP / gRPC APIs decode their respective request format and funnel the -//! resulting operation through the common [`DmlHandler`] composed of the layers -//! described above. +//! The HTTP API decodes the request and funnels the resulting operation through +//! the [`DmlHandler`] stack composed of the layers described above. //! -//! The [`NamespaceAutocreation`] handler (for testing only) populates the -//! global catalog with an entry for each namespace it observes, using the -//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending -//! requests to the catalog for namespaces that are known to exist. -//! -//! Incoming line-protocol writes then pass through the [`Partitioner`], parsing -//! the LP and splitting them into batches per partition, before passing each +//! Incoming line-protocol writes pass through the [`Partitioner`], parsing the +//! LP and splitting them into batches per IOx partition, before passing each //! partitioned batch through the rest of the request pipeline. //! -//! Writes then pass through the [`SchemaValidator`] applying schema enforcement -//! (a NOP layer for deletes) which pushes additive schema changes to the -//! catalog and populates the [`NamespaceCache`], converging it to match the set -//! of [`NamespaceSchema`] in the global catalog. +//! Writes then pass through the [`SchemaValidator`] applying schema & limit +//! enforcement (a NOP layer for deletes) which pushes additive schema changes +//! to the catalog and populates the [`NamespaceCache`], converging it to match +//! the set of [`NamespaceSchema`] in the global catalog. //! //! The [`ShardedWriteBuffer`] uses a sharder implementation to direct the DML //! operations into a fixed set of shards. @@ -84,9 +68,6 @@ pub mod nop; mod sharded_write_buffer; pub use sharded_write_buffer::*; -mod ns_autocreation; -pub use ns_autocreation::*; - mod partitioner; pub use partitioner::*; diff --git a/router/src/dml_handlers/nop.rs b/router/src/dml_handlers/nop.rs index cdad61186a..f3299fbe1f 100644 --- a/router/src/dml_handlers/nop.rs +++ b/router/src/dml_handlers/nop.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, marker::PhantomData}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use observability_deps::tracing::*; use trace::ctx::SpanContext; @@ -32,10 +32,11 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, batches: Self::WriteInput, _span_ctx: Option, ) -> Result { - info!(%namespace, ?batches, "dropping write operation"); + info!(%namespace, %namespace_id, ?batches, "dropping write operation"); Ok(batches) } diff --git a/router/src/dml_handlers/partitioner.rs b/router/src/dml_handlers/partitioner.rs index 099ffbff11..0bd948b413 100644 --- a/router/src/dml_handlers/partitioner.rs +++ b/router/src/dml_handlers/partitioner.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, PartitionKey, PartitionTemplate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate}; use hashbrown::HashMap; use mutable_batch::{MutableBatch, PartitionWrite, WritePayload}; use observability_deps::tracing::*; @@ -71,6 +71,7 @@ impl DmlHandler for Partitioner { async fn write( &self, _namespace: &DatabaseName<'static>, + _namespace_id: NamespaceId, batch: Self::WriteInput, _span_ctx: Option, ) -> Result { @@ -145,7 +146,7 @@ mod tests { let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIMESTAMP_NANOS).expect("failed to parse test LP"); - let handler_ret = partitioner.write(&ns, writes, None).await; + let handler_ret = partitioner.write(&ns, NamespaceId::new(42), writes, None).await; assert_matches!(handler_ret, $($want_handler_ret)+); // Check the partition -> table mapping. diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index f649f6abe5..8a2660292a 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -1,7 +1,7 @@ use std::{ops::DerefMut, sync::Arc}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, NamespaceSchema}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema}; use hashbrown::HashMap; use iox_catalog::{ interface::{get_schema_by_name, Catalog, Error as CatalogError}, @@ -167,6 +167,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, batches: Self::WriteInput, _span_ctx: Option, ) -> Result { @@ -183,7 +184,12 @@ where let schema = get_schema_by_name(namespace, repos.deref_mut()) .await .map_err(|e| { - warn!(error=%e, %namespace, "failed to retrieve namespace schema"); + warn!( + error=%e, + %namespace, + %namespace_id, + "failed to retrieve namespace schema" + ); SchemaError::NamespaceLookup(e) }) .map(Arc::new)?; @@ -197,7 +203,12 @@ where }; validate_column_limits(&batches, &schema).map_err(|e| { - warn!(%namespace, error=%e, "service protection limit reached"); + warn!( + %namespace, + %namespace_id, + error=%e, + "service protection limit reached" + ); self.service_limit_hit.inc(1); SchemaError::ServiceLimit(Box::new(e)) })?; @@ -218,6 +229,7 @@ where } => { warn!( %namespace, + %namespace_id, column_name=%name, existing_column_type=%existing, request_column_type=%new, @@ -230,12 +242,22 @@ where // Service limits CatalogError::ColumnCreateLimitError { .. } | CatalogError::TableCreateLimitError { .. } => { - warn!(%namespace, error=%e, "service protection limit reached"); + warn!( + %namespace, + %namespace_id, + error=%e, + "service protection limit reached" + ); self.service_limit_hit.inc(1); SchemaError::ServiceLimit(Box::new(e.into_err())) } _ => { - error!(%namespace, error=%e, "schema validation failed"); + error!( + %namespace, + %namespace_id, + error=%e, + "schema validation failed" + ); SchemaError::UnexpectedCatalogError(e.into_err()) } } @@ -453,12 +475,12 @@ mod tests { // namespace schema gets cached let writes1_valid = lp_to_writes("dragonfruit val=42i 123456"); handler1 - .write(&*NAMESPACE, writes1_valid, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes1_valid, None) .await .expect("request should succeed"); let writes2_valid = lp_to_writes("dragonfruit val=43i 123457"); handler2 - .write(&*NAMESPACE, writes2_valid, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes2_valid, None) .await .expect("request should succeed"); @@ -466,12 +488,12 @@ mod tests { // putting the table over the limit let writes1_add_column = lp_to_writes("dragonfruit,tag1=A val=42i 123456"); handler1 - .write(&*NAMESPACE, writes1_add_column, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes1_add_column, None) .await .expect("request should succeed"); let writes2_add_column = lp_to_writes("dragonfruit,tag2=B val=43i 123457"); handler2 - .write(&*NAMESPACE, writes2_add_column, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes2_add_column, None) .await .expect("request should succeed"); @@ -542,7 +564,7 @@ mod tests { let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect("request should succeed"); @@ -567,7 +589,7 @@ mod tests { let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); let err = handler - .write(&ns, writes, None) + .write(&ns, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -590,7 +612,7 @@ mod tests { // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64 let got = handler - .write(&*NAMESPACE, writes.clone(), None) + .write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None) .await .expect("request should succeed"); assert_eq!(writes.len(), got.len()); @@ -598,7 +620,7 @@ mod tests { // Second write attempts to violate it causing an error let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42.0 123456"); // val=float let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -628,7 +650,7 @@ mod tests { // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); let got = handler - .write(&*NAMESPACE, writes.clone(), None) + .write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None) .await .expect("request should succeed"); assert_eq!(writes.len(), got.len()); @@ -646,7 +668,7 @@ mod tests { // Second write attempts to violate limits, causing an error let writes = lp_to_writes("bananas2,tag1=A,tag2=B val=42i 123456"); let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -667,7 +689,7 @@ mod tests { // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); let got = handler - .write(&*NAMESPACE, writes.clone(), None) + .write(&*NAMESPACE, NamespaceId::new(42), writes.clone(), None) .await .expect("request should succeed"); assert_eq!(writes.len(), got.len()); @@ -683,7 +705,7 @@ mod tests { // Second write attempts to violate limits, causing an error let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456"); let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); @@ -714,7 +736,7 @@ mod tests { // First write attempts to add columns over the limit, causing an error let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456"); let err = handler - .write(&*NAMESPACE, writes, None) + .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect_err("request should fail"); diff --git a/router/src/dml_handlers/sharded_write_buffer.rs b/router/src/dml_handlers/sharded_write_buffer.rs index be5c34d9e1..11ef89b132 100644 --- a/router/src/dml_handlers/sharded_write_buffer.rs +++ b/router/src/dml_handlers/sharded_write_buffer.rs @@ -6,7 +6,7 @@ use std::{ }; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, NonEmptyString}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use futures::{stream::FuturesUnordered, StreamExt}; use hashbrown::HashMap; @@ -94,6 +94,7 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, writes: Self::WriteInput, span_ctx: Option, ) -> Result { @@ -129,6 +130,7 @@ where kafka_partition=%shard.shard_index(), tables=%dml.table_count(), %namespace, + %namespace_id, approx_size=%dml.size(), "routing writes to shard" ); @@ -274,7 +276,9 @@ mod tests { // Call the ShardedWriteBuffer and drive the test let ns = DatabaseName::new("bananas").unwrap(); - w.write(&ns, writes, None).await.expect("write failed"); + w.write(&ns, NamespaceId::new(42), writes, None) + .await + .expect("write failed"); // Assert the sharder saw all the tables let calls = sharder.calls(); @@ -340,7 +344,9 @@ mod tests { // Call the ShardedWriteBuffer and drive the test let ns = DatabaseName::new("bananas").unwrap(); - w.write(&ns, writes, None).await.expect("write failed"); + w.write(&ns, NamespaceId::new(42), writes, None) + .await + .expect("write failed"); // Assert the sharder saw all the tables let calls = sharder.calls(); @@ -417,7 +423,7 @@ mod tests { // Call the ShardedWriteBuffer and drive the test let ns = DatabaseName::new("bananas").unwrap(); let err = w - .write(&ns, writes, None) + .write(&ns, NamespaceId::new(42), writes, None) .await .expect_err("write should return a failure"); assert_matches!(err, ShardError::WriteBufferErrors{successes, errs} => { diff --git a/router/src/dml_handlers/trait.rs b/router/src/dml_handlers/trait.rs index 784ea82584..5e75ab3788 100644 --- a/router/src/dml_handlers/trait.rs +++ b/router/src/dml_handlers/trait.rs @@ -1,11 +1,11 @@ use std::{error::Error, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use thiserror::Error; use trace::ctx::SpanContext; -use super::{partitioner::PartitionError, NamespaceCreationError, SchemaError, ShardError}; +use super::{partitioner::PartitionError, SchemaError, ShardError}; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -23,10 +23,6 @@ pub enum DmlError { #[error(transparent)] Schema(#[from] SchemaError), - /// Failed to create the request namespace. - #[error(transparent)] - NamespaceCreation(#[from] NamespaceCreationError), - /// An error partitioning the request. #[error(transparent)] Partition(#[from] PartitionError), @@ -63,6 +59,7 @@ pub trait DmlHandler: Debug + Send + Sync { async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result; @@ -90,10 +87,13 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { - (**self).write(namespace, input, span_ctx).await + (**self) + .write(namespace, namespace_id, input, span_ctx) + .await } /// Delete the data specified in `delete`. diff --git a/router/src/dml_handlers/write_summary.rs b/router/src/dml_handlers/write_summary.rs index eefa1d6594..86f12f06c5 100644 --- a/router/src/dml_handlers/write_summary.rs +++ b/router/src/dml_handlers/write_summary.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId}; use dml::DmlMeta; use trace::ctx::SpanContext; use write_summary::WriteSummary; @@ -40,10 +40,14 @@ where async fn write( &self, namespace: &DatabaseName<'static>, + namespace_id: NamespaceId, input: Self::WriteInput, span_ctx: Option, ) -> Result { - let metas = self.inner.write(namespace, input, span_ctx).await?; + let metas = self + .inner + .write(namespace, namespace_id, input, span_ctx) + .await?; Ok(WriteSummary::new(metas)) } diff --git a/router/src/lib.rs b/router/src/lib.rs index 24cf7c77d0..3d17dd5937 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -1,14 +1,96 @@ -//! IOx router role implementation. +//! IOx router implementation. //! //! An IOx router is responsible for: //! //! * Creating IOx namespaces & synchronising them within the catalog. //! * Handling writes: -//! * Receiving IOx write/delete requests via HTTP and gRPC endpoints. +//! * Receiving IOx write/delete requests via HTTP. //! * Enforcing schema validation & synchronising it within the catalog. //! * Deriving the partition key of each DML operation. //! * Applying sharding logic. -//! * Push resulting operations into the appropriate shards (Kafka partitions if using Kafka). +//! * Push resulting operations into the appropriate shards (Kafka +//! partitions if using Kafka). +//! +//! The router is composed of singly-responsible components that each apply a +//! transformation to the request: +//! +//! ```text +//! ┌──────────────┐ +//! │ HTTP API │ +//! └──────────────┘ +//! │ +//! │ +//! ▼ +//! ╔═ NamespaceResolver ════╗ +//! ║ ║ +//! ║ ┌──────────────────┐ ║ +//! ║ │ Namespace │ ║ +//! ║ │ Autocreation │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ +//! ║ ▼ ║ │ +//! ║ ┌──────────────────┐ ║ +//! ║ │ NamespaceSchema │ ║ │ +//! ║ │ Resolver │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ +//! ╚════════════│═══════════╝ │ +//! │ ┌─────────────────┐ +//! │ │ Namespace Cache │ +//! │ └─────────────────┘ +//! ╔═ DmlHandler│Stack ═════╗ │ +//! ║ ▼ ║ +//! ║ ┌──────────────────┐ ║ │ +//! ║ │ Partitioner │ ║ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ +//! ║ ▼ ║ │ +//! ║ ┌──────────────────┐ ║ +//! ║ │ Schema │ ║ │ +//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ║ ▼ ║ +//! ┌───────┐ ║ ┌──────────────────┐ ║ +//! │Sharder│◀ ─ ─ ▶│ShardedWriteBuffer│ ║ +//! └───────┘ ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ╚════════════│═══════════╝ +//! │ +//! ▼ +//! ┌──────────────┐ +//! │ Write Buffer │ +//! └──────────────┘ +//! │ +//! │ +//! ┌────────▼─────┐ +//! │ Kafka ├┐ +//! └┬─────────────┘├┐ +//! └┬─────────────┘│ +//! └──────────────┘ +//! ``` +//! +//! The [`NamespaceAutocreation`] handler (for testing only) populates the +//! global catalog with an entry for each namespace it observes, using the +//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending +//! requests to the catalog for namespaces that are known to exist. +//! +//! A [`NamespaceResolver`] maps a user-provided namespace string to the +//! catalog ID ([`NamespaceId`]). The [`NamespaceSchemaResolver`] achieves this +//! by inspecting the contents of the [`NamespaceCache`]. If a cache-miss +//! occurs the [`NamespaceSchemaResolver`] queries the catalog and populates the +//! cache for subsequent requests. +//! +//! Once the [`NamespaceId`] has been resolved, the request is passed into the +//! [`DmlHandler`] stack. +//! +//! [`NamespaceAutocreation`]: crate::namespace_resolver::NamespaceAutocreation +//! [`NamespaceSchemaResolver`]: crate::namespace_resolver::NamespaceSchemaResolver +//! [`NamespaceResolver`]: crate::namespace_resolver +//! [`NamespaceId`]: data_types::NamespaceId +//! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache +//! [`NamespaceSchema`]: data_types::NamespaceSchema +//! [`DmlHandler`]: crate::dml_handlers #![deny( rustdoc::broken_intra_doc_links, @@ -28,5 +110,6 @@ pub mod dml_handlers; pub mod namespace_cache; +pub mod namespace_resolver; pub mod server; pub mod shard; diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs new file mode 100644 index 0000000000..da1801a11f --- /dev/null +++ b/router/src/namespace_resolver.rs @@ -0,0 +1,209 @@ +//! An trait to abstract resolving a[`DatabaseName`] to [`NamespaceId`], and a +//! collection of composable implementations. + +use std::{ops::DerefMut, sync::Arc}; + +use async_trait::async_trait; +use data_types::{DatabaseName, NamespaceId}; +use iox_catalog::interface::{get_schema_by_name, Catalog}; +use observability_deps::tracing::*; +use thiserror::Error; + +use crate::namespace_cache::NamespaceCache; + +pub mod mock; +mod ns_autocreation; +pub use ns_autocreation::*; + +/// Error states encountered during [`NamespaceId`] lookup. +#[derive(Debug, Error)] +pub enum Error { + /// An error occured when attempting to fetch the namespace ID. + #[error("failed to resolve namespace ID: {0}")] + Lookup(iox_catalog::interface::Error), + + /// An error state for errors returned by [`NamespaceAutocreation`]. + #[error(transparent)] + Create(#[from] NamespaceCreationError), +} + +/// An abstract resolver of [`DatabaseName`] to [`NamespaceId`]. +#[async_trait] +pub trait NamespaceResolver: std::fmt::Debug + Send + Sync { + /// Return the [`NamespaceId`] for the given [`DatabaseName`]. + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result; +} + +/// An implementation of [`NamespaceResolver`] that queries the [`Catalog`] to +/// resolve a [`NamespaceId`], and populates the [`NamespaceCache`] as a side +/// effect. +#[derive(Debug)] +pub struct NamespaceSchemaResolver { + catalog: Arc, + cache: C, +} + +impl NamespaceSchemaResolver { + /// Construct a new [`NamespaceSchemaResolver`] that fetches schemas from + /// `catalog` and caches them in `cache`. + pub fn new(catalog: Arc, cache: C) -> Self { + Self { catalog, cache } + } +} + +#[async_trait] +impl NamespaceResolver for NamespaceSchemaResolver +where + C: NamespaceCache, +{ + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result { + // Load the namespace schema from the cache, falling back to pulling it + // from the global catalog (if it exists). + match self.cache.get_schema(namespace) { + Some(v) => Ok(v.id), + None => { + let mut repos = self.catalog.repositories().await; + + // Pull the schema from the global catalog or error if it does + // not exist. + let schema = get_schema_by_name(namespace, repos.deref_mut()) + .await + .map_err(|e| { + warn!( + error=%e, + %namespace, + "failed to retrieve namespace schema" + ); + Error::Lookup(e) + }) + .map(Arc::new)?; + + // Cache population MAY race with other threads and lead to + // overwrites, but an entry will always exist once inserted, and + // the schemas will eventually converge. + self.cache + .put_schema(namespace.clone(), Arc::clone(&schema)); + + trace!(%namespace, "schema cache populated"); + Ok(schema.id) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId}; + use iox_catalog::mem::MemCatalog; + + use super::*; + use crate::namespace_cache::MemoryNamespaceCache; + + #[tokio::test] + async fn test_cache_hit() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + // Prep the cache before the test to cause a hit + let cache = Arc::new(MemoryNamespaceCache::default()); + cache.put_schema( + ns.clone(), + NamespaceSchema { + id: NamespaceId::new(42), + topic_id: TopicId::new(2), + query_pool_id: QueryPoolId::new(3), + tables: Default::default(), + max_columns_per_table: 4, + }, + ); + + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + + // Drive the code under test + resolver + .get_namespace_id(&ns) + .await + .expect("lookup should succeed"); + + assert!(cache.get_schema(&ns).is_some()); + + // The cache hit should mean the catalog SHOULD NOT see a create request + // for the namespace. + let mut repos = catalog.repositories().await; + assert!( + repos + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .is_none(), + "expected no request to the catalog" + ); + } + + #[tokio::test] + async fn test_cache_miss() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + // Create the namespace in the catalog + { + let mut repos = catalog.repositories().await; + let topic = repos.topics().create_or_get("bananas").await.unwrap(); + let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap(); + repos + .namespaces() + .create( + &ns, + iox_catalog::INFINITE_RETENTION_POLICY, + topic.id, + query_pool.id, + ) + .await + .expect("failed to setup catalog state"); + } + + let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + + resolver + .get_namespace_id(&ns) + .await + .expect("lookup should succeed"); + + // The cache should be populated as a result of the lookup. + assert!(cache.get_schema(&ns).is_some()); + } + + #[tokio::test] + async fn test_cache_miss_does_not_exist() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + + let err = resolver + .get_namespace_id(&ns) + .await + .expect_err("lookup should error"); + + assert_matches!(err, Error::Lookup(_)); + assert!(cache.get_schema(&ns).is_none()); + } +} diff --git a/router/src/namespace_resolver/mock.rs b/router/src/namespace_resolver/mock.rs new file mode 100644 index 0000000000..d90ad3afd3 --- /dev/null +++ b/router/src/namespace_resolver/mock.rs @@ -0,0 +1,45 @@ +//! A mock implementation of [`NamespaceResolver`]. + +#![allow(missing_docs)] + +use std::collections::HashMap; + +use async_trait::async_trait; +use data_types::{DatabaseName, NamespaceId}; +use parking_lot::Mutex; + +use super::NamespaceResolver; + +#[derive(Debug, Default)] +pub struct MockNamespaceResolver { + map: Mutex, NamespaceId>>, +} + +impl MockNamespaceResolver { + pub fn new(map: HashMap, NamespaceId>) -> Self { + Self { + map: Mutex::new(map), + } + } + + pub fn with_mapping(self, name: impl Into + 'static, id: NamespaceId) -> Self { + let name = DatabaseName::try_from(name.into()).unwrap(); + assert!(self.map.lock().insert(name, id).is_none()); + self + } +} + +#[async_trait] +impl NamespaceResolver for MockNamespaceResolver { + /// Return the [`NamespaceId`] for the given [`DatabaseName`]. + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result { + Ok(*self + .map + .lock() + .get(namespace) + .expect("mock namespace resolver does not have ID")) + } +} diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs new file mode 100644 index 0000000000..180f4d1a49 --- /dev/null +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -0,0 +1,221 @@ +use std::{fmt::Debug, sync::Arc}; + +use async_trait::async_trait; +use data_types::{DatabaseName, NamespaceId, QueryPoolId, TopicId}; +use iox_catalog::interface::Catalog; +use observability_deps::tracing::*; +use thiserror::Error; + +use super::NamespaceResolver; +use crate::namespace_cache::NamespaceCache; + +/// An error auto-creating the request namespace. +#[derive(Debug, Error)] +pub enum NamespaceCreationError { + /// An error returned from a namespace creation request. + #[error("failed to create namespace: {0}")] + Create(iox_catalog::interface::Error), +} + +/// A layer to populate the [`Catalog`] with all the namespaces the router +/// observes. +/// +/// Uses a [`NamespaceCache`] to limit issuing create requests to namespaces the +/// router has not yet observed a schema for. +#[derive(Debug)] +pub struct NamespaceAutocreation { + inner: T, + cache: C, + catalog: Arc, + + topic_id: TopicId, + query_id: QueryPoolId, + retention: String, +} + +impl NamespaceAutocreation { + /// Return a new [`NamespaceAutocreation`] layer that ensures a requested + /// namespace exists in `catalog`. + /// + /// If the namespace does not exist, it is created with the specified + /// `topic_id`, `query_id` and `retention` policy. + /// + /// Namespaces are looked up in `cache`, skipping the creation request to + /// the catalog if there's a hit. + pub fn new( + inner: T, + cache: C, + catalog: Arc, + topic_id: TopicId, + query_id: QueryPoolId, + retention: String, + ) -> Self { + Self { + inner, + cache, + catalog, + topic_id, + query_id, + retention, + } + } +} + +#[async_trait] +impl NamespaceResolver for NamespaceAutocreation +where + C: NamespaceCache, + T: NamespaceResolver, +{ + /// Force the creation of `namespace` if it does not already exist in the + /// cache, before passing the request through to the inner delegate. + async fn get_namespace_id( + &self, + namespace: &DatabaseName<'static>, + ) -> Result { + if self.cache.get_schema(namespace).is_none() { + trace!(%namespace, "namespace auto-create cache miss"); + + let mut repos = self.catalog.repositories().await; + + match repos + .namespaces() + .create( + namespace.as_str(), + &self.retention, + self.topic_id, + self.query_id, + ) + .await + { + Ok(_) => { + debug!(%namespace, "created namespace"); + } + Err(iox_catalog::interface::Error::NameExists { .. }) => { + // Either the cache has not yet converged to include this + // namespace, or another thread raced populating the catalog + // and beat this thread to it. + debug!(%namespace, "spurious namespace create failed"); + } + Err(e) => { + error!(error=%e, %namespace, "failed to auto-create namespace"); + return Err(NamespaceCreationError::Create(e).into()); + } + } + } + + self.inner.get_namespace_id(namespace).await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use data_types::{Namespace, NamespaceId, NamespaceSchema}; + use iox_catalog::mem::MemCatalog; + + use super::*; + use crate::{ + namespace_cache::MemoryNamespaceCache, namespace_resolver::mock::MockNamespaceResolver, + }; + + #[tokio::test] + async fn test_cache_hit() { + const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + + let ns = DatabaseName::try_from("bananas").unwrap(); + + // Prep the cache before the test to cause a hit + let cache = Arc::new(MemoryNamespaceCache::default()); + cache.put_schema( + ns.clone(), + NamespaceSchema { + id: NAMESPACE_ID, + topic_id: TopicId::new(2), + query_pool_id: QueryPoolId::new(3), + tables: Default::default(), + max_columns_per_table: 4, + }, + ); + + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let creator = NamespaceAutocreation::new( + MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID), + cache, + Arc::clone(&catalog), + TopicId::new(42), + QueryPoolId::new(42), + "inf".to_owned(), + ); + + // Drive the code under test + let got = creator + .get_namespace_id(&ns) + .await + .expect("handler should succeed"); + assert_eq!(got, NAMESPACE_ID); + + // The cache hit should mean the catalog SHOULD NOT see a create request + // for the namespace. + let mut repos = catalog.repositories().await; + assert!( + repos + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .is_none(), + "expected no request to the catalog" + ); + } + + #[tokio::test] + async fn test_cache_miss() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let creator = NamespaceAutocreation::new( + MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)), + cache, + Arc::clone(&catalog), + TopicId::new(42), + QueryPoolId::new(42), + "inf".to_owned(), + ); + + let created_id = creator + .get_namespace_id(&ns) + .await + .expect("handler should succeed"); + + // The cache miss should mean the catalog MUST see a create request for + // the namespace. + let mut repos = catalog.repositories().await; + let got = repos + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .expect("creation request should be sent to catalog"); + + assert_eq!(got.id, created_id); + assert_eq!( + got, + Namespace { + id: NamespaceId::new(1), + name: ns.to_string(), + retention_duration: Some("inf".to_owned()), + topic_id: TopicId::new(42), + query_pool_id: QueryPoolId::new(42), + max_tables: iox_catalog::DEFAULT_MAX_TABLES, + max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, + } + ); + } +} diff --git a/router/src/server.rs b/router/src/server.rs index 10e3213c32..9e9b71489e 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -15,19 +15,19 @@ pub mod http; /// The [`RouterServer`] manages the lifecycle and contains all state for a /// `router` server instance. #[derive(Debug)] -pub struct RouterServer { +pub struct RouterServer { metrics: Arc, trace_collector: Option>, - http: HttpDelegate, + http: HttpDelegate, grpc: GrpcDelegate, } -impl RouterServer { +impl RouterServer { /// Initialise a new [`RouterServer`] using the provided HTTP and gRPC /// handlers. pub fn new( - http: HttpDelegate, + http: HttpDelegate, grpc: GrpcDelegate, metrics: Arc, trace_collector: Option>, @@ -51,12 +51,12 @@ impl RouterServer { } } -impl RouterServer +impl RouterServer where D: DmlHandler>, { /// Get a reference to the router http delegate. - pub fn http(&self) -> &HttpDelegate { + pub fn http(&self) -> &HttpDelegate { &self.http } diff --git a/router/src/server/http.rs b/router/src/server/http.rs index dbae3e6be8..8206e7d783 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -22,7 +22,10 @@ use trace::ctx::SpanContext; use write_summary::WriteSummary; use self::delete_predicate::parse_http_delete_request; -use crate::dml_handlers::{DmlError, DmlHandler, PartitionError, SchemaError}; +use crate::{ + dml_handlers::{DmlError, DmlHandler, PartitionError, SchemaError}, + namespace_resolver::NamespaceResolver, +}; const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token"; @@ -77,6 +80,13 @@ pub enum Error { #[error("dml handler error: {0}")] DmlHandler(#[from] DmlError), + /// An error that occurs when attempting to map the user-provided namespace + /// name into a [`NamespaceId`]. + /// + /// [`NamespaceId`]: data_types::NamespaceId + #[error("failed to resolve namespace ID: {0}")] + NamespaceResolver(#[from] crate::namespace_resolver::Error), + /// The router is currently servicing the maximum permitted number of /// simultaneous requests. #[error("this service is overloaded, please try again later")] @@ -103,6 +113,7 @@ impl Error { StatusCode::UNSUPPORTED_MEDIA_TYPE } Error::DmlHandler(err) => StatusCode::from(err), + Error::NamespaceResolver(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::RequestLimit => StatusCode::SERVICE_UNAVAILABLE, } } @@ -128,9 +139,7 @@ impl From<&DmlError> for StatusCode { StatusCode::INTERNAL_SERVER_ERROR } - DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } + DmlError::Internal(_) | DmlError::WriteBuffer(_) => StatusCode::INTERNAL_SERVER_ERROR, DmlError::Partition(PartitionError::BatchWrite(_)) => StatusCode::INTERNAL_SERVER_ERROR, } } @@ -216,9 +225,10 @@ impl TryFrom<&Request> for WriteInfo { /// server runner framework takes care of implementing the heath endpoint, /// metrics, pprof, etc. #[derive(Debug)] -pub struct HttpDelegate { +pub struct HttpDelegate { max_request_bytes: usize, time_provider: T, + namespace_resolver: N, dml_handler: Arc, // A request limiter to restrict the number of simultaneous requests this @@ -239,7 +249,7 @@ pub struct HttpDelegate { request_limit_rejected: U64Counter, } -impl HttpDelegate { +impl HttpDelegate { /// Initialise a new [`HttpDelegate`] passing valid requests to the /// specified `dml_handler`. /// @@ -248,6 +258,7 @@ impl HttpDelegate { pub fn new( max_request_bytes: usize, max_requests: usize, + namespace_resolver: N, dml_handler: Arc, metrics: &metric::Registry, ) -> Self { @@ -297,6 +308,7 @@ impl HttpDelegate { Self { max_request_bytes, time_provider: SystemProvider::default(), + namespace_resolver, dml_handler, request_sem: Semaphore::new(max_requests), write_metric_lines, @@ -310,9 +322,10 @@ impl HttpDelegate { } } -impl HttpDelegate +impl HttpDelegate where D: DmlHandler, WriteOutput = WriteSummary>, + N: NamespaceResolver, T: TimeProvider, { /// Routes `req` to the appropriate handler, if any, returning the handler @@ -395,9 +408,12 @@ where "routing write", ); + // Retrieve the namespace ID for this namespace. + let namespace_id = self.namespace_resolver.get_namespace_id(&namespace).await?; + let summary = self .dml_handler - .write(&namespace, batches, span_ctx) + .write(&namespace, namespace_id, batches, span_ctx) .await .map_err(Into::into)?; @@ -522,6 +538,7 @@ mod tests { use std::{io::Write, iter, sync::Arc, time::Duration}; use assert_matches::assert_matches; + use data_types::NamespaceId; use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; use metric::{Attributes, Metric}; @@ -531,9 +548,13 @@ mod tests { use tokio_stream::wrappers::ReceiverStream; use super::*; - use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; + use crate::{ + dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}, + namespace_resolver::mock::MockNamespaceResolver, + }; const MAX_BYTES: usize = 1024; + const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); fn summary() -> WriteSummary { WriteSummary::default() @@ -620,12 +641,20 @@ mod tests { // encoding test_http_handler!(encoding_header=$encoding, request); + let mock_namespace_resolver = MockNamespaceResolver::default() + .with_mapping("bananas_test", NAMESPACE_ID); let dml_handler = Arc::new(MockDmlHandler::default() .with_write_return($dml_write_handler) .with_delete_return($dml_delete_handler) ); let metrics = Arc::new(metric::Registry::default()); - let delegate = HttpDelegate::new(MAX_BYTES, 100, Arc::clone(&dml_handler), &metrics); + let delegate = HttpDelegate::new( + MAX_BYTES, + 100, + mock_namespace_resolver, + Arc::clone(&dml_handler), + &metrics + ); let got = delegate.route(request).await; assert_matches!(got, $want_result); @@ -732,8 +761,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -747,8 +777,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -762,8 +793,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -777,8 +809,9 @@ mod tests { body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("platanos").expect("table not found"); let ts = table.timestamp_summary().expect("no timestamp summary"); @@ -906,8 +939,9 @@ mod tests { body = "test field=1u 100\ntest field=2u 100".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, "bananas_test"); + assert_eq!(*namespace_id, NAMESPACE_ID); let table = write_input.get("test").expect("table not in write"); let col = table.column("field").expect("column missing"); assert_matches!(col.data(), ColumnData::U64(data, _) => { @@ -1039,7 +1073,7 @@ mod tests { body = "whydo InputPower=300i,InputPower=300i".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => { assert_eq!(namespace, "bananas_test"); let table = write_input.get("whydo").expect("table not in write"); let col = table.column("InputPower").expect("column missing"); @@ -1056,7 +1090,7 @@ mod tests { body = "whydo InputPower=300i,InputPower=42i".as_bytes(), dml_handler = [Ok(summary())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => { assert_eq!(namespace, "bananas_test"); let table = write_input.get("whydo").expect("table not in write"); let col = table.column("InputPower").expect("column missing"); @@ -1154,11 +1188,15 @@ mod tests { // number of simultaneous requests are being serviced. #[tokio::test] async fn test_request_limit_enforced() { + let mock_namespace_resolver = + MockNamespaceResolver::default().with_mapping("bananas", NamespaceId::new(42)); + let dml_handler = Arc::new(MockDmlHandler::default()); let metrics = Arc::new(metric::Registry::default()); let delegate = Arc::new(HttpDelegate::new( MAX_BYTES, 1, + mock_namespace_resolver, Arc::clone(&dml_handler), &metrics, )); diff --git a/router/tests/http.rs b/router/tests/http.rs index f990bac483..f4698c766a 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -10,11 +10,11 @@ use metric::{Attributes, DurationHistogram, Metric, Registry, U64Counter}; use mutable_batch::MutableBatch; use router::{ dml_handlers::{ - Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, - NamespaceAutocreation, Partitioned, Partitioner, SchemaError, SchemaValidator, - ShardedWriteBuffer, WriteSummaryAdapter, + Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned, + Partitioner, SchemaError, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter, }, namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_resolver::{NamespaceAutocreation, NamespaceSchemaResolver}, server::http::HttpDelegate, shard::Shard, }; @@ -46,16 +46,7 @@ pub struct TestContext { type HttpDelegateStack = HttpDelegate< InstrumentationDecorator< Chain< - Chain< - Chain< - NamespaceAutocreation< - Arc>>, - HashMap, - >, - SchemaValidator>>>, - >, - Partitioner, - >, + Chain>>>, Partitioner>, WriteSummaryAdapter< FanOutAdaptor< ShardedWriteBuffer>>, @@ -64,6 +55,10 @@ type HttpDelegateStack = HttpDelegate< >, >, >, + NamespaceAutocreation< + Arc>>, + NamespaceSchemaResolver>>>, + >, >; /// A [`router`] stack configured with the various DML handlers using mock @@ -95,29 +90,39 @@ impl TestContext { iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), )); - let ns_creator = NamespaceAutocreation::new( - Arc::clone(&catalog), + let schema_validator = + SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics); + let partitioner = Partitioner::new(PartitionTemplate { + parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], + }); + + let handler_stack = + schema_validator + .and_then(partitioner) + .and_then(WriteSummaryAdapter::new(FanOutAdaptor::new( + sharded_write_buffer, + ))); + + let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); + + let namespace_resolver = + NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let namespace_resolver = NamespaceAutocreation::new( + namespace_resolver, Arc::clone(&ns_cache), + Arc::clone(&catalog), TopicId::new(TEST_TOPIC_ID), QueryPoolId::new(TEST_QUERY_POOL_ID), iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), ); - let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache, &*metrics); - let partitioner = Partitioner::new(PartitionTemplate { - parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], - }); - - let handler_stack = ns_creator - .and_then(schema_validator) - .and_then(partitioner) - .and_then(WriteSummaryAdapter::new(FanOutAdaptor::new( - sharded_write_buffer, - ))); - - let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); - - let delegate = HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics); + let delegate = HttpDelegate::new( + 1024, + 100, + namespace_resolver, + Arc::new(handler_stack), + &metrics, + ); Self { delegate,