Merge pull request #3576 from influxdata/dom/schema-validation

feat(router2): write schema validation!
pull/24376/head
kodiakhq[bot] 2022-02-02 19:36:45 +00:00 committed by GitHub
commit 48daede8d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 490 additions and 14 deletions

2
Cargo.lock generated
View File

@ -3901,6 +3901,7 @@ dependencies = [
"generated_types",
"hashbrown 0.12.0",
"hyper",
"iox_catalog",
"metric",
"mutable_batch",
"mutable_batch_lp",
@ -3909,6 +3910,7 @@ dependencies = [
"paste",
"predicate",
"rand",
"schema",
"serde",
"serde_urlencoded",
"siphasher",

View File

@ -12,9 +12,10 @@ use crate::{
},
},
};
use iox_catalog::{interface::Catalog, postgres::PostgresCatalog};
use observability_deps::tracing::*;
use router2::{
dml_handlers::ShardedWriteBuffer,
dml_handlers::{SchemaValidator, ShardedWriteBuffer},
sequencer::Sequencer,
server::{http::HttpDelegate, RouterServer},
sharder::TableNamespaceSharder,
@ -31,6 +32,9 @@ pub enum Error {
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
#[error("failed to initialise write buffer connection: {0}")]
WriteBuffer(#[from] WriteBufferError),
}
@ -58,12 +62,25 @@ pub struct Config {
#[clap(flatten)]
pub(crate) write_buffer_config: WriteBufferConfig,
/// Postgres connection string
#[clap(env = "INFLUXDB_IOX_CATALOG_DSN")]
pub catalog_dsn: String,
}
pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> = Arc::new(
PostgresCatalog::connect(
"router2",
iox_catalog::postgres::SCHEMA_NAME,
&config.catalog_dsn,
)
.await?,
);
let write_buffer = init_write_buffer(
&config,
Arc::clone(&metrics),
@ -71,7 +88,9 @@ pub async fn command(config: Config) -> Result<()> {
)
.await?;
let http = HttpDelegate::new(config.run_config.max_http_request_size, write_buffer);
let handler_stack = SchemaValidator::new(write_buffer, catalog);
let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack);
let router_server = RouterServer::new(
http,
Default::default(),

View File

@ -528,10 +528,7 @@ impl NamespaceSchema {
}
/// Gets the namespace schema including all tables and columns.
pub async fn get_schema_by_name(
name: &str,
catalog: &dyn Catalog,
) -> Result<Option<NamespaceSchema>> {
pub async fn get_schema_by_name(name: &str, catalog: &dyn Catalog) -> Result<NamespaceSchema> {
let namespace = catalog
.namespaces()
.get_by_name(name)
@ -578,7 +575,7 @@ pub async fn get_schema_by_name(
namespace.tables.insert(table_name, schema);
}
Ok(Some(namespace))
Ok(namespace)
}
/// Data object for a table

View File

@ -259,8 +259,7 @@ mod tests {
// cached schema.
let db_schema = get_schema_by_name(NAMESPACE_NAME, &repo)
.await
.expect("database failed to query for namespace schema")
.expect("no schema found");
.expect("database failed to query for namespace schema");
assert_eq!(schema, db_schema, "schema in DB and cached schema differ");
// Generate the map of tables => desired column types

View File

@ -15,6 +15,7 @@ futures = "0.3.19"
generated_types = { path = "../generated_types" }
hashbrown = "0.12"
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
@ -37,6 +38,7 @@ assert_matches = "1.5"
criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] }
paste = "1.0.6"
rand = "0.8.3"
schema = { path = "../schema" }
[[bench]]
name = "sharder"

View File

@ -46,6 +46,9 @@
mod r#trait;
pub use r#trait::*;
mod schema_validation;
pub use schema_validation::*;
pub mod nop;
mod sharded_write_buffer;

View File

@ -0,0 +1,449 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
use hashbrown::HashMap;
use iox_catalog::{
interface::{get_schema_by_name, Catalog, NamespaceSchema},
validate_or_insert_schema,
};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use parking_lot::RwLock;
use thiserror::Error;
use trace::ctx::SpanContext;
use super::{DmlError, DmlHandler};
/// Errors emitted during schema validation.
#[derive(Debug, Error)]
pub enum SchemaError {
/// The requested namespace could not be found in the catalog.
#[error("failed to read namespace schema from catalog: {0}")]
NamespaceLookup(iox_catalog::interface::Error),
/// The request failed during schema validation.
///
/// NOTE: this may be due to transient I/O errors while interrogating the
/// global catalog - the caller should inspect the inner error to determine
/// the failure reason.
#[error(transparent)]
Validate(iox_catalog::interface::Error),
/// The inner DML handler returned an error.
#[error(transparent)]
Inner(Box<DmlError>),
}
/// A [`SchemaValidator`] checks the schema of incoming writes against a
/// centralised schema store, maintaining an in-memory cache of all observed
/// schemas.
///
/// # Caching
///
/// This validator attempts to incrementally build an in-memory cache of all
/// table schemas it observes (never evicting the schemas).
///
/// All schema operations are scoped to a single namespace.
///
/// When a request contains columns that do not exist in the cached schema, the
/// catalog is queried for an existing column by the same name and (atomically)
/// the column is created if it does not exist. If the requested column type
/// conflicts with the catalog column, the request is rejected and the cached
/// schema is not updated.
///
/// Any successful write that adds new columns causes the new schema to be
/// cached.
///
/// To minimise locking, this cache is designed to allow (and tolerate) spurious
/// cache "updates" racing with each other and overwriting newer schemas with
/// older schemas. This is acceptable due to the incremental, additive schema
/// creation never allowing a column to change or be removed, therefore columns
/// lost by racy schema cache overwrites are "discovered" in subsequent
/// requests. This overwriting is scoped to the namespace, and is expected to be
/// relatively rare - it results in additional requests being made to the
/// catalog until the cached schema converges to match the catalog schema.
///
/// # Correctness
///
/// The correct functioning of this schema validator relies on the catalog
/// implementation correctly validating new column requests against any existing
/// (but uncached) columns, returning an error if the requested type does not
/// match.
///
/// The catalog must serialise column creation to avoid `set(a=tag)` overwriting
/// a prior `set(a=int)` write to the catalog (returning an error that `a` is an
/// `int` for the second request).
///
/// Because each column is set incrementally and independently of other new
/// columns in the request, racing multiple requests for the same table can
/// produce incorrect schemas ([#3573]).
///
/// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573
#[derive(Debug)]
pub struct SchemaValidator<D> {
inner: D,
catalog: Arc<dyn Catalog>,
cache: RwLock<HashMap<DatabaseName<'static>, Arc<NamespaceSchema>>>,
}
impl<D> SchemaValidator<D> {
/// Initialise a new [`SchemaValidator`] decorator, loading schemas from
/// `catalog` and passing acceptable requests through to `inner`.
pub fn new(inner: D, catalog: Arc<dyn Catalog>) -> Self {
Self {
inner,
catalog,
cache: Default::default(),
}
}
}
#[async_trait]
impl<D> DmlHandler for SchemaValidator<D>
where
D: DmlHandler,
{
type WriteError = SchemaError;
type DeleteError = D::DeleteError;
/// Validate the schema of all the writes in `batches` before passing the
/// request onto the inner handler.
///
/// # Errors
///
/// If `namespace` does not exist, [`SchemaError::NamespaceLookup`] is
/// returned.
///
/// If the schema validation fails, [`SchemaError::Validate`] is returned.
/// Callers should inspect the inner error to determine if the failure was
/// caused by catalog I/O, or a schema conflict.
///
/// A request that fails validation on one or more tables fails the request
/// as a whole - calling this method has "all or nothing" semantics.
///
/// If the inner handler returns an error (wrapped in a
/// [`SchemaError::Inner`]), the semantics of the inner handler write apply.
async fn write(
&self,
namespace: DatabaseName<'static>,
batches: HashMap<String, MutableBatch>,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
let schema = self.cache.read().get(&namespace).map(Arc::clone);
let schema = match schema {
Some(v) => v,
None => {
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(&namespace, &*self.catalog)
.await
.map_err(|e| {
warn!(error=%e, %namespace, "failed to retrieve namespace schema");
SchemaError::NamespaceLookup(e)
})
.map(Arc::new)?;
self.cache
.write()
.insert(namespace.clone(), Arc::clone(&schema));
trace!(%namespace, "schema cache populated");
schema
}
};
let maybe_new_schema = validate_or_insert_schema(&batches, &schema, &*self.catalog)
.await
.map_err(|e| {
warn!(error=%e, %namespace, "schema validation failed");
SchemaError::Validate(e)
})?
.map(Arc::new);
trace!(%namespace, "schema validation complete");
// If the schema has been updated, immediately add it to the cache
// (before passing through the write) in order to allow subsequent,
// parallel requests to use it while waiting on the inner DML handler to
// perform the write.
match maybe_new_schema {
Some(v) => {
// This call MAY overwrite a more-up-to-date cache entry if
// racing with another request for the same namespace, but the
// cache will eventually converge in subsequent requests.
self.cache.write().insert(namespace.clone(), v);
trace!(%namespace, "schema cache updated");
}
None => {
trace!(%namespace, "schema unchanged");
}
}
self.inner
.write(namespace, batches, span_ctx)
.await
.map_err(|e| SchemaError::Inner(Box::new(e.into())))
}
/// This call is passed through to `D` - no schema validation is performed
/// on deletes.
async fn delete<'a>(
&self,
namespace: DatabaseName<'static>,
table_name: impl Into<String> + Send + Sync + 'a,
predicate: DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, table_name, predicate, span_ctx)
.await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::timestamp::TimestampRange;
use iox_catalog::{
interface::{ColumnType, KafkaTopicId, QueryPoolId},
mem::MemCatalog,
};
use schema::{InfluxColumnType, InfluxFieldType};
use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall};
use super::*;
const NAMESPACE: &str = "bananas";
#[derive(Debug, Error)]
enum MockError {
#[error("terrible things")]
Terrible,
}
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<String, MutableBatch> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
writes
}
/// Initialise an in-memory [`MemCatalog`] and create a single namespace
/// named [`NAMESPACE`].
async fn create_catalog() -> Arc<dyn Catalog> {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
catalog
.namespaces()
.create(
NAMESPACE,
"inf",
KafkaTopicId::new(42),
QueryPoolId::new(24),
)
.await
.expect("failed to create test namespace");
catalog
}
fn assert_cache<D>(handler: &SchemaValidator<D>, table: &str, col: &str, want: ColumnType) {
// The cache should be populated.
let cached = handler.cache.read();
let ns = cached
.get(&NAMESPACE.try_into().unwrap())
.expect("cache should be populated");
let table = ns.tables.get(table).expect("table should exist in cache");
assert_eq!(
table
.columns
.get(col)
.expect("column not cached")
.column_type,
want
);
}
#[tokio::test]
async fn test_write_ok() {
let catalog = create_catalog().await;
let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())]));
let handler = SchemaValidator::new(Arc::clone(&mock), catalog);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
handler
.write(NAMESPACE.try_into().unwrap(), writes, None)
.await
.expect("request should succeed");
// THe mock should observe exactly one write.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, NAMESPACE);
});
// The cache should be populated.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64);
assert_cache(&handler, "bananas", "time", ColumnType::Time);
}
#[tokio::test]
async fn test_write_schema_not_found() {
let catalog = create_catalog().await;
let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())]));
let handler = SchemaValidator::new(Arc::clone(&mock), catalog);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let err = handler
.write("A_DIFFERENT_NAMESPACE".try_into().unwrap(), writes, None)
.await
.expect_err("request should fail");
assert_matches!(err, SchemaError::NamespaceLookup(_));
assert!(mock.calls().is_empty());
// The cache should not have retained the schema.
assert!(handler.cache.read().is_empty());
}
#[tokio::test]
async fn test_write_validation_failure() {
let catalog = create_catalog().await;
let mock = Arc::new(MockDmlHandler::default().with_write_return(vec![Ok(())]));
let handler = SchemaValidator::new(Arc::clone(&mock), catalog);
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64
handler
.write(NAMESPACE.try_into().unwrap(), writes, None)
.await
.expect("request should succeed");
// Second write attempts to violate it causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42.0 123456"); // val=float
let err = handler
.write(NAMESPACE.try_into().unwrap(), writes, None)
.await
.expect_err("request should fail");
assert_matches!(err, SchemaError::Validate(_));
// THe mock should observe exactly one write from the first call.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, batches}] => {
assert_eq!(namespace, NAMESPACE);
let batch = batches.get("bananas").expect("table not found in write");
assert_eq!(batch.rows(), 1);
let col = batch.column("val").expect("column not found in write");
assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer));
});
// The cache should retain the original schema.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type
assert_cache(&handler, "bananas", "time", ColumnType::Time);
}
#[tokio::test]
async fn test_write_inner_handler_error() {
let catalog = create_catalog().await;
let mock = Arc::new(
MockDmlHandler::default()
.with_write_return(vec![Err(DmlError::Internal(MockError::Terrible.into()))]),
);
let handler = SchemaValidator::new(Arc::clone(&mock), catalog);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let err = handler
.write(NAMESPACE.try_into().unwrap(), writes, None)
.await
.expect_err("request should return mock error");
assert_matches!(err, SchemaError::Inner(e) => {
assert_matches!(*e, DmlError::Internal(_));
});
// The mock should observe exactly one write.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write { .. }]);
// The cache should be populated as it has been flushed to the catalog.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64);
assert_cache(&handler, "bananas", "time", ColumnType::Time);
}
#[tokio::test]
async fn test_write_delete_passthrough_ok() {
const NAMESPACE: &str = "NAMESPACE_IS_NOT_VALIDATED";
const TABLE: &str = "bananas";
let catalog = create_catalog().await;
let mock = Arc::new(MockDmlHandler::default().with_delete_return(vec![Ok(())]));
let handler = SchemaValidator::new(Arc::clone(&mock), catalog);
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
handler
.delete(
NAMESPACE.try_into().unwrap(),
TABLE,
predicate.clone(),
None,
)
.await
.expect("request should succeed");
// The mock should observe exactly one delete.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { namespace, predicate: got, table }] => {
assert_eq!(namespace, NAMESPACE);
assert_eq!(predicate, *got);
assert_eq!(table, TABLE);
});
// Deletes have no effect on the cache.
assert!(handler.cache.read().is_empty());
}
#[tokio::test]
async fn test_write_delete_passthrough_err() {
let catalog = create_catalog().await;
let mock = Arc::new(
MockDmlHandler::default()
.with_delete_return(vec![Err(DmlError::Internal(MockError::Terrible.into()))]),
);
let handler = SchemaValidator::new(Arc::clone(&mock), catalog);
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
let err = handler
.delete(
"NAMESPACE_IS_IGNORED".try_into().unwrap(),
"bananas",
predicate,
None,
)
.await
.expect_err("request should return mock error");
assert_matches!(err, DmlError::Internal(_));
// The mock should observe exactly one delete.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { .. }]);
// Deletes have no effect on the cache.
assert!(handler.cache.read().is_empty());
}
}

