From f9f9beac36f849ed1c57abf98e7a409c3e8a05cf Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 28 Jan 2022 16:24:29 +0000 Subject: [PATCH 1/6] refactor: get_schema_by_name no Option Fetching a NamespaceSchema always succeeds and never returns None. --- iox_catalog/src/interface.rs | 7 ++----- iox_catalog/src/lib.rs | 3 +-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 1ada77aec9..043855e05d 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -528,10 +528,7 @@ impl NamespaceSchema { } /// Gets the namespace schema including all tables and columns. -pub async fn get_schema_by_name( - name: &str, - catalog: &dyn Catalog, -) -> Result> { +pub async fn get_schema_by_name(name: &str, catalog: &dyn Catalog) -> Result { let namespace = catalog .namespaces() .get_by_name(name) @@ -578,7 +575,7 @@ pub async fn get_schema_by_name( namespace.tables.insert(table_name, schema); } - Ok(Some(namespace)) + Ok(namespace) } /// Data object for a table diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 62b3720667..0500fcc5b7 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -259,8 +259,7 @@ mod tests { // cached schema. let db_schema = get_schema_by_name(NAMESPACE_NAME, &repo) .await - .expect("database failed to query for namespace schema") - .expect("no schema found"); + .expect("database failed to query for namespace schema"); assert_eq!(schema, db_schema, "schema in DB and cached schema differ"); // Generate the map of tables => desired column types From c81f20729823575e3c42d0b00f5292f561c6db34 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 28 Jan 2022 16:25:20 +0000 Subject: [PATCH 2/6] feat: schema validation Implements a write schema validation DML handler, denying requests that conflict with the schema within the global catalog. Additive schema changes are accepted, incrementally updating the global catalog schema. Deletes are passed through unchanged and unvalidated. --- Cargo.lock | 2 + router2/Cargo.toml | 2 + router2/src/dml_handlers/mod.rs | 3 + router2/src/dml_handlers/schema_validation.rs | 312 ++++++++++++++++++ router2/src/dml_handlers/trait.rs | 14 +- router2/src/server/http.rs | 1 + 6 files changed, 329 insertions(+), 5 deletions(-) create mode 100644 router2/src/dml_handlers/schema_validation.rs diff --git a/Cargo.lock b/Cargo.lock index 6865f616ed..032de23c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3900,6 +3900,7 @@ dependencies = [ "generated_types", "hashbrown 0.12.0", "hyper", + "iox_catalog", "metric", "mutable_batch", "mutable_batch_lp", @@ -3908,6 +3909,7 @@ dependencies = [ "paste", "predicate", "rand", + "schema", "serde", "serde_urlencoded", "siphasher", diff --git a/router2/Cargo.toml b/router2/Cargo.toml index f238f1e92f..7bcb52154c 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3.19" generated_types = { path = "../generated_types" } hashbrown = "0.12" hyper = "0.14" +iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } @@ -37,6 +38,7 @@ assert_matches = "1.5" criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] } paste = "1.0.6" rand = "0.8.3" +schema = { path = "../schema" } [[bench]] name = "sharder" diff --git a/router2/src/dml_handlers/mod.rs b/router2/src/dml_handlers/mod.rs index 8e84da874f..afe91517e1 100644 --- a/router2/src/dml_handlers/mod.rs +++ b/router2/src/dml_handlers/mod.rs @@ -46,6 +46,9 @@ mod r#trait; pub use r#trait::*; +mod schema_validation; +pub use schema_validation::*; + pub mod nop; mod sharded_write_buffer; diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs new file mode 100644 index 0000000000..1f005dac1b --- /dev/null +++ b/router2/src/dml_handlers/schema_validation.rs @@ -0,0 +1,312 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use data_types::{delete_predicate::DeletePredicate, DatabaseName}; +use hashbrown::HashMap; +use iox_catalog::{ + interface::{get_schema_by_name, Catalog}, + validate_or_insert_schema, +}; +use mutable_batch::MutableBatch; +use thiserror::Error; +use trace::ctx::SpanContext; + +use super::{DmlError, DmlHandler}; + +/// Errors emitted during schema validation. +#[derive(Debug, Error)] +pub enum SchemaError { + /// The requested namespace could not be found in the catalog. + #[error("failed to read namespace schema from catalog: {0}")] + NamespaceLookup(iox_catalog::interface::Error), + + /// The request failed during schema validation. + /// + /// NOTE: this may be due to transient I/O errors while interrogating the + /// global catalog - the caller should inspect the inner error to determine + /// the failure reason. + #[error(transparent)] + Validate(iox_catalog::interface::Error), + + /// The inner DML handler returned an error. + #[error(transparent)] + Inner(Box), +} + +/// A [`SchemaValidator`] checks the schema of incoming writes against a +/// centralised schema store. +#[derive(Debug)] +pub struct SchemaValidator { + inner: D, + catalog: Arc, +} + +impl SchemaValidator { + /// Initialise a new [`SchemaValidator`] decorator, loading schemas from + /// `catalog` and passing acceptable requests through to `inner`. + pub fn new(inner: D, catalog: Arc) -> Self { + Self { inner, catalog } + } +} + +#[async_trait] +impl DmlHandler for SchemaValidator +where + D: DmlHandler, +{ + type WriteError = SchemaError; + type DeleteError = D::DeleteError; + + /// Validate the schema of all the writes in `batches` before passing the + /// request onto the inner handler. + /// + /// # Errors + /// + /// If `namespace` does not exist, [`SchemaError::NamespaceLookup`] is + /// returned. + /// + /// If the schema validation fails, [`SchemaError::Validate`] is returned. + /// Callers should inspect the inner error to determine if the failure was + /// caused by catalog I/O, or a schema conflict. + /// + /// A request that fails validation on one or more tables fails the request + /// as a whole - calling this method has "all or nothing" semantics. + /// + /// If the inner handler returns an error (wrapped in a + /// [`SchemaError::Inner`]), the semantics of the inner handler write apply. + async fn write( + &self, + namespace: DatabaseName<'static>, + batches: HashMap, + span_ctx: Option, + ) -> Result<(), Self::WriteError> { + let schema = get_schema_by_name(&namespace, &*self.catalog) + .await + .map_err(SchemaError::NamespaceLookup)?; + + let _new_schema = validate_or_insert_schema(&batches, &schema, &*self.catalog) + .await + .map_err(SchemaError::Validate)?; + + self.inner + .write(namespace, batches, span_ctx) + .await + .map_err(|e| SchemaError::Inner(Box::new(e.into())))?; + + Ok(()) + } + + /// This call is passed through to `D` - no schema validation is performed + /// on deletes. + async fn delete<'a>( + &self, + namespace: DatabaseName<'static>, + table_name: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, + span_ctx: Option, + ) -> Result<(), Self::DeleteError> { + self.inner + .delete(namespace, table_name, predicate, span_ctx) + .await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use data_types::timestamp::TimestampRange; + use iox_catalog::{ + interface::{KafkaTopicId, QueryPoolId}, + mem::MemCatalog, + }; + use schema::{InfluxColumnType, InfluxFieldType}; + + use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; + + use super::*; + + const NAMESPACE: &str = "bananas"; + + #[derive(Debug, Error)] + enum MockError { + #[error("terrible things")] + Terrible, + } + + // Parse `lp` into a table-keyed MutableBatch map. + fn lp_to_writes(lp: &str) -> HashMap { + let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) + .expect("failed to build test writes from LP"); + writes + } + + /// Initialise an in-memory [`MemCatalog`] and create a single namespace + /// named [`NAMESPACE`]. + async fn create_catalog() -> Arc { + let catalog: Arc = Arc::new(MemCatalog::new()); + catalog + .namespaces() + .create( + NAMESPACE, + "inf", + KafkaTopicId::new(42), + QueryPoolId::new(24), + ) + .await + .expect("failed to create test namespace"); + catalog + } + + #[tokio::test] + async fn test_write_ok() { + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); + handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect("request should succeed"); + + // THe mock should observe exactly one write. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => { + assert_eq!(namespace, NAMESPACE); + }); + } + + #[tokio::test] + async fn test_write_schema_not_found() { + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); + let err = handler + .write("A_DIFFERENT_NAMESPACE".try_into().unwrap(), writes, None) + .await + .expect_err("request should fail"); + + assert_matches!(err, SchemaError::NamespaceLookup(_)); + assert!(mock.calls().is_empty()); + } + + #[tokio::test] + async fn test_write_validation_failure() { + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + // First write sets the schema + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64 + handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect("request should succeed"); + + // Second write attempts to violate it causing an error + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42.0 123456"); // val=float + let err = handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect_err("request should fail"); + + assert_matches!(err, SchemaError::Validate(_)); + + // THe mock should observe exactly one write from the first call. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, batches}] => { + assert_eq!(namespace, NAMESPACE); + let batch = batches.get("bananas").expect("table not found in write"); + assert_eq!(batch.rows(), 1); + let col = batch.column("val").expect("column not found in write"); + assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer)); + }); + } + + #[tokio::test] + async fn test_write_inner_handler_error() { + let catalog = create_catalog().await; + let mock = Arc::new( + MockDmlHandler::default() + .with_write_return(vec![Err(DmlError::Internal(MockError::Terrible.into()))]), + ); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); + let err = handler + .write(NAMESPACE.try_into().unwrap(), writes, None) + .await + .expect_err("request should return mock error"); + + assert_matches!(err, SchemaError::Inner(e) => { + assert_matches!(*e, DmlError::Internal(_)); + }); + + // The mock should observe exactly one write. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write { .. }]); + } + + #[tokio::test] + async fn test_write_delete_passthrough_ok() { + const NAMESPACE: &str = "NAMESPACE_IS_NOT_VALIDATED"; + const TABLE: &str = "bananas"; + + let catalog = create_catalog().await; + let mock = Arc::new(MockDmlHandler::default().with_delete_return(vec![Ok(())])); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let predicate = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![], + }; + + handler + .delete( + NAMESPACE.try_into().unwrap(), + TABLE, + predicate.clone(), + None, + ) + .await + .expect("request should succeed"); + + // The mock should observe exactly one delete. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { namespace, predicate: got, table }] => { + assert_eq!(namespace, NAMESPACE); + assert_eq!(predicate, *got); + assert_eq!(table, TABLE); + }); + } + + #[tokio::test] + async fn test_write_delete_passthrough_err() { + let catalog = create_catalog().await; + let mock = Arc::new( + MockDmlHandler::default() + .with_delete_return(vec![Err(DmlError::Internal(MockError::Terrible.into()))]), + ); + let handler = SchemaValidator::new(Arc::clone(&mock), catalog); + + let predicate = DeletePredicate { + range: TimestampRange::new(1, 2), + exprs: vec![], + }; + + let err = handler + .delete( + "NAMESPACE_IS_IGNORED".try_into().unwrap(), + "bananas", + predicate, + None, + ) + .await + .expect_err("request should return mock error"); + + assert_matches!(err, DmlError::Internal(_)); + + // The mock should observe exactly one delete. + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { .. }]); + } +} diff --git a/router2/src/dml_handlers/trait.rs b/router2/src/dml_handlers/trait.rs index 5a9439cdef..ee3316e832 100644 --- a/router2/src/dml_handlers/trait.rs +++ b/router2/src/dml_handlers/trait.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{error::Error, fmt::Debug}; use async_trait::async_trait; use data_types::{delete_predicate::DeletePredicate, DatabaseName}; @@ -8,7 +8,7 @@ use mutable_batch::MutableBatch; use thiserror::Error; use trace::ctx::SpanContext; -use super::ShardError; +use super::{SchemaError, ShardError}; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -22,9 +22,13 @@ pub enum DmlError { #[error(transparent)] WriteBuffer(#[from] ShardError), + /// A schema validation failure. + #[error(transparent)] + Schema(#[from] SchemaError), + /// An unknown error occured while processing the DML request. #[error("internal dml handler error: {0}")] - Internal(Box), + Internal(Box), } /// A composable, abstract handler of DML requests. @@ -34,9 +38,9 @@ pub trait DmlHandler: Debug + Send + Sync { /// requests. /// /// All errors must be mappable into the concrete [`DmlError`] type. - type WriteError: Into; + type WriteError: Error + Into; /// The error type of the delete handler. - type DeleteError: Into; + type DeleteError: Error + Into; /// Write `batches` to `namespace`. async fn write( diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 3ad3f97180..72c8f41a9a 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -80,6 +80,7 @@ impl Error { Error::ParseLineProtocol(_) => StatusCode::BAD_REQUEST, Error::ParseDelete(_) => StatusCode::BAD_REQUEST, Error::RequestSizeExceeded(_) => StatusCode::PAYLOAD_TOO_LARGE, + Error::DmlHandler(DmlError::Schema(_)) => StatusCode::BAD_REQUEST, Error::InvalidContentEncoding(_) => { // https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13 StatusCode::UNSUPPORTED_MEDIA_TYPE From 659802372682f59b93c5bca121b568cc8fb48f2c Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 28 Jan 2022 14:42:31 +0000 Subject: [PATCH 3/6] feat: cache NamespaceSchema in validator Adds an in-memory cache of table schemas to the SchemaValidator DML handler. The cache pulls from the global catalog when observing a column for the first time, and pushes the column type to set it for subsequent requests if it does not exist (this pull & push is done by atomically by the catalog in an "upsert" call). The in-memory cache is sharded by namespace, with each shard guarded by an individual lock to minimise contention between readers (the expected average case) and writers (only when adding new columns/tables). Relies on the catalog to serialise new column creation and validate parallel creation requests. --- Cargo.lock | 12 ++ router2/Cargo.toml | 1 + router2/src/dml_handlers/schema_validation.rs | 166 +++++++++++++++++- 3 files changed, 170 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 032de23c92..3ca9b0e393 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,6 +841,17 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b799062aaf67eb976af3bdca031ee6f846d2f0a5710ddbb0d2efee33f3cc4760" +dependencies = [ + "cfg-if", + "num_cpus", + "parking_lot", +] + [[package]] name = "data_types" version = "0.1.0" @@ -3893,6 +3904,7 @@ dependencies = [ "async-trait", "bytes", "criterion", + "dashmap", "data_types", "dml", "flate2", diff --git a/router2/Cargo.toml b/router2/Cargo.toml index 7bcb52154c..ffc70e7c12 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] async-trait = "0.1" bytes = "1.1" +dashmap = "5.0.0" data_types = { path = "../data_types" } dml = { path = "../dml" } flate2 = "1.0" diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index 1f005dac1b..0155c92033 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -1,13 +1,15 @@ use std::sync::Arc; use async_trait::async_trait; +use dashmap::{mapref::entry::Entry, DashMap}; use data_types::{delete_predicate::DeletePredicate, DatabaseName}; use hashbrown::HashMap; use iox_catalog::{ - interface::{get_schema_by_name, Catalog}, + interface::{get_schema_by_name, Catalog, NamespaceSchema}, validate_or_insert_schema, }; use mutable_batch::MutableBatch; +use observability_deps::tracing::*; use thiserror::Error; use trace::ctx::SpanContext; @@ -34,18 +36,71 @@ pub enum SchemaError { } /// A [`SchemaValidator`] checks the schema of incoming writes against a -/// centralised schema store. +/// centralised schema store, maintaining an in-memory cache of all observed +/// schemas. +/// +/// # Caching +/// +/// This validator attempts to incrementally build an in-memory cache of all +/// table schemas it observes (never evicting the schemas). +/// +/// All schema operations are scoped to a single namespace. +/// +/// When a request contains columns that do not exist in the cached schema, the +/// catalog is queried for an existing column by the same name and (atomically) +/// the column is created if it does not exist. If the requested column type +/// conflicts with the catalog column, the request is rejected and the cached +/// schema is not updated. +/// +/// Any successful write that adds new columns causes the new schema to be +/// cached. +/// +/// To minimise locking, this cache is designed to allow (and tolerate) spurious +/// cache "updates" racing with each other and overwriting newer schemas with +/// older schemas. This is acceptable due to the incremental, additive schema +/// creation never allowing a column to change or be removed, therefore columns +/// lost by racy schema cache overwrites are "discovered" in subsequent +/// requests. This overwriting is scoped to the namespace, and is expected to be +/// relatively rare - it results in additional requests being made to the +/// catalog until the cached schema converges to match the catalog schema. +/// +/// # Correctness +/// +/// The correct functioning of this schema validator relies on the catalog +/// implementation correctly validating new column requests against any existing +/// (but uncached) columns, returning an error if the requested type does not +/// match. +/// +/// The catalog must serialise column creation to avoid `set(a=tag)` overwriting +/// a prior `set(a=int)` write to the catalog (returning an error that `a` is an +/// `int` for the second request). +/// +/// Because each column is set incrementally and independently of other new +/// columns in the request, racing multiple requests for the same table can +/// produce incorrect schemas ([#3573]). +/// +/// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573 #[derive(Debug)] pub struct SchemaValidator { inner: D, catalog: Arc, + + // A map guarded by locks sharded by key. + // + // No documentation is provided as to the read/write bias or other locking + // semantics. + cache: DashMap, Arc>, } impl SchemaValidator { /// Initialise a new [`SchemaValidator`] decorator, loading schemas from /// `catalog` and passing acceptable requests through to `inner`. pub fn new(inner: D, catalog: Arc) -> Self { - Self { inner, catalog } + Self { + inner, + catalog, + cache: Default::default(), + } } } @@ -80,13 +135,59 @@ where batches: HashMap, span_ctx: Option, ) -> Result<(), Self::WriteError> { - let schema = get_schema_by_name(&namespace, &*self.catalog) - .await - .map_err(SchemaError::NamespaceLookup)?; + // Load the namespace schema from the cache, falling back to pulling it + // from the global catalog (if it exists). + let schema = match self.cache.entry(namespace.clone()) { + Entry::Occupied(v) => { + trace!(%namespace, "schema cache hit"); + Arc::clone(v.get()) + } + Entry::Vacant(v) => { + trace!(%namespace, "schema cache miss"); - let _new_schema = validate_or_insert_schema(&batches, &schema, &*self.catalog) + // Pull the schema from the global catalog or error if it does + // not exist. + let schema = get_schema_by_name(&namespace, &*self.catalog) + .await + .map_err(|e| { + warn!(error=%e, %namespace, "failed to retrieve namespace schema"); + SchemaError::NamespaceLookup(e) + }) + .map(Arc::new)?; + + v.insert(Arc::clone(&schema)); + trace!(%namespace, "schema cache populated"); + + schema + } + }; + + let maybe_new_schema = validate_or_insert_schema(&batches, &schema, &*self.catalog) .await - .map_err(SchemaError::Validate)?; + .map_err(|e| { + warn!(error=%e, %namespace, "schema validation failed"); + SchemaError::Validate(e) + })? + .map(Arc::new); + + trace!(%namespace, "schema validation complete"); + + // If the schema has been updated, immediately add it to the cache + // (before passing through the write) in order to allow subsequent, + // parallel requests to use it while waiting on the inner DML handler to + // perform the write. + match maybe_new_schema { + Some(v) => { + // This call MAY overwrite a more-up-to-date cache entry if + // racing with another request for the same namespace, but the + // cache will eventually converge in subsequent requests. + self.cache.insert(namespace.clone(), v); + trace!(%namespace, "schema cache updated"); + } + None => { + trace!(%namespace, "schema unchanged"); + } + } self.inner .write(namespace, batches, span_ctx) @@ -118,7 +219,7 @@ mod tests { use assert_matches::assert_matches; use data_types::timestamp::TimestampRange; use iox_catalog::{ - interface::{KafkaTopicId, QueryPoolId}, + interface::{ColumnType, KafkaTopicId, QueryPoolId}, mem::MemCatalog, }; use schema::{InfluxColumnType, InfluxFieldType}; @@ -159,6 +260,26 @@ mod tests { catalog } + fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) { + // The cache should be populated. + let cached = handler + .cache + .get(&NAMESPACE.try_into().unwrap()) + .expect("cache should be populated"); + let table = cached + .tables + .get(table) + .expect("table should exist in cache"); + assert_eq!( + table + .columns + .get(col) + .expect("column not cached") + .column_type, + want + ); + } + #[tokio::test] async fn test_write_ok() { let catalog = create_catalog().await; @@ -175,6 +296,12 @@ mod tests { assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => { assert_eq!(namespace, NAMESPACE); }); + + // The cache should be populated. + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); + assert_cache(&handler, "bananas", "val", ColumnType::I64); + assert_cache(&handler, "bananas", "time", ColumnType::Time); } #[tokio::test] @@ -191,6 +318,9 @@ mod tests { assert_matches!(err, SchemaError::NamespaceLookup(_)); assert!(mock.calls().is_empty()); + + // The cache should not have retained the schema. + assert!(handler.cache.is_empty()); } #[tokio::test] @@ -223,6 +353,12 @@ mod tests { let col = batch.column("val").expect("column not found in write"); assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer)); }); + + // The cache should retain the original schema. + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); + assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type + assert_cache(&handler, "bananas", "time", ColumnType::Time); } #[tokio::test] @@ -246,6 +382,12 @@ mod tests { // The mock should observe exactly one write. assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write { .. }]); + + // The cache should be populated as it has been flushed to the catalog. + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); + assert_cache(&handler, "bananas", "val", ColumnType::I64); + assert_cache(&handler, "bananas", "time", ColumnType::Time); } #[tokio::test] @@ -278,6 +420,9 @@ mod tests { assert_eq!(predicate, *got); assert_eq!(table, TABLE); }); + + // Deletes have no effect on the cache. + assert!(handler.cache.is_empty()); } #[tokio::test] @@ -308,5 +453,8 @@ mod tests { // The mock should observe exactly one delete. assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { .. }]); + + // Deletes have no effect on the cache. + assert!(handler.cache.is_empty()); } } From 39d489d9e717de7a2348db9dffa570b101fc8767 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 28 Jan 2022 15:45:56 +0000 Subject: [PATCH 4/6] refactor: enable schema validation Adds the SchemaValidator to the DML handler stack - this adds it into the request path in router2. --- Cargo.lock | 2 +- influxdb_iox/src/commands/run/router2.rs | 23 +++++++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ca9b0e393..96cd89dc55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -849,7 +849,7 @@ checksum = "b799062aaf67eb976af3bdca031ee6f846d2f0a5710ddbb0d2efee33f3cc4760" dependencies = [ "cfg-if", "num_cpus", - "parking_lot", + "parking_lot 0.11.2", ] [[package]] diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index a69b9b4ea7..cd5abcb5fa 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -12,9 +12,10 @@ use crate::{ }, }, }; +use iox_catalog::{interface::Catalog, postgres::PostgresCatalog}; use observability_deps::tracing::*; use router2::{ - dml_handlers::ShardedWriteBuffer, + dml_handlers::{SchemaValidator, ShardedWriteBuffer}, sequencer::Sequencer, server::{http::HttpDelegate, RouterServer}, sharder::TableNamespaceSharder, @@ -31,6 +32,9 @@ pub enum Error { #[error("Invalid config: {0}")] InvalidConfig(#[from] CommonServerStateError), + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + #[error("failed to initialise write buffer connection: {0}")] WriteBuffer(#[from] WriteBufferError), } @@ -58,12 +62,25 @@ pub struct Config { #[clap(flatten)] pub(crate) write_buffer_config: WriteBufferConfig, + + /// Postgres connection string + #[clap(env = "INFLUXDB_IOX_CATALOG_DSN")] + pub catalog_dsn: String, } pub async fn command(config: Config) -> Result<()> { let common_state = CommonServerState::from_config(config.run_config.clone())?; let metrics = Arc::new(metric::Registry::default()); + let catalog: Arc = Arc::new( + PostgresCatalog::connect( + "router2", + iox_catalog::postgres::SCHEMA_NAME, + &config.catalog_dsn, + ) + .await?, + ); + let write_buffer = init_write_buffer( &config, Arc::clone(&metrics), @@ -71,7 +88,9 @@ pub async fn command(config: Config) -> Result<()> { ) .await?; - let http = HttpDelegate::new(config.run_config.max_http_request_size, write_buffer); + let handler_stack = SchemaValidator::new(write_buffer, catalog); + + let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack); let router_server = RouterServer::new( http, Default::default(), From 4744c5804e0d180b18e988ec8a7691d53fdb53f8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 28 Jan 2022 17:14:18 +0000 Subject: [PATCH 5/6] refactor: remove Dashmap Swap Dashmap for a regular RwLock> due to soundness issues: https://rustsec.org/advisories/RUSTSEC-2022-0002 --- Cargo.lock | 12 ------ router2/Cargo.toml | 1 - router2/src/dml_handlers/schema_validation.rs | 43 ++++++++----------- router2/src/dml_handlers/trait.rs | 4 +- 4 files changed, 19 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96cd89dc55..032de23c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,17 +841,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dashmap" -version = "5.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b799062aaf67eb976af3bdca031ee6f846d2f0a5710ddbb0d2efee33f3cc4760" -dependencies = [ - "cfg-if", - "num_cpus", - "parking_lot 0.11.2", -] - [[package]] name = "data_types" version = "0.1.0" @@ -3904,7 +3893,6 @@ dependencies = [ "async-trait", "bytes", "criterion", - "dashmap", "data_types", "dml", "flate2", diff --git a/router2/Cargo.toml b/router2/Cargo.toml index ffc70e7c12..7bcb52154c 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] async-trait = "0.1" bytes = "1.1" -dashmap = "5.0.0" data_types = { path = "../data_types" } dml = { path = "../dml" } flate2 = "1.0" diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index 0155c92033..eadad55c28 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use async_trait::async_trait; -use dashmap::{mapref::entry::Entry, DashMap}; use data_types::{delete_predicate::DeletePredicate, DatabaseName}; use hashbrown::HashMap; use iox_catalog::{ @@ -10,6 +9,7 @@ use iox_catalog::{ }; use mutable_batch::MutableBatch; use observability_deps::tracing::*; +use parking_lot::RwLock; use thiserror::Error; use trace::ctx::SpanContext; @@ -85,11 +85,7 @@ pub struct SchemaValidator { inner: D, catalog: Arc, - // A map guarded by locks sharded by key. - // - // No documentation is provided as to the read/write bias or other locking - // semantics. - cache: DashMap, Arc>, + cache: RwLock, Arc>>, } impl SchemaValidator { @@ -137,14 +133,10 @@ where ) -> Result<(), Self::WriteError> { // Load the namespace schema from the cache, falling back to pulling it // from the global catalog (if it exists). - let schema = match self.cache.entry(namespace.clone()) { - Entry::Occupied(v) => { - trace!(%namespace, "schema cache hit"); - Arc::clone(v.get()) - } - Entry::Vacant(v) => { - trace!(%namespace, "schema cache miss"); - + let schema = self.cache.read().get(&namespace).map(Arc::clone); + let schema = match schema { + Some(v) => v, + None => { // Pull the schema from the global catalog or error if it does // not exist. let schema = get_schema_by_name(&namespace, &*self.catalog) @@ -155,9 +147,11 @@ where }) .map(Arc::new)?; - v.insert(Arc::clone(&schema)); - trace!(%namespace, "schema cache populated"); + self.cache + .write() + .insert(namespace.clone(), Arc::clone(&schema)); + trace!(%namespace, "schema cache populated"); schema } }; @@ -181,7 +175,7 @@ where // This call MAY overwrite a more-up-to-date cache entry if // racing with another request for the same namespace, but the // cache will eventually converge in subsequent requests. - self.cache.insert(namespace.clone(), v); + self.cache.write().insert(namespace.clone(), v); trace!(%namespace, "schema cache updated"); } None => { @@ -262,14 +256,11 @@ mod tests { fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) { // The cache should be populated. - let cached = handler - .cache + let cached = handler.cache.read(); + let ns = cached .get(&NAMESPACE.try_into().unwrap()) .expect("cache should be populated"); - let table = cached - .tables - .get(table) - .expect("table should exist in cache"); + let table = ns.tables.get(table).expect("table should exist in cache"); assert_eq!( table .columns @@ -320,7 +311,7 @@ mod tests { assert!(mock.calls().is_empty()); // The cache should not have retained the schema. - assert!(handler.cache.is_empty()); + assert!(handler.cache.read().is_empty()); } #[tokio::test] @@ -422,7 +413,7 @@ mod tests { }); // Deletes have no effect on the cache. - assert!(handler.cache.is_empty()); + assert!(handler.cache.read().is_empty()); } #[tokio::test] @@ -455,6 +446,6 @@ mod tests { assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { .. }]); // Deletes have no effect on the cache. - assert!(handler.cache.is_empty()); + assert!(handler.cache.read().is_empty()); } } diff --git a/router2/src/dml_handlers/trait.rs b/router2/src/dml_handlers/trait.rs index ee3316e832..86dbe8d3c0 100644 --- a/router2/src/dml_handlers/trait.rs +++ b/router2/src/dml_handlers/trait.rs @@ -38,9 +38,9 @@ pub trait DmlHandler: Debug + Send + Sync { /// requests. /// /// All errors must be mappable into the concrete [`DmlError`] type. - type WriteError: Error + Into; + type WriteError: Error + Into + Send; /// The error type of the delete handler. - type DeleteError: Error + Into; + type DeleteError: Error + Into + Send; /// Write `batches` to `namespace`. async fn write( From 26c033d529c40589aad7e5b6f3d4b9dda5a8da08 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 2 Feb 2022 15:20:48 +0000 Subject: [PATCH 6/6] style: return directly --- router2/src/dml_handlers/schema_validation.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index eadad55c28..7eafa384ac 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -186,9 +186,7 @@ where self.inner .write(namespace, batches, span_ctx) .await - .map_err(|e| SchemaError::Inner(Box::new(e.into())))?; - - Ok(()) + .map_err(|e| SchemaError::Inner(Box::new(e.into()))) } /// This call is passed through to `D` - no schema validation is performed