fix: only emit ttbr metric for applied ops (#5854)

* fix: only emit ttbr metric for applied ops

* fix: move DmlApplyAction to s/w accessible

* chore: test for skipped ingest; comments and log improvements

* fix: fixed ingester test re skipping write

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Luke Bond 2022-10-14 13:06:49 +01:00 committed by GitHub
parent efb964c390
commit 475c8a0704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 133 additions and 71 deletions

View File

@ -172,7 +172,7 @@ impl IngesterData {
shard_id: ShardId,
dml_operation: DmlOperation,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool> {
) -> Result<DmlApplyAction> {
let shard_data = self
.shards
.get(&shard_id)
@ -541,6 +541,16 @@ impl Persister for IngesterData {
}
}
/// A successful DML apply operation can perform one of these actions
#[derive(Clone, Copy, Debug)]
pub enum DmlApplyAction {
/// The DML operation was successful; bool indicates if ingestion should be paused
Applied(bool),
/// The DML operation was skipped because it has already been applied
Skipped,
}
#[cfg(test)]
mod tests {
use std::{ops::DerefMut, sync::Arc, time::Duration};
@ -634,7 +644,7 @@ mod tests {
metrics,
Arc::new(SystemProvider::new()),
);
let should_pause = data
let action = data
.buffer_operation(
shard1.id,
DmlOperation::Write(w1.clone()),
@ -642,12 +652,12 @@ mod tests {
)
.await
.unwrap();
assert!(!should_pause);
let should_pause = data
assert_matches!(action, DmlApplyAction::Applied(false));
let action = data
.buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle())
.await
.unwrap();
assert!(should_pause);
assert_matches!(action, DmlApplyAction::Applied(true));
}
#[tokio::test]
@ -715,13 +725,13 @@ mod tests {
Arc::new(SystemProvider::new()),
);
let should_pause = data
let action = data
.buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle())
.await
.unwrap();
// Exceeding the row count doesn't pause ingest (like other partition
// limits)
assert!(!should_pause);
assert_matches!(action, DmlApplyAction::Applied(false));
let (table_id, partition_id) = {
let sd = data.shards.get(&shard1.id).unwrap();
@ -1291,7 +1301,7 @@ mod tests {
// w1 should be ignored because the per-partition replay offset is set
// to 1 already, so it shouldn't be buffered and the buffer should
// remain empty.
let should_pause = data
let action = data
.buffer_operation(DmlOperation::Write(w1), &catalog, &manager.handle())
.await
.unwrap();
@ -1305,7 +1315,7 @@ mod tests {
);
assert!(p.data.buffer.is_none());
}
assert!(!should_pause);
assert_matches!(action, DmlApplyAction::Skipped);
// w2 should be in the buffer
data.buffer_operation(DmlOperation::Write(w2), &catalog, &manager.handle())

View File

@ -17,7 +17,7 @@ use super::{
partition::resolver::PartitionProvider,
table::{TableData, TableName},
};
use crate::lifecycle::LifecycleHandle;
use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle};
/// A double-referenced map where [`TableData`] can be looked up by name, or ID.
#[derive(Debug, Default)]
@ -177,7 +177,7 @@ impl NamespaceData {
dml_operation: DmlOperation,
catalog: &Arc<dyn Catalog>,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool, super::Error> {
) -> Result<DmlApplyAction, super::Error> {
let sequence_number = dml_operation
.meta()
.sequence()
@ -194,6 +194,7 @@ impl NamespaceData {
match dml_operation {
DmlOperation::Write(write) => {
let mut pause_writes = false;
let mut all_skipped = true;
// Extract the partition key derived by the router.
let partition_key = write
@ -211,7 +212,7 @@ impl NamespaceData {
{
// lock scope
let mut table_data = table_data.write().await;
let should_pause = table_data
let action = table_data
.buffer_table_write(
sequence_number,
b,
@ -219,13 +220,21 @@ impl NamespaceData {
lifecycle_handle,
)
.await?;
pause_writes = pause_writes || should_pause;
if let DmlApplyAction::Applied(should_pause) = action {
pause_writes = pause_writes || should_pause;
all_skipped = false;
}
}
#[cfg(test)]
self.test_triggers.on_write().await;
}
Ok(pause_writes)
if all_skipped {
Ok(DmlApplyAction::Skipped)
} else {
// at least some were applied
Ok(DmlApplyAction::Applied(pause_writes))
}
}
DmlOperation::Delete(delete) => {
// Deprecated delete support:
@ -239,7 +248,7 @@ impl NamespaceData {
"discarding unsupported delete op"
);
Ok(false)
Ok(DmlApplyAction::Applied(false))
}
}
}

View File

@ -13,6 +13,7 @@ use write_summary::ShardProgress;
use super::{
namespace::{NamespaceData, NamespaceName},
partition::resolver::PartitionProvider,
DmlApplyAction,
};
use crate::lifecycle::LifecycleHandle;
@ -99,7 +100,7 @@ impl ShardData {
dml_operation: DmlOperation,
catalog: &Arc<dyn Catalog>,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool, super::Error> {
) -> Result<DmlApplyAction, super::Error> {
let namespace_data = match self.namespace(&NamespaceName::from(dml_operation.namespace())) {
Some(d) => d,
None => {

View File

@ -8,7 +8,7 @@ use observability_deps::tracing::*;
use write_summary::ShardProgress;
use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData};
use crate::{lifecycle::LifecycleHandle, querier_handler::PartitionStatus};
use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle, querier_handler::PartitionStatus};
/// A double-referenced map where [`PartitionData`] can be looked up by
/// [`PartitionKey`], or ID.
@ -137,7 +137,7 @@ impl TableData {
batch: MutableBatch,
partition_key: PartitionKey,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool, super::Error> {
) -> Result<DmlApplyAction, super::Error> {
let partition_data = match self.partition_data.by_key.get_mut(&partition_key) {
Some(p) => p,
None => {
@ -165,7 +165,7 @@ impl TableData {
op_sequence_number=?sequence_number,
"skipping already-persisted write"
);
return Ok(false);
return Ok(DmlApplyAction::Skipped);
}
}
@ -188,7 +188,7 @@ impl TableData {
rows,
);
Ok(should_pause)
Ok(DmlApplyAction::Applied(should_pause))
}
/// Return the [`PartitionData`] for the specified ID.
@ -332,7 +332,7 @@ mod tests {
assert!(table.partition_data.by_id_mut(PARTITION_ID).is_none());
// Write some test data
let pause = table
let action = table
.buffer_table_write(
SequenceNumber::new(42),
batch,
@ -341,7 +341,7 @@ mod tests {
)
.await
.expect("buffer op should succeed");
assert!(!pause);
assert_matches!(action, DmlApplyAction::Applied(false));
// Referencing the partition should succeed
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());
@ -394,7 +394,7 @@ mod tests {
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none());
// Write some test data
let pause = table
let action = table
.buffer_table_write(
SequenceNumber::new(42),
batch,
@ -403,7 +403,7 @@ mod tests {
)
.await
.expect("buffer op should succeed");
assert!(!pause);
assert_matches!(action, DmlApplyAction::Applied(false));
// Referencing the partition should succeed
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());

View File

@ -12,7 +12,10 @@ use tokio_util::sync::CancellationToken;
use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
use super::DmlSink;
use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl};
use crate::{
data::DmlApplyAction,
lifecycle::{LifecycleHandle, LifecycleHandleImpl},
};
/// When the [`LifecycleManager`] indicates that ingest should be paused because
/// of memory pressure, the shard will loop, sleeping this long between
@ -384,7 +387,7 @@ something clever.",
op.meta().duration_since_production(&self.time_provider);
let should_pause = match self.sink.apply(op).await {
Ok(should_pause) => {
Ok(DmlApplyAction::Applied(should_pause)) => {
trace!(
kafka_topic=%self.topic_name,
shard_index=%self.shard_index,
@ -393,8 +396,31 @@ something clever.",
?op_sequence_number,
"successfully applied dml operation"
);
// we only want to report the TTBR if anything was applied
if let Some(delta) = duration_since_production {
// Update the TTBR metric before potentially sleeping.
self.time_to_be_readable.set(delta);
trace!(
kafka_topic=%self.topic_name,
shard_index=%self.shard_index,
shard_id=%self.shard_id,
delta=%delta.as_millis(),
"reporting TTBR for shard (ms)"
);
}
should_pause
}
Ok(DmlApplyAction::Skipped) => {
trace!(
kafka_topic=%self.topic_name,
shard_index=%self.shard_index,
shard_id=%self.shard_id,
false,
?op_sequence_number,
"did not apply dml operation (op was already persisted previously)"
);
false
}
Err(e) => {
error!(
error=%e,
@ -410,18 +436,6 @@ something clever.",
}
};
if let Some(delta) = duration_since_production {
// Update the TTBR metric before potentially sleeping.
self.time_to_be_readable.set(delta);
trace!(
kafka_topic=%self.topic_name,
shard_index=%self.shard_index,
shard_id=%self.shard_id,
delta=%delta.as_millis(),
"reporting TTBR for shard (ms)"
);
}
if should_pause {
// The lifecycle manager may temporarily pause ingest - wait for
// persist operations to shed memory pressure if needed.
@ -772,7 +786,7 @@ mod tests {
stream_ops = vec![
vec![Ok(DmlOperation::Write(make_write("bananas", 42)))]
],
sink_rets = [Ok(true)],
sink_rets = [Ok(DmlApplyAction::Applied(true))],
want_ttbr = 42,
want_reset = 0,
want_err_metrics = [],
@ -788,7 +802,7 @@ mod tests {
stream_ops = vec![
vec![Ok(DmlOperation::Delete(make_delete("platanos", 24)))]
],
sink_rets = [Ok(true)],
sink_rets = [Ok(DmlApplyAction::Applied(true))],
want_ttbr = 24,
want_reset = 0,
want_err_metrics = [],
@ -806,7 +820,7 @@ mod tests {
Err(WriteBufferError::new(WriteBufferErrorKind::IO, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 13)))
]],
sink_rets = [Ok(true)],
sink_rets = [Ok(DmlApplyAction::Applied(true))],
want_ttbr = 13,
want_reset = 0,
want_err_metrics = [
@ -829,7 +843,7 @@ mod tests {
Err(WriteBufferError::new(WriteBufferErrorKind::SequenceNumberNoLongerExists, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 31)))
]],
sink_rets = [Ok(true)],
sink_rets = [Ok(DmlApplyAction::Applied(true))],
want_ttbr = 31,
want_reset = 0,
want_err_metrics = [
@ -858,7 +872,7 @@ mod tests {
],
vec![Ok(DmlOperation::Write(make_write("bananas", 31)))],
],
sink_rets = [Ok(true)],
sink_rets = [Ok(DmlApplyAction::Applied(true))],
want_ttbr = 31,
want_reset = 1,
want_err_metrics = [
@ -880,7 +894,7 @@ mod tests {
Err(WriteBufferError::new(WriteBufferErrorKind::InvalidData, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 50)))
]],
sink_rets = [Ok(true)],
sink_rets = [Ok(DmlApplyAction::Applied(true))],
want_ttbr = 50,
want_reset = 0,
want_err_metrics = [
@ -902,7 +916,7 @@ mod tests {
Err(WriteBufferError::new(WriteBufferErrorKind::Unknown, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 60)))
]],
sink_rets = [Ok(true)],
sink_rets = [Ok(DmlApplyAction::Applied(true))],
want_ttbr = 60,
want_reset = 0,
want_err_metrics = [
@ -932,7 +946,7 @@ mod tests {
want_sink = []
);
// Asserts the TTBR is uses the last value in the stream.
// Asserts the TTBR used is the last value in the stream.
test_stream_handler!(
reports_last_ttbr,
skip_to_oldest_available = false,
@ -942,7 +956,7 @@ mod tests {
Ok(DmlOperation::Write(make_write("bananas", 3))),
Ok(DmlOperation::Write(make_write("bananas", 42))),
]],
sink_rets = [Ok(true), Ok(false), Ok(true), Ok(false),],
sink_rets = [Ok(DmlApplyAction::Applied(true)), Ok(DmlApplyAction::Applied(false)), Ok(DmlApplyAction::Applied(true)), Ok(DmlApplyAction::Applied(false)),],
want_ttbr = 42,
want_reset = 0,
want_err_metrics = [
@ -967,7 +981,7 @@ mod tests {
]],
sink_rets = [
Err(crate::data::Error::NamespaceNotFound{namespace: "bananas".to_string() }),
Ok(true),
Ok(DmlApplyAction::Applied(true)),
],
want_ttbr = 2,
want_reset = 0,
@ -986,6 +1000,21 @@ mod tests {
}
);
test_stream_handler!(
skipped_op_no_ttbr,
skip_to_oldest_available = false,
stream_ops = vec![vec![Ok(DmlOperation::Write(make_write("some_op", 1)))]],
sink_rets = [Ok(DmlApplyAction::Skipped)],
want_ttbr = 0,
want_reset = 0,
want_err_metrics = [],
want_sink = [
DmlOperation::Write(op),
] => {
assert_eq!(op.namespace(), "some_op");
}
);
#[derive(Debug)]
struct EmptyWriteBufferStreamHandler {}

View File

@ -5,11 +5,12 @@ use dml::DmlOperation;
use parking_lot::Mutex;
use super::DmlSink;
use crate::data::DmlApplyAction;
#[derive(Debug, Default)]
struct MockDmlSinkState {
calls: Vec<DmlOperation>,
ret: VecDeque<Result<bool, crate::data::Error>>,
ret: VecDeque<Result<DmlApplyAction, crate::data::Error>>,
}
#[derive(Debug, Default)]
@ -20,7 +21,7 @@ pub struct MockDmlSink {
impl MockDmlSink {
pub fn with_apply_return(
self,
ret: impl Into<VecDeque<Result<bool, crate::data::Error>>>,
ret: impl Into<VecDeque<Result<DmlApplyAction, crate::data::Error>>>,
) -> Self {
self.state.lock().ret = ret.into();
self
@ -33,7 +34,7 @@ impl MockDmlSink {
#[async_trait]
impl DmlSink for MockDmlSink {
async fn apply(&self, op: DmlOperation) -> Result<bool, crate::data::Error> {
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
let mut state = self.state.lock();
state.calls.push(op);
state.ret.pop_front().expect("no mock sink value to return")

View File

@ -3,12 +3,15 @@ use std::{fmt::Debug, ops::Deref, sync::Arc};
use async_trait::async_trait;
use dml::DmlOperation;
use crate::data::DmlApplyAction;
/// A [`DmlSink`] handles [`DmlOperation`] instances read from a shard.
#[async_trait]
pub(crate) trait DmlSink: Debug + Send + Sync {
/// Apply `op` read from a shard, returning `Ok(true)` if ingest should
/// be paused.
async fn apply(&self, op: DmlOperation) -> Result<bool, crate::data::Error>;
/// Apply `op` read from a shard, returning `Ok(DmlApplyAction::Applied(bool))`, the bool indicating if the
/// ingest should be paused. Returns `Ok(DmlApplyAction::Skipped)` if the operation has been
/// applied previously and was skipped.
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error>;
}
#[async_trait]
@ -16,7 +19,7 @@ impl<T> DmlSink for Arc<T>
where
T: DmlSink,
{
async fn apply(&self, op: DmlOperation) -> Result<bool, crate::data::Error> {
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
self.deref().apply(op).await
}
}

View File

@ -7,7 +7,10 @@ use data_types::ShardId;
use dml::DmlOperation;
use super::DmlSink;
use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
use crate::{
data::{DmlApplyAction, IngesterData},
lifecycle::LifecycleHandleImpl,
};
/// Provides a [`DmlSink`] implementation for a [`IngesterData`] instance.
#[derive(Debug)]
@ -35,7 +38,7 @@ impl IngestSinkAdaptor {
#[async_trait]
impl DmlSink for IngestSinkAdaptor {
async fn apply(&self, op: DmlOperation) -> Result<bool, crate::data::Error> {
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
self.ingest_data
.buffer_operation(self.shard_id, op, &self.lifecycle_handle)
.await

View File

@ -9,6 +9,8 @@ use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, U64Counter, U64Gauge};
use trace::span::{SpanExt, SpanRecorder};
use crate::data::DmlApplyAction;
use super::DmlSink;
/// A [`WatermarkFetcher`] abstracts a source of the write buffer high watermark
@ -155,7 +157,7 @@ where
T: DmlSink,
P: TimeProvider,
{
async fn apply(&self, op: DmlOperation) -> Result<bool, crate::data::Error> {
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
let meta = op.meta();
// Immediately increment the "bytes read" metric as it records the
@ -292,9 +294,9 @@ mod tests {
async fn test(
op: impl Into<DmlOperation> + Send,
metrics: &metric::Registry,
with_sink_return: Result<bool, crate::data::Error>,
with_sink_return: Result<DmlApplyAction, crate::data::Error>,
with_fetcher_return: Option<i64>,
) -> Result<bool, crate::data::Error> {
) -> Result<DmlApplyAction, crate::data::Error> {
let op = op.into();
let inner = MockDmlSink::default().with_apply_return([with_sink_return]);
let instrumentation = SinkInstrumentation::new(
@ -342,8 +344,8 @@ mod tests {
);
let op = make_write(meta);
let got = test(op, &metrics, Ok(true), Some(12345)).await;
assert_matches!(got, Ok(true));
let got = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(12345)).await;
assert_matches!(got, Ok(DmlApplyAction::Applied(true)));
// Validate the various write buffer metrics
assert_matches!(
@ -487,8 +489,8 @@ mod tests {
);
let op = make_write(meta);
let got = test(op, &metrics, Ok(true), None).await;
assert_matches!(got, Ok(true));
let got = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), None).await;
assert_matches!(got, Ok(DmlApplyAction::Applied(true)));
// Validate the various write buffer metrics
assert_matches!(
@ -556,8 +558,8 @@ mod tests {
);
let op = make_write(meta);
let got = test(op, &metrics, Ok(true), Some(1)).await;
assert_matches!(got, Ok(true));
let got = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(1)).await;
assert_matches!(got, Ok(DmlApplyAction::Applied(true)));
// Validate the various write buffer metrics
assert_matches!(
@ -617,7 +619,7 @@ mod tests {
let meta = DmlMeta::unsequenced(None);
let op = make_write(meta);
let _ = test(op, &metrics, Ok(true), Some(12345)).await;
let _ = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(12345)).await;
}
// The instrumentation emits per-shard metrics, so upon observing an op
@ -639,6 +641,6 @@ mod tests {
);
let op = make_write(meta);
let _ = test(op, &metrics, Ok(true), Some(12345)).await;
let _ = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(12345)).await;
}
}

View File

@ -14,7 +14,9 @@ use generated_types::{
};
use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError};
use ingester::{
data::{partition::resolver::CatalogPartitionResolver, IngesterData, Persister},
data::{
partition::resolver::CatalogPartitionResolver, DmlApplyAction, IngesterData, Persister,
},
lifecycle::mock_handle::MockLifecycleHandle,
querier_handler::{prepare_data_to_querier, FlatIngesterQueryResponse, IngesterQueryResponse},
};
@ -721,12 +723,14 @@ impl MockIngester {
async fn buffer_operation(&mut self, dml_operation: DmlOperation) {
let lifecycle_handle = MockLifecycleHandle::default();
let should_pause = self
let action = self
.ingester_data
.buffer_operation(self.shard.shard.id, dml_operation, &lifecycle_handle)
.await
.unwrap();
assert!(!should_pause);
if let DmlApplyAction::Applied(should_pause) = action {
assert!(!should_pause);
}
}
/// Persists the given set of partitions.