Merge pull request #7797 from influxdata/dom/file-metrics

feat(metrics): Parquet file attribute statistics
pull/24376/head
Dom 2023-05-16 14:35:25 +01:00 committed by GitHub
commit f6c3531df7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 552 additions and 51 deletions

View File

@ -36,8 +36,8 @@ use crate::{
ingest_state::IngestState,
ingester_id::IngesterId,
persist::{
completion_observer::NopObserver, handle::PersistHandle,
hot_partitions::HotPartitionPersister,
completion_observer::NopObserver, file_metrics::ParquetFileInstrumentation,
handle::PersistHandle, hot_partitions::HotPartitionPersister,
},
query::{
exec_instrumentation::QueryExecInstrumentation,
@ -292,7 +292,9 @@ where
persist_executor,
object_store,
Arc::clone(&catalog),
NopObserver::default(),
// Register a post-persistence observer that emits Parquet file
// attributes as metrics.
ParquetFileInstrumentation::new(NopObserver::default(), &metrics),
&metrics,
);
let persist_handle = Arc::new(persist_handle);

View File

@ -1,7 +1,9 @@
use std::{fmt::Debug, sync::Arc};
use std::{fmt::Debug, sync::Arc, time::Duration};
use async_trait::async_trait;
use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId, PartitionId, TableId};
use data_types::{
sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, PartitionId, TableId,
};
/// An abstract observer of persistence completion events.
///
@ -24,10 +26,8 @@ pub(crate) trait PersistCompletionObserver: Send + Sync + Debug {
/// A set of details describing the persisted data.
#[derive(Debug)]
pub struct CompletedPersist {
/// The catalog identifiers for the persisted partition.
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
/// The catalog metadata for the persist operation.
meta: ParquetFileParams,
/// The [`SequenceNumberSet`] of the persisted data.
sequence_numbers: SequenceNumberSet,
@ -35,33 +35,26 @@ pub struct CompletedPersist {
impl CompletedPersist {
/// Construct a new completion notification.
pub(crate) fn new(
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
sequence_numbers: SequenceNumberSet,
) -> Self {
pub(crate) fn new(meta: ParquetFileParams, sequence_numbers: SequenceNumberSet) -> Self {
Self {
namespace_id,
table_id,
partition_id,
meta,
sequence_numbers,
}
}
/// Returns the [`NamespaceId`] of the persisted data.
pub(crate) fn namespace_id(&self) -> NamespaceId {
self.namespace_id
self.meta.namespace_id
}
/// Returns the [`TableId`] of the persisted data.
pub(crate) fn table_id(&self) -> TableId {
self.table_id
self.meta.table_id
}
/// Returns the [`PartitionId`] of the persisted data.
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
self.meta.partition_id
}
/// Returns the [`SequenceNumberSet`] of the persisted data.
@ -85,6 +78,31 @@ impl CompletedPersist {
.map(|v| v.into_sequence_numbers())
.unwrap_or_else(|v| v.sequence_numbers().clone())
}
/// The number of rows persisted.
pub fn row_count(&self) -> usize {
self.meta.row_count as _
}
/// The number of columns persisted.
pub fn column_count(&self) -> usize {
self.meta.column_set.len()
}
/// The byte size of the generated Parquet file.
pub fn parquet_file_bytes(&self) -> usize {
self.meta.file_size_bytes as _
}
/// The duration of time covered by this file (difference between min
/// timestamp, and max timestamp).
pub fn timestamp_range(&self) -> Duration {
let min = iox_time::Time::from(self.meta.min_time);
let max = iox_time::Time::from(self.meta.max_time);
max.checked_duration_since(min)
.expect("parquet min/max file timestamp difference is negative")
}
}
/// A no-op implementation of the [`PersistCompletionObserver`] trait.
@ -138,10 +156,31 @@ pub(crate) mod mock {
#[cfg(test)]
mod tests {
use data_types::SequenceNumber;
use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp};
use super::*;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(1);
const TABLE_ID: TableId = TableId::new(1);
const PARTITION_ID: PartitionId = PartitionId::new(1);
fn arbitrary_file_meta() -> ParquetFileParams {
ParquetFileParams {
namespace_id: NAMESPACE_ID,
table_id: TABLE_ID,
partition_id: PARTITION_ID,
object_store_id: Default::default(),
min_time: Timestamp::new(42),
max_time: Timestamp::new(42),
file_size_bytes: 42424242,
row_count: 24,
compaction_level: data_types::CompactionLevel::Initial,
created_at: Timestamp::new(1234),
column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)),
max_l0_created_at: Timestamp::new(42),
}
}
#[test]
fn test_owned_sequence_numbers_only_ref() {
let orig_set = [SequenceNumber::new(42)]
@ -149,9 +188,7 @@ mod tests {
.collect::<SequenceNumberSet>();
let note = Arc::new(CompletedPersist::new(
NamespaceId::new(1),
TableId::new(2),
PartitionId::new(3),
arbitrary_file_meta(),
orig_set.clone(),
));
@ -165,9 +202,7 @@ mod tests {
.collect::<SequenceNumberSet>();
let note = Arc::new(CompletedPersist::new(
NamespaceId::new(1),
TableId::new(2),
PartitionId::new(3),
arbitrary_file_meta(),
orig_set.clone(),
));
@ -176,4 +211,53 @@ mod tests {
assert_eq!(orig_set, note.owned_sequence_numbers());
assert_eq!(orig_set, note2.owned_sequence_numbers());
}
#[test]
fn test_accessors() {
let meta = arbitrary_file_meta();
let note = CompletedPersist::new(meta.clone(), Default::default());
assert_eq!(note.namespace_id(), meta.namespace_id);
assert_eq!(note.table_id(), meta.table_id);
assert_eq!(note.partition_id(), meta.partition_id);
assert_eq!(note.column_count(), meta.column_set.len());
assert_eq!(note.row_count(), meta.row_count as usize);
assert_eq!(note.parquet_file_bytes(), meta.file_size_bytes as usize);
}
#[test]
fn test_timestamp_range() {
const RANGE: Duration = Duration::from_secs(42);
let min = iox_time::Time::from_timestamp_nanos(0);
let max = min.checked_add(RANGE).unwrap();
let mut meta = arbitrary_file_meta();
meta.min_time = Timestamp::from(min);
meta.max_time = Timestamp::from(max);
let note = CompletedPersist::new(meta, Default::default());
assert_eq!(note.timestamp_range(), RANGE);
}
#[test]
#[should_panic(expected = "parquet min/max file timestamp difference is negative")]
fn test_timestamp_range_negative() {
const RANGE: Duration = Duration::from_secs(42);
let min = iox_time::Time::from_timestamp_nanos(0);
let max = min.checked_add(RANGE).unwrap();
let mut meta = arbitrary_file_meta();
// Values are the wrong way around!
meta.max_time = Timestamp::from(min);
meta.min_time = Timestamp::from(max);
let note = CompletedPersist::new(meta, Default::default());
let _ = note.timestamp_range();
}
}

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
use data_types::{NamespaceId, ParquetFileParams, PartitionId, PartitionKey, TableId};
use observability_deps::tracing::*;
use parking_lot::Mutex;
use schema::sort::SortKey;
@ -217,8 +217,12 @@ impl Context {
// Call [`PartitionData::mark_complete`] to finalise the persistence job,
// emit a log for the user, and notify the observer of this persistence
// task, if any.
pub(super) async fn mark_complete<O>(self, object_store_id: Uuid, completion_observer: &O)
where
pub(super) async fn mark_complete<O>(
self,
object_store_id: Uuid,
metadata: ParquetFileParams,
completion_observer: &O,
) where
O: PersistCompletionObserver,
{
// Mark the partition as having completed persistence, causing it to
@ -234,12 +238,7 @@ impl Context {
// Dispatch the completion notification into the observer chain before
// completing the persist operation.
completion_observer
.persist_complete(Arc::new(CompletedPersist::new(
self.namespace_id,
self.table_id,
self.partition_id,
sequence_numbers,
)))
.persist_complete(Arc::new(CompletedPersist::new(metadata, sequence_numbers)))
.await;
let now = Instant::now();

View File

@ -0,0 +1,228 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use metric::{
DurationHistogram, DurationHistogramOptions, U64Histogram, U64HistogramOptions, DURATION_MAX,
};
use super::completion_observer::{CompletedPersist, PersistCompletionObserver};
const MINUTES: Duration = Duration::from_secs(60 * 60);
#[derive(Debug)]
pub(crate) struct ParquetFileInstrumentation<T> {
inner: T,
row_count: U64Histogram,
column_count: U64Histogram,
file_size_bytes: U64Histogram,
file_time_range: DurationHistogram,
}
impl<T> ParquetFileInstrumentation<T> {
pub(crate) fn new(inner: T, metrics: &metric::Registry) -> Self {
// A metric capturing the duration difference between min & max
// timestamps.
let file_time_range: DurationHistogram = metrics
.register_metric_with_options::<DurationHistogram, _>(
"ingester_persist_parquet_file_time_range",
"range from min to max timestamp in output parquet file",
|| {
DurationHistogramOptions::new([
30 * MINUTES, // 30m
60 * MINUTES, // 1h
120 * MINUTES, // 2h
240 * MINUTES, // 4h
480 * MINUTES, // 8h
960 * MINUTES, // 16h
1_920 * MINUTES, // 32h
DURATION_MAX,
])
},
)
.recorder(&[]);
// File size distribution.
let file_size_bytes: U64Histogram = metrics
.register_metric_with_options::<U64Histogram, _>(
"ingester_persist_parquet_file_size_bytes",
"distribution of output parquet file size in bytes",
|| {
U64HistogramOptions::new([
4_u64.pow(5), // 1 kibibyte
4_u64.pow(6), // 4 kibibytes
4_u64.pow(7), // 16 kibibytes
4_u64.pow(8), // 64 kibibytes
4_u64.pow(9), // 256 kibibytes
4_u64.pow(10), // 1 mebibyte
4_u64.pow(11), // 4 mebibytes
4_u64.pow(12), // 16 mebibytes
4_u64.pow(13), // 64 mebibytes
4_u64.pow(14), // 256 mebibytes
4_u64.pow(15), // 1 gibibyte
4_u64.pow(16), // 4 gibibytes
u64::MAX,
])
},
)
.recorder(&[]);
// Row count distribution.
let row_count: U64Histogram = metrics
.register_metric_with_options::<U64Histogram, _>(
"ingester_persist_parquet_file_row_count",
"distribution of row count in output parquet files",
|| {
U64HistogramOptions::new([
4_u64.pow(3), // 64
4_u64.pow(4), // 256
4_u64.pow(5), // 1,024
4_u64.pow(6), // 4,096
4_u64.pow(7), // 16,384
4_u64.pow(8), // 65,536
4_u64.pow(9), // 262,144
4_u64.pow(10), // 1,048,576
4_u64.pow(11), // 4,194,304
4_u64.pow(12), // 16,777,216
u64::MAX,
])
},
)
.recorder(&[]);
// Column count distribution.
//
// Because the column count is, by default, limited per table, this
// range should exceed that limit by some degree to discover overshoot
// (limits are eventually consistent) and correctly measure workloads
// that have been configured with a higher limit.
let column_count: U64Histogram = metrics
.register_metric_with_options::<U64Histogram, _>(
"ingester_persist_parquet_file_column_count",
"distribution of column count in output parquet files",
|| {
U64HistogramOptions::new([
2_u64.pow(1), // 2
2_u64.pow(2), // 4
2_u64.pow(3), // 8
2_u64.pow(4), // 16
2_u64.pow(5), // 32
2_u64.pow(6), // 64
2_u64.pow(7), // 128
2_u64.pow(8), // 256
2_u64.pow(9), // 512
2_u64.pow(10), // 1,024
2_u64.pow(11), // 2,048
u64::MAX,
])
},
)
.recorder(&[]);
Self {
inner,
row_count,
column_count,
file_size_bytes,
file_time_range,
}
}
}
#[async_trait]
impl<T> PersistCompletionObserver for ParquetFileInstrumentation<T>
where
T: PersistCompletionObserver,
{
async fn persist_complete(&self, note: Arc<CompletedPersist>) {
// Observe the persistence notification values.
self.row_count.record(note.row_count() as _);
self.column_count.record(note.column_count() as _);
self.file_size_bytes.record(note.parquet_file_bytes() as _);
self.file_time_range.record(note.timestamp_range());
// Forward on the notification to the next handler.
self.inner.persist_complete(note).await;
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::{
sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, NamespaceId,
ParquetFileParams, PartitionId, TableId, Timestamp,
};
use metric::assert_histogram;
use crate::persist::completion_observer::mock::MockCompletionObserver;
use super::*;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(1);
const TABLE_ID: TableId = TableId::new(1);
const PARTITION_ID: PartitionId = PartitionId::new(1);
#[tokio::test]
async fn test_persisted_file_metrics() {
let inner = Arc::new(MockCompletionObserver::default());
let metrics = metric::Registry::default();
let decorator = ParquetFileInstrumentation::new(Arc::clone(&inner), &metrics);
let meta = ParquetFileParams {
namespace_id: NAMESPACE_ID,
table_id: TABLE_ID,
partition_id: PARTITION_ID,
object_store_id: Default::default(),
min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _),
max_time: Timestamp::new(Duration::from_secs(1_042).as_nanos() as _), // 42 seconds later
file_size_bytes: 42424242,
row_count: 24,
compaction_level: data_types::CompactionLevel::Initial,
created_at: Timestamp::new(1234),
column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)),
max_l0_created_at: Timestamp::new(42),
};
decorator
.persist_complete(Arc::new(CompletedPersist::new(
meta.clone(),
SequenceNumberSet::default(),
)))
.await;
assert_histogram!(
metrics,
DurationHistogram,
"ingester_persist_parquet_file_time_range",
samples = 1,
sum = Duration::from_secs(42),
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_persist_parquet_file_size_bytes",
samples = 1,
sum = meta.file_size_bytes as u64,
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_persist_parquet_file_row_count",
samples = 1,
sum = meta.row_count as u64,
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_persist_parquet_file_column_count",
samples = 1,
sum = meta.column_set.len() as u64,
);
}
}

View File

@ -5,6 +5,7 @@ pub(super) mod compact;
pub(crate) mod completion_observer;
mod context;
pub(crate) mod drain_buffer;
pub(crate) mod file_metrics;
pub(crate) mod handle;
pub(crate) mod hot_partitions;
pub mod queue;

View File

@ -135,12 +135,17 @@ pub(super) async fn run_task<O>(
};
// Make the newly uploaded parquet file visible to other nodes.
let object_store_id = update_catalog_parquet(&ctx, &worker_state, parquet_table_data).await;
let object_store_id =
update_catalog_parquet(&ctx, &worker_state, &parquet_table_data).await;
// And finally mark the persist job as complete and notify any
// observers.
ctx.mark_complete(object_store_id, &worker_state.completion_observer)
.await;
ctx.mark_complete(
object_store_id,
parquet_table_data,
&worker_state.completion_observer,
)
.await;
// Capture the time spent actively persisting.
let now = Instant::now();
@ -459,7 +464,7 @@ where
async fn update_catalog_parquet<O>(
ctx: &Context,
worker_state: &SharedWorkerState<O>,
parquet_table_data: ParquetFileParams,
parquet_table_data: &ParquetFileParams,
) -> Uuid
where
O: Send + Sync,

View File

@ -209,7 +209,9 @@ mod tests {
use assert_matches::assert_matches;
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionId, TableId};
use data_types::{
ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp,
};
use futures::{task::Context, Future, FutureExt};
use metric::{assert_counter, U64Gauge};
use parking_lot::Mutex;
@ -262,9 +264,20 @@ mod tests {
T: IntoIterator<Item = i64>,
{
Arc::new(CompletedPersist::new(
NamespaceId::new(1),
TableId::new(2),
PartitionId::new(3),
ParquetFileParams {
namespace_id: NamespaceId::new(1),
table_id: TableId::new(2),
partition_id: PartitionId::new(3),
object_store_id: Default::default(),
min_time: Timestamp::new(42),
max_time: Timestamp::new(42),
file_size_bytes: 42424242,
row_count: 24,
compaction_level: data_types::CompactionLevel::Initial,
created_at: Timestamp::new(1234),
column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)),
max_l0_created_at: Timestamp::new(42),
},
new_set(vals),
))
}

View File

@ -1,11 +1,14 @@
use arrow_util::assert_batches_sorted_eq;
use assert_matches::assert_matches;
use data_types::PartitionKey;
use data_types::{PartitionKey, TableId, Timestamp};
use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
use ingester_test_ctx::TestContextBuilder;
use ingester_test_ctx::{TestContextBuilder, DEFAULT_MAX_PERSIST_QUEUE_DEPTH};
use iox_catalog::interface::Catalog;
use metric::{DurationHistogram, U64Histogram};
use std::sync::Arc;
use metric::{
assert_counter, assert_histogram, DurationHistogram, U64Counter, U64Gauge, U64Histogram,
};
use parquet_file::ParquetFilePath;
use std::{sync::Arc, time::Duration};
// Write data to an ingester through the RPC interface and query the data, validating the contents.
#[tokio::test]
@ -89,6 +92,161 @@ async fn write_query() {
assert_eq!(hist.total, 2);
}
// Write data to an ingester through the RPC interface and persist the data.
#[tokio::test]
async fn write_persist() {
let namespace_name = "write_query_test_namespace";
let mut ctx = TestContextBuilder::default().build().await;
let ns = ctx.ensure_namespace(namespace_name, None).await;
let partition_key = PartitionKey::from("1970-01-01");
ctx.write_lp(
namespace_name,
r#"bananas count=42,greatness="inf" 200"#,
partition_key.clone(),
42,
)
.await;
// Perform a query to validate the actual data buffered.
let table_id = ctx.table_id(namespace_name, "bananas").await.get();
let data: Vec<_> = ctx
.query(IngesterQueryRequest {
namespace_id: ns.id.get(),
table_id,
columns: vec![],
predicate: None,
})
.await
.expect("query request failed");
let expected = vec![
"+-------+-----------+--------------------------------+",
"| count | greatness | time |",
"+-------+-----------+--------------------------------+",
"| 42.0 | inf | 1970-01-01T00:00:00.000000200Z |",
"+-------+-----------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &data);
// Persist the data.
ctx.persist(namespace_name).await;
// Ensure the data is no longer buffered.
let data: Vec<_> = ctx
.query(IngesterQueryRequest {
namespace_id: ns.id.get(),
table_id,
columns: vec![],
predicate: None,
})
.await
.expect("query request failed");
assert!(data.is_empty());
// Validate the parquet file was added to the catalog
let parquet_files = ctx.catalog_parquet_file_records(namespace_name).await;
let (path, want_file_size) = assert_matches!(parquet_files.as_slice(), [f] => {
assert_eq!(f.namespace_id, ns.id);
assert_eq!(f.table_id, TableId::new(table_id));
assert_eq!(f.min_time, Timestamp::new(200));
assert_eq!(f.max_time, Timestamp::new(200));
assert_eq!(f.to_delete, None);
assert_eq!(f.row_count, 1);
assert_eq!(f.column_set.len(), 3);
assert_eq!(f.max_l0_created_at, f.created_at);
(ParquetFilePath::from(f), f.file_size_bytes)
});
// Validate the file exists at the expected object store path.
let file_size = ctx
.object_store()
.get(&path.object_store_path())
.await
.expect("parquet file must exist in object store")
.bytes()
.await
.expect("failed to read parquet file bytes")
.len();
assert_eq!(file_size, want_file_size as usize);
// And that the persist metrics were recorded.
let metrics = ctx.metrics();
////////////////////////////////////////////////////////////////////////////
// Config reflection metrics
assert_counter!(
metrics,
U64Gauge,
"ingester_persist_max_parallelism",
value = 5,
);
assert_counter!(
metrics,
U64Gauge,
"ingester_persist_max_queue_depth",
value = DEFAULT_MAX_PERSIST_QUEUE_DEPTH as u64,
);
////////////////////////////////////////////////////////////////////////////
// Persist worker metrics
assert_histogram!(
metrics,
DurationHistogram,
"ingester_persist_active_duration",
samples = 1,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_persist_enqueue_duration",
samples = 1,
);
assert_counter!(
metrics,
U64Counter,
"ingester_persist_enqueued_jobs",
value = 1,
);
////////////////////////////////////////////////////////////////////////////
// Parquet file metrics
assert_histogram!(
metrics,
DurationHistogram,
"ingester_persist_parquet_file_time_range",
samples = 1,
sum = Duration::from_secs(0),
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_persist_parquet_file_size_bytes",
samples = 1,
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_persist_parquet_file_row_count",
samples = 1,
sum = 1,
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_persist_parquet_file_column_count",
samples = 1,
sum = 3,
);
}
// Write data to the ingester, which writes it to the WAL, then drop and recreate the WAL and
// validate the data is replayed from the WAL into memory.
#[tokio::test]

View File

@ -35,6 +35,7 @@ use iox_time::TimeProvider;
use metric::{Attributes, Metric, MetricObserver};
use mutable_batch_lp::lines_to_batches;
use mutable_batch_pb::encode::encode_write;
use object_store::ObjectStore;
use observability_deps::tracing::*;
use parquet_file::storage::ParquetStorage;
use tempfile::TempDir;
@ -158,7 +159,7 @@ impl TestContextBuilder {
shutdown_tx,
_dir: dir,
catalog,
_storage: storage,
storage,
metrics,
namespaces: Default::default(),
}
@ -174,7 +175,7 @@ pub struct TestContext<T> {
ingester: IngesterGuard<T>,
shutdown_tx: oneshot::Sender<CancellationToken>,
catalog: Arc<dyn Catalog>,
_storage: ParquetStorage,
storage: ParquetStorage,
metrics: Arc<metric::Registry>,
/// Once the last [`TempDir`] reference is dropped, the directory it
@ -415,6 +416,11 @@ where
.recorder()
}
/// Return the metric recorder for the [`TestContext`].
pub fn metrics(&self) -> &metric::Registry {
&self.metrics
}
/// Retrieve the Parquet files in the catalog for the specified namespace.
pub async fn catalog_parquet_file_records(&self, namespace: &str) -> Vec<ParquetFile> {
let namespace_id = self.namespace_id(namespace).await;
@ -432,6 +438,11 @@ where
Arc::clone(&self.catalog)
}
/// Return the [`ObjectStore`] for this [`TestContext`].
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
Arc::clone(self.storage.object_store())
}
/// Return the [`IngesterRpcInterface`] for this [`TestContext`].
///
/// Calls duration made through this interface measures the cost of the