diff --git a/Cargo.lock b/Cargo.lock index 83ead87d0b..22861d8c6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3901,6 +3901,7 @@ dependencies = [ "generated_types", "hashbrown 0.12.0", "hyper", + "iox_catalog", "metric", "mutable_batch", "mutable_batch_lp", @@ -3909,6 +3910,7 @@ dependencies = [ "paste", "predicate", "rand", + "schema", "serde", "serde_urlencoded", "siphasher", diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index a69b9b4ea7..cd5abcb5fa 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -12,9 +12,10 @@ use crate::{ }, }, }; +use iox_catalog::{interface::Catalog, postgres::PostgresCatalog}; use observability_deps::tracing::*; use router2::{ - dml_handlers::ShardedWriteBuffer, + dml_handlers::{SchemaValidator, ShardedWriteBuffer}, sequencer::Sequencer, server::{http::HttpDelegate, RouterServer}, sharder::TableNamespaceSharder, @@ -31,6 +32,9 @@ pub enum Error { #[error("Invalid config: {0}")] InvalidConfig(#[from] CommonServerStateError), + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + #[error("failed to initialise write buffer connection: {0}")] WriteBuffer(#[from] WriteBufferError), } @@ -58,12 +62,25 @@ pub struct Config { #[clap(flatten)] pub(crate) write_buffer_config: WriteBufferConfig, + + /// Postgres connection string + #[clap(env = "INFLUXDB_IOX_CATALOG_DSN")] + pub catalog_dsn: String, } pub async fn command(config: Config) -> Result<()> { let common_state = CommonServerState::from_config(config.run_config.clone())?; let metrics = Arc::new(metric::Registry::default()); + let catalog: Arc = Arc::new( + PostgresCatalog::connect( + "router2", + iox_catalog::postgres::SCHEMA_NAME, + &config.catalog_dsn, + ) + .await?, + ); + let write_buffer = init_write_buffer( &config, Arc::clone(&metrics), @@ -71,7 +88,9 @@ pub async fn command(config: Config) -> Result<()> { ) .await?; - let http = HttpDelegate::new(config.run_config.max_http_request_size, write_buffer); + let handler_stack = SchemaValidator::new(write_buffer, catalog); + + let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack); let router_server = RouterServer::new( http, Default::default(), diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 1ada77aec9..043855e05d 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -528,10 +528,7 @@ impl NamespaceSchema { } /// Gets the namespace schema including all tables and columns. -pub async fn get_schema_by_name( - name: &str, - catalog: &dyn Catalog, -) -> Result> { +pub async fn get_schema_by_name(name: &str, catalog: &dyn Catalog) -> Result { let namespace = catalog .namespaces() .get_by_name(name) @@ -578,7 +575,7 @@ pub async fn get_schema_by_name( namespace.tables.insert(table_name, schema); } - Ok(Some(namespace)) + Ok(namespace) } /// Data object for a table diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 62b3720667..0500fcc5b7 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -259,8 +259,7 @@ mod tests { // cached schema. let db_schema = get_schema_by_name(NAMESPACE_NAME, &repo) .await - .expect("database failed to query for namespace schema") - .expect("no schema found"); + .expect("database failed to query for namespace schema"); assert_eq!(schema, db_schema, "schema in DB and cached schema differ"); // Generate the map of tables => desired column types diff --git a/router2/Cargo.toml b/router2/Cargo.toml index f238f1e92f..7bcb52154c 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3.19" generated_types = { path = "../generated_types" } hashbrown = "0.12" hyper = "0.14" +iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } @@ -37,6 +38,7 @@ assert_matches = "1.5" criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] } paste = "1.0.6" rand = "0.8.3" +schema = { path = "../schema" } [[bench]] name = "sharder" diff --git a/router2/src/dml_handlers/mod.rs b/router2/src/dml_handlers/mod.rs index 8e84da874f..afe91517e1 100644 --- a/router2/src/dml_handlers/mod.rs +++ b/router2/src/dml_handlers/mod.rs @@ -46,6 +46,9 @@ mod r#trait; pub use r#trait::*; +mod schema_validation; +pub use schema_validation::*; + pub mod nop; mod sharded_write_buffer; diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs new file mode 100644 index 0000000000..7eafa384ac --- /dev/null +++ b/router2/src/dml_handlers/schema_validation.rs @@ -0,0 +1,449 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use data_types::{delete_predicate::DeletePredicate, DatabaseName}; +use hashbrown::HashMap; +use iox_catalog::{ + interface::{get_schema_by_name, Catalog, NamespaceSchema}, + validate_or_insert_schema, +}; +use mutable_batch::MutableBatch; +use observability_deps::tracing::*; +use parking_lot::RwLock; +use thiserror::Error; +use trace::ctx::SpanContext; + +use super::{DmlError, DmlHandler}; + +/// Errors emitted during schema validation. +#[derive(Debug, Error)] +pub enum SchemaError { + /// The requested namespace could not be found in the catalog. + #[error("failed to read namespace schema from catalog: {0}")] + NamespaceLookup(iox_catalog::interface::Error), + + /// The request failed during schema validation. + /// + /// NOTE: this may be due to transient I/O errors while interrogating the + /// global catalog - the caller should inspect the inner error to determine + /// the failure reason. + #[error(transparent)] + Validate(iox_catalog::interface::Error), + + /// The inner DML handler returned an error. + #[error(transparent)] + Inner(Box), +} + +/// A [`SchemaValidator`] checks the schema of incoming writes against a +/// centralised schema store, maintaining an in-memory cache of all observed +/// schemas. +/// +/// # Caching +/// +/// This validator attempts to incrementally build an in-memory cache of all +/// table schemas it observes (never evicting the schemas). +/// +/// All schema operations are scoped to a single namespace. +/// +/// When a request contains columns that do not exist in the cached schema, the +/// catalog is queried for an existing column by the same name and (atomically) +/// the column is created if it does not exist. If the requested column type +/// conflicts with the catalog column, the request is rejected and the cached +/// schema is not updated. +/// +/// Any successful write that adds new columns causes the new schema to be +/// cached. +/// +/// To minimise locking, this cache is designed to allow (and tolerate) spurious +/// cache "updates" racing with each other and overwriting newer schemas with +/// older schemas. This is acceptable due to the incremental, additive schema +/// creation never allowing a column to change or be removed, therefore columns +/// lost by racy schema cache overwrites are "discovered" in subsequent +/// requests. This overwriting is scoped to the namespace, and is expected to be +/// relatively rare - it results in additional requests being made to the +/// catalog until the cached schema converges to match the catalog schema. +/// +/// # Correctness +/// +/// The correct functioning of this schema validator relies on the catalog +/// implementation correctly validating new column requests against any existing +/// (but uncached) columns, returning an error if the requested type does not +/// match. +/// +/// The catalog must serialise column creation to avoid `set(a=tag)` overwriting +/// a prior `set(a=int)` write to the catalog (returning an error that `a` is an +/// `int` for the second request). +/// +/// Because each column is set incrementally and independently of other new +/// columns in the request, racing multiple requests for the same table can +/// produce incorrect schemas ([#3573]). +/// +/// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573 +#[derive(Debug)] +pub struct SchemaValidator { + inner: D, + catalog: Arc, + + cache: RwLock, Arc>>, +} + +impl SchemaValidator { + /// Initialise a new [`SchemaValidator`] decorator, loading schemas from + /// `catalog` and passing acceptable requests through to `inner`. + pub fn new(inner: D, catalog: Arc) -> Self { + Self { + inner, + catalog, + cache: Default::default(), + } + } +} + +#[async_trait] +impl DmlHandler for SchemaValidator +where + D: DmlHandler, +{ + type WriteError = SchemaError; + type DeleteError = D::DeleteError; + + /// Validate the schema of all the writes in `batches` before passing the + /// request onto the inner handler. + /// + /// # Errors + /// + /// If `namespace` does not exist, [`SchemaError::NamespaceLookup`] is + /// returned. + /// + /// If the schema validation fails, [`SchemaError::Validate`] is returned. + /// Callers should inspect the inner error to determine if the failure was + /// caused by catalog I/O, or a schema conflict. + /// + /// A request that fails validation on one or more tables fails the request + /// as a whole - calling this method has "all or nothing" semantics. + /// + /// If the inner handler returns an error (wrapped in a + /// [`SchemaError::Inner`]), the semantics of the inner handler write apply. + async fn write( + &self, + namespace: DatabaseName<'static>, + batches: HashMap, + span_ctx: Option, + ) -> Result<(), Self::WriteError> { + // Load the namespace schema from the cache, falling back to pulling it + // from the global catalog (if it exists). + let schema = self.cache.read().get(&namespace).map(Arc::clone); + let schema = match schema { + Some(v) => v, + None => { + // Pull the schema from the global catalog or error if it does + // not exist. + let schema = get_schema_by_name(&namespace, &*self.catalog) + .await + .map_err(|e| { + warn!(error=%e, %namespace, "failed to retrieve namespace schema"); + SchemaError::NamespaceLookup(e) + }) + .map(Arc::new)?; + + self.cache + .write() + .insert(namespace.clone(), Arc::clone(&schema)); + + trace!(%namespace, "schema cache populated"); + schema + } + }; + + let maybe_new_schema = validate_or_insert_schema(&batches, &schema, &*self.catalog) + .await + .map_err(|e| { + warn!(error=%e, %namespace, "schema validation failed"); + SchemaError::Validate(e) + })? + .map(Arc::new); + + trace!(%namespace, "schema validation complete"); + + // If the schema has been updated, immediately add it to the cache + // (before passing through the write) in order to allow subsequent, + // parallel requests to use it while waiting on the inner DML handler to + // perform the write. + match maybe_new_schema { + Some(v) => { + // This call MAY overwrite a more-up-to-date cache entry if + // racing with another request for the same namespace, but the + // cache will eventually converge in subsequent requests. + self.cache.write().insert(namespace.clone(), v); + trace!(%namespace, "schema cache updated"); + } + None => { + trace!(%namespace, "schema unchanged"); + } + } + + self.inner + .write(namespace, batches, span_ctx) + .await + .map_err(|e| SchemaError::Inner(Box::new(e.into()))) + } + + /// This call is passed through to `D` - no schema validation is performed + /// on deletes. + async fn delete<'a>( + &self, + namespace: DatabaseName<'static>, + table_name: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, + span_ctx: Option, + ) -> Result<(), Self::DeleteError> { + self.inner + .delete(namespace, table_name, predicate, span_ctx) + .await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use data_types::timestamp::TimestampRange; + use iox_catalog::{ + interface::{ColumnType, KafkaTopicId, QueryPoolId}, + mem::MemCatalog, + }; + use schema::{InfluxColumnType, InfluxFieldType}; + + use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; + + use super::*; + + const NAMESPACE: &str = "bananas"; + + #[derive(Debug, Error)] + enum MockError { + #[error("terrible things")] + Terrible, + } + + // Parse `lp` into a table-keyed MutableBatch map. + fn lp_to_writes(lp: &str) -> HashMap { + let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) + .expect("failed to build test writes from LP"); + writes + } + + /// Initialise an in-memory [`MemCatalog`] and create a single namespace + /// named [`NAMESPACE`]. + async fn create_catalog() -> Arc { + let catalog: Arc = Arc::new(MemCatalog::new()); + catalog + .namespaces() + .create( + NAMESPACE, + "inf", + KafkaTopicId::new(42), + QueryPoolId::new(24), + ) + .await + .expect("failed to create test namespace"); + catalog + } + + fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) { + // The cache should be populated. + let cached = handler.cache.read(); + let ns = cached + .get(&NAMESPACE.try_into().unwrap()) + .expect("cache should be populated"); + let table = ns.tables.get(table).expect("table should exist in cache"); + assert_eq!( + table + .columns + .get(col) + .expect("column not cached") + .column_type, + want + ); + } + + #[tokio::test] + async fn test_write_ok() { + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); + handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect("request should succeed"); + + // THe mock should observe exactly one write. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => { + assert_eq!(namespace, NAMESPACE); + }); + + // The cache should be populated. + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); + assert_cache(&handler, "bananas", "val", ColumnType::I64); + assert_cache(&handler, "bananas", "time", ColumnType::Time); + } + + #[tokio::test] + async fn test_write_schema_not_found() { + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); + let err = handler + .write("A_DIFFERENT_NAMESPACE".try_into().unwrap(), writes, None) + .await + .expect_err("request should fail"); + + assert_matches!(err, SchemaError::NamespaceLookup(_)); + assert!(mock.calls().is_empty()); + + // The cache should not have retained the schema. + assert!(handler.cache.read().is_empty()); + } + + #[tokio::test] + async fn test_write_validation_failure() { + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + // First write sets the schema + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64 + handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect("request should succeed"); + + // Second write attempts to violate it causing an error + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42.0 123456"); // val=float + let err = handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect_err("request should fail"); + + assert_matches!(err, SchemaError::Validate(_)); + + // THe mock should observe exactly one write from the first call. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, batches}] => { + assert_eq!(namespace, NAMESPACE); + let batch = batches.get("bananas").expect("table not found in write"); + assert_eq!(batch.rows(), 1); + let col = batch.column("val").expect("column not found in write"); + assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer)); + }); + + // The cache should retain the original schema. + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); + assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type + assert_cache(&handler, "bananas", "time", ColumnType::Time); + } + + #[tokio::test] + async fn test_write_inner_handler_error() { + let catalog = create_catalog().await; + let mock = Arc::new( + MockDmlHandler::default() + .with_write_return(vec![Err(DmlError::Internal(MockError::Terrible.into()))]), + ); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); + let err = handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect_err("request should return mock error"); + + assert_matches!(err, SchemaError::Inner(e) => { + assert_matches!(*e, DmlError::Internal(_)); + }); + + // The mock should observe exactly one write. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write { .. }]); + + // The cache should be populated as it has been flushed to the catalog. + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); + assert_cache(&handler, "bananas", "val", ColumnType::I64); + assert_cache(&handler, "bananas", "time", ColumnType::Time); + } + + #[tokio::test] + async fn test_write_delete_passthrough_ok() { + const NAMESPACE: &str = "NAMESPACE_IS_NOT_VALIDATED"; + const TABLE: &str = "bananas"; + + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_delete_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let predicate = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![], + }; + + handler + .delete( + NAMESPACE.try_into().unwrap(), + TABLE, + predicate.clone(), + None, + ) + .await + .expect("request should succeed"); + + // The mock should observe exactly one delete. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { namespace, predicate: got, table }] => { + assert_eq!(namespace, NAMESPACE); + assert_eq!(predicate, *got); + assert_eq!(table, TABLE); + }); + + // Deletes have no effect on the cache. + assert!(handler.cache.read().is_empty()); + } + + #[tokio::test] + async fn test_write_delete_passthrough_err() { + let catalog = create_catalog().await; + let mock = Arc::new( + MockDmlHandler::default() + .with_delete_return(vec![Err(DmlError::Internal(MockError::Terrible.into()))]), + ); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let predicate = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![], + }; + + let err = handler + .delete( + "NAMESPACE_IS_IGNORED".try_into().unwrap(), + "bananas", + predicate, + None, + ) + .await + .expect_err("request should return mock error"); + + assert_matches!(err, DmlError::Internal(_)); + + // The mock should observe exactly one delete. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { .. }]); + + // Deletes have no effect on the cache. + assert!(handler.cache.read().is_empty()); + } +} diff --git a/router2/src/dml_handlers/trait.rs b/router2/src/dml_handlers/trait.rs index 5a9439cdef..86dbe8d3c0 100644 --- a/router2/src/dml_handlers/trait.rs +++ b/router2/src/dml_handlers/trait.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{error::Error, fmt::Debug}; use async_trait::async_trait; use data_types::{delete_predicate::DeletePredicate, DatabaseName}; @@ -8,7 +8,7 @@ use mutable_batch::MutableBatch; use thiserror::Error; use trace::ctx::SpanContext; -use super::ShardError; +use super::{SchemaError, ShardError}; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -22,9 +22,13 @@ pub enum DmlError { #[error(transparent)] WriteBuffer(#[from] ShardError), + /// A schema validation failure. + #[error(transparent)] + Schema(#[from] SchemaError), + /// An unknown error occured while processing the DML request. #[error("internal dml handler error: {0}")] - Internal(Box), + Internal(Box), } /// A composable, abstract handler of DML requests. @@ -34,9 +38,9 @@ pub trait DmlHandler: Debug + Send + Sync { /// requests. /// /// All errors must be mappable into the concrete [`DmlError`] type. - type WriteError: Into; + type WriteError: Error + Into + Send; /// The error type of the delete handler. - type DeleteError: Into; + type DeleteError: Error + Into + Send; /// Write `batches` to `namespace`. async fn write( diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 3ad3f97180..72c8f41a9a 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -80,6 +80,7 @@ impl Error { Error::ParseLineProtocol(_) => StatusCode::BAD_REQUEST, Error::ParseDelete(_) => StatusCode::BAD_REQUEST, Error::RequestSizeExceeded(_) => StatusCode::PAYLOAD_TOO_LARGE, + Error::DmlHandler(DmlError::Schema(_)) => StatusCode::BAD_REQUEST, Error::InvalidContentEncoding(_) => { // https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13 StatusCode::UNSUPPORTED_MEDIA_TYPE