diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index d6084db4d9..90f8021b48 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -93,9 +93,6 @@ impl TableSummary { + mem::size_of::() // Add size of this struct that points to // table and ColumnSummary } - pub fn has_table(&self, table_name: &str) -> bool { - self.name.eq(table_name) - } /// Updates the table summary with combined stats from the other. Counts are /// treated as non-overlapping so they're just added together. If the diff --git a/internal_types/src/schema/sort.rs b/internal_types/src/schema/sort.rs index ebfd3ab8b9..6775695e0b 100644 --- a/internal_types/src/schema/sort.rs +++ b/internal_types/src/schema/sort.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::{fmt::Display, str::FromStr}; use indexmap::{map::Iter, IndexMap}; use itertools::Itertools; @@ -133,6 +133,31 @@ impl<'a> SortKey<'a> { } } +// Produces a human-readable representation of a sort key that looks like: +// +// "host, region DESC, env NULLS FIRST, time" +// +impl<'a> Display for SortKey<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + for (i, (name, options)) in self.columns.iter().enumerate() { + write!(f, "{}", name)?; + if options.descending { + write!(f, " DESC")?; + } + if !options.nulls_first { + // less common case + write!(f, " NULLS LAST")?; + } + write!(f, ",")?; + + if i < self.columns.len() - 1 { + write!(f, " ")?; + } + } + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 1ea94bf23a..e2ea5e51dc 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -89,7 +89,7 @@ impl Chunk { } /// The total number of row groups in all tables in this chunk. - pub(crate) fn row_groups(&self) -> usize { + pub fn row_groups(&self) -> usize { self.table.row_groups() } diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index e304cae0eb..51604298e5 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use data_types::job::Job; use lifecycle::LifecycleWriteGuard; +use observability_deps::tracing::info; use query::exec::ExecutorType; use query::frontend::reorg::ReorgPlanner; use query::QueryChunkMeta; @@ -30,6 +31,7 @@ pub(crate) fn compact_chunks( TaskTracker, TrackedFuture>> + Send>, )> { + let now = std::time::Instant::now(); // time compaction duration. let db = Arc::clone(&partition.data().db); let table_name = partition.table_name().to_string(); let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect(); @@ -42,6 +44,7 @@ pub(crate) fn compact_chunks( }); // Mark and snapshot chunks, then drop locks + let mut input_rows = 0; let query_chunks = chunks .into_iter() .map(|mut chunk| { @@ -49,6 +52,7 @@ pub(crate) fn compact_chunks( assert!(Arc::ptr_eq(&db, &chunk.data().db)); assert_eq!(chunk.table_name().as_ref(), table_name.as_str()); + input_rows += chunk.table_summary().count(); chunk.set_compacting(®istration)?; Ok(DbChunk::snapshot(&*chunk)) }) @@ -71,6 +75,7 @@ pub(crate) fn compact_chunks( let fut = async move { let key = compute_sort_key(query_chunks.iter().map(|x| x.summary())); + let key_str = format!("\"{}\"", key); // for logging // Cannot move query_chunks as the sort key borrows the column names let (schema, plan) = @@ -79,6 +84,7 @@ pub(crate) fn compact_chunks( let physical_plan = ctx.prepare_plan(&plan)?; let stream = ctx.execute(physical_plan).await?; collect_rub(stream, &mut rb_chunk).await?; + let rb_row_groups = rb_chunk.row_groups(); let new_chunk = { let mut partition = partition.write(); @@ -89,6 +95,16 @@ pub(crate) fn compact_chunks( }; let guard = new_chunk.read(); + let elapsed = now.elapsed(); + + assert!(guard.table_summary().count() > 0, "chunk has zero rows"); + // input rows per second + let throughput = (input_rows as u128 * 1_000_000_000) / elapsed.as_nanos(); + + info!(input_chunks=query_chunks.len(), rub_row_groups=rb_row_groups, + input_rows=input_rows, output_rows=guard.table_summary().count(), + sort_key=%key_str, compaction_took = ?elapsed, rows_per_sec=?throughput, "chunk(s) compacted"); + Ok(DbChunk::snapshot(&guard)) };