feat: cache NamespaceSchema in validator

Adds an in-memory cache of table schemas to the SchemaValidator DML
handler.

The cache pulls from the global catalog when observing a column for the
first time, and pushes the column type to set it for subsequent requests
if it does not exist (this pull & push is done by atomically by the
catalog in an "upsert" call).

The in-memory cache is sharded by namespace, with each shard guarded by
an individual lock to minimise contention between readers (the expected
average case) and writers (only when adding new columns/tables).

Relies on the catalog to serialise new column creation and validate
parallel creation requests.
pull/24376/head
Dom Dwyer 2022-01-28 14:42:31 +00:00
parent c81f207298
commit 6598023726
3 changed files with 170 additions and 9 deletions

12
Cargo.lock generated
View File

@ -841,6 +841,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dashmap"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b799062aaf67eb976af3bdca031ee6f846d2f0a5710ddbb0d2efee33f3cc4760"
dependencies = [
"cfg-if",
"num_cpus",
"parking_lot",
]
[[package]] [[package]]
name = "data_types" name = "data_types"
version = "0.1.0" version = "0.1.0"
@ -3893,6 +3904,7 @@ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"criterion", "criterion",
"dashmap",
"data_types", "data_types",
"dml", "dml",
"flate2", "flate2",

View File

@ -8,6 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
bytes = "1.1" bytes = "1.1"
dashmap = "5.0.0"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
dml = { path = "../dml" } dml = { path = "../dml" }
flate2 = "1.0" flate2 = "1.0"

View File

