Merge pull request #1928 from influxdata/er/refactor/logging

refactor: log information for compaction results
pull/24376/head
kodiakhq[bot] 2021-07-08 11:57:11 +00:00 committed by GitHub
commit 4c05d7ccfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 5 deletions

View File

@ -93,9 +93,6 @@ impl TableSummary {
+ mem::size_of::<Self>() // Add size of this struct that points to + mem::size_of::<Self>() // Add size of this struct that points to
// table and ColumnSummary // table and ColumnSummary
} }
pub fn has_table(&self, table_name: &str) -> bool {
self.name.eq(table_name)
}
/// Updates the table summary with combined stats from the other. Counts are /// Updates the table summary with combined stats from the other. Counts are
/// treated as non-overlapping so they're just added together. If the /// treated as non-overlapping so they're just added together. If the

View File

@ -1,4 +1,4 @@
use std::str::FromStr; use std::{fmt::Display, str::FromStr};
use indexmap::{map::Iter, IndexMap}; use indexmap::{map::Iter, IndexMap};
use itertools::Itertools; use itertools::Itertools;
@ -133,6 +133,31 @@ impl<'a> SortKey<'a> {
} }
} }
// Produces a human-readable representation of a sort key that looks like:
//
// "host, region DESC, env NULLS FIRST, time"
//
impl<'a> Display for SortKey<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
for (i, (name, options)) in self.columns.iter().enumerate() {
write!(f, "{}", name)?;
if options.descending {
write!(f, " DESC")?;
}
if !options.nulls_first {
// less common case
write!(f, " NULLS LAST")?;
}
write!(f, ",")?;
if i < self.columns.len() - 1 {
write!(f, " ")?;
}
}
Ok(())
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -89,7 +89,7 @@ impl Chunk {
} }
/// The total number of row groups in all tables in this chunk. /// The total number of row groups in all tables in this chunk.
pub(crate) fn row_groups(&self) -> usize { pub fn row_groups(&self) -> usize {
self.table.row_groups() self.table.row_groups()
} }

View File

@ -5,6 +5,7 @@ use std::sync::Arc;
use data_types::job::Job; use data_types::job::Job;
use lifecycle::LifecycleWriteGuard; use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info;
use query::exec::ExecutorType; use query::exec::ExecutorType;
use query::frontend::reorg::ReorgPlanner; use query::frontend::reorg::ReorgPlanner;
use query::QueryChunkMeta; use query::QueryChunkMeta;
@ -30,6 +31,7 @@ pub(crate) fn compact_chunks(
TaskTracker<Job>, TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>, TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> { )> {
let now = std::time::Instant::now(); // time compaction duration.
let db = Arc::clone(&partition.data().db); let db = Arc::clone(&partition.data().db);
let table_name = partition.table_name().to_string(); let table_name = partition.table_name().to_string();
let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect(); let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect();
@ -42,6 +44,7 @@ pub(crate) fn compact_chunks(
}); });
// Mark and snapshot chunks, then drop locks // Mark and snapshot chunks, then drop locks
let mut input_rows = 0;
let query_chunks = chunks let query_chunks = chunks
.into_iter() .into_iter()
.map(|mut chunk| { .map(|mut chunk| {
@ -49,6 +52,7 @@ pub(crate) fn compact_chunks(
assert!(Arc::ptr_eq(&db, &chunk.data().db)); assert!(Arc::ptr_eq(&db, &chunk.data().db));
assert_eq!(chunk.table_name().as_ref(), table_name.as_str()); assert_eq!(chunk.table_name().as_ref(), table_name.as_str());
input_rows += chunk.table_summary().count();
chunk.set_compacting(&registration)?; chunk.set_compacting(&registration)?;
Ok(DbChunk::snapshot(&*chunk)) Ok(DbChunk::snapshot(&*chunk))
}) })
@ -71,6 +75,7 @@ pub(crate) fn compact_chunks(
let fut = async move { let fut = async move {
let key = compute_sort_key(query_chunks.iter().map(|x| x.summary())); let key = compute_sort_key(query_chunks.iter().map(|x| x.summary()));
let key_str = format!("\"{}\"", key); // for logging
// Cannot move query_chunks as the sort key borrows the column names // Cannot move query_chunks as the sort key borrows the column names
let (schema, plan) = let (schema, plan) =
@ -79,6 +84,7 @@ pub(crate) fn compact_chunks(
let physical_plan = ctx.prepare_plan(&plan)?; let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?; let stream = ctx.execute(physical_plan).await?;
collect_rub(stream, &mut rb_chunk).await?; collect_rub(stream, &mut rb_chunk).await?;
let rb_row_groups = rb_chunk.row_groups();
let new_chunk = { let new_chunk = {
let mut partition = partition.write(); let mut partition = partition.write();
@ -89,6 +95,16 @@ pub(crate) fn compact_chunks(
}; };
let guard = new_chunk.read(); let guard = new_chunk.read();
let elapsed = now.elapsed();
assert!(guard.table_summary().count() > 0, "chunk has zero rows");
// input rows per second
let throughput = (input_rows as u128 * 1_000_000_000) / elapsed.as_nanos();
info!(input_chunks=query_chunks.len(), rub_row_groups=rb_row_groups,
input_rows=input_rows, output_rows=guard.table_summary().count(),
sort_key=%key_str, compaction_took = ?elapsed, rows_per_sec=?throughput, "chunk(s) compacted");
Ok(DbChunk::snapshot(&guard)) Ok(DbChunk::snapshot(&guard))
}; };