diff --git a/server/src/db.rs b/server/src/db.rs index eece23ef64..3f9ace91fa 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -235,6 +235,9 @@ pub struct Db { /// Metric labels metric_labels: Vec, + /// Metrics for tracking the number of errors that occur while ingesting data + ingest_errors: metrics::Counter, + /// Optionally connect to a write buffer for either buffering writes or reading buffered writes write_buffer: Option, @@ -267,6 +270,12 @@ impl Db { let store = Arc::clone(&database_to_commit.object_store); let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry); let metric_labels = database_to_commit.catalog.metric_labels.clone(); + + let ingest_domain = + metrics_registry.register_domain_with_labels("ingest", metric_labels.clone()); + let ingest_errors = + ingest_domain.register_counter_metric("errors", None, "Number of errors during ingest"); + let catalog = Arc::new(database_to_commit.catalog); let catalog_access = QueryCatalogAccess::new( @@ -294,6 +303,7 @@ impl Db { worker_iterations_lifecycle: AtomicUsize::new(0), worker_iterations_cleanup: AtomicUsize::new(0), metric_labels, + ingest_errors, write_buffer: database_to_commit.write_buffer, cleanup_lock: Default::default(), } @@ -584,8 +594,7 @@ impl Db { Ok(sequenced_entry) => sequenced_entry, Err(e) => { debug!(?e, "Error converting write buffer data to SequencedEntry"); - // TODO: add to metrics - // self.ingest_errors.add_with_labels(1, &labels); + self.ingest_errors.add(1); return; } }; @@ -597,8 +606,7 @@ impl Db { ?e, "Error storing SequencedEntry from write buffer in database" ); - // TODO: add to metrics - // self.ingest_errors.add_with_labels(1, &labels); + self.ingest_errors.add(1); } }) .await @@ -1095,6 +1103,53 @@ mod tests { assert_batches_eq!(expected, &batches); } + #[tokio::test] + async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { + let write_buffer = Arc::new(MockBufferForReading::new(vec![Err(String::from( + "Something bad happened on the way to creating a SequencedEntry", + ) + .into())])); + + let test_db = TestDb::builder() + .write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _)) + .build() + .await; + + let db = Arc::new(test_db.db); + let metrics = test_db.metric_registry; + + // do: start background task loop + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + // check: after a while the error should be reported in the database's metrics + let t_0 = Instant::now(); + loop { + let family = metrics.try_has_metric_family("ingest_errors_total"); + + if let Ok(metric) = family { + if metric + .with_labels(&[("db_name", "placeholder"), ("svr_id", "1")]) + .counter() + .eq(1.0) + .is_ok() + { + break; + } + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // do: stop background task loop + shutdown.cancel(); + join_handle.await.unwrap(); + } + #[tokio::test] async fn cant_write_when_reading_from_write_buffer() { // Validate that writes are rejected if this database is reading from the write buffer