Merge pull request #5411 from influxdata/dom/table-scoped-errs
feat: table name in schema validation errorspull/24376/head
commit
a6f28d0709
|
@ -2236,6 +2236,7 @@ dependencies = [
|
|||
"sqlx-hotswap-pool",
|
||||
"tempfile",
|
||||
"test_helpers",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"uuid 1.1.2",
|
||||
"workspace-hack",
|
||||
|
|
|
@ -15,6 +15,7 @@ observability_deps = { path = "../observability_deps" }
|
|||
snafu = "0.7"
|
||||
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] }
|
||||
sqlx-hotswap-pool = { path = "../sqlx-hotswap-pool" }
|
||||
thiserror = "1.0.32"
|
||||
tokio = { version = "1.20", features = ["io-util", "macros", "parking_lot", "rt-multi-thread", "time"] }
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -18,6 +18,7 @@ use data_types::{
|
|||
};
|
||||
use mutable_batch::MutableBatch;
|
||||
use std::{borrow::Cow, collections::BTreeMap};
|
||||
use thiserror::Error;
|
||||
|
||||
const SHARED_KAFKA_TOPIC: &str = "iox-shared";
|
||||
const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC;
|
||||
|
@ -31,6 +32,28 @@ pub mod mem;
|
|||
pub mod metrics;
|
||||
pub mod postgres;
|
||||
|
||||
/// An [`crate::interface::Error`] scoped to a single table for schema validation errors.
|
||||
#[derive(Debug, Error)]
|
||||
#[error("table {}, {}", .0, .1)]
|
||||
pub struct TableScopedError(String, Error);
|
||||
|
||||
impl TableScopedError {
|
||||
/// Return the table name for this error.
|
||||
pub fn table(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
|
||||
/// Return a reference to the error.
|
||||
pub fn err(&self) -> &Error {
|
||||
&self.1
|
||||
}
|
||||
|
||||
/// Return ownership of the error, discarding the table name.
|
||||
pub fn into_err(self) -> Error {
|
||||
self.1
|
||||
}
|
||||
}
|
||||
|
||||
/// Given an iterator of `(table_name, batch)` to validate, this function
|
||||
/// ensures all the columns within `batch` match the existing schema for
|
||||
/// `table_name` in `schema`. If the column does not already exist in `schema`,
|
||||
|
@ -43,7 +66,7 @@ pub async fn validate_or_insert_schema<'a, T, U, R>(
|
|||
tables: T,
|
||||
schema: &NamespaceSchema,
|
||||
repos: &mut R,
|
||||
) -> Result<Option<NamespaceSchema>>
|
||||
) -> Result<Option<NamespaceSchema>, TableScopedError>
|
||||
where
|
||||
T: IntoIterator<IntoIter = U, Item = (&'a str, &'a MutableBatch)> + Send + Sync,
|
||||
U: Iterator<Item = T::Item> + Send,
|
||||
|
@ -55,7 +78,9 @@ where
|
|||
let mut schema = Cow::Borrowed(schema);
|
||||
|
||||
for (table_name, batch) in tables {
|
||||
validate_mutable_batch(batch, table_name, &mut schema, repos).await?;
|
||||
validate_mutable_batch(batch, table_name, &mut schema, repos)
|
||||
.await
|
||||
.map_err(|e| TableScopedError(table_name.to_string(), e))?;
|
||||
}
|
||||
|
||||
match schema {
|
||||
|
@ -258,7 +283,7 @@ mod tests {
|
|||
.await;
|
||||
|
||||
match got {
|
||||
Err(Error::ColumnTypeMismatch{ .. }) => {
|
||||
Err(TableScopedError(_, Error::ColumnTypeMismatch{ .. })) => {
|
||||
observed_conflict = true;
|
||||
schema
|
||||
},
|
||||
|
|
|
@ -27,7 +27,7 @@ pub enum SchemaError {
|
|||
|
||||
/// The request schema conflicts with the existing namespace schema.
|
||||
#[error("schema conflict: {0}")]
|
||||
Conflict(iox_catalog::interface::Error),
|
||||
Conflict(iox_catalog::TableScopedError),
|
||||
|
||||
/// A catalog error during schema validation.
|
||||
///
|
||||
|
@ -185,7 +185,7 @@ where
|
|||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
match e.err() {
|
||||
// Schema conflicts
|
||||
CatalogError::ColumnTypeMismatch {
|
||||
ref name,
|
||||
|
@ -197,6 +197,7 @@ where
|
|||
column_name=%name,
|
||||
existing_column_type=%existing,
|
||||
request_column_type=%new,
|
||||
table_name=%e.table(),
|
||||
"schema conflict"
|
||||
);
|
||||
self.schema_conflict.inc(1);
|
||||
|
@ -207,11 +208,11 @@ where
|
|||
| CatalogError::TableCreateLimitError { .. } => {
|
||||
warn!(%namespace, error=%e, "service protection limit reached");
|
||||
self.service_limit_hit.inc(1);
|
||||
SchemaError::ServiceLimit(e)
|
||||
SchemaError::ServiceLimit(e.into_err())
|
||||
}
|
||||
e => {
|
||||
_ => {
|
||||
error!(%namespace, error=%e, "schema validation failed");
|
||||
SchemaError::UnexpectedCatalogError(e)
|
||||
SchemaError::UnexpectedCatalogError(e.into_err())
|
||||
}
|
||||
}
|
||||
})?
|
||||
|
@ -383,7 +384,9 @@ mod tests {
|
|||
.await
|
||||
.expect_err("request should fail");
|
||||
|
||||
assert_matches!(err, SchemaError::Conflict(_));
|
||||
assert_matches!(err, SchemaError::Conflict(e) => {
|
||||
assert_eq!(e.table(), "bananas");
|
||||
});
|
||||
|
||||
// The cache should retain the original schema.
|
||||
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
|
||||
|
|
|
@ -276,17 +276,19 @@ async fn test_schema_conflict() {
|
|||
router::server::http::Error::DmlHandler(
|
||||
DmlError::Schema(
|
||||
SchemaError::Conflict(
|
||||
iox_catalog::interface::Error::ColumnTypeMismatch {
|
||||
name,
|
||||
existing,
|
||||
new,
|
||||
}
|
||||
e
|
||||
)
|
||||
)
|
||||
) => {
|
||||
assert_eq!(name, "val");
|
||||
assert_eq!(existing, "i64");
|
||||
assert_eq!(new, "iox::column_type::field::float");
|
||||
assert_matches!(e.err(), iox_catalog::interface::Error::ColumnTypeMismatch {
|
||||
name,
|
||||
existing,
|
||||
new,
|
||||
} => {
|
||||
assert_eq!(name, "val");
|
||||
assert_eq!(existing, "i64");
|
||||
assert_eq!(new, "iox::column_type::field::float");
|
||||
});
|
||||
}
|
||||
);
|
||||
assert_eq!(err.as_status_code(), StatusCode::BAD_REQUEST);
|
||||
|
|
Loading…
Reference in New Issue