diff --git a/ioxd/src/server_type/router2.rs b/ioxd/src/server_type/router2.rs index b16acda70a..d76c33e523 100644 --- a/ioxd/src/server_type/router2.rs +++ b/ioxd/src/server_type/router2.rs @@ -156,7 +156,7 @@ pub async fn create_router2_server_type( ) .await?; let write_buffer = - InstrumentationDecorator::new("sharded_write_buffer", Arc::clone(&metrics), write_buffer); + InstrumentationDecorator::new("sharded_write_buffer", &*metrics, write_buffer); // Initialise an instrumented namespace cache to be shared with the schema // validator, and namespace auto-creator that reports cache hit/miss/update @@ -169,17 +169,17 @@ pub async fn create_router2_server_type( )); // Initialise and instrument the schema validator - let schema_validator = SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); let schema_validator = - InstrumentationDecorator::new("schema_validator", Arc::clone(&metrics), schema_validator); + SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics); + let schema_validator = + InstrumentationDecorator::new("schema_validator", &*metrics, schema_validator); // Add a write partitioner into the handler stack that splits by the date // portion of the write's timestamp. let partitioner = Partitioner::new(PartitionTemplate { parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], }); - let partitioner = - InstrumentationDecorator::new("partitioner", Arc::clone(&metrics), partitioner); + let partitioner = InstrumentationDecorator::new("partitioner", &*metrics, partitioner); //////////////////////////////////////////////////////////////////////////// // @@ -247,13 +247,12 @@ pub async fn create_router2_server_type( // operation. .and_then(InstrumentationDecorator::new( "parallel_write", - Arc::clone(&metrics), + &*metrics, FanOutAdaptor::new(write_buffer), )); // Record the overall request handling latency - let handler_stack = - InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack); + let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); // Initialise the API delegates, sharing the handler stack between them. let handler_stack = Arc::new(handler_stack); diff --git a/router2/benches/e2e.rs b/router2/benches/e2e.rs index db3cbb28ed..0b67b2311c 100644 --- a/router2/benches/e2e.rs +++ b/router2/benches/e2e.rs @@ -60,7 +60,8 @@ fn e2e_benchmarks(c: &mut Criterion) { )); let write_buffer = init_write_buffer(1); - let schema_validator = SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let schema_validator = + SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics); let partitioner = Partitioner::new(PartitionTemplate { parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], }); diff --git a/router2/benches/schema_validator.rs b/router2/benches/schema_validator.rs index 0899597406..6665d69dd8 100644 --- a/router2/benches/schema_validator.rs +++ b/router2/benches/schema_validator.rs @@ -46,7 +46,7 @@ fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: let ns_cache = Arc::new(ShardedCache::new( iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), )); - let validator = SchemaValidator::new(catalog, ns_cache); + let validator = SchemaValidator::new(catalog, ns_cache, &*metrics); for i in 0..65_000 { let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str()); diff --git a/router2/src/dml_handlers/instrumentation.rs b/router2/src/dml_handlers/instrumentation.rs index 7ae88c84f2..6c4bfabc43 100644 --- a/router2/src/dml_handlers/instrumentation.rs +++ b/router2/src/dml_handlers/instrumentation.rs @@ -2,7 +2,6 @@ use super::DmlHandler; use async_trait::async_trait; use data_types2::{DatabaseName, DeletePredicate}; use metric::{Metric, U64Histogram, U64HistogramOptions}; -use std::sync::Arc; use time::{SystemProvider, TimeProvider}; use trace::{ctx::SpanContext, span::SpanRecorder}; @@ -27,7 +26,7 @@ pub struct InstrumentationDecorator { impl InstrumentationDecorator { /// Wrap a new [`InstrumentationDecorator`] over `T` exposing metrics /// labelled with `handler=name`. - pub fn new(name: &'static str, registry: Arc, inner: T) -> Self { + pub fn new(name: &'static str, registry: &metric::Registry, inner: T) -> Self { let buckets = || { U64HistogramOptions::new([ 5, @@ -215,7 +214,7 @@ mod tests { let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); let span = SpanContext::new(Arc::clone(&traces)); - let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler); + let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); decorator .write(&ns, (), Some(span)) @@ -238,7 +237,7 @@ mod tests { let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); let span = SpanContext::new(Arc::clone(&traces)); - let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler); + let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); let err = decorator .write(&ns, (), Some(span)) @@ -260,7 +259,7 @@ mod tests { let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); let span = SpanContext::new(Arc::clone(&traces)); - let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler); + let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); let pred = DeletePredicate { range: TimestampRange::new(1, 2), @@ -288,7 +287,7 @@ mod tests { let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); let span = SpanContext::new(Arc::clone(&traces)); - let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler); + let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler); let pred = DeletePredicate { range: TimestampRange::new(1, 2), diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index a899e0368e..b3ee23d68a 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -7,6 +7,7 @@ use iox_catalog::{ interface::{get_schema_by_name, Catalog, Error as CatalogError}, validate_or_insert_schema, }; +use metric::U64Counter; use mutable_batch::MutableBatch; use observability_deps::tracing::*; use std::{ops::DerefMut, sync::Arc}; @@ -85,8 +86,10 @@ pub enum SchemaError { #[derive(Debug)] pub struct SchemaValidator>> { catalog: Arc, - cache: C, + + service_limit_hit: U64Counter, + schema_conflict: U64Counter, } impl SchemaValidator { @@ -94,10 +97,25 @@ impl SchemaValidator { /// `catalog`. /// /// Schemas are cached in `ns_cache`. - pub fn new(catalog: Arc, ns_cache: C) -> Self { + pub fn new(catalog: Arc, ns_cache: C, metrics: &metric::Registry) -> Self { + let service_limit_hit = metrics + .register_metric::( + "schema_validation_service_limit_reached", + "number of requests that have hit the namespace table/column limit", + ) + .recorder(&[]); + let schema_conflict = metrics + .register_metric::( + "schema_validation_schema_conflict", + "number of requests that fail due to a schema conflict", + ) + .recorder(&[]); + Self { catalog, cache: ns_cache, + service_limit_hit, + schema_conflict, } } } @@ -181,12 +199,14 @@ where request_column_type=%new, "schema conflict" ); + self.schema_conflict.inc(1); SchemaError::Conflict(e) } // Service limits CatalogError::ColumnCreateLimitError { .. } | CatalogError::TableCreateLimitError { .. } => { warn!(%namespace, error=%e, "service protection limit reached"); + self.service_limit_hit.inc(1); SchemaError::ServiceLimit(e) } e => { @@ -295,7 +315,12 @@ mod tests { #[tokio::test] async fn test_write_ok() { let catalog = create_catalog().await; - let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default())); + let metrics = Arc::new(metric::Registry::default()); + let handler = SchemaValidator::new( + catalog, + Arc::new(MemoryNamespaceCache::default()), + &*metrics, + ); let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); handler @@ -313,7 +338,12 @@ mod tests { #[tokio::test] async fn test_write_schema_not_found() { let catalog = create_catalog().await; - let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default())); + let metrics = Arc::new(metric::Registry::default()); + let handler = SchemaValidator::new( + catalog, + Arc::new(MemoryNamespaceCache::default()), + &*metrics, + ); let ns = DatabaseName::try_from("A_DIFFERENT_NAMESPACE").unwrap(); @@ -332,7 +362,12 @@ mod tests { #[tokio::test] async fn test_write_validation_failure() { let catalog = create_catalog().await; - let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default())); + let metrics = Arc::new(metric::Registry::default()); + let handler = SchemaValidator::new( + catalog, + Arc::new(MemoryNamespaceCache::default()), + &*metrics, + ); // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64 @@ -356,14 +391,18 @@ mod tests { assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type assert_cache(&handler, "bananas", "time", ColumnType::Time); + + assert_eq!(1, handler.schema_conflict.fetch()); } #[tokio::test] async fn test_write_table_service_limit() { let catalog = create_catalog().await; + let metrics = Arc::new(metric::Registry::default()); let handler = SchemaValidator::new( Arc::clone(&catalog), Arc::new(MemoryNamespaceCache::default()), + &*metrics, ); // First write sets the schema @@ -391,14 +430,17 @@ mod tests { .expect_err("request should fail"); assert_matches!(err, SchemaError::ServiceLimit(_)); + assert_eq!(1, handler.service_limit_hit.fetch()); } #[tokio::test] async fn test_write_column_service_limit() { let catalog = create_catalog().await; + let metrics = Arc::new(metric::Registry::default()); let handler = SchemaValidator::new( Arc::clone(&catalog), Arc::new(MemoryNamespaceCache::default()), + &*metrics, ); // First write sets the schema @@ -426,6 +468,7 @@ mod tests { .expect_err("request should fail"); assert_matches!(err, SchemaError::ServiceLimit(_)); + assert_eq!(1, handler.service_limit_hit.fetch()); } #[tokio::test] @@ -434,7 +477,12 @@ mod tests { const TABLE: &str = "bananas"; let catalog = create_catalog().await; - let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default())); + let metrics = Arc::new(metric::Registry::default()); + let handler = SchemaValidator::new( + catalog, + Arc::new(MemoryNamespaceCache::default()), + &*metrics, + ); let predicate = DeletePredicate { range: TimestampRange::new(1, 2), diff --git a/router2/tests/http.rs b/router2/tests/http.rs index a60a6df5b5..676110a959 100644 --- a/router2/tests/http.rs +++ b/router2/tests/http.rs @@ -101,7 +101,7 @@ impl TestContext { iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), ); - let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache); + let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache, &*metrics); let partitioner = Partitioner::new(PartitionTemplate { parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], }); @@ -111,8 +111,7 @@ impl TestContext { .and_then(partitioner) .and_then(FanOutAdaptor::new(sharded_write_buffer)); - let handler_stack = - InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack); + let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); let delegate = HttpDelegate::new(1024, Arc::new(handler_stack), &metrics);