Merge pull request #4351 from influxdata/dom/revert-service-limits
fix: revert column service limits (#4179)pull/24376/head
commit
7ac7fdcd5b
|
@ -58,16 +58,6 @@ pub enum Error {
|
|||
table_id: TableId,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"partial failure: inserted {} of {} column names in multi insert.",
|
||||
input_column_count,
|
||||
inserted_column_count,
|
||||
))]
|
||||
PartialColumnCreateError {
|
||||
input_column_count: usize,
|
||||
inserted_column_count: usize,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"couldn't create table {}; limit reached on namespace {}",
|
||||
table_name,
|
||||
|
@ -357,13 +347,6 @@ pub trait ColumnRepo: Send + Sync {
|
|||
/// Implementations make no guarantees as to the ordering or atomicity of
|
||||
/// the batch of column upsert operations - a batch upsert may partially
|
||||
/// commit, in which case an error MUST be returned by the implementation.
|
||||
///
|
||||
/// Partial failure will occur when column limits per table are hit. For example, if namespace
|
||||
/// A has a limit of 10 columns per table and so does namespace B, the DB already has 5 columns
|
||||
/// for table A in namespace A and nothing for namespace B, and a batch is passed to this
|
||||
/// function containing 10 inserts for table A and 10 inserts for some table in namespace B,
|
||||
/// ALL of the inserts for table A will fail and all of the inserts for the table in namespace
|
||||
/// B will succeed, and an error will be returned telling the user it was a partial failure.
|
||||
async fn create_or_get_many(
|
||||
&mut self,
|
||||
columns: &[ColumnUpsertRequest<'_>],
|
||||
|
@ -1084,8 +1067,7 @@ pub(crate) mod test_helpers {
|
|||
want.extend(cols3);
|
||||
assert_eq!(want, columns);
|
||||
|
||||
// test per-namespace column limits (not testing _many() version, that's tested in
|
||||
// postgres.rs because it's closely tied to the complex SQL statement)
|
||||
// test per-namespace column limits
|
||||
repos
|
||||
.namespaces()
|
||||
.update_column_limit("namespace_column_test", 1)
|
||||
|
|
|
@ -151,8 +151,6 @@ where
|
|||
}
|
||||
|
||||
if !column_batch.is_empty() {
|
||||
// Remember that create_or_get_many() can partially fail if per-table column limits are hit
|
||||
// for a namespace
|
||||
repos
|
||||
.columns()
|
||||
.create_or_get_many(&column_batch)
|
||||
|
|
|
@ -553,20 +553,7 @@ impl ColumnRepo for MemTxn {
|
|||
for column in columns {
|
||||
out.push(
|
||||
ColumnRepo::create_or_get(self, column.name, column.table_id, column.column_type)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
// create_or_get_many returns a different error than the single version in
|
||||
// the postgres impl. since the mem impl is only used in tests we can just
|
||||
// do this sloppy conversion of one error type to another
|
||||
Error::ColumnCreateLimitError {
|
||||
column_name: _,
|
||||
table_id: _,
|
||||
} => Error::PartialColumnCreateError {
|
||||
input_column_count: columns.len(),
|
||||
inserted_column_count: 0,
|
||||
},
|
||||
_ => e,
|
||||
})?,
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
Ok(out)
|
||||
|
|
|
@ -20,7 +20,7 @@ use observability_deps::tracing::{info, warn};
|
|||
use sqlx::types::Uuid;
|
||||
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row};
|
||||
use sqlx_hotswap_pool::HotSwapPool;
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
@ -880,9 +880,22 @@ RETURNING *;
|
|||
Ok(rec)
|
||||
}
|
||||
|
||||
// Will partially fail if namespace limits on the number of columns per table are hit- all
|
||||
// inserts for that table will fail but inserts for other tables will succeed. A partial
|
||||
// failure error will be returned in this case.
|
||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
||||
let rec = sqlx::query_as::<_, Column>(
|
||||
r#"
|
||||
SELECT column_name.* FROM table_name
|
||||
INNER JOIN column_name on column_name.table_id = table_name.id
|
||||
WHERE table_name.namespace_id = $1;
|
||||
"#,
|
||||
)
|
||||
.bind(&namespace_id)
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(rec)
|
||||
}
|
||||
|
||||
async fn create_or_get_many(
|
||||
&mut self,
|
||||
columns: &[ColumnUpsertRequest<'_>],
|
||||
|
@ -899,31 +912,7 @@ RETURNING *;
|
|||
let out = sqlx::query_as::<_, Column>(
|
||||
r#"
|
||||
INSERT INTO column_name ( name, table_id, column_type )
|
||||
SELECT column_name, input.table_id, column_type
|
||||
FROM UNNEST($1, $2, $3) AS input(column_name, table_id, column_type)
|
||||
INNER JOIN
|
||||
(
|
||||
-- this subquery counts up the number of columns per table in the input and adds that as
|
||||
-- 'input_per_table_count' that we can use later in the WHERE clause
|
||||
SELECT table_id, COUNT(table_id) AS input_per_table_count
|
||||
FROM UNNEST($1, $2, $3) AS input2(column_name, table_id, column_type)
|
||||
GROUP BY table_id
|
||||
) AS get_input_count(table_id, input_per_table_count)
|
||||
ON input.table_id = get_input_count.table_id
|
||||
INNER JOIN
|
||||
(
|
||||
-- this subquery gets the limit, and the current count of columns per table in the database
|
||||
-- so we can later check if the inserts would go over the limit by comparing against
|
||||
-- input_per_table_count in the WHERE clause
|
||||
SELECT max_columns_per_table, namespace.id, table_name.id AS table_id, COUNT(column_name.*) AS count_existing
|
||||
FROM namespace
|
||||
LEFT JOIN table_name ON namespace.id = table_name.namespace_id
|
||||
LEFT JOIN column_name ON table_name.id = column_name.table_id
|
||||
GROUP BY namespace.max_columns_per_table, namespace.id, table_name.id
|
||||
) AS get_existing_count(max_columns_per_table, namespace_id, table_id, count_existing)
|
||||
ON input.table_id = get_existing_count.table_id
|
||||
-- this is where the limits are actually checked
|
||||
WHERE count_existing + input_per_table_count <= max_columns_per_table
|
||||
SELECT name, table_id, column_type FROM UNNEST($1, $2, $3) as a(name, table_id, column_type)
|
||||
ON CONFLICT ON CONSTRAINT column_name_unique
|
||||
DO UPDATE SET name = column_name.name
|
||||
RETURNING *;
|
||||
|
@ -942,51 +931,24 @@ RETURNING *;
|
|||
}
|
||||
})?;
|
||||
|
||||
if columns.len() != out.len() {
|
||||
return Err(Error::PartialColumnCreateError {
|
||||
input_column_count: columns.len(),
|
||||
inserted_column_count: out.len(),
|
||||
});
|
||||
}
|
||||
assert_eq!(columns.len(), out.len());
|
||||
|
||||
let want_types: HashMap<String, i16> = columns
|
||||
.iter()
|
||||
.map(|req| (req.name.to_string(), req.column_type as i16))
|
||||
.collect();
|
||||
out.into_iter()
|
||||
.map(|existing| {
|
||||
let wanted_type = *want_types
|
||||
.get(&existing.name)
|
||||
.expect("Got back a record from multi column insert that we didn't insert!");
|
||||
if existing.column_type as i16 != wanted_type {
|
||||
.zip(v_column_type)
|
||||
.map(|(existing, want)| {
|
||||
if existing.column_type != want {
|
||||
return Err(Error::ColumnTypeMismatch {
|
||||
name: existing.name,
|
||||
existing: ColumnType::try_from(existing.column_type)
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
new: ColumnType::try_from(wanted_type).unwrap().to_string(),
|
||||
new: ColumnType::try_from(want).unwrap().to_string(),
|
||||
});
|
||||
}
|
||||
Ok(existing)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
||||
let rec = sqlx::query_as::<_, Column>(
|
||||
r#"
|
||||
SELECT column_name.* FROM table_name
|
||||
INNER JOIN column_name on column_name.table_id = table_name.id
|
||||
WHERE table_name.namespace_id = $1;
|
||||
"#,
|
||||
)
|
||||
.bind(&namespace_id)
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(rec)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -2485,16 +2447,12 @@ mod tests {
|
|||
// column values if successful.
|
||||
if let Ok(got) = &got {
|
||||
assert_eq!(insert.len(), got.len());
|
||||
let got_map: HashMap<String, Column> = got
|
||||
.into_iter()
|
||||
.map(|col| (col.name.to_string(), col.clone()))
|
||||
.collect();
|
||||
insert.iter().for_each(|req| {
|
||||
let got_col = got_map.get(req.name).expect("should get back what was inserted");
|
||||
assert_eq!(req.table_id, got_col.table_id);
|
||||
insert.iter().zip(got).for_each(|(req, got)| {
|
||||
assert_eq!(req.name, got.name);
|
||||
assert_eq!(req.table_id, got.table_id);
|
||||
assert_eq!(
|
||||
req.column_type,
|
||||
ColumnType::try_from(got_col.column_type).expect("invalid column type")
|
||||
ColumnType::try_from(got.column_type).expect("invalid column type")
|
||||
);
|
||||
});
|
||||
assert_metric_hit(&metrics, "column_create_or_get_many");
|
||||
|
@ -2617,225 +2575,4 @@ mod tests {
|
|||
},
|
||||
want = Err(Error::SqlxError{ .. })
|
||||
);
|
||||
|
||||
macro_rules! test_column_create_or_get_many_two_tables_limits {
|
||||
(
|
||||
$name:ident,
|
||||
limit = $col_limit:literal,
|
||||
preseed = $col_preseed_count:literal,
|
||||
preseed_table = $preseed_table_id:literal,
|
||||
calls = {$([$(($col_name:literal, $table_id:literal) => $col_type:expr),+ $(,)?]),+},
|
||||
want = $($want:tt)+
|
||||
) => {
|
||||
paste::paste! {
|
||||
#[tokio::test]
|
||||
async fn [<test_column_create_or_get_many_column_limits_$name>]() {
|
||||
// If running an integration test on your laptop, this requires that you have
|
||||
// Postgres running and that you've done the sqlx migrations. See the README in
|
||||
// this crate for info to set it up.
|
||||
maybe_skip_integration!();
|
||||
|
||||
let postgres = setup_db().await;
|
||||
|
||||
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
|
||||
let mut txn = postgres.start_transaction().await.expect("txn start");
|
||||
let (kafka, query, _sequencers) = create_or_get_default_records(1, txn.deref_mut())
|
||||
.await
|
||||
.expect("db init failed");
|
||||
txn.commit().await.expect("txn commit");
|
||||
|
||||
let namespace_id = postgres
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create("ns4", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id)
|
||||
.await
|
||||
.expect("namespace create failed")
|
||||
.id;
|
||||
postgres
|
||||
.repositories()
|
||||
.await
|
||||
.tables()
|
||||
.create_or_get("table1", namespace_id)
|
||||
.await
|
||||
.expect("create table failed");
|
||||
postgres
|
||||
.repositories()
|
||||
.await
|
||||
.tables()
|
||||
.create_or_get("table2", namespace_id)
|
||||
.await
|
||||
.expect("create table failed");
|
||||
|
||||
// create the desired number of columns for the table to preseed the test so we
|
||||
// can more easily test the limits
|
||||
let mut column_names: Vec<String> = Vec::with_capacity($col_preseed_count);
|
||||
for n in 1..=$col_preseed_count {
|
||||
column_names.push(format!("a{}", n));
|
||||
}
|
||||
let preseed_inserts: Vec<ColumnUpsertRequest> = column_names
|
||||
.iter()
|
||||
.map(|name| ColumnUpsertRequest {
|
||||
name: name.as_str(),
|
||||
table_id: TableId::new($preseed_table_id),
|
||||
column_type: ColumnType::Tag,
|
||||
})
|
||||
.collect();
|
||||
if $col_preseed_count > 0 {
|
||||
postgres
|
||||
.repositories()
|
||||
.await
|
||||
.columns()
|
||||
.create_or_get_many(&preseed_inserts)
|
||||
.await
|
||||
.expect("couldn't insert preseed columns");
|
||||
}
|
||||
|
||||
// deliberately setting the limits after pre-seeding with data in case we want
|
||||
// to write tests for the case where the limits are set below the amount of
|
||||
// columns we already have in the DB (which can happen- setting limits doesn't
|
||||
// truncate the data, or fail if data already exceeds it)
|
||||
postgres
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.update_column_limit("ns4", $col_limit)
|
||||
.await
|
||||
.expect("namespace limit update failed");
|
||||
|
||||
$(
|
||||
let insert = [
|
||||
$(
|
||||
ColumnUpsertRequest {
|
||||
name: $col_name,
|
||||
table_id: TableId::new($table_id),
|
||||
column_type: $col_type,
|
||||
},
|
||||
)+
|
||||
];
|
||||
let got = postgres
|
||||
.repositories()
|
||||
.await
|
||||
.columns()
|
||||
.create_or_get_many(&insert)
|
||||
.await;
|
||||
)+
|
||||
|
||||
assert_matches!(got, $($want)+);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test_column_create_or_get_many_two_tables_limits!(
|
||||
single_table_ok_exact_limit,
|
||||
limit = 5,
|
||||
preseed = 1,
|
||||
preseed_table = 1,
|
||||
calls = {
|
||||
[
|
||||
("test1", 1) => ColumnType::Tag,
|
||||
("test2", 1) => ColumnType::Tag,
|
||||
("test3", 1) => ColumnType::Tag,
|
||||
("test4", 1) => ColumnType::Tag,
|
||||
]
|
||||
},
|
||||
want = Ok(_)
|
||||
);
|
||||
|
||||
test_column_create_or_get_many_two_tables_limits!(
|
||||
single_table_err_one_over,
|
||||
limit = 5,
|
||||
preseed = 1,
|
||||
preseed_table = 1,
|
||||
calls = {
|
||||
[
|
||||
("test1", 1) => ColumnType::Tag,
|
||||
("test2", 1) => ColumnType::Tag,
|
||||
("test3", 1) => ColumnType::Tag,
|
||||
("test4", 1) => ColumnType::Tag,
|
||||
("test5", 1) => ColumnType::Tag,
|
||||
]
|
||||
},
|
||||
want = Err(Error::PartialColumnCreateError {
|
||||
input_column_count: 5,
|
||||
inserted_column_count: 0
|
||||
})
|
||||
);
|
||||
|
||||
test_column_create_or_get_many_two_tables_limits!(
|
||||
single_table_ok_one_under,
|
||||
limit = 5,
|
||||
preseed = 2,
|
||||
preseed_table = 1,
|
||||
calls = {
|
||||
[
|
||||
("test1", 1) => ColumnType::Tag,
|
||||
("test2", 1) => ColumnType::Tag,
|
||||
]
|
||||
},
|
||||
want = Ok(_)
|
||||
);
|
||||
|
||||
test_column_create_or_get_many_two_tables_limits!(
|
||||
multi_table_ok,
|
||||
limit = 5,
|
||||
preseed = 1,
|
||||
preseed_table = 1,
|
||||
calls = {
|
||||
[
|
||||
("test1", 1) => ColumnType::Tag,
|
||||
("test2", 1) => ColumnType::Tag,
|
||||
("test3", 1) => ColumnType::Tag,
|
||||
("test4", 1) => ColumnType::Tag,
|
||||
("test1", 2) => ColumnType::Tag,
|
||||
("test2", 2) => ColumnType::Tag,
|
||||
("test3", 2) => ColumnType::Tag,
|
||||
("test4", 2) => ColumnType::Tag,
|
||||
("test5", 2) => ColumnType::Tag,
|
||||
]
|
||||
},
|
||||
want = Ok(_)
|
||||
);
|
||||
|
||||
test_column_create_or_get_many_two_tables_limits!(
|
||||
multi_table_partial,
|
||||
limit = 5,
|
||||
preseed = 3,
|
||||
preseed_table = 1,
|
||||
calls = {
|
||||
[
|
||||
("test1", 1) => ColumnType::Tag,
|
||||
("test2", 1) => ColumnType::Tag,
|
||||
("test3", 1) => ColumnType::Tag,
|
||||
("test4", 1) => ColumnType::Tag,
|
||||
("test5", 1) => ColumnType::Tag,
|
||||
("test1", 2) => ColumnType::Tag,
|
||||
("test2", 2) => ColumnType::Tag,
|
||||
("test3", 2) => ColumnType::Tag,
|
||||
("test4", 2) => ColumnType::Tag,
|
||||
("test5", 2) => ColumnType::Tag,
|
||||
]
|
||||
},
|
||||
want = Err(Error::PartialColumnCreateError {
|
||||
input_column_count: 10,
|
||||
inserted_column_count: 5
|
||||
})
|
||||
);
|
||||
|
||||
test_column_create_or_get_many_two_tables_limits!(
|
||||
invalid_table_id,
|
||||
limit = 5,
|
||||
preseed = 1,
|
||||
preseed_table = 1,
|
||||
calls = {
|
||||
[
|
||||
("test1", 3) => ColumnType::Tag,
|
||||
]
|
||||
},
|
||||
want = Err(Error::PartialColumnCreateError {
|
||||
input_column_count: 1,
|
||||
inserted_column_count: 0
|
||||
})
|
||||
);
|
||||
}
|
||||
|
|
|
@ -204,8 +204,7 @@ where
|
|||
}
|
||||
// Service limits
|
||||
CatalogError::ColumnCreateLimitError { .. }
|
||||
| CatalogError::TableCreateLimitError { .. }
|
||||
| CatalogError::PartialColumnCreateError { .. } => {
|
||||
| CatalogError::TableCreateLimitError { .. } => {
|
||||
warn!(%namespace, error=%e, "service protection limit reached");
|
||||
self.service_limit_hit.inc(1);
|
||||
SchemaError::ServiceLimit(e)
|
||||
|
|
Loading…
Reference in New Issue