View File

@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{error::Error, fmt::Debug};
use async_trait::async_trait;
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
@ -8,7 +8,7 @@ use mutable_batch::MutableBatch;
use thiserror::Error;
use trace::ctx::SpanContext;
use super::ShardError;
use super::{SchemaError, ShardError};
/// Errors emitted by a [`DmlHandler`] implementation during DML request
/// processing.
@ -22,9 +22,13 @@ pub enum DmlError {
#[error(transparent)]
WriteBuffer(#[from] ShardError),
/// A schema validation failure.
#[error(transparent)]
Schema(#[from] SchemaError),
/// An unknown error occured while processing the DML request.
#[error("internal dml handler error: {0}")]
Internal(Box<dyn std::error::Error + Send + Sync>),
Internal(Box<dyn Error + Send + Sync>),
}
/// A composable, abstract handler of DML requests.
@ -34,9 +38,9 @@ pub trait DmlHandler: Debug + Send + Sync {
/// requests.
///
/// All errors must be mappable into the concrete [`DmlError`] type.
type WriteError: Into<DmlError>;
type WriteError: Error + Into<DmlError> + Send;
/// The error type of the delete handler.
type DeleteError: Into<DmlError>;
type DeleteError: Error + Into<DmlError> + Send;
/// Write `batches` to `namespace`.
async fn write(

View File

@ -80,6 +80,7 @@ impl Error {
Error::ParseLineProtocol(_) => StatusCode::BAD_REQUEST,
Error::ParseDelete(_) => StatusCode::BAD_REQUEST,
Error::RequestSizeExceeded(_) => StatusCode::PAYLOAD_TOO_LARGE,
Error::DmlHandler(DmlError::Schema(_)) => StatusCode::BAD_REQUEST,
Error::InvalidContentEncoding(_) => {
// https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13
StatusCode::UNSUPPORTED_MEDIA_TYPE