feat: Enforce table column limits from the schema cache (#5819)

* fix: Avoid some allocations by collecting instead of inserting into a vec

* refactor: Encode that adding columns is for one table at a time

* test: Add another test of column limits

* test: Add below/above limit tests for create_or_get_many

* fix: Explicitly DO NOT check column limits when inserting many columns

* feat: Cache the max_columns_per_table on the NamespaceSchema

* feat: Add a function to validate column limits in-memory

* fix: Provide more useful information when over column limits

* fix: Swap types to remove intermediate allocation

* docs: Explain the interactions of the cache and the column limits

* test: Actually set up test that showcases column limit race condition

* fix: Allow writing to existing columns even if table is over column limit

Co-authored-by: Dom <dom@itsallbroken.com>
pull/24376/head
Carol (Nichols || Goulding) 2022-10-14 07:34:17 -04:00 committed by GitHub
parent 81722dc19b
commit efb964c390
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 507 additions and 150 deletions

1
Cargo.lock generated
View File

@ -4447,6 +4447,7 @@ dependencies = [
"hashbrown",
"hyper",
"iox_catalog",
"iox_tests",
"iox_time",
"metric",
"mutable_batch",

View File

@ -25,7 +25,7 @@ use snafu::{ResultExt, Snafu};
use sqlx::postgres::PgHasArrayType;
use std::{
borrow::{Borrow, Cow},
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
convert::TryFrom,
fmt::{Display, Write},
mem::{self, size_of_val},
@ -464,16 +464,24 @@ pub struct NamespaceSchema {
pub query_pool_id: QueryPoolId,
/// the tables in the namespace by name
pub tables: BTreeMap<String, TableSchema>,
/// the number of columns per table this namespace allows
pub max_columns_per_table: usize,
}
impl NamespaceSchema {
/// Create a new `NamespaceSchema`
pub fn new(id: NamespaceId, topic_id: TopicId, query_pool_id: QueryPoolId) -> Self {
pub fn new(
id: NamespaceId,
topic_id: TopicId,
query_pool_id: QueryPoolId,
max_columns_per_table: i32,
) -> Self {
Self {
id,
tables: BTreeMap::new(),
topic_id,
query_pool_id,
max_columns_per_table: max_columns_per_table as usize,
}
}
@ -547,6 +555,12 @@ impl TableSchema {
.map(|(name, c)| (c.id, name.as_str()))
.collect()
}
/// Return the set of column names for this table. Used in combination with a write operation's
/// column names to determine whether a write would exceed the max allowed columns.
pub fn column_names(&self) -> BTreeSet<&str> {
self.columns.keys().map(|name| name.as_str()).collect()
}
}
/// Data object for a column
@ -3367,12 +3381,14 @@ mod tests {
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: BTreeMap::from([]),
max_columns_per_table: 4,
};
let schema2 = NamespaceSchema {
id: NamespaceId::new(1),
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: BTreeMap::from([(String::from("foo"), TableSchema::new(TableId::new(1)))]),
max_columns_per_table: 4,
};
assert!(schema1.size() < schema2.size());
}

View File

@ -220,7 +220,6 @@ where
// column doesn't exist; add it
column_batch.push(ColumnUpsertRequest {
name: tag.name.as_str(),
table_id: table.id,
column_type: ColumnType::Tag,
});
}
@ -257,7 +256,6 @@ where
// column doesn't exist; add it
column_batch.push(ColumnUpsertRequest {
name: field.name.as_str(),
table_id: table.id,
column_type: ColumnType::from(influx_column_type),
});
}
@ -270,7 +268,10 @@ where
// that with short-lived loop variables.
// since this is a CLI tool rather than something called a lot on the write path, i
// figure it's okay.
repos.columns().create_or_get_many(&column_batch).await?;
repos
.columns()
.create_or_get_many_unchecked(table.id, &column_batch)
.await?;
}
// create a partition for every day in the date range.
// N.B. this will need updating if we someday support partitioning by inputs other than

View File

