Merge pull request #4175 from influxdata/dom/schema-metrics

feat: schema validation metrics
pull/24376/head
kodiakhq[bot] 2022-03-30 14:17:42 +00:00 committed by GitHub
commit a1339aa194
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 25 deletions

View File

@ -156,7 +156,7 @@ pub async fn create_router2_server_type(
)
.await?;
let write_buffer =
InstrumentationDecorator::new("sharded_write_buffer", Arc::clone(&metrics), write_buffer);
InstrumentationDecorator::new("sharded_write_buffer", &*metrics, write_buffer);
// Initialise an instrumented namespace cache to be shared with the schema
// validator, and namespace auto-creator that reports cache hit/miss/update
@ -169,17 +169,17 @@ pub async fn create_router2_server_type(
));
// Initialise and instrument the schema validator
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let schema_validator =
InstrumentationDecorator::new("schema_validator", Arc::clone(&metrics), schema_validator);
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics);
let schema_validator =
InstrumentationDecorator::new("schema_validator", &*metrics, schema_validator);
// Add a write partitioner into the handler stack that splits by the date
// portion of the write's timestamp.
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let partitioner =
InstrumentationDecorator::new("partitioner", Arc::clone(&metrics), partitioner);
let partitioner = InstrumentationDecorator::new("partitioner", &*metrics, partitioner);
////////////////////////////////////////////////////////////////////////////
//
@ -247,13 +247,12 @@ pub async fn create_router2_server_type(
// operation.
.and_then(InstrumentationDecorator::new(
"parallel_write",
Arc::clone(&metrics),
&*metrics,
FanOutAdaptor::new(write_buffer),
));
// Record the overall request handling latency
let handler_stack =
InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack);
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);
// Initialise the API delegates, sharing the handler stack between them.
let handler_stack = Arc::new(handler_stack);

View File

@ -60,7 +60,8 @@ fn e2e_benchmarks(c: &mut Criterion) {
));
let write_buffer = init_write_buffer(1);
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});

View File

@ -46,7 +46,7 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let validator = SchemaValidator::new(catalog, ns_cache);
let validator = SchemaValidator::new(catalog, ns_cache, &*metrics);
for i in 0..65_000 {
let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str());

View File

@ -2,7 +2,6 @@ use super::DmlHandler;
use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use metric::{Metric, U64Histogram, U64HistogramOptions};
use std::sync::Arc;
use time::{SystemProvider, TimeProvider};
use trace::{ctx::SpanContext, span::SpanRecorder};
@ -27,7 +26,7 @@ pub struct InstrumentationDecorator<T, P = SystemProvider> {
impl<T> InstrumentationDecorator<T> {
/// Wrap a new [`InstrumentationDecorator`] over `T` exposing metrics
/// labelled with `handler=name`.
pub fn new(name: &'static str, registry: Arc<metric::Registry>, inner: T) -> Self {
pub fn new(name: &'static str, registry: &metric::Registry, inner: T) -> Self {
let buckets = || {
U64HistogramOptions::new([
5,
@ -215,7 +214,7 @@ mod tests {
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler);
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler);
decorator
.write(&ns, (), Some(span))
@ -238,7 +237,7 @@ mod tests {
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler);
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler);
let err = decorator
.write(&ns, (), Some(span))
@ -260,7 +259,7 @@ mod tests {
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler);
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler);
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
@ -288,7 +287,7 @@ mod tests {
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let decorator = InstrumentationDecorator::new(HANDLER_NAME, Arc::clone(&metrics), handler);
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &*metrics, handler);
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),

View File

@ -7,6 +7,7 @@ use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError},
validate_or_insert_schema,
};
use metric::U64Counter;
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use std::{ops::DerefMut, sync::Arc};
@ -85,8 +86,10 @@ pub enum SchemaError {
#[derive(Debug)]
pub struct SchemaValidator<C = Arc<InstrumentedCache<MemoryNamespaceCache>>> {
catalog: Arc<dyn Catalog>,
cache: C,
service_limit_hit: U64Counter,
schema_conflict: U64Counter,
}
impl<C> SchemaValidator<C> {
@ -94,10 +97,25 @@ impl<C> SchemaValidator<C> {
/// `catalog`.
///
/// Schemas are cached in `ns_cache`.
pub fn new(catalog: Arc<dyn Catalog>, ns_cache: C) -> Self {
pub fn new(catalog: Arc<dyn Catalog>, ns_cache: C, metrics: &metric::Registry) -> Self {
let service_limit_hit = metrics
.register_metric::<U64Counter>(
"schema_validation_service_limit_reached",
"number of requests that have hit the namespace table/column limit",
)
.recorder(&[]);
let schema_conflict = metrics
.register_metric::<U64Counter>(
"schema_validation_schema_conflict",
"number of requests that fail due to a schema conflict",
)
.recorder(&[]);
Self {
catalog,
cache: ns_cache,
service_limit_hit,
schema_conflict,
}
}
}
@ -181,12 +199,14 @@ where
request_column_type=%new,
"schema conflict"
);
self.schema_conflict.inc(1);
SchemaError::Conflict(e)
}
// Service limits
CatalogError::ColumnCreateLimitError { .. }
| CatalogError::TableCreateLimitError { .. } => {
warn!(%namespace, error=%e, "service protection limit reached");
self.service_limit_hit.inc(1);
SchemaError::ServiceLimit(e)
}
e => {
@ -295,7 +315,12 @@ mod tests {
#[tokio::test]
async fn test_write_ok() {
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
handler
@ -313,7 +338,12 @@ mod tests {
#[tokio::test]
async fn test_write_schema_not_found() {
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
let ns = DatabaseName::try_from("A_DIFFERENT_NAMESPACE").unwrap();
@ -332,7 +362,12 @@ mod tests {
#[tokio::test]
async fn test_write_validation_failure() {
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64
@ -356,14 +391,18 @@ mod tests {
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type
assert_cache(&handler, "bananas", "time", ColumnType::Time);
assert_eq!(1, handler.schema_conflict.fetch());
}
#[tokio::test]
async fn test_write_table_service_limit() {
let catalog = create_catalog().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
Arc::clone(&catalog),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// First write sets the schema
@ -391,14 +430,17 @@ mod tests {
.expect_err("request should fail");
assert_matches!(err, SchemaError::ServiceLimit(_));
assert_eq!(1, handler.service_limit_hit.fetch());
}
#[tokio::test]
async fn test_write_column_service_limit() {
let catalog = create_catalog().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
Arc::clone(&catalog),
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
// First write sets the schema
@ -426,6 +468,7 @@ mod tests {
.expect_err("request should fail");
assert_matches!(err, SchemaError::ServiceLimit(_));
assert_eq!(1, handler.service_limit_hit.fetch());
}
#[tokio::test]
@ -434,7 +477,12 @@ mod tests {
const TABLE: &str = "bananas";
let catalog = create_catalog().await;
let handler = SchemaValidator::new(catalog, Arc::new(MemoryNamespaceCache::default()));
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog,
Arc::new(MemoryNamespaceCache::default()),
&*metrics,
);
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),

View File

@ -101,7 +101,7 @@ impl TestContext {
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
);
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache);
let schema_validator = SchemaValidator::new(Arc::clone(&catalog), ns_cache, &*metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
@ -111,8 +111,7 @@ impl TestContext {
.and_then(partitioner)
.and_then(FanOutAdaptor::new(sharded_write_buffer));
let handler_stack =
InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack);
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);
let delegate = HttpDelegate::new(1024, Arc::new(handler_stack), &metrics);