From 11900cea4d2fde77d33ccaf8569c9422f888fc2e Mon Sep 17 00:00:00 2001 From: Luke Bond Date: Wed, 12 Oct 2022 13:10:20 +0100 Subject: [PATCH] chore: add some tracing logs to the ingester (#5839) --- ingester/src/data/partition.rs | 14 +++++++++++--- ingester/src/data/table.rs | 6 ++++++ ingester/src/lifecycle.rs | 20 +++++++++++++++++++- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 6e31899d82..61dd4c36d2 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use arrow::record_batch::RecordBatch; use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId}; use mutable_batch::MutableBatch; +use observability_deps::tracing::*; use schema::{selection::Selection, sort::SortKey}; use snafu::ResultExt; use uuid::Uuid; @@ -232,19 +233,26 @@ impl PartitionData { sequence_number: SequenceNumber, mb: MutableBatch, ) -> Result<(), super::Error> { - match &mut self.data.buffer { + let (min_sequence_number, max_sequence_number) = match &mut self.data.buffer { Some(buf) => { buf.max_sequence_number = sequence_number.max(buf.max_sequence_number); buf.data.extend_from(&mb).context(super::BufferWriteSnafu)?; + (buf.min_sequence_number, buf.max_sequence_number) } None => { self.data.buffer = Some(BufferBatch { min_sequence_number: sequence_number, max_sequence_number: sequence_number, data: mb, - }) + }); + (sequence_number, sequence_number) } - } + }; + trace!( + min_sequence_number=?min_sequence_number, + max_sequence_number=?max_sequence_number, + "buffered write" + ); Ok(()) } diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 709055dd88..8ebaa7a192 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId}; use mutable_batch::MutableBatch; +use observability_deps::tracing::*; use write_summary::ShardProgress; use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData}; @@ -159,6 +160,11 @@ impl TableData { // skip the write if it has already been persisted if let Some(max) = partition_data.max_persisted_sequence_number() { if max >= sequence_number { + trace!( + shard_id=%self.shard_id, + op_sequence_number=?sequence_number, + "skipping already-persisted write" + ); return Ok(false); } } diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 01b9ff2f33..d15389ed60 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -12,7 +12,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, TableId}; use iox_time::{Time, TimeProvider}; use metric::{Metric, U64Counter}; -use observability_deps::tracing::{error, info, warn}; +use observability_deps::tracing::{error, info, trace, warn}; use parking_lot::Mutex; use tokio_util::sync::CancellationToken; use tracker::TrackedFutureExt; @@ -97,6 +97,18 @@ impl LifecycleHandle for LifecycleHandleImpl { stats.last_write = now; stats.rows_written += rows_written; + trace!( + shard_id=%stats.shard_id, + partition_id=%stats.partition_id, + namespace_id=%stats.namespace_id, + table_id=%stats.table_id, + first_write=%stats.first_write, + last_write=%stats.last_write, + bytes_written=%stats.bytes_written, + first_sequence_number=?stats.first_sequence_number, + "logged write" + ); + s.total_bytes += bytes_written; // Pause if the server has exceeded the configured memory limit. @@ -538,6 +550,12 @@ impl LifecycleManager { .map(|s| s.first_sequence_number) .min() .unwrap_or(sequence_number); + trace!( + min_unpersisted_sequence_number=?min, + shard_id=%shard_id, + sequence_number=?sequence_number, + "updated min_unpersisted_sequence_number for persisted shard" + ); persister .update_min_unpersisted_sequence_number(shard_id, min) .await;