@ -1,13 +1,15 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::{mapref::entry::Entry, DashMap};
use data_types::{delete_predicate::DeletePredicate, DatabaseName}; use data_types::{delete_predicate::DeletePredicate, DatabaseName};
use hashbrown::HashMap; use hashbrown::HashMap;
use iox_catalog::{ use iox_catalog::{
interface::{get_schema_by_name, Catalog}, interface::{get_schema_by_name, Catalog, NamespaceSchema},
validate_or_insert_schema, validate_or_insert_schema,
}; };
use mutable_batch::MutableBatch; use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use thiserror::Error; use thiserror::Error;
use trace::ctx::SpanContext; use trace::ctx::SpanContext;
@ -34,18 +36,71 @@ pub enum SchemaError {
} }
/// A [`SchemaValidator`] checks the schema of incoming writes against a /// A [`SchemaValidator`] checks the schema of incoming writes against a
/// centralised schema store. /// centralised schema store, maintaining an in-memory cache of all observed
/// schemas.
///
/// # Caching
///
/// This validator attempts to incrementally build an in-memory cache of all
/// table schemas it observes (never evicting the schemas).
///
/// All schema operations are scoped to a single namespace.
///
/// When a request contains columns that do not exist in the cached schema, the
/// catalog is queried for an existing column by the same name and (atomically)
/// the column is created if it does not exist. If the requested column type
/// conflicts with the catalog column, the request is rejected and the cached
/// schema is not updated.
///
/// Any successful write that adds new columns causes the new schema to be
/// cached.
///
/// To minimise locking, this cache is designed to allow (and tolerate) spurious
/// cache "updates" racing with each other and overwriting newer schemas with
/// older schemas. This is acceptable due to the incremental, additive schema
/// creation never allowing a column to change or be removed, therefore columns
/// lost by racy schema cache overwrites are "discovered" in subsequent
/// requests. This overwriting is scoped to the namespace, and is expected to be
/// relatively rare - it results in additional requests being made to the
/// catalog until the cached schema converges to match the catalog schema.
///
/// # Correctness
///
/// The correct functioning of this schema validator relies on the catalog
/// implementation correctly validating new column requests against any existing
/// (but uncached) columns, returning an error if the requested type does not
/// match.
///
/// The catalog must serialise column creation to avoid `set(a=tag)` overwriting
/// a prior `set(a=int)` write to the catalog (returning an error that `a` is an
/// `int` for the second request).
///
/// Because each column is set incrementally and independently of other new
/// columns in the request, racing multiple requests for the same table can
/// produce incorrect schemas ([#3573]).
///
/// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573
#[derive(Debug)] #[derive(Debug)]
pub struct SchemaValidator<D> { pub struct SchemaValidator<D> {
inner: D, inner: D,
catalog: Arc<dyn Catalog>, catalog: Arc<dyn Catalog>,
// A map guarded by locks sharded by key.
//
// No documentation is provided as to the read/write bias or other locking
// semantics.
cache: DashMap<DatabaseName<'static>, Arc<NamespaceSchema>>,
} }
impl<D> SchemaValidator<D> { impl<D> SchemaValidator<D> {
/// Initialise a new [`SchemaValidator`] decorator, loading schemas from /// Initialise a new [`SchemaValidator`] decorator, loading schemas from
/// `catalog` and passing acceptable requests through to `inner`. /// `catalog` and passing acceptable requests through to `inner`.
pub fn new(inner: D, catalog: Arc<dyn Catalog>) -> Self { pub fn new(inner: D, catalog: Arc<dyn Catalog>) -> Self {
Self { inner, catalog } Self {
inner,
catalog,
cache: Default::default(),
}
} }
} }
@ -80,13 +135,59 @@ where
batches: HashMap<String, MutableBatch>, batches: HashMap<String, MutableBatch>,
span_ctx: Option<SpanContext>, span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> { ) -> Result<(), Self::WriteError> {
let schema = get_schema_by_name(&namespace, &*self.catalog) // Load the namespace schema from the cache, falling back to pulling it
.await // from the global catalog (if it exists).
.map_err(SchemaError::NamespaceLookup)?; let schema = match self.cache.entry(namespace.clone()) {
Entry::Occupied(v) => {
trace!(%namespace, "schema cache hit");
Arc::clone(v.get())
}
Entry::Vacant(v) => {
trace!(%namespace, "schema cache miss");
let _new_schema = validate_or_insert_schema(&batches, &schema, &*self.catalog) // Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(&namespace, &*self.catalog)
.await
.map_err(|e| {
warn!(error=%e, %namespace, "failed to retrieve namespace schema");
SchemaError::NamespaceLookup(e)
})
.map(Arc::new)?;
v.insert(Arc::clone(&schema));
trace!(%namespace, "schema cache populated");
schema
}
};
let maybe_new_schema = validate_or_insert_schema(&batches, &schema, &*self.catalog)
.await .await
.map_err(SchemaError::Validate)?; .map_err(|e| {
warn!(error=%e, %namespace, "schema validation failed");
SchemaError::Validate(e)
})?
.map(Arc::new);
trace!(%namespace, "schema validation complete");
// If the schema has been updated, immediately add it to the cache
// (before passing through the write) in order to allow subsequent,
// parallel requests to use it while waiting on the inner DML handler to
// perform the write.
match maybe_new_schema {
Some(v) => {
// This call MAY overwrite a more-up-to-date cache entry if
// racing with another request for the same namespace, but the
// cache will eventually converge in subsequent requests.
self.cache.insert(namespace.clone(), v);
trace!(%namespace, "schema cache updated");
}
None => {
trace!(%namespace, "schema unchanged");
}
}
self.inner self.inner
.write(namespace, batches, span_ctx) .write(namespace, batches, span_ctx)
@ -118,7 +219,7 @@ mod tests {
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::timestamp::TimestampRange; use data_types::timestamp::TimestampRange;
use iox_catalog::{ use iox_catalog::{
interface::{KafkaTopicId, QueryPoolId}, interface::{ColumnType, KafkaTopicId, QueryPoolId},
mem::MemCatalog, mem::MemCatalog,
}; };
use schema::{InfluxColumnType, InfluxFieldType}; use schema::{InfluxColumnType, InfluxFieldType};
@ -159,6 +260,26 @@ mod tests {
catalog catalog
} }
fn assert_cache<D>(handler: &SchemaValidator<D>, table: &str, col: &str, want: ColumnType) {
// The cache should be populated.
let cached = handler
.cache
.get(&NAMESPACE.try_into().unwrap())
.expect("cache should be populated");
let table = cached
.tables
.get(table)
.expect("table should exist in cache");
assert_eq!(
table
.columns
.get(col)
.expect("column not cached")
.column_type,
want
);
}
#[tokio::test] #[tokio::test]
async fn test_write_ok() { async fn test_write_ok() {
let catalog = create_catalog().await; let catalog = create_catalog().await;
@ -175,6 +296,12 @@ mod tests {
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => { assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, NAMESPACE); assert_eq!(namespace, NAMESPACE);
}); });
// The cache should be populated.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64);
assert_cache(&handler, "bananas", "time", ColumnType::Time);
} }
#[tokio::test] #[tokio::test]
@ -191,6 +318,9 @@ mod tests {
assert_matches!(err, SchemaError::NamespaceLookup(_)); assert_matches!(err, SchemaError::NamespaceLookup(_));
assert!(mock.calls().is_empty()); assert!(mock.calls().is_empty());
// The cache should not have retained the schema.
assert!(handler.cache.is_empty());
} }
#[tokio::test] #[tokio::test]
@ -223,6 +353,12 @@ mod tests {
let col = batch.column("val").expect("column not found in write"); let col = batch.column("val").expect("column not found in write");
assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer)); assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer));
}); });
// The cache should retain the original schema.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type
assert_cache(&handler, "bananas", "time", ColumnType::Time);
} }
#[tokio::test] #[tokio::test]
@ -246,6 +382,12 @@ mod tests {
// The mock should observe exactly one write. // The mock should observe exactly one write.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write { .. }]); assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write { .. }]);
// The cache should be populated as it has been flushed to the catalog.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64);
assert_cache(&handler, "bananas", "time", ColumnType::Time);
} }
#[tokio::test] #[tokio::test]
@ -278,6 +420,9 @@ mod tests {
assert_eq!(predicate, *got); assert_eq!(predicate, *got);
assert_eq!(table, TABLE); assert_eq!(table, TABLE);
}); });
// Deletes have no effect on the cache.
assert!(handler.cache.is_empty());
} }
#[tokio::test] #[tokio::test]
@ -308,5 +453,8 @@ mod tests {
// The mock should observe exactly one delete. // The mock should observe exactly one delete.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { .. }]); assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Delete { .. }]);
// Deletes have no effect on the cache.
assert!(handler.cache.is_empty());
} }
} }