Merge pull request #5348 from influxdata/cn/upgrade-l0-metrics

feat: Add metrics on the size of files created by ingestion and used for compaction
pull/24376/head
kodiakhq[bot] 2022-08-17 16:08:59 +00:00 committed by GitHub
commit 8eb3a79d7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 386 additions and 122 deletions

View File

@ -10,8 +10,8 @@ use iox_catalog::interface::{get_schema_by_id, Catalog};
use iox_query::exec::Executor;
use iox_time::TimeProvider;
use metric::{
Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Counter, U64Gauge,
DURATION_MAX,
Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Gauge, U64Histogram,
U64HistogramOptions, DURATION_MAX,
};
use observability_deps::tracing::debug;
use parquet_file::storage::ParquetStorage;
@ -98,20 +98,21 @@ pub struct Compactor {
/// Configuration options for the compactor
pub(crate) config: CompactorConfig,
/// Counter for the number of files compacted
pub(crate) compaction_counter: Metric<U64Counter>,
/// Gauge for the number of compaction partition candidates
/// Gauge for the number of compaction partition candidates before filtering
compaction_candidate_gauge: Metric<U64Gauge>,
/// Gauge for the number of Parquet file candidates. The recorded values have attributes for
/// the compaction level of the file and whether the file was selected for compaction or not.
/// Gauge for the number of Parquet file candidates after filtering. The recorded values have
/// attributes for the compaction level of the file and whether the file was selected for
/// compaction or not.
pub(crate) parquet_file_candidate_gauge: Metric<U64Gauge>,
/// Gauge for the number of bytes of Parquet file candidates. The recorded values have
/// attributes for the compaction level of the file and whether the file was selected for
/// compaction or not.
pub(crate) parquet_file_candidate_bytes_gauge: Metric<U64Gauge>,
/// Histogram for the number of bytes of Parquet files selected for compaction. The recorded
/// values have attributes for the compaction level of the file.
pub(crate) parquet_file_candidate_bytes: Metric<U64Histogram>,
/// After a successful compaction operation, track the sizes of the files that were used as the
/// inputs of the compaction operation by compaction level.
pub(crate) compaction_input_file_bytes: Metric<U64Histogram>,
/// Histogram for tracking the time to compact a partition
pub(crate) compaction_duration: Metric<DurationHistogram>,
@ -152,11 +153,6 @@ impl Compactor {
config: CompactorConfig,
registry: Arc<metric::Registry>,
) -> Self {
let compaction_counter = registry.register_metric(
"compactor_compacted_files_total",
"counter for the number of files compacted",
);
let compaction_candidate_gauge = registry.register_metric(
"compactor_candidates",
"gauge for the number of compaction candidates that are found when checked",
@ -167,9 +163,25 @@ impl Compactor {
"Number of Parquet file candidates",
);
let parquet_file_candidate_bytes_gauge = registry.register_metric(
let file_size_buckets = U64HistogramOptions::new([
500 * 1024, // 500 KB
1024 * 1024, // 1 MB
3 * 1024 * 1024, // 3 MB
10 * 1024 * 1024, // 10 MB
30 * 1024 * 1024, // 30 MB
u64::MAX, // Inf
]);
let parquet_file_candidate_bytes = registry.register_metric_with_options(
"parquet_file_candidate_bytes",
"Number of bytes of Parquet file candidates",
"Number of bytes of Parquet file compaction candidates",
|| file_size_buckets.clone(),
);
let compaction_input_file_bytes = registry.register_metric_with_options(
"compaction_input_file_bytes",
"Number of bytes of Parquet files used as inputs to a successful compaction operation",
|| file_size_buckets.clone(),
);
let duration_histogram_options = DurationHistogramOptions::new([
@ -215,10 +227,10 @@ impl Compactor {
time_provider,
backoff_config,
config,
compaction_counter,
compaction_candidate_gauge,
parquet_file_candidate_gauge,
parquet_file_candidate_bytes_gauge,
parquet_file_candidate_bytes,
compaction_input_file_bytes,
compaction_duration,
candidate_selection_duration,
partitions_extra_info_reading_duration,

View File

@ -66,7 +66,7 @@ pub(crate) async fn compact_hot_partition(
compactor.config.input_size_threshold_bytes(),
compactor.config.input_file_count_threshold(),
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes_gauge,
&compactor.parquet_file_candidate_bytes,
);
let compact_result = parquet_file_combining::compact_parquet_files(
@ -76,7 +76,7 @@ pub(crate) async fn compact_hot_partition(
compactor.store.clone(),
Arc::clone(&compactor.exec),
Arc::clone(&compactor.time_provider),
&compactor.compaction_counter,
&compactor.compaction_input_file_bytes,
compactor.config.max_desired_file_size_bytes(),
compactor.config.percentage_max_file_size(),
compactor.config.split_percentage(),
@ -120,7 +120,7 @@ pub(crate) async fn compact_cold_partition(
parquet_files_for_compaction,
compactor.config.cold_input_size_threshold_bytes(),
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes_gauge,
&compactor.parquet_file_candidate_bytes,
);
let compact_result =
@ -142,7 +142,7 @@ pub(crate) async fn compact_cold_partition(
compactor.store.clone(),
Arc::clone(&compactor.exec),
Arc::clone(&compactor.time_provider),
&compactor.compaction_counter,
&compactor.compaction_input_file_bytes,
compactor.config.max_desired_file_size_bytes(),
compactor.config.percentage_max_file_size(),
compactor.config.split_percentage(),
@ -712,7 +712,6 @@ mod tests {
//
// - the level 1 file that didn't overlap with anything
// - the newly created level 1 file that was only upgraded from level 0
// - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 2);
let files_and_levels: Vec<_> = files

View File

@ -11,7 +11,7 @@ use iox_query::{
QueryChunk,
};
use iox_time::TimeProvider;
use metric::{Attributes, Metric, U64Counter};
use metric::{Attributes, Metric, U64Histogram};
use observability_deps::tracing::*;
use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata, storage::ParquetStorage};
use schema::{sort::SortKey, Schema};
@ -72,8 +72,8 @@ pub(crate) async fn compact_parquet_files(
// Executor for running queries, compacting, and persisting
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
// Counter for the number of files compacted
compaction_counter: &Metric<U64Counter>,
// Histogram for the sizes of the files compacted
compaction_input_file_bytes: &Metric<U64Histogram>,
// Desired max size of compacted parquet files.
// It is a target desired value, rather than a guarantee.
max_desired_file_size_bytes: u64,
@ -99,16 +99,22 @@ pub(crate) async fn compact_parquet_files(
}
);
// Find the total size of all files.
let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum();
// Save all file sizes for recording metrics if this compaction succeeds.
let file_sizes: Vec<_> = files.iter().map(|f| f.file_size_bytes).collect();
// Find the total size of all files, to be used to determine if the result should be one file
// or if the result should be split into multiple files.
let total_size: i64 = file_sizes.iter().sum();
let total_size = total_size as u64;
let mut files_by_level = BTreeMap::new();
// Compute the number of files per compaction level for logging
let mut num_files_by_level = BTreeMap::new();
for compaction_level in files.iter().map(|f| f.compaction_level) {
*files_by_level.entry(compaction_level).or_default() += 1;
*num_files_by_level.entry(compaction_level).or_default() += 1;
}
let num_level_0 = files_by_level.get(&CompactionLevel::Initial).unwrap_or(&0);
let num_level_1 = files_by_level
let num_level_0 = num_files_by_level
.get(&CompactionLevel::Initial)
.unwrap_or(&0);
let num_level_1 = num_files_by_level
.get(&CompactionLevel::FileNonOverlapped)
.unwrap_or(&0);
debug!(
@ -317,11 +323,9 @@ pub(crate) async fn compact_parquet_files(
"sequencer_id",
format!("{}", partition.sequencer_id()).into(),
)]);
for (compaction_level, file_count) in files_by_level {
let mut attributes = attributes.clone();
attributes.insert("compaction_level", format!("{}", compaction_level as i32));
let compaction_counter = compaction_counter.recorder(attributes);
compaction_counter.inc(file_count);
let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes);
for size in file_sizes {
compaction_input_file_bytes.record(size as u64);
}
Ok(())
@ -462,8 +466,9 @@ mod tests {
use super::*;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
use data_types::{ColumnType, PartitionParam};
use data_types::{ColumnType, PartitionParam, SequencerId};
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use metric::U64HistogramOptions;
use parquet_file::ParquetFilePath;
use test_helpers::assert_error;
@ -485,6 +490,7 @@ mod tests {
const DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES: u64 = 100 * 1024 * 1024;
const DEFAULT_PERCENTAGE_MAX_FILE_SIZE: u16 = 30;
const DEFAULT_SPLIT_PERCENTAGE: u16 = 80;
const BUCKET_500_KB: u64 = 500 * 1024;
struct TestSetup {
catalog: Arc<TestCatalog>,
@ -614,11 +620,22 @@ mod tests {
}
}
fn metrics() -> Metric<U64Counter> {
fn metrics() -> Metric<U64Histogram> {
let registry = Arc::new(metric::Registry::new());
registry.register_metric(
"compactor_compacted_files_total",
"counter for the number of files compacted",
registry.register_metric_with_options(
"compaction_input_file_bytes",
"Number of bytes of Parquet files used as inputs to a successful compaction \
operation",
|| {
U64HistogramOptions::new([
BUCKET_500_KB, // 500 KB
1024 * 1024, // 1 MB
3 * 1024 * 1024, // 3 MB
10 * 1024 * 1024, // 10 MB
30 * 1024 * 1024, // 30 MB
u64::MAX, // Inf
])
},
)
}
@ -631,7 +648,8 @@ mod tests {
candidate_partition,
..
} = test_setup().await;
let compaction_counter = metrics();
let compaction_input_file_bytes = metrics();
let sequencer_id = candidate_partition.sequencer_id();
let files = vec![];
let result = compact_parquet_files(
@ -641,13 +659,22 @@ mod tests {
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::clone(&catalog.exec),
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
&compaction_counter,
&compaction_input_file_bytes,
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
DEFAULT_SPLIT_PERCENTAGE,
)
.await;
assert_error!(result, Error::NotEnoughParquetFiles { num_files: 0, .. });
// No metrics recorded because the compaction didn't succeed
assert_eq!(
extract_byte_metrics(&compaction_input_file_bytes, sequencer_id),
ExtractedByteMetrics {
sample_count: 0,
buckets_with_counts: vec![],
}
);
}
#[tokio::test]
@ -661,7 +688,8 @@ mod tests {
..
} = test_setup().await;
let table_id = candidate_partition.table_id();
let compaction_counter = metrics();
let compaction_input_file_bytes = metrics();
let sequencer_id = candidate_partition.sequencer_id();
let parquet_file = parquet_files.remove(0);
compact_parquet_files(
@ -671,7 +699,7 @@ mod tests {
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::clone(&catalog.exec),
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
&compaction_counter,
&compaction_input_file_bytes,
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
DEFAULT_SPLIT_PERCENTAGE,
@ -701,6 +729,15 @@ mod tests {
(7, CompactionLevel::FileNonOverlapped),
]
);
// Verify the metrics
assert_eq!(
extract_byte_metrics(&compaction_input_file_bytes, sequencer_id),
ExtractedByteMetrics {
sample_count: 1,
buckets_with_counts: vec![(BUCKET_500_KB, 1)],
}
);
}
#[tokio::test]
@ -713,7 +750,8 @@ mod tests {
candidate_partition,
parquet_files,
} = test_setup().await;
let compaction_counter = metrics();
let compaction_input_file_bytes = metrics();
let sequencer_id = candidate_partition.sequencer_id();
compact_parquet_files(
parquet_files.into_iter().take(4).collect(),
@ -722,7 +760,7 @@ mod tests {
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::clone(&catalog.exec),
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
&compaction_counter,
&compaction_input_file_bytes,
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
DEFAULT_SPLIT_PERCENTAGE,
@ -752,6 +790,15 @@ mod tests {
]
);
// Verify the metrics
assert_eq!(
extract_byte_metrics(&compaction_input_file_bytes, sequencer_id),
ExtractedByteMetrics {
sample_count: 4,
buckets_with_counts: vec![(BUCKET_500_KB, 4)],
}
);
// ------------------------------------------------
// Verify the parquet file content
@ -787,7 +834,8 @@ mod tests {
candidate_partition,
parquet_files,
} = test_setup().await;
let compaction_counter = metrics();
let compaction_input_file_bytes = metrics();
let sequencer_id = candidate_partition.sequencer_id();
compact_parquet_files(
parquet_files.into_iter().take(5).collect(),
@ -796,7 +844,7 @@ mod tests {
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::clone(&catalog.exec),
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
&compaction_counter,
&compaction_input_file_bytes,
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
DEFAULT_SPLIT_PERCENTAGE,
@ -825,6 +873,15 @@ mod tests {
]
);
// Verify the metrics
assert_eq!(
extract_byte_metrics(&compaction_input_file_bytes, sequencer_id),
ExtractedByteMetrics {
sample_count: 5,
buckets_with_counts: vec![(BUCKET_500_KB, 4), (u64::MAX, 1)],
}
);
// ------------------------------------------------
// Verify the parquet file content
@ -873,7 +930,8 @@ mod tests {
candidate_partition,
parquet_files,
} = test_setup().await;
let compaction_counter = metrics();
let compaction_input_file_bytes = metrics();
let sequencer_id = candidate_partition.sequencer_id();
let files_to_compact: Vec<_> = parquet_files.into_iter().take(5).collect();
@ -888,7 +946,7 @@ mod tests {
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::clone(&catalog.exec),
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
&compaction_counter,
&compaction_input_file_bytes,
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
split_percentage,
@ -908,7 +966,7 @@ mod tests {
.collect();
// 1 large file not included in compaction,
// 1 newly created CompactionLevel::FileNonOverlapped file as the result of
// compaction and NOT splitting
// compaction and NOT splitting
assert_eq!(
files_and_levels,
vec![
@ -917,6 +975,15 @@ mod tests {
]
);
// Verify the metrics
assert_eq!(
extract_byte_metrics(&compaction_input_file_bytes, sequencer_id),
ExtractedByteMetrics {
sample_count: 5,
buckets_with_counts: vec![(BUCKET_500_KB, 4), (u64::MAX, 1)],
}
);
// ------------------------------------------------
// Verify the parquet file content
@ -952,7 +1019,8 @@ mod tests {
candidate_partition,
parquet_files,
} = test_setup().await;
let compaction_counter = metrics();
let compaction_input_file_bytes = metrics();
let sequencer_id = candidate_partition.sequencer_id();
compact_parquet_files(
parquet_files,
@ -961,7 +1029,7 @@ mod tests {
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::clone(&catalog.exec),
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
&compaction_counter,
&compaction_input_file_bytes,
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
DEFAULT_SPLIT_PERCENTAGE,
@ -988,6 +1056,15 @@ mod tests {
]
);
// Verify the metrics
assert_eq!(
extract_byte_metrics(&compaction_input_file_bytes, sequencer_id),
ExtractedByteMetrics {
sample_count: 6,
buckets_with_counts: vec![(BUCKET_500_KB, 4), (u64::MAX, 2)],
}
);
// ------------------------------------------------
// Verify the parquet file content
@ -1061,4 +1138,42 @@ mod tests {
.await
.unwrap()
}
#[derive(Debug, PartialEq)]
struct ExtractedByteMetrics {
sample_count: u64,
buckets_with_counts: Vec<(u64, u64)>,
}
fn extract_byte_metrics(
metric: &Metric<U64Histogram>,
sequencer_id: SequencerId,
) -> ExtractedByteMetrics {
let attributes = Attributes::from([("sequencer_id", format!("{}", sequencer_id).into())]);
let (sample_count, buckets_with_counts) =
if let Some(observer) = metric.get_observer(&attributes) {
let observer = observer.fetch();
let mut buckets_with_counts: Vec<_> = observer
.buckets
.iter()
.filter_map(|o| {
if o.count == 0 {
None
} else {
Some((o.le, o.count))
}
})
.collect();
buckets_with_counts.sort();
(observer.sample_count(), buckets_with_counts)
} else {
(0, vec![])
};
ExtractedByteMetrics {
sample_count,
buckets_with_counts,
}
}
}

View File

@ -3,7 +3,7 @@
use crate::parquet_file_lookup::ParquetFilesForCompaction;
use data_types::ParquetFile;
use metric::{Attributes, Metric, U64Gauge};
use metric::{Attributes, Metric, U64Gauge, U64Histogram};
use observability_deps::tracing::*;
/// Given a list of hot level 0 files sorted by max sequence number and a list of level 1 files for
@ -28,8 +28,8 @@ pub(crate) fn filter_hot_parquet_files(
input_file_count_threshold: usize,
// Gauge for the number of Parquet file candidates
parquet_file_candidate_gauge: &Metric<U64Gauge>,
// Gauge for the number of bytes of Parquet file candidates
parquet_file_candidate_bytes_gauge: &Metric<U64Gauge>,
// Histogram for the number of bytes of Parquet file candidates
parquet_file_candidate_bytes: &Metric<U64Histogram>,
) -> Vec<ParquetFile> {
let ParquetFilesForCompaction {
level_0,
@ -118,9 +118,15 @@ pub(crate) fn filter_hot_parquet_files(
num_level_1_compacting as u64,
);
record_byte_metrics(
parquet_file_candidate_bytes_gauge,
total_level_0_bytes as u64,
total_level_1_bytes as u64,
parquet_file_candidate_bytes,
level_0_to_return
.iter()
.map(|pf| pf.file_size_bytes as u64)
.collect(),
files_to_return
.iter()
.map(|pf| pf.file_size_bytes as u64)
.collect(),
);
// Return the level 1 files first, followed by the level 0 files assuming we've maintained
@ -148,8 +154,8 @@ pub(crate) fn filter_cold_parquet_files(
max_bytes: u64,
// Gauge for the number of Parquet file candidates
parquet_file_candidate_gauge: &Metric<U64Gauge>,
// Gauge for the number of bytes of Parquet file candidates
parquet_file_candidate_bytes_gauge: &Metric<U64Gauge>,
// Histogram for the number of bytes of Parquet file candidates
parquet_file_candidate_bytes: &Metric<U64Histogram>,
) -> Vec<ParquetFile> {
let ParquetFilesForCompaction {
level_0,
@ -231,9 +237,15 @@ pub(crate) fn filter_cold_parquet_files(
num_level_1_compacting as u64,
);
record_byte_metrics(
parquet_file_candidate_bytes_gauge,
total_level_0_bytes as u64,
total_level_1_bytes as u64,
parquet_file_candidate_bytes,
level_0_to_return
.iter()
.map(|pf| pf.file_size_bytes as u64)
.collect(),
files_to_return
.iter()
.map(|pf| pf.file_size_bytes as u64)
.collect(),
);
// Return the level 1 files first, followed by the level 0 files assuming we've maintained
@ -284,17 +296,21 @@ fn record_file_metrics(
}
fn record_byte_metrics(
gauge: &Metric<U64Gauge>,
total_level_0_bytes: u64,
total_level_1_bytes: u64,
histogram: &Metric<U64Histogram>,
level_0_sizes: Vec<u64>,
level_1_sizes: Vec<u64>,
) {
let attributes = Attributes::from(&[("compaction_level", "0")]);
let recorder = gauge.recorder(attributes);
recorder.set(total_level_0_bytes);
let recorder = histogram.recorder(attributes);
for size in level_0_sizes {
recorder.record(size);
}
let attributes = Attributes::from(&[("compaction_level", "1")]);
let recorder = gauge.recorder(attributes);
recorder.set(total_level_1_bytes);
let recorder = histogram.recorder(attributes);
for size in level_1_sizes {
recorder.record(size);
}
}
#[cfg(test)]
@ -304,9 +320,13 @@ mod tests {
ColumnSet, CompactionLevel, NamespaceId, ParquetFileId, PartitionId, SequenceNumber,
SequencerId, TableId, Timestamp,
};
use metric::{ObservationBucket, U64HistogramOptions};
use std::sync::Arc;
use uuid::Uuid;
const BUCKET_500_KB: u64 = 500 * 1024;
const BUCKET_1_MB: u64 = 1024 * 1024;
#[test]
fn test_overlaps_in_time() {
assert_overlap((1, 3), (2, 4));
@ -359,7 +379,7 @@ mod tests {
);
}
fn metrics() -> (Metric<U64Gauge>, Metric<U64Gauge>) {
fn metrics() -> (Metric<U64Gauge>, Metric<U64Histogram>) {
let registry = Arc::new(metric::Registry::new());
let parquet_file_candidate_gauge = registry.register_metric(
@ -367,15 +387,22 @@ mod tests {
"Number of Parquet file candidates",
);
let parquet_file_candidate_bytes_gauge = registry.register_metric(
let parquet_file_candidate_bytes = registry.register_metric_with_options(
"parquet_file_candidate_bytes",
"Number of bytes of Parquet file candidates",
|| {
U64HistogramOptions::new([
BUCKET_500_KB, // 500 KB
BUCKET_1_MB, // 1 MB
3 * 1024 * 1024, // 3 MB
10 * 1024 * 1024, // 10 MB
30 * 1024 * 1024, // 30 MB
u64::MAX, // Inf
])
},
);
(
parquet_file_candidate_gauge,
parquet_file_candidate_bytes_gauge,
)
(parquet_file_candidate_gauge, parquet_file_candidate_bytes)
}
mod hot {
@ -552,7 +579,7 @@ mod tests {
.id(106)
.min_time(340)
.max_time(360)
.file_size_bytes(10)
.file_size_bytes(BUCKET_500_KB as i64 + 1) // exercise metrics
.build(),
// Does not overlap any level 0, times are too late
ParquetFileBuilder::level_1()
@ -589,8 +616,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 10,
level_1: 20,
level_0_sample_count: 1,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)],
level_1_sample_count: 2,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
}
);
@ -619,8 +648,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 20,
level_1: 40,
level_0_sample_count: 2,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
level_1_sample_count: 4,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)],
}
);
@ -649,8 +680,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 20,
level_1: 40,
level_0_sample_count: 2,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
level_1_sample_count: 4,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)],
}
);
@ -678,8 +711,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 30,
level_1: 50,
level_0_sample_count: 3,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 3)],
level_1_sample_count: 5,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4), (BUCKET_1_MB, 1)],
}
);
@ -708,8 +743,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 10,
level_1: 20,
level_0_sample_count: 1,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)],
level_1_sample_count: 2,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
}
);
@ -738,8 +775,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 10,
level_1: 20,
level_0_sample_count: 1,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)],
level_1_sample_count: 2,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
}
);
@ -768,8 +807,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 20,
level_1: 40,
level_0_sample_count: 2,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
level_1_sample_count: 4,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)],
}
);
}
@ -968,8 +1009,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 30,
level_1: 0,
level_0_sample_count: 3,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 3)],
level_1_sample_count: 0,
level_1_buckets_with_counts: vec![],
}
);
}
@ -1076,8 +1119,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 10,
level_1: 20,
level_0_sample_count: 1,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)],
level_1_sample_count: 2,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
}
);
@ -1105,8 +1150,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 20,
level_1: 40,
level_0_sample_count: 2,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
level_1_sample_count: 4,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)],
}
);
@ -1134,8 +1181,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 20,
level_1: 40,
level_0_sample_count: 2,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)],
level_1_sample_count: 4,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)],
}
);
@ -1161,8 +1210,10 @@ mod tests {
assert_eq!(
extract_byte_metrics(&bytes_metric),
ExtractedByteMetrics {
level_0: 30,
level_1: 50,
level_0_sample_count: 3,
level_0_buckets_with_counts: vec![(BUCKET_500_KB, 3)],
level_1_sample_count: 5,
level_1_buckets_with_counts: vec![(BUCKET_500_KB, 5)],
}
);
}
@ -1304,21 +1355,42 @@ mod tests {
#[derive(Debug, PartialEq)]
struct ExtractedByteMetrics {
level_0: u64,
level_1: u64,
level_0_sample_count: u64,
level_0_buckets_with_counts: Vec<(u64, u64)>,
level_1_sample_count: u64,
level_1_buckets_with_counts: Vec<(u64, u64)>,
}
fn extract_byte_metrics(metric: &Metric<U64Gauge>) -> ExtractedByteMetrics {
fn extract_byte_metrics(metric: &Metric<U64Histogram>) -> ExtractedByteMetrics {
let bucket_filter = |o: &ObservationBucket<u64>| {
if o.count == 0 {
None
} else {
Some((o.le, o.count))
}
};
let level_0 = metric
.get_observer(&Attributes::from(&[("compaction_level", "0")]))
.unwrap()
.fetch();
let mut level_0_buckets_with_counts: Vec<_> =
level_0.buckets.iter().filter_map(bucket_filter).collect();
level_0_buckets_with_counts.sort();
let level_1 = metric
.get_observer(&Attributes::from(&[("compaction_level", "1")]))
.unwrap()
.fetch();
let mut level_1_buckets_with_counts: Vec<_> =
level_1.buckets.iter().filter_map(bucket_filter).collect();
level_1_buckets_with_counts.sort();
ExtractedByteMetrics { level_0, level_1 }
ExtractedByteMetrics {
level_0_sample_count: level_0.sample_count(),
level_0_buckets_with_counts,
level_1_sample_count: level_1.sample_count(),
level_1_buckets_with_counts,
}
}
}

View File

@ -19,7 +19,7 @@ use futures::{Stream, StreamExt};
use iox_catalog::interface::{get_table_schema_by_id, Catalog};
use iox_query::exec::Executor;
use iox_time::SystemProvider;
use metric::U64Counter;
use metric::{Attributes, Metric, U64Counter, U64Histogram, U64HistogramOptions};
use mutable_batch::MutableBatch;
use object_store::DynObjectStore;
use observability_deps::tracing::{debug, warn};
@ -94,6 +94,9 @@ pub struct IngesterData {
/// Backoff config
backoff_config: BackoffConfig,
/// Metrics for file size of persisted Parquet files
persisted_file_size_bytes: Metric<U64Histogram>,
}
impl IngesterData {
@ -104,13 +107,30 @@ impl IngesterData {
sequencers: BTreeMap<SequencerId, SequencerData>,
exec: Arc<Executor>,
backoff_config: BackoffConfig,
metrics: Arc<metric::Registry>,
) -> Self {
let persisted_file_size_bytes = metrics.register_metric_with_options(
"ingester_persisted_file_size_bytes",
"Size of files persisted by the ingester",
|| {
U64HistogramOptions::new([
500 * 1024, // 500 KB
1024 * 1024, // 1 MB
3 * 1024 * 1024, // 3 MB
10 * 1024 * 1024, // 10 MB
30 * 1024 * 1024, // 30 MB
u64::MAX, // Inf
])
},
);
Self {
store: ParquetStorage::new(object_store),
catalog,
sequencers,
exec,
backoff_config,
persisted_file_size_bytes,
}
}
@ -344,6 +364,15 @@ impl Persister for IngesterData {
.await
.expect("retry forever");
// Record metrics
let attributes = Attributes::from([(
"sequencer_id",
format!("{}", partition_info.partition.sequencer_id).into(),
)]);
self.persisted_file_size_bytes
.recorder(attributes)
.record(file_size as u64);
// and remove the persisted data from memory
debug!(
?partition_id,
@ -1566,8 +1595,8 @@ pub(crate) type IngesterQueryPartitionStream =
/// Response streams for querier<>ingester requests.
///
/// The data structure is constructed to allow lazy/streaming data generation. For easier consumption according to the
/// wire protocol, use the [`flatten`](Self::flatten) method.
/// The data structure is constructed to allow lazy/streaming data generation. For easier
/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method.
pub struct IngesterQueryResponse {
/// Stream of partitions.
partitions: IngesterQueryPartitionStream,
@ -1649,13 +1678,15 @@ pub enum FlatIngesterQueryResponse {
/// Start a new snapshot.
///
/// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) message.
/// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition)
/// message.
StartSnapshot {
/// Snapshot schema.
schema: Arc<arrow::datatypes::Schema>,
},
/// Add a record batch to the snapshot that was announced by the last [`StartSnapshot`](Self::StartSnapshot) message.
/// Add a record batch to the snapshot that was announced by the last
/// [`StartSnapshot`](Self::StartSnapshot) message.
RecordBatch {
/// Record batch.
batch: RecordBatch,
@ -1796,6 +1827,7 @@ mod tests {
sequencers,
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
@ -1889,6 +1921,7 @@ mod tests {
sequencers,
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
@ -1943,7 +1976,7 @@ mod tests {
let manager = LifecycleManager::new(
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)),
metrics,
Arc::clone(&metrics),
Arc::new(SystemProvider::new()),
);
@ -2016,6 +2049,30 @@ mod tests {
assert_eq!(pf.sequencer_id, sequencer1.id);
assert!(pf.to_delete.is_none());
// This value should be recorded in the metrics asserted next; it is less than 500 KB
assert_eq!(pf.file_size_bytes, 1252);
// verify metrics
let persisted_file_size_bytes: Metric<U64Histogram> = metrics
.get_instrument("ingester_persisted_file_size_bytes")
.unwrap();
let observation = persisted_file_size_bytes
.get_observer(&Attributes::from([(
"sequencer_id",
format!("{}", sequencer1.id).into(),
)]))
.unwrap()
.fetch();
assert_eq!(observation.sample_count(), 1);
let buckets_with_counts: Vec<_> = observation
.buckets
.iter()
.filter_map(|o| if o.count == 0 { None } else { Some(o.le) })
.collect();
// Only the < 500 KB bucket has a count
assert_eq!(buckets_with_counts, &[500 * 1024]);
// verify it set a sort key on the partition in the catalog
let partition_info = repos
.partitions()
@ -2083,6 +2140,7 @@ mod tests {
sequencers,
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
@ -2284,7 +2342,8 @@ mod tests {
50, // max time of data to get deleted
"city=Boston", // delete predicate
);
// two rows will get deleted, one from existing snapshot, one from the buffer being moved to snpashot
// two rows will get deleted, one from existing snapshot, one from the buffer being moved
// to snpashot
p.buffer_tombstone(&exec, "restaurant", ts).await;
// verify data
@ -2341,9 +2400,11 @@ mod tests {
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
// no snpashots becasue buffer has not data yet and the sanpshot was empty too
// no snpashots becasue buffer has not data yet and the
// snapshot was empty too
assert_eq!(p.data.snapshots.len(), 0);
assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is persisting
assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is
// persisting
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
// ------------------------------------------
@ -2362,7 +2423,7 @@ mod tests {
assert_eq!(
p.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(8)
); // 1 newlly added mutable batch of 3 rows of data
); // 1 newly added mutable batch of 3 rows of data
assert_eq!(p.data.snapshots.len(), 0); // still empty
assert_eq!(p.data.deletes_during_persisting.len(), 1);
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
@ -2622,6 +2683,7 @@ mod tests {
sequencers,
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
));
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);

View File

@ -157,6 +157,7 @@ impl IngestHandlerImpl {
sequencers,
exec,
BackoffConfig::default(),
Arc::clone(&metric_registry),
));
let ingester_data = Arc::clone(&data);

View File

@ -701,13 +701,14 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
sequencers,
exec,
backoff::BackoffConfig::default(),
metrics,
)
}
pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData {
// Whatever data because they won't be used in the tests
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let object_store = Arc::new(InMemory::new());
let exec = Arc::new(iox_query::exec::Executor::new(1));
@ -745,6 +746,7 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
sequencers,
exec,
backoff::BackoffConfig::default(),
metrics,
)
}

View File

@ -704,6 +704,7 @@ impl MockIngester {
sequencers,
catalog.exec(),
BackoffConfig::default(),
catalog.metric_registry(),
));
Self {