@ -599,7 +599,7 @@ mod tests {
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100);
let ignored_ts = Time::from_timestamp_millis(42);
@ -681,7 +681,7 @@ mod tests {
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100);
let w1 = DmlWrite::new(
"foo",
@ -788,7 +788,7 @@ mod tests {
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100);
let ignored_ts = Time::from_timestamp_millis(42);
@ -1058,7 +1058,7 @@ mod tests {
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100);
let ignored_ts = Time::from_timestamp_millis(42);
@ -1174,7 +1174,7 @@ mod tests {
.await
.unwrap();
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100);
let ignored_ts = Time::from_timestamp_millis(42);
@ -1357,7 +1357,7 @@ mod tests {
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100);
let ignored_ts = Time::from_timestamp_millis(42);

View File

@ -456,6 +456,7 @@ mod tests {
ingester.namespace.id,
ingester.topic.id,
ingester.query_pool.id,
100,
);
let mut txn = ingester.catalog.start_transaction().await.unwrap();
let ingest_ts1 = Time::from_timestamp_millis(42);
@ -708,7 +709,12 @@ mod tests {
let write_buffer_state =
MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap());
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
let schema = NamespaceSchema::new(
namespace.id,
topic.id,
query_pool.id,
namespace.max_columns_per_table,
);
for write_operation in write_operations {
validate_or_insert_schema(write_operation.tables(), &schema, txn.deref_mut())
.await

View File

@ -332,13 +332,11 @@ pub trait TableRepo: Send + Sync {
}
/// Parameters necessary to perform a batch insert of
/// [`ColumnRepo::create_or_get()`].
/// [`ColumnRepo::create_or_get()`] for one table (specified separately)
#[derive(Debug)]
pub struct ColumnUpsertRequest<'a> {
/// The name of the column.
pub name: &'a str,
/// The table ID to which it belongs.
pub table_id: TableId,
/// The data type of the column.
pub column_type: ColumnType,
}
@ -361,8 +359,13 @@ pub trait ColumnRepo: Send + Sync {
/// Implementations make no guarantees as to the ordering or atomicity of
/// the batch of column upsert operations - a batch upsert may partially
/// commit, in which case an error MUST be returned by the implementation.
async fn create_or_get_many(
///
/// Per-namespace limits on the number of columns allowed per table are explicitly NOT checked
/// by this function, hence the name containing `unchecked`. It is expected that the caller
/// will check this first-- and yes, this is racy.
async fn create_or_get_many_unchecked(
&mut self,
table_id: TableId,
columns: &[ColumnUpsertRequest<'_>],
) -> Result<Vec<Column>>;
@ -714,8 +717,12 @@ where
let columns = repos.columns().list_by_namespace_id(namespace.id).await?;
let tables = repos.tables().list_by_namespace_id(namespace.id).await?;
let mut namespace =
NamespaceSchema::new(namespace.id, namespace.topic_id, namespace.query_pool_id);
let mut namespace = NamespaceSchema::new(
namespace.id,
namespace.topic_id,
namespace.query_pool_id,
namespace.max_columns_per_table,
);
let mut table_id_to_schema = BTreeMap::new();
for t in tables {
@ -846,7 +853,8 @@ pub async fn list_schemas(
// was created, or have no tables/columns (and therefore have no entry
// in "joined").
.filter_map(move |v| {
let mut ns = NamespaceSchema::new(v.id, v.topic_id, v.query_pool_id);
let mut ns =
NamespaceSchema::new(v.id, v.topic_id, v.query_pool_id, v.max_columns_per_table);
ns.tables = joined.remove(&v.id)?;
Some((v, ns))
});
@ -1179,14 +1187,7 @@ pub(crate) mod test_helpers {
.create_or_get("column_test", table.id, ColumnType::U64)
.await
.expect_err("should error with wrong column type");
assert!(matches!(
err,
Error::ColumnTypeMismatch {
name: _,
existing: _,
new: _
}
));
assert!(matches!(err, Error::ColumnTypeMismatch { .. }));
// test that we can create a column of the same name under a different table
let table2 = repos
@ -1201,23 +1202,6 @@ pub(crate) mod test_helpers {
.unwrap();
assert_ne!(c, ccc);
let cols3 = repos
.columns()
.create_or_get_many(&[
ColumnUpsertRequest {
name: "a",
table_id: table2.id,
column_type: ColumnType::U64,
},
ColumnUpsertRequest {
name: "a",
table_id: table.id,
column_type: ColumnType::U64,
},
])
.await
.unwrap();
let columns = repos
.columns()
.list_by_namespace_id(namespace.id)
@ -1225,12 +1209,11 @@ pub(crate) mod test_helpers {
.unwrap();
let mut want = vec![c.clone(), ccc];
want.extend(cols3.clone());
assert_eq!(want, columns);
let columns = repos.columns().list_by_table_id(table.id).await.unwrap();
let want2 = vec![c, cols3[1].clone()];
let want2 = vec![c];
assert_eq!(want2, columns);
// Add another tag column into table2
@ -1252,7 +1235,7 @@ pub(crate) mod test_helpers {
},
ColumnTypeCount {
col_type: ColumnType::U64,
count: 2,
count: 1,
},
];
expect.sort_by_key(|c| c.col_type);
@ -1264,6 +1247,28 @@ pub(crate) mod test_helpers {
want.extend([c3]);
assert_eq!(list, want);
// test create_or_get_many_unchecked, below column limit
let table1_columns = repos
.columns()
.create_or_get_many_unchecked(
table.id,
&[
ColumnUpsertRequest {
name: "column_test",
column_type: ColumnType::Tag,
},
ColumnUpsertRequest {
name: "new_column",
column_type: ColumnType::Tag,
},
],
)
.await
.unwrap();
let mut table1_column_names: Vec<_> = table1_columns.iter().map(|c| &c.name).collect();
table1_column_names.sort();
assert_eq!(table1_column_names, vec!["column_test", "new_column"]);
// test per-namespace column limits
repos
.namespaces()
@ -1282,6 +1287,33 @@ pub(crate) mod test_helpers {
table_id: _,
}
));
// test per-namespace column limits are NOT enforced with create_or_get_many_unchecked
let table3 = repos
.tables()
.create_or_get("test_table_3", namespace.id)
.await
.unwrap();
let table3_columns = repos
.columns()
.create_or_get_many_unchecked(
table3.id,
&[
ColumnUpsertRequest {
name: "apples",
column_type: ColumnType::Tag,
},
ColumnUpsertRequest {
name: "oranges",
column_type: ColumnType::Tag,
},
],
)
.await
.unwrap();
let mut table3_column_names: Vec<_> = table3_columns.iter().map(|c| &c.name).collect();
table3_column_names.sort();
assert_eq!(table3_column_names, vec!["apples", "oranges"]);
}
async fn test_shards(catalog: Arc<dyn Catalog>) {
@ -3989,7 +4021,7 @@ pub(crate) mod test_helpers {
let batches = mutable_batch_lp::lines_to_batches(lines, 42).unwrap();
let batches = batches.iter().map(|(table, batch)| (table.as_str(), batch));
let ns = NamespaceSchema::new(namespace.id, topic.id, pool.id);
let ns = NamespaceSchema::new(namespace.id, topic.id, pool.id, 1000);
let schema = validate_or_insert_schema(batches, &ns, repos)
.await

View File

@ -144,43 +144,47 @@ where
// If the table itself needs to be updated during column validation it
// becomes a Cow::owned() copy and the modified copy should be inserted into
// the schema before returning.
let mut column_batch = Vec::default();
for (name, col) in mb.columns() {
// Check if the column exists in the cached schema.
//
// If it does, validate it. If it does not exist, create it and insert
// it into the cached schema.
match table.columns.get(name.as_str()) {
Some(existing) if existing.matches_type(col.influx_type()) => {
// No action is needed as the column matches the existing column
// schema.
}
Some(existing) => {
// The column schema, and the column in the mutable batch are of
// different types.
return ColumnTypeMismatchSnafu {
name,
existing: existing.column_type,
new: col.influx_type(),
let column_batch: Vec<_> = mb
.columns()
.filter_map(|(name, col)| {
// Check if the column exists in the cached schema.
//
// If it does, validate it. If it does not exist, create it and insert
// it into the cached schema.
match table.columns.get(name.as_str()) {
Some(existing) if existing.matches_type(col.influx_type()) => {
// No action is needed as the column matches the existing column
// schema.
None
}
Some(existing) => {
// The column schema, and the column in the mutable batch are of
// different types.
Some(
ColumnTypeMismatchSnafu {
name,
existing: existing.column_type,
new: col.influx_type(),
}
.fail(),
)
}
None => {
// The column does not exist in the cache, add it to the column
// batch to be bulk inserted later.
Some(Ok(ColumnUpsertRequest {
name: name.as_str(),
column_type: ColumnType::from(col.influx_type()),
}))
}
.fail();
}
None => {
// The column does not exist in the cache, add it to the column
// batch to be bulk inserted later.
column_batch.push(ColumnUpsertRequest {
name: name.as_str(),
table_id: table.id,
column_type: ColumnType::from(col.influx_type()),
});
}
};
}
})
.collect::<Result<Vec<_>>>()?;
if !column_batch.is_empty() {
repos
.columns()
.create_or_get_many(&column_batch)
.create_or_get_many_unchecked(table.id, &column_batch)
.await?
.into_iter()
.for_each(|c| table.to_mut().add_column(&c));
@ -270,6 +274,7 @@ mod tests {
namespace.id,
namespace.topic_id,
namespace.query_pool_id,
namespace.max_columns_per_table,
);
// Apply all the lp literals as individual writes, feeding

View File

@ -522,17 +522,51 @@ impl ColumnRepo for MemTxn {
Ok(column.clone())
}
async fn create_or_get_many(
async fn create_or_get_many_unchecked(
&mut self,
table_id: TableId,
columns: &[ColumnUpsertRequest<'_>],
) -> Result<Vec<Column>> {
let mut out = Vec::new();
for column in columns {
out.push(
ColumnRepo::create_or_get(self, column.name, column.table_id, column.column_type)
.await?,
);
}
// Explicitly NOT using `create_or_get` in this function: the Postgres catalog doesn't
// check column limits when inserting many columns because it's complicated and expensive,
// and for testing purposes the in-memory catalog needs to match its functionality.
let stage = self.stage();
let out: Vec<_> = columns
.iter()
.map(|column| {
match stage
.columns
.iter()
.find(|t| t.name == column.name && t.table_id == table_id)
{
Some(c) => {
ensure!(
column.column_type == c.column_type,
ColumnTypeMismatchSnafu {
name: column.name,
existing: c.column_type,
new: column.column_type
}
);
Ok(c.clone())
}
None => {
let new_column = Column {
id: ColumnId::new(stage.columns.len() as i64 + 1),
table_id,
name: column.name.to_string(),
column_type: column.column_type,
};
stage.columns.push(new_column);
Ok(stage.columns.last().unwrap().clone())
}
}
})
.collect::<Result<Vec<Column>>>()?;
Ok(out)
}

View File

@ -219,7 +219,7 @@ decorate!(
"column_create_or_get" = create_or_get(&mut self, name: &str, table_id: TableId, column_type: ColumnType) -> Result<Column>;
"column_list_by_namespace_id" = list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>>;
"column_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Column>>;
"column_create_or_get_many" = create_or_get_many(&mut self, columns: &[ColumnUpsertRequest<'_>]) -> Result<Vec<Column>>;
"column_create_or_get_many_unchecked" = create_or_get_many_unchecked(&mut self, table_id: TableId, columns: &[ColumnUpsertRequest<'_>]) -> Result<Vec<Column>>;
"column_list" = list(&mut self) -> Result<Vec<Column>>;
"column_list_type_count_by_table_id" = list_type_count_by_table_id(&mut self, table_id: TableId) -> Result<Vec<ColumnTypeCount>>;
]

View File

@ -939,31 +939,30 @@ WHERE table_id = $1;
Ok(rec)
}
async fn create_or_get_many(
async fn create_or_get_many_unchecked(
&mut self,
table_id: TableId,
columns: &[ColumnUpsertRequest<'_>],
) -> Result<Vec<Column>> {
let mut v_name = Vec::new();
let mut v_table_id = Vec::new();
let mut v_column_type = Vec::new();
for c in columns {
v_name.push(c.name.to_string());
v_table_id.push(c.table_id.get());
v_column_type.push(c.column_type as i16);
}
let out = sqlx::query_as::<_, Column>(
r#"
INSERT INTO column_name ( name, table_id, column_type )
SELECT name, table_id, column_type FROM UNNEST($1, $2, $3) as a(name, table_id, column_type)
SELECT name, $1, column_type FROM UNNEST($2, $3) as a(name, column_type)
ON CONFLICT ON CONSTRAINT column_name_unique
DO UPDATE SET name = column_name.name
RETURNING *;
"#,
)
.bind(&v_name)
.bind(&v_table_id)
.bind(&v_column_type)
.bind(&table_id) // $1
.bind(&v_name) // $2
.bind(&v_column_type) // $3
.fetch_all(&mut self.inner)
.await
.map_err(|e| {
@ -2622,7 +2621,7 @@ mod tests {
assert_eq!(application_name, TEST_APPLICATION_NAME_NEW);
}
macro_rules! test_column_create_or_get_many {
macro_rules! test_column_create_or_get_many_unchecked {
(
$name:ident,
calls = {$([$($col_name:literal => $col_type:expr),+ $(,)?]),+},
@ -2630,7 +2629,7 @@ mod tests {
) => {
paste::paste! {
#[tokio::test]
async fn [<test_column_create_or_get_many_ $name>]() {
async fn [<test_column_create_or_get_many_unchecked_ $name>]() {
// If running an integration test on your laptop, this requires that you have
// Postgres running and that you've done the sqlx migrations. See the README in
// this crate for info to set it up.
@ -2668,7 +2667,6 @@ mod tests {
$(
ColumnUpsertRequest {
name: $col_name,
table_id,
column_type: $col_type,
},
)+
@ -2677,7 +2675,7 @@ mod tests {
.repositories()
.await
.columns()
.create_or_get_many(&insert)
.create_or_get_many_unchecked(table_id, &insert)
.await;
// The returned columns MUST always match the requested
@ -2686,13 +2684,13 @@ mod tests {
assert_eq!(insert.len(), got.len());
insert.iter().zip(got).for_each(|(req, got)| {
assert_eq!(req.name, got.name);
assert_eq!(req.table_id, got.table_id);
assert_eq!(table_id, got.table_id);
assert_eq!(
req.column_type,
ColumnType::try_from(got.column_type).expect("invalid column type")
);
});
assert_metric_hit(&metrics, "column_create_or_get_many");
assert_metric_hit(&metrics, "column_create_or_get_many_unchecked");
}
)+
@ -2704,7 +2702,7 @@ mod tests {
// Issue a few calls to create_or_get_many that contain distinct columns and
// covers the full set of column types.
test_column_create_or_get_many!(
test_column_create_or_get_many_unchecked!(
insert,
calls = {
[
@ -2726,7 +2724,7 @@ mod tests {
// Issue two calls with overlapping columns - request should succeed (upsert
// semantics).
test_column_create_or_get_many!(
test_column_create_or_get_many_unchecked!(
partial_upsert,
calls = {
[
@ -2750,7 +2748,7 @@ mod tests {
);
// Issue two calls with the same columns and types.
test_column_create_or_get_many!(
test_column_create_or_get_many_unchecked!(
full_upsert,
calls = {
[
@ -2771,7 +2769,7 @@ mod tests {
// Issue two calls with overlapping columns with conflicting types and
// observe a correctly populated ColumnTypeMismatch error.
test_column_create_or_get_many!(
test_column_create_or_get_many_unchecked!(
partial_type_conflict,
calls = {
[
@ -2802,7 +2800,7 @@ mod tests {
// Issue one call containing a column specified twice, with differing types
// and observe an error different from the above test case.
test_column_create_or_get_many!(
test_column_create_or_get_many_unchecked!(
intra_request_type_conflict,
calls = {
[

View File

@ -265,6 +265,16 @@ impl TestNamespace {
.await
.unwrap()
}
/// Set the number of columns per table allowed in this namespace.
pub async fn update_column_limit(&self, new_max: i32) {
let mut repos = self.catalog.catalog.repositories().await;
repos
.namespaces()
.update_column_limit(&self.namespace.name, new_max)
.await
.unwrap();
}
}
/// A test shard with its namespace in the catalog

View File

@ -25,7 +25,7 @@ use iox_time::Time;
use schema::selection::Selection;
use schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME};
use snafu::{OptionExt, ResultExt, Snafu};
use std::ops::Range;
use std::{collections::BTreeSet, ops::Range};
pub mod column;
pub mod payload;
@ -137,6 +137,12 @@ impl MutableBatch {
.map(move |(name, idx)| (name, &self.columns[*idx]))
}
/// Return the set of column names for this table. Used in combination with a write operation's
/// column names to determine whether a write would exceed the max allowed columns.
pub fn column_names(&self) -> BTreeSet<&str> {
self.column_names.keys().map(|name| name.as_str()).collect()
}
/// Return the number of rows in this chunk
pub fn rows(&self) -> usize {
self.row_count

View File

@ -43,6 +43,7 @@ write_summary = { path = "../write_summary" }
[dev-dependencies]
assert_matches = "1.5"
criterion = { version = "0.4", default-features = false, features = ["async_tokio", "rayon"]}
iox_tests = { path = "../iox_tests" }
once_cell = "1"
paste = "1.0.9"
pretty_assertions = "1.3.0"

View File

@ -149,6 +149,7 @@ mod tests {
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
max_columns_per_table: 4,
},
);

View File

@ -1,7 +1,7 @@
use super::DmlHandler;
use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use data_types::{DatabaseName, DeletePredicate, NamespaceSchema};
use hashbrown::HashMap;
use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError},
@ -23,7 +23,7 @@ pub enum SchemaError {
/// The user has hit their column/table limit.
#[error("service limit reached: {0}")]
ServiceLimit(iox_catalog::interface::Error),
ServiceLimit(Box<dyn std::error::Error + Send + Sync + 'static>),
/// The request schema conflicts with the existing namespace schema.
#[error("schema conflict: {0}")]
@ -67,6 +67,22 @@ pub enum SchemaError {
/// relatively rare - it results in additional requests being made to the
/// catalog until the cached schema converges to match the catalog schema.
///
/// Note that the namespace-wide limit of the number of columns allowed per table
/// is also cached, which has two implications:
///
/// 1. If the namespace's column limit is updated in the catalog, the new limit
/// will not be enforced until the whole namespace is recached, likely only
/// on startup. In other words, updating the namespace's column limit requires
/// both a catalog update and service restart.
/// 2. There's a race condition that can result in a table ending up with more
/// columns than the namespace limit should allow. When multiple concurrent
/// writes come in to different service instances that each have their own
/// cache, and each of those writes add a disjoint set of new columns, the
/// requests will all succeed because when considered separately, they do
/// not exceed the number of columns in the cache. Once all the writes have
/// completed, the total set of columns in the table will be some multiple
/// of the limit.
///
/// # Correctness
///
/// The correct functioning of this schema validator relies on the catalog
@ -178,6 +194,12 @@ where
}
};
validate_column_limits(&batches, &schema).map_err(|e| {
warn!(%namespace, error=%e, "service protection limit reached");
self.service_limit_hit.inc(1);
SchemaError::ServiceLimit(Box::new(e))
})?;
let maybe_new_schema = validate_or_insert_schema(
batches.iter().map(|(k, v)| (k.as_str(), v)),
&schema,
@ -208,7 +230,7 @@ where
| CatalogError::TableCreateLimitError { .. } => {
warn!(%namespace, error=%e, "service protection limit reached");
self.service_limit_hit.inc(1);
SchemaError::ServiceLimit(e.into_err())
SchemaError::ServiceLimit(Box::new(e.into_err()))
}
_ => {
error!(%namespace, error=%e, "schema validation failed");
@ -253,17 +275,221 @@ where
}
}
#[derive(Debug, Error)]
#[error(
"couldn't create columns in table `{table_name}`; table contains \
{existing_column_count} existing columns, applying this write would result \
in {merged_column_count} columns, limit is {max_columns_per_table}"
)]
struct OverColumnLimit {
table_name: String,
// Number of columns already in the table.
existing_column_count: usize,
// Number of resultant columns after merging the write with existing columns.
merged_column_count: usize,
// The configured limit.
max_columns_per_table: usize,
}
fn validate_column_limits(
batches: &HashMap<String, MutableBatch>,
schema: &NamespaceSchema,
) -> Result<(), OverColumnLimit> {
for (table_name, batch) in batches {
let mut existing_columns = schema
.tables
.get(table_name)
.map(|t| t.column_names())
.unwrap_or_default();
let existing_column_count = existing_columns.len();
let merged_column_count = {
existing_columns.append(&mut batch.column_names());
existing_columns.len()
};
// If the table is currently over the column limit but this write only includes existing
// columns and doesn't exceed the limit more, this is allowed.
let columns_were_added_in_this_batch = merged_column_count > existing_column_count;
let column_limit_exceeded = merged_column_count > schema.max_columns_per_table;
if columns_were_added_in_this_batch && column_limit_exceeded {
return Err(OverColumnLimit {
table_name: table_name.into(),
merged_column_count,
existing_column_count,
max_columns_per_table: schema.max_columns_per_table,
});
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use data_types::{ColumnType, QueryPoolId, TimestampRange, TopicId};
use iox_catalog::mem::MemCatalog;
use data_types::{ColumnType, TimestampRange};
use iox_tests::util::{TestCatalog, TestNamespace};
use once_cell::sync::Lazy;
use std::sync::Arc;
static NAMESPACE: Lazy<DatabaseName<'static>> = Lazy::new(|| "bananas".try_into().unwrap());
#[tokio::test]
async fn validate_limits() {
let (catalog, namespace) = test_setup().await;
namespace.update_column_limit(3).await;
// Table not found in schema,
{
let schema = namespace.schema().await;
// Columns under the limit is ok
let batches = lp_to_writes("nonexistent val=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
// Columns over the limit is an error
let batches = lp_to_writes("nonexistent,tag1=A,tag2=B val=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
table_name: _,
existing_column_count: 0,
merged_column_count: 4,
max_columns_per_table: 3,
})
);
}
// Table exists but no columns in schema,
{
namespace.create_table("no_columns_in_schema").await;
let schema = namespace.schema().await;
// Columns under the limit is ok
let batches = lp_to_writes("no_columns_in_schema val=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
// Columns over the limit is an error
let batches = lp_to_writes("no_columns_in_schema,tag1=A,tag2=B val=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
table_name: _,
existing_column_count: 0,
merged_column_count: 4,
max_columns_per_table: 3,
})
);
}
// Table exists with a column in the schema,
{
let table = namespace.create_table("i_got_columns").await;
table.create_column("i_got_music", ColumnType::I64).await;
let schema = namespace.schema().await;
// Columns already existing is ok
let batches = lp_to_writes("i_got_columns i_got_music=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
// Adding columns under the limit is ok
let batches = lp_to_writes("i_got_columns,tag1=A i_got_music=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
// Adding columns over the limit is an error
let batches = lp_to_writes("i_got_columns,tag1=A,tag2=B i_got_music=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
table_name: _,
existing_column_count: 1,
merged_column_count: 4,
max_columns_per_table: 3,
})
);
}
// Table exists and is at the column limit,
{
let table = namespace.create_table("bananas").await;
table.create_column("greatness", ColumnType::I64).await;
table.create_column("tastiness", ColumnType::I64).await;
table
.create_column(schema::TIME_COLUMN_NAME, ColumnType::Time)
.await;
let schema = namespace.schema().await;
// Columns already existing is allowed
let batches = lp_to_writes("bananas greatness=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
// Adding columns over the limit is an error
let batches = lp_to_writes("bananas i_got_music=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
table_name: _,
existing_column_count: 3,
merged_column_count: 4,
max_columns_per_table: 3,
})
);
}
// Table exists and is over the column limit because of the race condition,
{
// Make two schema validator instances each with their own cache
let handler1 = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&catalog.metric_registry,
);
let handler2 = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&catalog.metric_registry,
);
// Make a valid write with one column + timestamp through each validator so the
// namespace schema gets cached
let writes1_valid = lp_to_writes("dragonfruit val=42i 123456");
handler1
.write(&*NAMESPACE, writes1_valid, None)
.await
.expect("request should succeed");
let writes2_valid = lp_to_writes("dragonfruit val=43i 123457");
handler2
.write(&*NAMESPACE, writes2_valid, None)
.await
.expect("request should succeed");
// Make "valid" writes through each validator that each add a different column, thus
// putting the table over the limit
let writes1_add_column = lp_to_writes("dragonfruit,tag1=A val=42i 123456");
handler1
.write(&*NAMESPACE, writes1_add_column, None)
.await
.expect("request should succeed");
let writes2_add_column = lp_to_writes("dragonfruit,tag2=B val=43i 123457");
handler2
.write(&*NAMESPACE, writes2_add_column, None)
.await
.expect("request should succeed");
let schema = namespace.schema().await;
// Columns already existing is allowed
let batches = lp_to_writes("dragonfruit val=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
// Adding more columns over the limit is an error
let batches = lp_to_writes("dragonfruit i_got_music=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
table_name: _,
existing_column_count: 4,
merged_column_count: 5,
max_columns_per_table: 3,
})
);
}
}
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<String, MutableBatch> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
@ -273,23 +499,11 @@ mod tests {
/// Initialise an in-memory [`MemCatalog`] and create a single namespace
/// named [`NAMESPACE`].
async fn create_catalog() -> Arc<dyn Catalog> {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
async fn test_setup() -> (Arc<TestCatalog>, Arc<TestNamespace>) {
let catalog = TestCatalog::new();
let namespace = catalog.create_namespace(&NAMESPACE).await;
let mut repos = catalog.repositories().await;
repos
.namespaces()
.create(
NAMESPACE.as_str(),
"inf",
TopicId::new(42),
QueryPoolId::new(24),
)
.await
.expect("failed to create test namespace");
catalog
(catalog, namespace)
}
fn assert_cache<C>(handler: &SchemaValidator<C>, table: &str, col: &str, want: ColumnType)
@ -314,10 +528,10 @@ mod tests {
#[tokio::test]
async fn test_write_ok() {
let catalog = create_catalog().await;
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
@ -337,10 +551,10 @@ mod tests {
#[tokio::test]
async fn test_write_schema_not_found() {
let catalog = create_catalog().await;
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
@ -361,10 +575,10 @@ mod tests {
#[tokio::test]
async fn test_write_validation_failure() {
let catalog = create_catalog().await;
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
@ -399,10 +613,10 @@ mod tests {
#[tokio::test]
async fn test_write_table_service_limit() {
let catalog = create_catalog().await;
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
Arc::clone(&catalog),
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
@ -417,6 +631,7 @@ mod tests {
// Configure the service limit to be hit next request
catalog
.catalog()
.repositories()
.await
.namespaces()
@ -437,10 +652,10 @@ mod tests {
#[tokio::test]
async fn test_write_column_service_limit() {
let catalog = create_catalog().await;
let (catalog, namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
Arc::clone(&catalog),
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
@ -453,16 +668,46 @@ mod tests {
.expect("request should succeed");
assert_eq!(writes.len(), got.len());
// Configure the service limit to be hit next request
namespace.update_column_limit(1).await;
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// Second write attempts to violate limits, causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456");
let err = handler
.write(&*NAMESPACE, writes, None)
.await
.expect_err("request should fail");
assert_matches!(err, SchemaError::ServiceLimit(_));
assert_eq!(1, handler.service_limit_hit.fetch());
}
#[tokio::test]
async fn test_first_write_many_columns_service_limit() {
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// Configure the service limit to be hit next request
catalog
.catalog()
.repositories()
.await
.namespaces()
.update_column_limit(NAMESPACE.as_str(), 1)
.update_column_limit(NAMESPACE.as_str(), 3)
.await
.expect("failed to set column limit");
// Second write attempts to violate limits, causing an error
// First write attempts to add columns over the limit, causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456");
let err = handler
.write(&*NAMESPACE, writes, None)
@ -478,10 +723,10 @@ mod tests {
const NAMESPACE: &str = "NAMESPACE_IS_NOT_VALIDATED";
const TABLE: &str = "bananas";
let catalog = create_catalog().await;
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);

View File

@ -42,6 +42,7 @@ mod tests {
topic_id: TopicId::new(24),
query_pool_id: QueryPoolId::new(1234),
tables: Default::default(),
max_columns_per_table: 50,
};
assert!(cache.put_schema(ns.clone(), schema1.clone()).is_none());
assert_eq!(*cache.get_schema(&ns).expect("lookup failure"), schema1);
@ -51,6 +52,7 @@ mod tests {
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(2),
tables: Default::default(),
max_columns_per_table: 10,
};
assert_eq!(

View File

@ -194,6 +194,7 @@ mod tests {
topic_id: TopicId::new(24),
query_pool_id: QueryPoolId::new(1234),
tables,
max_columns_per_table: 100,
}
}

View File

@ -60,6 +60,7 @@ mod tests {
topic_id: TopicId::new(1),
query_pool_id: QueryPoolId::new(1),
tables: Default::default(),
max_columns_per_table: 7,
}
}

View File

@ -330,16 +330,13 @@ async fn test_schema_limit() {
&err,
router::server::http::Error::DmlHandler(
DmlError::Schema(
SchemaError::ServiceLimit(
iox_catalog::interface::Error::TableCreateLimitError {
table_name,
namespace_id,
}
)
SchemaError::ServiceLimit(e)
)
) => {
assert_eq!(table_name, "platanos2");
assert_eq!(namespace_id.to_string(), "1");
assert_eq!(
e.to_string(),
"couldn't create table platanos2; limit reached on namespace 1"
);
}
);
assert_eq!(err.as_status_code(), StatusCode::TOO_MANY_REQUESTS);