chore: add some tracing logs to the ingester (#5839)

pull/24376/head
Luke Bond 2022-10-12 13:10:20 +01:00 committed by GitHub
parent b7153862b0
commit 11900cea4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 4 deletions

View File

@ -5,6 +5,7 @@ use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use schema::{selection::Selection, sort::SortKey};
use snafu::ResultExt;
use uuid::Uuid;
@ -232,19 +233,26 @@ impl PartitionData {
sequence_number: SequenceNumber,
mb: MutableBatch,
) -> Result<(), super::Error> {
match &mut self.data.buffer {
let (min_sequence_number, max_sequence_number) = match &mut self.data.buffer {
Some(buf) => {
buf.max_sequence_number = sequence_number.max(buf.max_sequence_number);
buf.data.extend_from(&mb).context(super::BufferWriteSnafu)?;
(buf.min_sequence_number, buf.max_sequence_number)
}
None => {
self.data.buffer = Some(BufferBatch {
min_sequence_number: sequence_number,
max_sequence_number: sequence_number,
data: mb,
})
});
(sequence_number, sequence_number)
}
}
};
trace!(
min_sequence_number=?min_sequence_number,
max_sequence_number=?max_sequence_number,
"buffered write"
);
Ok(())
}

View File

@ -4,6 +4,7 @@ use std::{collections::HashMap, sync::Arc};
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use write_summary::ShardProgress;
use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData};
@ -159,6 +160,11 @@ impl TableData {
// skip the write if it has already been persisted
if let Some(max) = partition_data.max_persisted_sequence_number() {
if max >= sequence_number {
trace!(
shard_id=%self.shard_id,
op_sequence_number=?sequence_number,
"skipping already-persisted write"
);
return Ok(false);
}
}

View File

@ -12,7 +12,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};
use iox_time::{Time, TimeProvider};
use metric::{Metric, U64Counter};
use observability_deps::tracing::{error, info, warn};
use observability_deps::tracing::{error, info, trace, warn};
use parking_lot::Mutex;
use tokio_util::sync::CancellationToken;
use tracker::TrackedFutureExt;
@ -97,6 +97,18 @@ impl LifecycleHandle for LifecycleHandleImpl {
stats.last_write = now;
stats.rows_written += rows_written;
trace!(
shard_id=%stats.shard_id,
partition_id=%stats.partition_id,
namespace_id=%stats.namespace_id,
table_id=%stats.table_id,
first_write=%stats.first_write,
last_write=%stats.last_write,
bytes_written=%stats.bytes_written,
first_sequence_number=?stats.first_sequence_number,
"logged write"
);
s.total_bytes += bytes_written;
// Pause if the server has exceeded the configured memory limit.
@ -538,6 +550,12 @@ impl LifecycleManager {
.map(|s| s.first_sequence_number)
.min()
.unwrap_or(sequence_number);
trace!(
min_unpersisted_sequence_number=?min,
shard_id=%shard_id,
sequence_number=?sequence_number,
"updated min_unpersisted_sequence_number for persisted shard"
);
persister
.update_min_unpersisted_sequence_number(shard_id, min)
.await;