diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index 5b8e3fce40..05d282b668 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -16,7 +16,7 @@ use crate::{ }; use observability_deps::tracing::*; use router2::{ - dml_handlers::{SchemaValidator, ShardedWriteBuffer}, + dml_handlers::{NamespaceAutocreation, SchemaValidator, ShardedWriteBuffer}, namespace_cache::MemoryNamespaceCache, sequencer::Sequencer, server::{http::HttpDelegate, RouterServer}, @@ -67,6 +67,14 @@ pub struct Config { #[clap(flatten)] pub(crate) write_buffer_config: WriteBufferConfig, + + /// Query pool name to dispatch writes to. + #[clap( + long = "--query-pool", + env = "INFLUXDB_IOX_QUERY_POOL_NAME", + default_value = "iox-shared" + )] + pub(crate) query_pool_name: String, } pub async fn command(config: Config) -> Result<()> { @@ -83,7 +91,56 @@ pub async fn command(config: Config) -> Result<()> { .await?; let ns_cache = Arc::new(MemoryNamespaceCache::default()); - let handler_stack = SchemaValidator::new(write_buffer, catalog, ns_cache); + let handler_stack = + SchemaValidator::new(write_buffer, Arc::clone(&catalog), Arc::clone(&ns_cache)); + + //////////////////////////////////////////////////////////////////////////// + // + // THIS CODE IS FOR TESTING ONLY. + // + // The source of truth for the kafka topics & query pools will be read from + // the DB, rather than CLI args for a prod deployment. + // + //////////////////////////////////////////////////////////////////////////// + // + // Look up the kafka topic ID needed to populate namespace creation + // requests. + // + // This code / auto-creation is for architecture testing purposes only - a + // prod deployment would expect namespaces to be explicitly created and this + // layer would be removed. + let topic_id = catalog + .kafka_topics() + .get_by_name(&config.write_buffer_config.topic) + .await? + .map(|v| v.id) + .unwrap_or_else(|| { + panic!( + "no kafka topic named {} in catalog", + &config.write_buffer_config.topic + ) + }); + let query_id = catalog + .query_pools() + .create_or_get(&config.query_pool_name) + .await + .map(|v| v.id) + .unwrap_or_else(|e| { + panic!( + "failed to upsert query pool {} in catalog: {}", + &config.write_buffer_config.topic, e + ) + }); + let handler_stack = NamespaceAutocreation::new( + catalog, + ns_cache, + topic_id, + query_id, + iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), + handler_stack, + ); + // + //////////////////////////////////////////////////////////////////////////// let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack); let router_server = RouterServer::new( diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 227945eb59..5543c4979e 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -25,6 +25,9 @@ const SHARED_KAFKA_TOPIC: &str = "iox_shared"; const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC; const TIME_COLUMN: &str = "time"; +/// A string value representing an infinite retention policy. +pub const INFINITE_RETENTION_POLICY: &str = "inf"; + pub mod interface; pub mod mem; pub mod postgres; diff --git a/router2/src/dml_handlers/mod.rs b/router2/src/dml_handlers/mod.rs index afe91517e1..eeb9d6ca2b 100644 --- a/router2/src/dml_handlers/mod.rs +++ b/router2/src/dml_handlers/mod.rs @@ -4,44 +4,65 @@ //! processing handler chain: //! //! ```text -//! -//! ┌──────────────┐ ┌──────────────┐ -//! │ HTTP API │ │ gRPC API │ -//! └──────────────┘ └──────────────┘ -//! │ │ -//! └─────────┬─────────┘ -//! │ -//! ▼ -//! ╔═ DmlHandler Stack ═════╗ -//! ║ ║ -//! ║ ┌──────────────────┐ ║ -//! ║ │ Schema │ ║ -//! ║ │ Validation │ ║ -//! ║ └──────────────────┘ ║ -//! ║ │ ║ -//! ║ ▼ ║ -//! ║ ┌──────────────────┐ ║ ┌─────────┐ -//! ║ │ShardedWriteBuffer│◀───▶│ Sharder │ -//! ║ └──────────────────┘ ║ └─────────┘ -//! ║ │ ║ -//! ╚════════════│═══════════╝ -//! │ -//! ▼ -//! ┌──────────────┐ -//! │ Write Buffer │ -//! └──────────────┘ -//! │ -//! │ -//! ┌────────▼─────┐ -//! │ Kafka ├┐ -//! └┬─────────────┘├┐ -//! └┬─────────────┘│ -//! └──────────────┘ +//! ┌──────────────┐ ┌──────────────┐ +//! │ HTTP API │ │ gRPC API │ +//! └──────────────┘ └──────────────┘ +//! │ │ +//! └─────────┬─────────┘ +//! │ +//! ▼ +//! ╔═ DmlHandler Stack ═════╗ +//! ║ ║ +//! ║ ┌──────────────────┐ ║ +//! ║ │ Namespace │ ║ +//! ║ │ Autocreation │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ ┌─────────────────┐ +//! ║ ▼ ║ │ Namespace Cache │ +//! ║ ┌──────────────────┐ ║ └─────────────────┘ +//! ║ │ Schema │ ║ │ +//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ║ ▼ ║ +//! ┌───────┐ ║ ┌──────────────────┐ ║ +//! │Sharder│◀ ─ ─ ▶│ShardedWriteBuffer│ ║ +//! └───────┘ ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ╚════════════│═══════════╝ +//! │ +//! ▼ +//! ┌──────────────┐ +//! │ Write Buffer │ +//! └──────────────┘ +//! │ +//! │ +//! ┌────────▼─────┐ +//! │ Kafka ├┐ +//! └┬─────────────┘├┐ +//! └┬─────────────┘│ +//! └──────────────┘ //! ``` //! //! The HTTP / gRPC APIs decode their respective request format and funnel the //! resulting operation through the common [`DmlHandler`] composed of the layers //! described above. +//! +//! The [`NamespaceAutocreation`] handler (for testing only) populates the +//! global catalog with an entry for each namespace it observes, using the +//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending +//! requests to the catalog for namespaces that are known to exist. +//! +//! Writes pass through the [`SchemaValidator`] applying schema enforcement (a +//! NOP layer for deletes) which pushes additive schema changes to the catalog +//! and populates the [`NamespaceCache`], converging it to match the set of +//! [`NamespaceSchema`] in the global catalog. +//! +//! The [`ShardedWriteBuffer`] uses a sharder implementation to direct the DML +//! operations into a fixed set of sequencers. +//! +//! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache +//! [`NamespaceSchema`]: iox_catalog::interface::NamespaceSchema mod r#trait; pub use r#trait::*; @@ -54,5 +75,8 @@ pub mod nop; mod sharded_write_buffer; pub use sharded_write_buffer::*; +mod ns_autocreation; +pub use ns_autocreation::*; + #[cfg(test)] pub mod mock; diff --git a/router2/src/dml_handlers/ns_autocreation.rs b/router2/src/dml_handlers/ns_autocreation.rs new file mode 100644 index 0000000000..ef161375b4 --- /dev/null +++ b/router2/src/dml_handlers/ns_autocreation.rs @@ -0,0 +1,256 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use data_types::{delete_predicate::DeletePredicate, DatabaseName}; +use hashbrown::HashMap; +use iox_catalog::interface::{Catalog, KafkaTopicId, QueryPoolId}; +use mutable_batch::MutableBatch; +use observability_deps::tracing::*; +use thiserror::Error; +use trace::ctx::SpanContext; + +use crate::namespace_cache::NamespaceCache; + +use super::{DmlError, DmlHandler}; + +/// An error auto-creating the request namespace. +#[derive(Debug, Error)] +pub enum NamespaceCreationError { + /// An error returned from a namespace creation request. + #[error("failed to create namespace: {0}")] + Create(iox_catalog::interface::Error), + + /// The inner DML handler returned an error. + #[error(transparent)] + Inner(Box), +} + +/// A layer to populate the [`Catalog`] with all the namespaces the router +/// observes. +/// +/// Uses a [`NamespaceCache`] to limit issuing create requests to namespaces the +/// router has not yet observed a schema for. +#[derive(Debug)] +pub struct NamespaceAutocreation { + catalog: Arc, + cache: C, + + topic_id: KafkaTopicId, + query_id: QueryPoolId, + retention: String, + + inner: D, +} + +impl NamespaceAutocreation { + /// Return a new [`NamespaceAutocreation`] layer that ensures a requested + /// namespace exists in `catalog` before passing the request to `inner`. + /// + /// If the namespace does not exist, it is created with the specified + /// `topic_id`, `query_id` and `retention` policy. + /// + /// Namespaces are looked up in `cache`, skipping the creation request to + /// the catalog if there's a hit. + pub fn new( + catalog: Arc, + cache: C, + topic_id: KafkaTopicId, + query_id: QueryPoolId, + retention: String, + inner: D, + ) -> Self { + Self { + catalog, + cache, + topic_id, + query_id, + retention, + inner, + } + } +} + +#[async_trait] +impl DmlHandler for NamespaceAutocreation +where + D: DmlHandler, + C: NamespaceCache, +{ + type WriteError = NamespaceCreationError; + type DeleteError = NamespaceCreationError; + + /// Write `batches` to `namespace`. + async fn write( + &self, + namespace: DatabaseName<'static>, + batches: HashMap, + span_ctx: Option, + ) -> Result<(), Self::WriteError> { + // If the namespace does not exist in the schema cache (populated by the + // schema validator) request an (idempotent) creation. + if self.cache.get_schema(&namespace).is_none() { + trace!(%namespace, "namespace auto-create cache miss"); + + match self + .catalog + .namespaces() + .create( + namespace.as_str(), + &self.retention, + self.topic_id, + self.query_id, + ) + .await + { + Ok(_) => { + debug!(%namespace, "created namespace"); + } + Err(iox_catalog::interface::Error::NameExists { .. }) => { + // Either the cache has not yet converged to include this + // namespace, or another thread raced populating the catalog + // and beat this thread to it. + debug!(%namespace, "spurious namespace create failed"); + } + Err(e) => { + error!(error=%e, %namespace, "failed to auto-create namespace"); + return Err(NamespaceCreationError::Create(e)); + } + } + } + + self.inner + .write(namespace, batches, span_ctx) + .await + .map_err(|e| NamespaceCreationError::Inner(Box::new(e.into()))) + } + + /// Delete the data specified in `delete`. + async fn delete<'a>( + &self, + namespace: DatabaseName<'static>, + table_name: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, + span_ctx: Option, + ) -> Result<(), Self::DeleteError> { + self.inner + .delete(namespace, table_name, predicate, span_ctx) + .await + .map_err(|e| NamespaceCreationError::Inner(Box::new(e.into()))) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use iox_catalog::{ + interface::{Namespace, NamespaceId, NamespaceSchema}, + mem::MemCatalog, + }; + + use crate::{ + dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}, + namespace_cache::MemoryNamespaceCache, + }; + + use super::*; + + #[tokio::test] + async fn test_cache_hit() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + cache.put_schema( + ns.clone(), + NamespaceSchema { + id: NamespaceId::new(1), + kafka_topic_id: KafkaTopicId::new(2), + query_pool_id: QueryPoolId::new(3), + tables: Default::default(), + }, + ); + + let catalog: Arc = Arc::new(MemCatalog::default()); + let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + + let creator = NamespaceAutocreation::new( + Arc::clone(&catalog), + cache, + KafkaTopicId::new(42), + QueryPoolId::new(42), + "inf".to_owned(), + Arc::clone(&mock_handler), + ); + + creator + .write(ns.clone(), Default::default(), None) + .await + .expect("handler should succeed"); + + // The cache hit should mean the catalog SHOULD NOT see a create request + // for the namespace. + assert!( + catalog + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .is_none(), + "expected no request to the catalog" + ); + + // And the DML handler must be called. + assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => { + assert_eq!(*namespace, *ns); + }); + } + + #[tokio::test] + async fn test_cache_miss() { + let ns = DatabaseName::try_from("bananas").unwrap(); + + let cache = Arc::new(MemoryNamespaceCache::default()); + let catalog: Arc = Arc::new(MemCatalog::default()); + let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + + let creator = NamespaceAutocreation::new( + Arc::clone(&catalog), + cache, + KafkaTopicId::new(42), + QueryPoolId::new(42), + "inf".to_owned(), + Arc::clone(&mock_handler), + ); + + creator + .write(ns.clone(), Default::default(), None) + .await + .expect("handler should succeed"); + + // The cache miss should mean the catalog MUST see a create request for + // the namespace. + let got = catalog + .namespaces() + .get_by_name(ns.as_str()) + .await + .expect("lookup should not error") + .expect("creation request should be sent to catalog"); + + assert_eq!( + got, + Namespace { + id: NamespaceId::new(1), + name: ns.to_string(), + retention_duration: Some("inf".to_owned()), + kafka_topic_id: KafkaTopicId::new(42), + query_pool_id: QueryPoolId::new(42), + } + ); + + // And the DML handler must be called. + assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => { + assert_eq!(*namespace, *ns); + }); + } +} diff --git a/router2/src/dml_handlers/trait.rs b/router2/src/dml_handlers/trait.rs index 86dbe8d3c0..f19d02cb6b 100644 --- a/router2/src/dml_handlers/trait.rs +++ b/router2/src/dml_handlers/trait.rs @@ -8,7 +8,7 @@ use mutable_batch::MutableBatch; use thiserror::Error; use trace::ctx::SpanContext; -use super::{SchemaError, ShardError}; +use super::{NamespaceCreationError, SchemaError, ShardError}; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -26,6 +26,10 @@ pub enum DmlError { #[error(transparent)] Schema(#[from] SchemaError), + /// Failed to create the request namespace. + #[error(transparent)] + NamespaceCreation(#[from] NamespaceCreationError), + /// An unknown error occured while processing the DML request. #[error("internal dml handler error: {0}")] Internal(Box), diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 72c8f41a9a..1052cdae17 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -85,9 +85,9 @@ impl Error { // https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13 StatusCode::UNSUPPORTED_MEDIA_TYPE } - Error::DmlHandler(DmlError::Internal(_) | DmlError::WriteBuffer(_)) => { - StatusCode::INTERNAL_SERVER_ERROR - } + Error::DmlHandler( + DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_), + ) => StatusCode::INTERNAL_SERVER_ERROR, } } }