chore: back-port catalog limit refactor from enterprise (#26278)
This back-ports some changes to how limits are enforced on the catalog from enterprise. There were some changes that would catch some potential bugs.pull/26280/head
parent
ab13c05c3f
commit
e4cfbf71f7
|
@ -122,11 +122,11 @@ impl CreateTableQuery<'_> {
|
|||
|
||||
pub fn with_fields(
|
||||
mut self,
|
||||
fields: impl IntoIterator<Item = (impl Into<String>, String)>,
|
||||
fields: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
|
||||
) -> Self {
|
||||
self.fields = fields
|
||||
.into_iter()
|
||||
.map(|(name, dt)| (name.into(), dt))
|
||||
.map(|(name, dt)| (name.into(), dt.into()))
|
||||
.collect();
|
||||
self
|
||||
}
|
||||
|
|
|
@ -66,6 +66,9 @@ pub const INTERNAL_DB_NAME: &str = "_internal";
|
|||
|
||||
const DEFAULT_ADMIN_TOKEN_NAME: &str = "_admin";
|
||||
|
||||
/// Limit for the number of tag columns on a table
|
||||
pub(crate) const NUM_TAG_COLUMNS_LIMIT: usize = 250;
|
||||
|
||||
/// The sequence number of a batch of WAL operations.
|
||||
#[derive(
|
||||
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
|
||||
|
@ -109,6 +112,7 @@ pub struct Catalog {
|
|||
metrics: Arc<CatalogMetrics>,
|
||||
/// In-memory representation of the catalog
|
||||
pub(crate) inner: RwLock<InnerCatalog>,
|
||||
limits: CatalogLimits,
|
||||
}
|
||||
|
||||
/// Custom implementation of `Debug` for the `Catalog` type to avoid serializing the object store
|
||||
|
@ -122,15 +126,30 @@ impl std::fmt::Debug for Catalog {
|
|||
|
||||
const CATALOG_CHECKPOINT_INTERVAL: u64 = 100;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct CatalogLimits {
|
||||
num_dbs: usize,
|
||||
num_tables: usize,
|
||||
num_columns_per_table: usize,
|
||||
}
|
||||
|
||||
impl Default for CatalogLimits {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
num_dbs: Catalog::NUM_DBS_LIMIT,
|
||||
num_tables: Catalog::NUM_TABLES_LIMIT,
|
||||
num_columns_per_table: Catalog::NUM_COLUMNS_PER_TABLE_LIMIT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Catalog {
|
||||
/// Limit for the number of Databases that InfluxDB 3 Core can have
|
||||
pub(crate) const NUM_DBS_LIMIT: usize = 5;
|
||||
pub const NUM_DBS_LIMIT: usize = 5;
|
||||
/// Limit for the number of columns per table that InfluxDB 3 Core can have
|
||||
pub(crate) const NUM_COLUMNS_PER_TABLE_LIMIT: usize = 500;
|
||||
pub const NUM_COLUMNS_PER_TABLE_LIMIT: usize = 500;
|
||||
/// Limit for the number of tables across all DBs that InfluxDB 3 Core can have
|
||||
pub(crate) const NUM_TABLES_LIMIT: usize = 2000;
|
||||
/// Limit for the number of tag columns on a table
|
||||
pub(crate) const NUM_TAG_COLUMNS_LIMIT: usize = 250;
|
||||
pub const NUM_TABLES_LIMIT: usize = 2000;
|
||||
|
||||
pub async fn new(
|
||||
node_id: impl Into<Arc<str>>,
|
||||
|
@ -154,6 +173,7 @@ impl Catalog {
|
|||
store,
|
||||
metrics,
|
||||
inner,
|
||||
limits: Default::default(),
|
||||
});
|
||||
|
||||
create_internal_db(&mut catalog).await;
|
||||
|
@ -191,6 +211,18 @@ impl Catalog {
|
|||
Ok(catalog)
|
||||
}
|
||||
|
||||
fn num_dbs_limit(&self) -> usize {
|
||||
self.limits.num_dbs
|
||||
}
|
||||
|
||||
fn num_tables_limit(&self) -> usize {
|
||||
self.limits.num_tables
|
||||
}
|
||||
|
||||
fn num_columns_per_table_limit(&self) -> usize {
|
||||
self.limits.num_columns_per_table
|
||||
}
|
||||
|
||||
pub fn object_store_prefix(&self) -> Arc<str> {
|
||||
Arc::clone(&self.store.prefix)
|
||||
}
|
||||
|
@ -288,8 +320,8 @@ impl Catalog {
|
|||
None => {
|
||||
let mut inner = self.inner.write();
|
||||
|
||||
if inner.database_count() >= Self::NUM_DBS_LIMIT {
|
||||
return Err(CatalogError::TooManyDbs);
|
||||
if inner.database_count() >= self.num_dbs_limit() {
|
||||
return Err(CatalogError::TooManyDbs(self.num_dbs_limit()));
|
||||
}
|
||||
|
||||
info!(database_name = db_name, "creating new database");
|
||||
|
@ -536,6 +568,7 @@ impl Catalog {
|
|||
store,
|
||||
metrics: Arc::new(CatalogMetrics::new(&metric_registry)),
|
||||
inner: RwLock::new(inner),
|
||||
limits: Default::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -860,22 +893,16 @@ impl InnerCatalog {
|
|||
}
|
||||
|
||||
fn apply_database_batch(&mut self, database_batch: &DatabaseBatch) -> Result<bool> {
|
||||
let table_count = self.table_count();
|
||||
if let Some(db) = self.databases.get_by_id(&database_batch.database_id) {
|
||||
let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(&db, database_batch)?
|
||||
else {
|
||||
return Ok(false);
|
||||
};
|
||||
check_overall_table_count(Some(&db), &new_db, table_count)?;
|
||||
self.databases
|
||||
.update(db.id, new_db)
|
||||
.expect("existing database should be updated");
|
||||
} else {
|
||||
if self.database_count() >= Catalog::NUM_DBS_LIMIT {
|
||||
return Err(CatalogError::TooManyDbs);
|
||||
}
|
||||
let new_db = DatabaseSchema::new_from_batch(database_batch)?;
|
||||
check_overall_table_count(None, &new_db, table_count)?;
|
||||
self.databases
|
||||
.insert(new_db.id, new_db)
|
||||
.expect("new database should be inserted");
|
||||
|
@ -888,30 +915,6 @@ impl InnerCatalog {
|
|||
}
|
||||
}
|
||||
|
||||
fn check_overall_table_count(
|
||||
existing_db: Option<&Arc<DatabaseSchema>>,
|
||||
new_db: &DatabaseSchema,
|
||||
current_table_count: usize,
|
||||
) -> Result<()> {
|
||||
let existing_table_count = if let Some(existing_db) = existing_db {
|
||||
existing_db.table_count()
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let new_table_count = new_db.table_count();
|
||||
match new_table_count.cmp(&existing_table_count) {
|
||||
Ordering::Less | Ordering::Equal => Ok(()),
|
||||
Ordering::Greater => {
|
||||
let newly_added_table_count = new_db.table_count() - existing_table_count;
|
||||
if current_table_count + newly_added_table_count > Catalog::NUM_TABLES_LIMIT {
|
||||
Err(CatalogError::TooManyTables)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct NodeDefinition {
|
||||
pub(crate) node_id: Arc<str>,
|
||||
|
@ -1375,11 +1378,6 @@ impl TableDefinition {
|
|||
columns: Vec<(ColumnId, Arc<str>, InfluxColumnType)>,
|
||||
series_key: Vec<ColumnId>,
|
||||
) -> Result<Self> {
|
||||
// ensure we're under the column limit
|
||||
if columns.len() > Catalog::NUM_COLUMNS_PER_TABLE_LIMIT {
|
||||
return Err(CatalogError::TooManyColumns);
|
||||
}
|
||||
|
||||
// Use a BTree to ensure that the columns are ordered:
|
||||
let mut ordered_columns = BTreeMap::new();
|
||||
for (col_id, name, column_type) in &columns {
|
||||
|
@ -1538,16 +1536,6 @@ impl TableDefinition {
|
|||
}
|
||||
}
|
||||
|
||||
// ensure we don't go over the column limit
|
||||
if cols.len() > Catalog::NUM_COLUMNS_PER_TABLE_LIMIT {
|
||||
return Err(CatalogError::TooManyColumns);
|
||||
}
|
||||
|
||||
// ensure we don't go over the tag column limit
|
||||
if self.series_key.len() > Catalog::NUM_TAG_COLUMNS_LIMIT {
|
||||
return Err(CatalogError::TooManyTagColumns);
|
||||
}
|
||||
|
||||
let mut schema_builder = SchemaBuilder::with_capacity(cols.len());
|
||||
// TODO: may need to capture some schema-level metadata, currently, this causes trouble in
|
||||
// tests, so I am omitting this for now:
|
||||
|
@ -1587,6 +1575,13 @@ impl TableDefinition {
|
|||
self.influx_schema().len()
|
||||
}
|
||||
|
||||
pub fn num_tag_columns(&self) -> usize {
|
||||
self.columns
|
||||
.resource_iter()
|
||||
.filter(|c| matches!(c.data_type, InfluxColumnType::Tag))
|
||||
.count()
|
||||
}
|
||||
|
||||
pub fn field_type_by_name(&self, name: impl AsRef<str>) -> Option<InfluxColumnType> {
|
||||
self.columns
|
||||
.get_by_name(name.as_ref())
|
||||
|
@ -2591,43 +2586,18 @@ mod tests {
|
|||
async fn apply_catalog_batch_fails_for_add_fields_past_tag_limit() {
|
||||
let catalog = Catalog::new_in_memory("host").await.unwrap();
|
||||
catalog.create_database("foo").await.unwrap();
|
||||
let tags = (0..Catalog::NUM_TAG_COLUMNS_LIMIT)
|
||||
let tags = (0..NUM_TAG_COLUMNS_LIMIT)
|
||||
.map(|i| format!("tag_{i}"))
|
||||
.collect::<Vec<_>>();
|
||||
catalog
|
||||
.create_table("foo", "bar", &tags, &[("f1", FieldDataType::String)])
|
||||
.await
|
||||
.unwrap();
|
||||
let db_id = catalog.db_name_to_id("foo").unwrap();
|
||||
let table_id = catalog
|
||||
.db_schema_by_id(&db_id)
|
||||
.unwrap()
|
||||
.table_definition("bar")
|
||||
.unwrap()
|
||||
.table_id;
|
||||
|
||||
let catalog_batch = create::catalog_batch(
|
||||
db_id,
|
||||
"foo",
|
||||
0,
|
||||
[create::add_fields_op(
|
||||
db_id,
|
||||
"foo",
|
||||
table_id,
|
||||
"bar",
|
||||
[create::field_def(
|
||||
ColumnId::new(500),
|
||||
"tag_too_much",
|
||||
FieldDataType::Tag,
|
||||
)],
|
||||
)],
|
||||
);
|
||||
debug!("getting write lock");
|
||||
let mut inner = catalog.inner.write();
|
||||
let sequence = inner.sequence_number();
|
||||
let err = inner
|
||||
.apply_catalog_batch(&catalog_batch, sequence.next())
|
||||
.expect_err("should fail to apply AddFields operation for tags past the limit");
|
||||
let mut txn = catalog.begin("foo").unwrap();
|
||||
let err = txn
|
||||
.column_or_create("bar", "tag_too_much", FieldDataType::Tag)
|
||||
.unwrap_err();
|
||||
assert_contains!(
|
||||
err.to_string(),
|
||||
"Update to schema would exceed number of tag columns per table limit of 250 columns"
|
||||
|
@ -2638,7 +2608,7 @@ mod tests {
|
|||
async fn apply_catalog_batch_fails_to_create_table_with_too_many_tags() {
|
||||
let catalog = Catalog::new_in_memory("host").await.unwrap();
|
||||
catalog.create_database("foo").await.unwrap();
|
||||
let tags = (0..Catalog::NUM_TAG_COLUMNS_LIMIT + 1)
|
||||
let tags = (0..NUM_TAG_COLUMNS_LIMIT + 1)
|
||||
.map(|i| format!("tag_{i}"))
|
||||
.collect::<Vec<_>>();
|
||||
let err = catalog
|
||||
|
|
|
@ -12,7 +12,7 @@ use super::{
|
|||
};
|
||||
use crate::{
|
||||
CatalogError, Result,
|
||||
catalog::NodeDefinition,
|
||||
catalog::{NUM_TAG_COLUMNS_LIMIT, NodeDefinition},
|
||||
log::{
|
||||
AddFieldsLog, CatalogBatch, CreateDatabaseLog, CreateTableLog, DatabaseCatalogOp,
|
||||
DeleteDistinctCacheLog, DeleteLastCacheLog, DeleteTokenDetails, DeleteTriggerLog,
|
||||
|
@ -33,14 +33,16 @@ impl Catalog {
|
|||
Some(database_schema) => Ok(DatabaseCatalogTransaction {
|
||||
catalog_sequence: inner.sequence_number(),
|
||||
current_table_count: inner.table_count(),
|
||||
table_limit: self.num_tables_limit(),
|
||||
time_ns: self.time_provider.now().timestamp_nanos(),
|
||||
database_schema: Arc::clone(&database_schema),
|
||||
ops: vec![],
|
||||
columns_per_table_limit: self.num_columns_per_table_limit(),
|
||||
}),
|
||||
None => {
|
||||
let inner = self.inner.read();
|
||||
if inner.database_count() >= Self::NUM_DBS_LIMIT {
|
||||
return Err(CatalogError::TooManyDbs);
|
||||
if inner.database_count() >= self.num_dbs_limit() {
|
||||
return Err(CatalogError::TooManyDbs(self.num_dbs_limit()));
|
||||
}
|
||||
let database_id = inner.databases.next_id();
|
||||
let database_name = Arc::from(db_name);
|
||||
|
@ -54,9 +56,11 @@ impl Catalog {
|
|||
Ok(DatabaseCatalogTransaction {
|
||||
catalog_sequence: inner.sequence_number(),
|
||||
current_table_count: inner.table_count(),
|
||||
table_limit: self.num_tables_limit(),
|
||||
time_ns,
|
||||
database_schema,
|
||||
ops,
|
||||
columns_per_table_limit: self.num_columns_per_table_limit(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -858,6 +862,8 @@ impl CatalogUpdate {
|
|||
pub struct DatabaseCatalogTransaction {
|
||||
catalog_sequence: CatalogSequenceNumber,
|
||||
current_table_count: usize,
|
||||
table_limit: usize,
|
||||
columns_per_table_limit: usize,
|
||||
time_ns: i64,
|
||||
database_schema: Arc<DatabaseSchema>,
|
||||
ops: Vec<DatabaseCatalogOp>,
|
||||
|
@ -889,6 +895,9 @@ impl DatabaseCatalogTransaction {
|
|||
match self.database_schema.table_definition(table_name) {
|
||||
Some(def) => Ok(def),
|
||||
None => {
|
||||
if self.current_table_count >= self.table_limit {
|
||||
return Err(CatalogError::TooManyTables(self.table_limit));
|
||||
}
|
||||
let database_id = self.database_schema.id;
|
||||
let database_name = Arc::clone(&self.database_schema.name);
|
||||
let db_schema = Arc::make_mut(&mut self.database_schema);
|
||||
|
@ -929,6 +938,14 @@ impl DatabaseCatalogTransaction {
|
|||
got: column_type.into(),
|
||||
}),
|
||||
None => {
|
||||
if table_def.num_columns() >= self.columns_per_table_limit {
|
||||
return Err(CatalogError::TooManyColumns(self.columns_per_table_limit));
|
||||
}
|
||||
if matches!(column_type, FieldDataType::Tag)
|
||||
&& table_def.num_tag_columns() >= NUM_TAG_COLUMNS_LIMIT
|
||||
{
|
||||
return Err(CatalogError::TooManyTagColumns);
|
||||
}
|
||||
let database_id = self.database_schema.id;
|
||||
let database_name = Arc::clone(&self.database_schema.name);
|
||||
let db_schema = Arc::make_mut(&mut self.database_schema);
|
||||
|
@ -969,8 +986,14 @@ impl DatabaseCatalogTransaction {
|
|||
if self.database_schema.table_definition(table_name).is_some() {
|
||||
return Err(CatalogError::AlreadyExists);
|
||||
}
|
||||
if self.current_table_count >= Catalog::NUM_TABLES_LIMIT {
|
||||
return Err(CatalogError::TooManyTables);
|
||||
if self.current_table_count >= self.table_limit {
|
||||
return Err(CatalogError::TooManyTables(self.table_limit));
|
||||
}
|
||||
if tags.len() > NUM_TAG_COLUMNS_LIMIT {
|
||||
return Err(CatalogError::TooManyTagColumns);
|
||||
}
|
||||
if tags.len() + fields.len() > self.columns_per_table_limit - 1 {
|
||||
return Err(CatalogError::TooManyColumns(self.columns_per_table_limit));
|
||||
}
|
||||
let db_schema = Arc::make_mut(&mut self.database_schema);
|
||||
let mut table_def_arc = db_schema.create_new_empty_table(table_name)?;
|
||||
|
|
|
@ -3,7 +3,10 @@ use std::sync::Arc;
|
|||
use anyhow::anyhow;
|
||||
use schema::InfluxColumnType;
|
||||
|
||||
use crate::{catalog::Catalog, channel::SubscriptionError, object_store::ObjectStoreCatalogError};
|
||||
use crate::{
|
||||
catalog::NUM_TAG_COLUMNS_LIMIT, channel::SubscriptionError,
|
||||
object_store::ObjectStoreCatalogError,
|
||||
};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum CatalogError {
|
||||
|
@ -44,29 +47,20 @@ pub enum CatalogError {
|
|||
#[error("invalid node registration")]
|
||||
InvalidNodeRegistration,
|
||||
|
||||
#[error(
|
||||
"Update to schema would exceed number of columns per table limit of {} columns",
|
||||
Catalog::NUM_COLUMNS_PER_TABLE_LIMIT - 1
|
||||
)]
|
||||
TooManyColumns,
|
||||
#[error("Update to schema would exceed number of columns per table limit of {0} columns")]
|
||||
TooManyColumns(usize),
|
||||
|
||||
#[error(
|
||||
"Update to schema would exceed number of tag columns per table limit of {} columns",
|
||||
Catalog::NUM_TAG_COLUMNS_LIMIT
|
||||
NUM_TAG_COLUMNS_LIMIT
|
||||
)]
|
||||
TooManyTagColumns,
|
||||
|
||||
#[error(
|
||||
"Update to schema would exceed number of tables limit of {} tables",
|
||||
Catalog::NUM_TABLES_LIMIT
|
||||
)]
|
||||
TooManyTables,
|
||||
#[error("Update to schema would exceed number of tables limit of {0} tables")]
|
||||
TooManyTables(usize),
|
||||
|
||||
#[error(
|
||||
"Adding a new database would exceed limit of {} databases",
|
||||
Catalog::NUM_DBS_LIMIT
|
||||
)]
|
||||
TooManyDbs,
|
||||
#[error("Adding a new database would exceed limit of {0} databases")]
|
||||
TooManyDbs(usize),
|
||||
|
||||
#[error("Table {} not in DB schema for {}", table_name, db_name)]
|
||||
TableNotFound {
|
||||
|
|
|
@ -274,7 +274,10 @@ impl IntoResponse for CatalogError {
|
|||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(Body::from(self.to_string()))
|
||||
.unwrap(),
|
||||
Self::TooManyColumns | Self::TooManyTables | Self::TooManyDbs => {
|
||||
Self::TooManyColumns(_)
|
||||
| Self::TooManyTables(_)
|
||||
| Self::TooManyDbs(_)
|
||||
| Self::TooManyTagColumns => {
|
||||
let err: ErrorMessage<()> = ErrorMessage {
|
||||
error: self.to_string(),
|
||||
data: None,
|
||||
|
|
Loading…
Reference in New Issue