Merge pull request #3628 from influxdata/dom/catalog-entry-creation

feat(router2): namespace auto-creation
pull/24376/head
kodiakhq[bot] 2022-02-04 15:46:58 +00:00 committed by GitHub
commit fa7fccde6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 383 additions and 39 deletions

View File

@ -16,7 +16,7 @@ use crate::{
};
use observability_deps::tracing::*;
use router2::{
dml_handlers::{SchemaValidator, ShardedWriteBuffer},
dml_handlers::{NamespaceAutocreation, SchemaValidator, ShardedWriteBuffer},
namespace_cache::MemoryNamespaceCache,
sequencer::Sequencer,
server::{http::HttpDelegate, RouterServer},
@ -67,6 +67,14 @@ pub struct Config {
#[clap(flatten)]
pub(crate) write_buffer_config: WriteBufferConfig,
/// Query pool name to dispatch writes to.
#[clap(
long = "--query-pool",
env = "INFLUXDB_IOX_QUERY_POOL_NAME",
default_value = "iox-shared"
)]
pub(crate) query_pool_name: String,
}
pub async fn command(config: Config) -> Result<()> {
@ -83,7 +91,56 @@ pub async fn command(config: Config) -> Result<()> {
.await?;
let ns_cache = Arc::new(MemoryNamespaceCache::default());
let handler_stack = SchemaValidator::new(write_buffer, catalog, ns_cache);
let handler_stack =
SchemaValidator::new(write_buffer, Arc::clone(&catalog), Arc::clone(&ns_cache));
////////////////////////////////////////////////////////////////////////////
//
// THIS CODE IS FOR TESTING ONLY.
//
// The source of truth for the kafka topics & query pools will be read from
// the DB, rather than CLI args for a prod deployment.
//
////////////////////////////////////////////////////////////////////////////
//
// Look up the kafka topic ID needed to populate namespace creation
// requests.
//
// This code / auto-creation is for architecture testing purposes only - a
// prod deployment would expect namespaces to be explicitly created and this
// layer would be removed.
let topic_id = catalog
.kafka_topics()
.get_by_name(&config.write_buffer_config.topic)
.await?
.map(|v| v.id)
.unwrap_or_else(|| {
panic!(
"no kafka topic named {} in catalog",
&config.write_buffer_config.topic
)
});
let query_id = catalog
.query_pools()
.create_or_get(&config.query_pool_name)
.await
.map(|v| v.id)
.unwrap_or_else(|e| {
panic!(
"failed to upsert query pool {} in catalog: {}",
&config.write_buffer_config.topic, e
)
});
let handler_stack = NamespaceAutocreation::new(
catalog,
ns_cache,
topic_id,
query_id,
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
handler_stack,
);
//
////////////////////////////////////////////////////////////////////////////
let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack);
let router_server = RouterServer::new(

View File

@ -25,6 +25,9 @@ const SHARED_KAFKA_TOPIC: &str = "iox_shared";
const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC;
const TIME_COLUMN: &str = "time";
/// A string value representing an infinite retention policy.
pub const INFINITE_RETENTION_POLICY: &str = "inf";
pub mod interface;
pub mod mem;
pub mod postgres;

View File

@ -4,44 +4,65 @@
//! processing handler chain:
//!
//! ```text
//!
//! ┌──────────────┐ ┌──────────────┐
//! │ HTTP API │ │ gRPC API │
//! └──────────────┘ └──────────────┘
//! │ │
//! └─────────┬─────────┘
//! │
//! ▼
//! ╔═ DmlHandler Stack ═════╗
//! ║ ║
//! ║ ┌──────────────────┐ ║
//! ║ │ Schema │ ║
//! ║ │ Validation │ ║
//! ║ └──────────────────┘ ║
//! ║ │ ║
//! ║ ▼ ║
//! ║ ┌──────────────────┐ ║ ┌─────────┐
//! ║ │ShardedWriteBuffer│◀───▶│ Sharder │
//! ║ └──────────────────┘ ║ └─────────┘
//! ║ │ ║
//! ╚════════════│═══════════╝
//! │
//! ▼
//! ┌──────────────┐
//! │ Write Buffer │
//! └──────────────┘
//! │
//! │
//! ┌────────▼─────┐
//! │ Kafka ├┐
//! └┬─────────────┘├┐
//! └┬─────────────┘│
//! └──────────────┘
//! ┌──────────────┐ ┌──────────────┐
//! │ HTTP API │ │ gRPC API │
//! └──────────────┘ └──────────────┘
//! │ │
//! └─────────┬─────────┘
//! │
//! ▼
//! ╔═ DmlHandler Stack ═════╗
//! ║ ║
//! ║ ┌──────────────────┐ ║
//! ║ │ Namespace │ ║
//! ║ │ Autocreation │ ─║─ ─ ─ ─ ─ ─
//! ║ └──────────────────┘ ║ │
//! ║ │ ║ ┌─────────────────┐
//! ║ ▼ ║ │ Namespace Cache │
//! ║ ┌──────────────────┐ ║ └─────────────────┘
//! ║ │ Schema │ ║ │
//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─
//! ║ └──────────────────┘ ║
//! ║ │ ║
//! ║ ▼ ║
//! ┌───────┐ ║ ┌──────────────────┐ ║
//! │Sharder│◀ ─ ─ ▶│ShardedWriteBuffer│ ║
//! └───────┘ ║ └──────────────────┘ ║
//! ║ │ ║
//! ╚════════════│═══════════╝
//! │
//! ▼
//! ┌──────────────┐
//! │ Write Buffer │
//! └──────────────┘
//! │
//! │
//! ┌────────▼─────┐
//! │ Kafka ├┐
//! └┬─────────────┘├┐
//! └┬─────────────┘│
//! └──────────────┘
//! ```
//!
//! The HTTP / gRPC APIs decode their respective request format and funnel the
//! resulting operation through the common [`DmlHandler`] composed of the layers
//! described above.
//!
//! The [`NamespaceAutocreation`] handler (for testing only) populates the
//! global catalog with an entry for each namespace it observes, using the
//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending
//! requests to the catalog for namespaces that are known to exist.
//!
//! Writes pass through the [`SchemaValidator`] applying schema enforcement (a
//! NOP layer for deletes) which pushes additive schema changes to the catalog
//! and populates the [`NamespaceCache`], converging it to match the set of
//! [`NamespaceSchema`] in the global catalog.
//!
//! The [`ShardedWriteBuffer`] uses a sharder implementation to direct the DML
//! operations into a fixed set of sequencers.
//!
//! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache
//! [`NamespaceSchema`]: iox_catalog::interface::NamespaceSchema
mod r#trait;
pub use r#trait::*;
@ -54,5 +75,8 @@ pub mod nop;
mod sharded_write_buffer;
pub use sharded_write_buffer::*;
mod ns_autocreation;
pub use ns_autocreation::*;
#[cfg(test)]
pub mod mock;

View File

@ -0,0 +1,256 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
use hashbrown::HashMap;
use iox_catalog::interface::{Catalog, KafkaTopicId, QueryPoolId};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use thiserror::Error;
use trace::ctx::SpanContext;
use crate::namespace_cache::NamespaceCache;
use super::{DmlError, DmlHandler};
/// An error auto-creating the request namespace.
#[derive(Debug, Error)]
pub enum NamespaceCreationError {
/// An error returned from a namespace creation request.
#[error("failed to create namespace: {0}")]
Create(iox_catalog::interface::Error),
/// The inner DML handler returned an error.
#[error(transparent)]
Inner(Box<DmlError>),
}
/// A layer to populate the [`Catalog`] with all the namespaces the router
/// observes.
///
/// Uses a [`NamespaceCache`] to limit issuing create requests to namespaces the
/// router has not yet observed a schema for.
#[derive(Debug)]
pub struct NamespaceAutocreation<D, C> {
catalog: Arc<dyn Catalog>,
cache: C,
topic_id: KafkaTopicId,
query_id: QueryPoolId,
retention: String,
inner: D,
}
impl<D, C> NamespaceAutocreation<D, C> {
/// Return a new [`NamespaceAutocreation`] layer that ensures a requested
/// namespace exists in `catalog` before passing the request to `inner`.
///
/// If the namespace does not exist, it is created with the specified
/// `topic_id`, `query_id` and `retention` policy.
///
/// Namespaces are looked up in `cache`, skipping the creation request to
/// the catalog if there's a hit.
pub fn new(
catalog: Arc<dyn Catalog>,
cache: C,
topic_id: KafkaTopicId,
query_id: QueryPoolId,
retention: String,
inner: D,
) -> Self {
Self {
catalog,
cache,
topic_id,
query_id,
retention,
inner,
}
}
}
#[async_trait]
impl<D, C> DmlHandler for NamespaceAutocreation<D, C>
where
D: DmlHandler,
C: NamespaceCache,
{
type WriteError = NamespaceCreationError;
type DeleteError = NamespaceCreationError;
/// Write `batches` to `namespace`.
async fn write(
&self,
namespace: DatabaseName<'static>,
batches: HashMap<String, MutableBatch>,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
// If the namespace does not exist in the schema cache (populated by the
// schema validator) request an (idempotent) creation.
if self.cache.get_schema(&namespace).is_none() {
trace!(%namespace, "namespace auto-create cache miss");
match self
.catalog
.namespaces()
.create(
namespace.as_str(),
&self.retention,
self.topic_id,
self.query_id,
)
.await
{
Ok(_) => {
debug!(%namespace, "created namespace");
}
Err(iox_catalog::interface::Error::NameExists { .. }) => {
// Either the cache has not yet converged to include this
// namespace, or another thread raced populating the catalog
// and beat this thread to it.
debug!(%namespace, "spurious namespace create failed");
}
Err(e) => {
error!(error=%e, %namespace, "failed to auto-create namespace");
return Err(NamespaceCreationError::Create(e));
}
}
}
self.inner
.write(namespace, batches, span_ctx)
.await
.map_err(|e| NamespaceCreationError::Inner(Box::new(e.into())))
}
/// Delete the data specified in `delete`.
async fn delete<'a>(
&self,
namespace: DatabaseName<'static>,
table_name: impl Into<String> + Send + Sync + 'a,
predicate: DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, table_name, predicate, span_ctx)
.await
.map_err(|e| NamespaceCreationError::Inner(Box::new(e.into())))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use iox_catalog::{
interface::{Namespace, NamespaceId, NamespaceSchema},
mem::MemCatalog,
};
use crate::{
dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall},
namespace_cache::MemoryNamespaceCache,
};
use super::*;
#[tokio::test]
async fn test_cache_hit() {
let ns = DatabaseName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
cache.put_schema(
ns.clone(),
NamespaceSchema {
id: NamespaceId::new(1),
kafka_topic_id: KafkaTopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
},
);
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let creator = NamespaceAutocreation::new(
Arc::clone(&catalog),
cache,
KafkaTopicId::new(42),
QueryPoolId::new(42),
"inf".to_owned(),
Arc::clone(&mock_handler),
);
creator
.write(ns.clone(), Default::default(), None)
.await
.expect("handler should succeed");
// The cache hit should mean the catalog SHOULD NOT see a create request
// for the namespace.
assert!(
catalog
.namespaces()
.get_by_name(ns.as_str())
.await
.expect("lookup should not error")
.is_none(),
"expected no request to the catalog"
);
// And the DML handler must be called.
assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => {
assert_eq!(*namespace, *ns);
});
}
#[tokio::test]
async fn test_cache_miss() {
let ns = DatabaseName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let creator = NamespaceAutocreation::new(
Arc::clone(&catalog),
cache,
KafkaTopicId::new(42),
QueryPoolId::new(42),
"inf".to_owned(),
Arc::clone(&mock_handler),
);
creator
.write(ns.clone(), Default::default(), None)
.await
.expect("handler should succeed");
// The cache miss should mean the catalog MUST see a create request for
// the namespace.
let got = catalog
.namespaces()
.get_by_name(ns.as_str())
.await
.expect("lookup should not error")
.expect("creation request should be sent to catalog");
assert_eq!(
got,
Namespace {
id: NamespaceId::new(1),
name: ns.to_string(),
retention_duration: Some("inf".to_owned()),
kafka_topic_id: KafkaTopicId::new(42),
query_pool_id: QueryPoolId::new(42),
}
);
// And the DML handler must be called.
assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => {
assert_eq!(*namespace, *ns);
});
}
}

View File

@ -8,7 +8,7 @@ use mutable_batch::MutableBatch;
use thiserror::Error;
use trace::ctx::SpanContext;
use super::{SchemaError, ShardError};
use super::{NamespaceCreationError, SchemaError, ShardError};
/// Errors emitted by a [`DmlHandler`] implementation during DML request
/// processing.
@ -26,6 +26,10 @@ pub enum DmlError {
#[error(transparent)]
Schema(#[from] SchemaError),
/// Failed to create the request namespace.
#[error(transparent)]
NamespaceCreation(#[from] NamespaceCreationError),
/// An unknown error occured while processing the DML request.
#[error("internal dml handler error: {0}")]
Internal(Box<dyn Error + Send + Sync>),

View File

@ -85,9 +85,9 @@ impl Error {
// https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13
StatusCode::UNSUPPORTED_MEDIA_TYPE
}
Error::DmlHandler(DmlError::Internal(_) | DmlError::WriteBuffer(_)) => {
StatusCode::INTERNAL_SERVER_ERROR
}
Error::DmlHandler(
DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_),
) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}