Merge pull request #1879 from influxdata/cn/kafka-read-metrics-and-e2e-tests

feat: Collect metrics on failed ingest from Kafka
pull/24376/head
kodiakhq[bot] 2021-07-08 20:46:57 +00:00 committed by GitHub
commit 299269a161
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 4 deletions

View File

@ -235,6 +235,9 @@ pub struct Db {
/// Metric labels
metric_labels: Vec<KeyValue>,
/// Metrics for tracking the number of errors that occur while ingesting data
ingest_errors: metrics::Counter,
/// Optionally connect to a write buffer for either buffering writes or reading buffered writes
write_buffer: Option<WriteBufferConfig>,
@ -267,6 +270,12 @@ impl Db {
let store = Arc::clone(&database_to_commit.object_store);
let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry);
let metric_labels = database_to_commit.catalog.metric_labels.clone();
let ingest_domain =
metrics_registry.register_domain_with_labels("ingest", metric_labels.clone());
let ingest_errors =
ingest_domain.register_counter_metric("errors", None, "Number of errors during ingest");
let catalog = Arc::new(database_to_commit.catalog);
let catalog_access = QueryCatalogAccess::new(
@ -294,6 +303,7 @@ impl Db {
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
metric_labels,
ingest_errors,
write_buffer: database_to_commit.write_buffer,
cleanup_lock: Default::default(),
}
@ -584,8 +594,7 @@ impl Db {
Ok(sequenced_entry) => sequenced_entry,
Err(e) => {
debug!(?e, "Error converting write buffer data to SequencedEntry");
// TODO: add to metrics
// self.ingest_errors.add_with_labels(1, &labels);
self.ingest_errors.add(1);
return;
}
};
@ -597,8 +606,7 @@ impl Db {
?e,
"Error storing SequencedEntry from write buffer in database"
);
// TODO: add to metrics
// self.ingest_errors.add_with_labels(1, &labels);
self.ingest_errors.add(1);
}
})
.await
@ -1095,6 +1103,53 @@ mod tests {
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() {
let write_buffer = Arc::new(MockBufferForReading::new(vec![Err(String::from(
"Something bad happened on the way to creating a SequencedEntry",
)
.into())]));
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _))
.build()
.await;
let db = Arc::new(test_db.db);
let metrics = test_db.metric_registry;
// do: start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// check: after a while the error should be reported in the database's metrics
let t_0 = Instant::now();
loop {
let family = metrics.try_has_metric_family("ingest_errors_total");
if let Ok(metric) = family {
if metric
.with_labels(&[("db_name", "placeholder"), ("svr_id", "1")])
.counter()
.eq(1.0)
.is_ok()
{
break;
}
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
}
#[tokio::test]
async fn cant_write_when_reading_from_write_buffer() {
// Validate that writes are rejected if this database is reading from the write buffer