feat: schema validation conflict/limit metrics

Emit metric counters tracking the number of schema conflicts, and number
of service limit errors observed.
pull/24376/head
Dom Dwyer 2022-03-30 11:42:58 +00:00
parent f2a3dd58b2
commit 91730d6a1c
5 changed files with 60 additions and 10 deletions

View File

@ -169,7 +169,8 @@ pub async fn create_router2_server_type(
));
// Initialise and instrument the schema validator
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics);
let schema_validator =
InstrumentationDecorator::new("schema_validator", Arc::clone(&metrics), schema_validator);

View File

@ -60,7 +60,8 @@ fn e2e_benchmarks(c: &mut Criterion) {
));
let write_buffer = init_write_buffer(1);
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});

View File

@ -46,7 +46,7 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let validator = SchemaValidator::new(catalog, ns_cache);
let validator = SchemaValidator::new(catalog, ns_cache, &*metrics);
for i in 0..65_000 {
let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str());

View File

@ -7,6 +7,7 @@ use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError},
validate_or_insert_schema,
};
use metric::U64Counter;
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use std::{ops::DerefMut, sync::Arc};
@ -85,8 +86,10 @@ pub enum SchemaError {
#[derive(Debug)]
pub struct SchemaValidator<C = Arc<InstrumentedCache<MemoryNamespaceCache>>> {
catalog: Arc<dyn Catalog>,
cache: C,
service_limit_hit: U64Counter,
schema_conflict: U64Counter,
}
impl<C> SchemaValidator<C> {
@ -94,10 +97,25 @@ impl<C> SchemaValidator<C> {
/// `catalog`.
///
/// Schemas are cached in `ns_cache`.
pub fn new(catalog: Arc<dyn Catalog>, ns_cache: C) -> Self {
pub fn new(catalog: Arc<dyn Catalog>, ns_cache: C, metrics: &metric::Registry) -> Self {
let service_limit_hit = metrics
.register_metric::<U64Counter>(
"schema_validation_service_limit_reached",
"number of requests that have hit the namespace table/column limit",
)
.recorder(&[]);
let schema_conflict = metrics
.register_metric::<U64Counter>(
"schema_validation_schema_conflict",
"number of requests that fail due to a schema conflict",
)
.recorder(&[]);
Self {
catalog,
cache: ns_cache,
service_limit_hit,
schema_conflict,
}
}
}
@ -181,12 +199,14 @@ where
request_column_type=%new,
"schema conflict"
);
self.schema_conflict.inc(1);
SchemaError::Conflict(e)
}
// Service limits
CatalogError::ColumnCreateLimitError { .. }
| CatalogError::TableCreateLimitError { .. } => {
warn!(%namespace, error=%e, "service protection limit reached");
self.service_limit_hit.inc(1);
SchemaError::ServiceLimit(e)
}
e => {
@ -295,7 +315,12 @@ mod tests {
#[tokio::test]
async fn test_write_ok() {
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
handler
@ -313,7 +338,12 @@ mod tests {
#[tokio::test]
async fn test_write_schema_not_found() {
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
let ns = DatabaseName::try_from("A_DIFFERENT_NAMESPACE").unwrap();
@ -332,7 +362,12 @@ mod tests {
#[tokio::test]
async fn test_write_validation_failure() {
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64
@ -356,14 +391,18 @@ mod tests {
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type
assert_cache(&handler, "bananas", "time", ColumnType::Time);
assert_eq!(1, handler.schema_conflict.fetch());
}
#[tokio::test]
async fn test_write_table_service_limit() {
let catalog = create_catalog().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
Arc::clone(&catalog),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// First write sets the schema
@ -391,14 +430,17 @@ mod tests {
.expect_err("request should fail");
assert_matches!(err, SchemaError::ServiceLimit(_));
assert_eq!(1, handler.service_limit_hit.fetch());
}
#[tokio::test]
async fn test_write_column_service_limit() {
let catalog = create_catalog().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
Arc::clone(&catalog),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// First write sets the schema
@ -426,6 +468,7 @@ mod tests {
.expect_err("request should fail");
assert_matches!(err, SchemaError::ServiceLimit(_));
assert_eq!(1, handler.service_limit_hit.fetch());
}
#[tokio::test]
@ -434,7 +477,12 @@ mod tests {
const TABLE: &str = "bananas";
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),

View File

@ -101,7 +101,7 @@ impl TestContext {
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
);
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache);
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache, &*metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});