feat: skip ingester buffering if INFLUXDB_IOX_INGESTER_SKIP_BUFFER is set (#5115)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
5c5c964dfe
commit
64b6b4fd6f
|
@ -5,6 +5,7 @@ use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
|
|||
use async_trait::async_trait;
|
||||
use data_types::SequencerId;
|
||||
use dml::DmlOperation;
|
||||
use observability_deps::tracing::warn;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Provides a [`DmlSink`] implementation for a [`IngesterData`] instance.
|
||||
|
@ -13,6 +14,8 @@ pub struct IngestSinkAdaptor {
|
|||
ingest_data: Arc<IngesterData>,
|
||||
lifecycle_handle: LifecycleHandleImpl,
|
||||
sequencer_id: SequencerId,
|
||||
/// Skip buffering
|
||||
skip_buffering: bool,
|
||||
}
|
||||
|
||||
impl IngestSinkAdaptor {
|
||||
|
@ -23,10 +26,28 @@ impl IngestSinkAdaptor {
|
|||
lifecycle_handle: LifecycleHandleImpl,
|
||||
sequencer_id: SequencerId,
|
||||
) -> Self {
|
||||
// Temporary setting (see
|
||||
// https://github.com/influxdata/conductor/issues/1034) that
|
||||
// can be used to have the ingesters read from kafka, but
|
||||
// ignore (do not buffer or persist)
|
||||
let env_name = "INFLUXDB_IOX_INGESTER_SKIP_BUFFER";
|
||||
let skip_buffering = match std::env::var(env_name) {
|
||||
Ok(s) if s.to_lowercase() == "true" => {
|
||||
warn!(name=%env_name, value=%s, "Skipping buffering due to environment request");
|
||||
true
|
||||
}
|
||||
Ok(s) => {
|
||||
warn!(name=%env_name, value=%s, "Unknown value for environment request. Expected 'true'");
|
||||
false
|
||||
}
|
||||
Err(_) => false,
|
||||
};
|
||||
|
||||
Self {
|
||||
ingest_data,
|
||||
lifecycle_handle,
|
||||
sequencer_id,
|
||||
skip_buffering,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,8 +55,13 @@ impl IngestSinkAdaptor {
|
|||
#[async_trait]
|
||||
impl DmlSink for IngestSinkAdaptor {
|
||||
async fn apply(&self, op: DmlOperation) -> Result<bool, crate::data::Error> {
|
||||
self.ingest_data
|
||||
.buffer_operation(self.sequencer_id, op, &self.lifecycle_handle)
|
||||
.await
|
||||
if self.skip_buffering {
|
||||
// 'false' means do not pause
|
||||
Ok(false)
|
||||
} else {
|
||||
self.ingest_data
|
||||
.buffer_operation(self.sequencer_id, op, &self.lifecycle_handle)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue