refactor: clean up querier code base (#6404)

* refactor: `s/QuerierChunk/QuerierParquetChunk/g`

* refactor: isolate parquet chunk creation code

* refactor: fuse `chunk` and `chunk_parts`

* refactor: pass catalog cache instead of chunk adapter to state reconciler

* refactor: move parquet chunks creation into its own method

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-12-15 07:01:11 +00:00 committed by GitHub
parent be45889be1
commit ffe8b98f47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 312 additions and 304 deletions

View File

@ -1,7 +1,7 @@
//! Cache for schema projections. //! Cache for schema projections.
//! //!
//! While this is technically NOT caching catalog requests (i.e. CPU and IO work), it heavily reduced memory when //! While this is technically NOT caching catalog requests (i.e. CPU and IO work), it heavily reduced memory when
//! creating [`QuerierChunk`](crate::chunk::QuerierChunk)s. //! creating [`QuerierParquetChunk`](crate::parquet::QuerierParquetChunk)s.
use std::{ use std::{
collections::HashMap, collections::HashMap,
mem::{size_of, size_of_val}, mem::{size_of, size_of_val},

View File

@ -1,8 +1,8 @@
//! Database for the querier that contains all namespaces. //! Database for the querier that contains all namespaces.
use crate::{ use crate::{
cache::CatalogCache, chunk::ChunkAdapter, ingester::IngesterConnection, cache::CatalogCache, ingester::IngesterConnection, namespace::QuerierNamespace,
namespace::QuerierNamespace, query_log::QueryLog, table::PruneMetrics, parquet::ChunkAdapter, query_log::QueryLog, table::PruneMetrics,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig}; use backoff::{Backoff, BackoffConfig};

View File

@ -12,11 +12,11 @@
)] )]
mod cache; mod cache;
mod chunk;
mod database; mod database;
mod handler; mod handler;
mod ingester; mod ingester;
mod namespace; mod namespace;
mod parquet;
mod poison; mod poison;
mod query_log; mod query_log;
mod server; mod server;

View File

@ -2,8 +2,8 @@
use crate::{ use crate::{
cache::{namespace::CachedNamespace, CatalogCache}, cache::{namespace::CachedNamespace, CatalogCache},
chunk::ChunkAdapter,
ingester::IngesterConnection, ingester::IngesterConnection,
parquet::ChunkAdapter,
query_log::QueryLog, query_log::QueryLog,
table::{PruneMetrics, QuerierTable, QuerierTableArgs}, table::{PruneMetrics, QuerierTable, QuerierTableArgs},
}; };

View File

@ -0,0 +1,235 @@
use std::{collections::HashSet, sync::Arc};
use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, TimestampMinMax};
use futures::StreamExt;
use iox_catalog::interface::Catalog;
use iox_query::{pruning::prune_summaries, util::create_basic_summary};
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use predicate::Predicate;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use schema::sort::SortKey;
use trace::span::{Span, SpanRecorder};
use uuid::Uuid;
use crate::{
cache::{namespace::CachedTable, CatalogCache},
parquet::QuerierParquetChunkMeta,
table::MetricPruningObserver,
};
use super::QuerierParquetChunk;
/// Number of concurrent chunk creation jobs.
///
/// This is mostly to fetch per-partition data concurrently.
const CONCURRENT_CHUNK_CREATION_JOBS: usize = 100;
/// Adapter that can create chunks.
#[derive(Debug)]
pub struct ChunkAdapter {
/// Cache
catalog_cache: Arc<CatalogCache>,
/// Metric registry.
metric_registry: Arc<metric::Registry>,
}
impl ChunkAdapter {
/// Create new adapter with empty cache.
pub fn new(catalog_cache: Arc<CatalogCache>, metric_registry: Arc<metric::Registry>) -> Self {
Self {
catalog_cache,
metric_registry,
}
}
/// Metric registry getter.
pub fn metric_registry(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metric_registry)
}
/// Get underlying catalog cache.
pub fn catalog_cache(&self) -> &Arc<CatalogCache> {
&self.catalog_cache
}
/// Get underlying catalog.
pub fn catalog(&self) -> Arc<dyn Catalog> {
self.catalog_cache.catalog()
}
pub(crate) async fn new_chunks(
&self,
cached_table: Arc<CachedTable>,
files: Arc<Vec<Arc<ParquetFile>>>,
predicate: &Predicate,
early_pruning_observer: MetricPruningObserver,
span: Option<Span>,
) -> Vec<QuerierParquetChunk> {
let span_recorder = SpanRecorder::new(span);
let basic_summaries: Vec<_> = files
.iter()
.map(|p| {
Arc::new(create_basic_summary(
p.row_count as u64,
&cached_table.schema,
TimestampMinMax {
min: p.min_time.get(),
max: p.max_time.get(),
},
))
})
.collect();
// Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks
let keeps = match prune_summaries(
Arc::clone(&cached_table.schema),
&basic_summaries,
predicate,
) {
Ok(keeps) => keeps,
Err(reason) => {
// Ignore pruning failures here - the chunk pruner should have already logged them.
// Just skip pruning and gather all the metadata. We have another chance to prune them
// once all the metadata is available
debug!(?reason, "Could not prune before metadata fetch");
vec![true; basic_summaries.len()]
}
};
// Remove any unused parquet files up front to maximize the
// concurrent catalog requests that could be outstanding
let mut parquet_files = files
.iter()
.zip(keeps)
.filter_map(|(pf, keep)| {
if keep {
Some(Arc::clone(pf))
} else {
early_pruning_observer
.was_pruned_early(pf.row_count as u64, pf.file_size_bytes as u64);
None
}
})
.collect::<Vec<_>>();
// de-correlate parquet files so that subsequent items likely don't block/wait on the same cache lookup
// (they are likely ordered by partition)
let mut rng = StdRng::seed_from_u64(cached_table.id.get() as u64);
parquet_files.shuffle(&mut rng);
futures::stream::iter(parquet_files)
.map(|cached_parquet_file| {
let span_recorder = &span_recorder;
let cached_table = Arc::clone(&cached_table);
async move {
let span = span_recorder.child_span("new_chunk");
self.new_chunk(cached_table, cached_parquet_file, span)
.await
}
})
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
.filter_map(|x| async { x })
.collect()
.await
}
async fn new_chunk(
&self,
cached_table: Arc<CachedTable>,
parquet_file: Arc<ParquetFile>,
span: Option<Span>,
) -> Option<QuerierParquetChunk> {
let span_recorder = SpanRecorder::new(span);
let parquet_file_cols: HashSet<ColumnId> =
parquet_file.column_set.iter().copied().collect();
// relevant_pk_columns is everything from the primary key for the table, that is actually in this parquet file
let relevant_pk_columns: Vec<_> = cached_table
.primary_key_column_ids
.iter()
.filter(|c| parquet_file_cols.contains(c))
.copied()
.collect();
let partition_sort_key = self
.catalog_cache
.partition()
.sort_key(
Arc::clone(&cached_table),
parquet_file.partition_id,
&relevant_pk_columns,
span_recorder.child_span("cache GET partition sort key"),
)
.await
.expect("partition sort key should be set when a parquet file exists");
// NOTE: Because we've looked up the sort key AFTER the namespace schema, it may contain columns for which we
// don't have any schema information yet. This is OK because we've ensured that all file columns are known
// withing the schema and if a column is NOT part of the file, it will also not be part of the chunk sort
// key, so we have consistency here.
// calculate schema
// IMPORTANT: Do NOT use the sort key to list columns because the sort key only contains primary-key columns.
// NOTE: The schema that we calculate here may have a different column order than the actual parquet file. This
// is OK because the IOx parquet reader can deal with that (see #4921).
let column_ids: Vec<_> = cached_table
.column_id_map
.keys()
.filter(|id| parquet_file_cols.contains(id))
.copied()
.collect();
let schema = self
.catalog_cache
.projected_schema()
.get(
Arc::clone(&cached_table),
column_ids,
span_recorder.child_span("cache GET projected schema"),
)
.await;
// calculate sort key
let sort_key = SortKey::from_columns(
partition_sort_key
.column_order
.iter()
.filter(|c_id| parquet_file_cols.contains(c_id))
.filter_map(|c_id| cached_table.column_id_map.get(c_id))
.cloned(),
);
assert!(
!sort_key.is_empty(),
"Sort key can never be empty because there should at least be a time column",
);
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
let order = ChunkOrder::new(parquet_file.max_sequence_number.get());
let meta = Arc::new(QuerierParquetChunkMeta {
parquet_file_id: parquet_file.id,
chunk_id,
order,
sort_key: Some(sort_key),
shard_id: parquet_file.shard_id,
partition_id: parquet_file.partition_id,
max_sequence_number: parquet_file.max_sequence_number,
compaction_level: parquet_file.compaction_level,
});
let parquet_chunk = Arc::new(ParquetChunk::new(
parquet_file,
schema,
self.catalog_cache.parquet_store(),
));
Some(QuerierParquetChunk::new(
parquet_chunk,
meta,
Some(Arc::clone(&partition_sort_key.sort_key)),
))
}
}

View File

@ -1,24 +1,22 @@
//! Querier Chunks //! Querier Chunks
use crate::cache::namespace::CachedTable;
use crate::cache::CatalogCache;
use data_types::{ use data_types::{
ChunkId, ChunkOrder, ColumnId, CompactionLevel, DeletePredicate, ParquetFile, ParquetFileId, ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, ParquetFileId, PartitionId,
PartitionId, SequenceNumber, ShardId, TableSummary, SequenceNumber, ShardId, TableSummary,
}; };
use iox_catalog::interface::Catalog;
use iox_query::util::create_basic_summary; use iox_query::util::create_basic_summary;
use parquet_file::chunk::ParquetChunk; use parquet_file::chunk::ParquetChunk;
use schema::{sort::SortKey, Schema}; use schema::{sort::SortKey, Schema};
use std::{collections::HashSet, sync::Arc}; use std::sync::Arc;
use trace::span::{Span, SpanRecorder};
use uuid::Uuid;
mod creation;
mod query_access; mod query_access;
/// Immutable metadata attached to a [`QuerierChunk`]. pub use creation::ChunkAdapter;
/// Immutable metadata attached to a [`QuerierParquetChunk`].
#[derive(Debug)] #[derive(Debug)]
pub struct ChunkMeta { pub struct QuerierParquetChunkMeta {
/// ID of the Parquet file of the chunk /// ID of the Parquet file of the chunk
parquet_file_id: ParquetFileId, parquet_file_id: ParquetFileId,
@ -44,7 +42,7 @@ pub struct ChunkMeta {
compaction_level: CompactionLevel, compaction_level: CompactionLevel,
} }
impl ChunkMeta { impl QuerierParquetChunkMeta {
/// ID of the Parquet file of the chunk /// ID of the Parquet file of the chunk
pub fn parquet_file_id(&self) -> ParquetFileId { pub fn parquet_file_id(&self) -> ParquetFileId {
self.parquet_file_id self.parquet_file_id
@ -82,9 +80,9 @@ impl ChunkMeta {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct QuerierChunk { pub struct QuerierParquetChunk {
/// Immutable chunk metadata /// Immutable chunk metadata
meta: Arc<ChunkMeta>, meta: Arc<QuerierParquetChunkMeta>,
/// Schema of the chunk /// Schema of the chunk
schema: Arc<Schema>, schema: Arc<Schema>,
@ -102,11 +100,11 @@ pub struct QuerierChunk {
table_summary: Arc<TableSummary>, table_summary: Arc<TableSummary>,
} }
impl QuerierChunk { impl QuerierParquetChunk {
/// Create new parquet-backed chunk (object store data). /// Create new parquet-backed chunk (object store data).
pub fn new( pub fn new(
parquet_chunk: Arc<ParquetChunk>, parquet_chunk: Arc<ParquetChunk>,
meta: Arc<ChunkMeta>, meta: Arc<QuerierParquetChunkMeta>,
partition_sort_key: Option<Arc<SortKey>>, partition_sort_key: Option<Arc<SortKey>>,
) -> Self { ) -> Self {
let schema = parquet_chunk.schema(); let schema = parquet_chunk.schema();
@ -136,7 +134,7 @@ impl QuerierChunk {
} }
/// Get metadata attached to the given chunk. /// Get metadata attached to the given chunk.
pub fn meta(&self) -> &ChunkMeta { pub fn meta(&self) -> &QuerierParquetChunkMeta {
self.meta.as_ref() self.meta.as_ref()
} }
@ -165,180 +163,24 @@ impl QuerierChunk {
} }
} }
/// Adapter that can create chunks.
#[derive(Debug)]
pub struct ChunkAdapter {
/// Cache
catalog_cache: Arc<CatalogCache>,
/// Metric registry.
metric_registry: Arc<metric::Registry>,
}
impl ChunkAdapter {
/// Create new adapter with empty cache.
pub fn new(catalog_cache: Arc<CatalogCache>, metric_registry: Arc<metric::Registry>) -> Self {
Self {
catalog_cache,
metric_registry,
}
}
/// Metric registry getter.
pub fn metric_registry(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metric_registry)
}
/// Get underlying catalog cache.
pub fn catalog_cache(&self) -> &Arc<CatalogCache> {
&self.catalog_cache
}
/// Get underlying catalog.
pub fn catalog(&self) -> Arc<dyn Catalog> {
self.catalog_cache.catalog()
}
pub async fn new_chunk(
&self,
cached_table: Arc<CachedTable>,
parquet_file: Arc<ParquetFile>,
span: Option<Span>,
) -> Option<QuerierChunk> {
let span_recorder = SpanRecorder::new(span);
let parts = self
.chunk_parts(
cached_table,
Arc::clone(&parquet_file),
span_recorder.child_span("chunk_parts"),
)
.await?;
let parquet_chunk = Arc::new(ParquetChunk::new(
parquet_file,
parts.schema,
self.catalog_cache.parquet_store(),
));
Some(QuerierChunk::new(
parquet_chunk,
parts.meta,
parts.partition_sort_key,
))
}
async fn chunk_parts(
&self,
cached_table: Arc<CachedTable>,
parquet_file: Arc<ParquetFile>,
span: Option<Span>,
) -> Option<ChunkParts> {
let span_recorder = SpanRecorder::new(span);
let parquet_file_cols: HashSet<ColumnId> =
parquet_file.column_set.iter().copied().collect();
// relevant_pk_columns is everything from the primary key for the table, that is actually in this parquet file
let relevant_pk_columns: Vec<_> = cached_table
.primary_key_column_ids
.iter()
.filter(|c| parquet_file_cols.contains(c))
.copied()
.collect();
let partition_sort_key = self
.catalog_cache
.partition()
.sort_key(
Arc::clone(&cached_table),
parquet_file.partition_id,
&relevant_pk_columns,
span_recorder.child_span("cache GET partition sort key"),
)
.await
.expect("partition sort key should be set when a parquet file exists");
// NOTE: Because we've looked up the sort key AFTER the namespace schema, it may contain columns for which we
// don't have any schema information yet. This is OK because we've ensured that all file columns are known
// withing the schema and if a column is NOT part of the file, it will also not be part of the chunk sort
// key, so we have consistency here.
// calculate schema
// IMPORTANT: Do NOT use the sort key to list columns because the sort key only contains primary-key columns.
// NOTE: The schema that we calculate here may have a different column order than the actual parquet file. This
// is OK because the IOx parquet reader can deal with that (see #4921).
let column_ids: Vec<_> = cached_table
.column_id_map
.keys()
.filter(|id| parquet_file_cols.contains(id))
.copied()
.collect();
let schema = self
.catalog_cache
.projected_schema()
.get(
Arc::clone(&cached_table),
column_ids,
span_recorder.child_span("cache GET projected schema"),
)
.await;
// calculate sort key
let sort_key = SortKey::from_columns(
partition_sort_key
.column_order
.iter()
.filter(|c_id| parquet_file_cols.contains(c_id))
.filter_map(|c_id| cached_table.column_id_map.get(c_id))
.cloned(),
);
assert!(
!sort_key.is_empty(),
"Sort key can never be empty because there should at least be a time column",
);
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
let order = ChunkOrder::new(parquet_file.max_sequence_number.get());
let meta = Arc::new(ChunkMeta {
parquet_file_id: parquet_file.id,
chunk_id,
order,
sort_key: Some(sort_key),
shard_id: parquet_file.shard_id,
partition_id: parquet_file.partition_id,
max_sequence_number: parquet_file.max_sequence_number,
compaction_level: parquet_file.compaction_level,
});
Some(ChunkParts {
meta,
schema,
partition_sort_key: Some(Arc::clone(&partition_sort_key.sort_key)),
})
}
}
struct ChunkParts {
meta: Arc<ChunkMeta>,
schema: Arc<Schema>,
partition_sort_key: Option<Arc<SortKey>>,
}
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use crate::cache::namespace::CachedNamespace; use crate::{
cache::{namespace::CachedNamespace, CatalogCache},
table::MetricPruningObserver,
};
use super::*; use super::*;
use arrow::{datatypes::DataType, record_batch::RecordBatch}; use arrow::{datatypes::DataType, record_batch::RecordBatch};
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::{ColumnType, NamespaceSchema}; use data_types::{ColumnType, NamespaceSchema, ParquetFile};
use iox_query::{ use iox_query::{
exec::{ExecutorType, IOxSessionContext}, exec::{ExecutorType, IOxSessionContext},
QueryChunk, QueryChunkMeta, QueryChunk, QueryChunkMeta,
}; };
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFileBuilder}; use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFileBuilder};
use metric::{Attributes, Observation, RawReporter}; use metric::{Attributes, Observation, RawReporter};
use predicate::Predicate;
use schema::{builder::SchemaBuilder, sort::SortKeyBuilder}; use schema::{builder::SchemaBuilder, sort::SortKeyBuilder};
use test_helpers::maybe_start_logging; use test_helpers::maybe_start_logging;
use tokio::runtime::Handle; use tokio::runtime::Handle;
@ -449,17 +291,19 @@ pub mod tests {
} }
} }
async fn chunk(&self, namespace_schema: Arc<NamespaceSchema>) -> QuerierChunk { async fn chunk(&self, namespace_schema: Arc<NamespaceSchema>) -> QuerierParquetChunk {
let cached_namespace: CachedNamespace = namespace_schema.as_ref().clone().into(); let cached_namespace: CachedNamespace = namespace_schema.as_ref().clone().into();
let cached_table = cached_namespace.tables.get("table").expect("table exists"); let cached_table = cached_namespace.tables.get("table").expect("table exists");
self.adapter self.adapter
.new_chunk( .new_chunks(
Arc::clone(cached_table), Arc::clone(cached_table),
Arc::clone(&self.parquet_file), Arc::new(vec![Arc::clone(&self.parquet_file)]),
&Predicate::new(),
MetricPruningObserver::new_unregistered(),
None, None,
) )
.await .await
.unwrap() .remove(0)
} }
/// get catalog access metrics from metric registry /// get catalog access metrics from metric registry
@ -488,7 +332,7 @@ pub mod tests {
} }
} }
fn assert_schema(chunk: &QuerierChunk) { fn assert_schema(chunk: &QuerierParquetChunk) {
let expected_schema = SchemaBuilder::new() let expected_schema = SchemaBuilder::new()
.field("field_int", DataType::Int64) .field("field_int", DataType::Int64)
.unwrap() .unwrap()
@ -500,7 +344,7 @@ pub mod tests {
assert_eq!(actual_schema.as_ref(), &expected_schema); assert_eq!(actual_schema.as_ref(), &expected_schema);
} }
fn assert_sort_key(chunk: &QuerierChunk) { fn assert_sort_key(chunk: &QuerierParquetChunk) {
let expected_sort_key = SortKeyBuilder::new() let expected_sort_key = SortKeyBuilder::new()
.with_col("tag1") .with_col("tag1")
.with_col("time") .with_col("time")
@ -509,9 +353,9 @@ pub mod tests {
assert_eq!(actual_sort_key, &expected_sort_key); assert_eq!(actual_sort_key, &expected_sort_key);
} }
async fn assert_content(chunk: &QuerierChunk, test_data: &TestData) { async fn assert_content(chunk: &QuerierParquetChunk, test_data: &TestData) {
let ctx = test_data.catalog.exec.new_context(ExecutorType::Query); let ctx = test_data.catalog.exec.new_context(ExecutorType::Query);
let parquet_store = test_data.adapter.catalog_cache.parquet_store(); let parquet_store = test_data.adapter.catalog_cache().parquet_store();
ctx.inner().runtime_env().register_object_store( ctx.inner().runtime_env().register_object_store(
"iox", "iox",
parquet_store.id(), parquet_store.id(),

View File

@ -1,4 +1,4 @@
use crate::chunk::QuerierChunk; use crate::parquet::QuerierParquetChunk;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary};
use datafusion::error::DataFusionError; use datafusion::error::DataFusionError;
use iox_query::{ use iox_query::{
@ -9,7 +9,7 @@ use predicate::Predicate;
use schema::{sort::SortKey, Projection, Schema}; use schema::{sort::SortKey, Projection, Schema};
use std::{any::Any, sync::Arc}; use std::{any::Any, sync::Arc};
impl QueryChunkMeta for QuerierChunk { impl QueryChunkMeta for QuerierParquetChunk {
fn summary(&self) -> Arc<TableSummary> { fn summary(&self) -> Arc<TableSummary> {
Arc::clone(&self.table_summary) Arc::clone(&self.table_summary)
} }
@ -35,7 +35,7 @@ impl QueryChunkMeta for QuerierChunk {
} }
} }
impl QueryChunk for QuerierChunk { impl QueryChunk for QuerierParquetChunk {
fn id(&self) -> ChunkId { fn id(&self) -> ChunkId {
self.meta().chunk_id self.meta().chunk_id
} }

View File

@ -1,24 +1,16 @@
use self::query_access::QuerierTableChunkPruner; use self::query_access::QuerierTableChunkPruner;
use self::state_reconciler::Reconciler; use self::state_reconciler::Reconciler;
use crate::table::query_access::MetricPruningObserver;
use crate::{ use crate::{
chunk::ChunkAdapter,
ingester::{self, IngesterPartition}, ingester::{self, IngesterPartition},
parquet::ChunkAdapter,
IngesterConnection, IngesterConnection,
}; };
use data_types::{ use data_types::{ColumnId, DeletePredicate, NamespaceId, PartitionId, ShardIndex, TableId};
ColumnId, DeletePredicate, NamespaceId, PartitionId, ShardIndex, TableId, TimestampMinMax,
};
use datafusion::error::DataFusionError; use datafusion::error::DataFusionError;
use futures::{join, StreamExt}; use futures::join;
use iox_query::pruning::prune_summaries;
use iox_query::util::create_basic_summary;
use iox_query::{exec::Executor, provider, provider::ChunkPruner, QueryChunk}; use iox_query::{exec::Executor, provider, provider::ChunkPruner, QueryChunk};
use observability_deps::tracing::{debug, trace}; use observability_deps::tracing::{debug, trace};
use predicate::Predicate; use predicate::Predicate;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use schema::Schema; use schema::Schema;
use sharder::JumpHash; use sharder::JumpHash;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
@ -33,6 +25,7 @@ use trace::span::{Span, SpanRecorder};
use uuid::Uuid; use uuid::Uuid;
pub use self::query_access::metrics::PruneMetrics; pub use self::query_access::metrics::PruneMetrics;
pub(crate) use self::query_access::MetricPruningObserver;
mod query_access; mod query_access;
mod state_reconciler; mod state_reconciler;
@ -40,11 +33,6 @@ mod state_reconciler;
#[cfg(test)] #[cfg(test)]
mod test_util; mod test_util;
/// Number of concurrent chunk creation jobs.
///
/// This is mostly to fetch per-partition data concurrently.
const CONCURRENT_CHUNK_CREATION_JOBS: usize = 100;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum Error { pub enum Error {
@ -348,86 +336,20 @@ impl QuerierTable {
let reconciler = Reconciler::new( let reconciler = Reconciler::new(
Arc::clone(&self.table_name), Arc::clone(&self.table_name),
Arc::clone(&self.namespace_name), Arc::clone(&self.namespace_name),
Arc::clone(&self.chunk_adapter), Arc::clone(self.chunk_adapter.catalog_cache()),
); );
// create parquet files // create parquet files
let parquet_files: Vec<_> = { let parquet_files = self
// use nested scope because we span many child scopes here and it's easier to .chunk_adapter
// aggregate / collapse in the UI .new_chunks(
let span_recorder = span_recorder.child("parquet chunks"); Arc::clone(cached_table),
Arc::clone(&parquet_files.files),
let basic_summaries: Vec<_> = parquet_files
.files
.iter()
.map(|p| {
Arc::new(create_basic_summary(
p.row_count as u64,
&cached_table.schema,
TimestampMinMax {
min: p.min_time.get(),
max: p.max_time.get(),
},
))
})
.collect();
// Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks
let keeps = match prune_summaries(
Arc::clone(&cached_table.schema),
&basic_summaries,
&predicate, &predicate,
) { MetricPruningObserver::new(Arc::clone(&self.prune_metrics)),
Ok(keeps) => keeps, span_recorder.child_span("new_chunks"),
Err(reason) => { )
// Ignore pruning failures here - the chunk pruner should have already logged them. .await;
// Just skip pruning and gather all the metadata. We have another chance to prune them
// once all the metadata is available
debug!(?reason, "Could not prune before metadata fetch");
vec![true; basic_summaries.len()]
}
};
let early_pruning_observer =
&MetricPruningObserver::new(Arc::clone(&self.prune_metrics));
// Remove any unused parquet files up front to maximize the
// concurrent catalog requests that could be outstanding
let mut parquet_files = parquet_files
.files
.iter()
.zip(keeps)
.filter_map(|(pf, keep)| {
if keep {
Some(Arc::clone(pf))
} else {
early_pruning_observer
.was_pruned_early(pf.row_count as u64, pf.file_size_bytes as u64);
None
}
})
.collect::<Vec<_>>();
// de-correlate parquet files so that subsequent items likely don't block/wait on the same cache lookup
// (they are likely ordered by partition)
let mut rng = StdRng::seed_from_u64(self.id().get() as u64);
parquet_files.shuffle(&mut rng);
futures::stream::iter(parquet_files)
.map(|cached_parquet_file| {
let span_recorder = &span_recorder;
async move {
let span = span_recorder.child_span("new_chunk");
self.chunk_adapter
.new_chunk(Arc::clone(cached_table), cached_parquet_file, span)
.await
}
})
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
.filter_map(|x| async { x })
.collect()
.await
};
let chunks = reconciler let chunks = reconciler
.reconcile( .reconcile(

View File

@ -124,4 +124,9 @@ impl PruneMetrics {
could_not_prune_df, could_not_prune_df,
} }
} }
#[cfg(test)]
pub(crate) fn new_unregistered() -> Self {
Self::new(&metric::Registry::new())
}
} }

View File

@ -19,7 +19,7 @@ use iox_query::{
use predicate::Predicate; use predicate::Predicate;
use schema::Schema; use schema::Schema;
use crate::{chunk::QuerierChunk, ingester::IngesterChunk}; use crate::{ingester::IngesterChunk, parquet::QuerierParquetChunk};
use self::metrics::PruneMetrics; use self::metrics::PruneMetrics;
@ -152,6 +152,11 @@ impl MetricPruningObserver {
Self { metrics } Self { metrics }
} }
#[cfg(test)]
pub(crate) fn new_unregistered() -> Self {
Self::new(Arc::new(PruneMetrics::new_unregistered()))
}
/// Called when pruning a chunk before fully creating the chunk structure /// Called when pruning a chunk before fully creating the chunk structure
pub(crate) fn was_pruned_early(&self, row_count: u64, size_estimate: u64) { pub(crate) fn was_pruned_early(&self, row_count: u64, size_estimate: u64) {
self.metrics.pruned_early.inc(1, row_count, size_estimate); self.metrics.pruned_early.inc(1, row_count, size_estimate);
@ -197,7 +202,7 @@ fn chunk_estimate_size(chunk: &dyn QueryChunk) -> usize {
if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() { if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() {
chunk.estimate_size() chunk.estimate_size()
} else if let Some(chunk) = chunk.downcast_ref::<QuerierChunk>() { } else if let Some(chunk) = chunk.downcast_ref::<QuerierParquetChunk>() {
chunk.estimate_size() chunk.estimate_size()
} else { } else {
panic!("Unknown chunk type") panic!("Unknown chunk type")
@ -207,7 +212,7 @@ fn chunk_estimate_size(chunk: &dyn QueryChunk) -> usize {
fn chunk_rows(chunk: &dyn QueryChunk) -> usize { fn chunk_rows(chunk: &dyn QueryChunk) -> usize {
let chunk = chunk.as_any(); let chunk = chunk.as_any();
if let Some(chunk) = chunk.downcast_ref::<QuerierChunk>() { if let Some(chunk) = chunk.downcast_ref::<QuerierParquetChunk>() {
chunk.rows() chunk.rows()
} else if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() { } else if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() {
chunk.rows() chunk.rows()

View File

@ -14,10 +14,8 @@ use std::{
use trace::span::{Span, SpanRecorder}; use trace::span::{Span, SpanRecorder};
use crate::{ use crate::{
chunk::{ChunkAdapter, QuerierChunk}, cache::CatalogCache, ingester::IngesterChunk, parquet::QuerierParquetChunk,
ingester::IngesterChunk, tombstone::QuerierTombstone, IngesterPartition,
tombstone::QuerierTombstone,
IngesterPartition,
}; };
use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo}; use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo};
@ -34,19 +32,19 @@ pub enum ReconcileError {
pub struct Reconciler { pub struct Reconciler {
table_name: Arc<str>, table_name: Arc<str>,
namespace_name: Arc<str>, namespace_name: Arc<str>,
chunk_adapter: Arc<ChunkAdapter>, catalog_cache: Arc<CatalogCache>,
} }
impl Reconciler { impl Reconciler {
pub(crate) fn new( pub(crate) fn new(
table_name: Arc<str>, table_name: Arc<str>,
namespace_name: Arc<str>, namespace_name: Arc<str>,
chunk_adapter: Arc<ChunkAdapter>, catalog_cache: Arc<CatalogCache>,
) -> Self { ) -> Self {
Self { Self {
table_name, table_name,
namespace_name, namespace_name,
chunk_adapter, catalog_cache,
} }
} }
@ -57,7 +55,7 @@ impl Reconciler {
ingester_partitions: Vec<IngesterPartition>, ingester_partitions: Vec<IngesterPartition>,
tombstones: Vec<Arc<Tombstone>>, tombstones: Vec<Arc<Tombstone>>,
retention_delete_pred: Option<DeletePredicate>, retention_delete_pred: Option<DeletePredicate>,
parquet_files: Vec<QuerierChunk>, parquet_files: Vec<QuerierParquetChunk>,
span: Option<Span>, span: Option<Span>,
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> { ) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
let span_recorder = SpanRecorder::new(span); let span_recorder = SpanRecorder::new(span);
@ -88,7 +86,7 @@ impl Reconciler {
ingester_partitions: &[IngesterPartition], ingester_partitions: &[IngesterPartition],
tombstones: Vec<Arc<Tombstone>>, tombstones: Vec<Arc<Tombstone>>,
retention_delete_pred: Option<DeletePredicate>, retention_delete_pred: Option<DeletePredicate>,
parquet_files: Vec<QuerierChunk>, parquet_files: Vec<QuerierParquetChunk>,
span: Option<Span>, span: Option<Span>,
) -> Result<Vec<Box<dyn UpdatableQuerierChunk>>, ReconcileError> { ) -> Result<Vec<Box<dyn UpdatableQuerierChunk>>, ReconcileError> {
let span_recorder = SpanRecorder::new(span); let span_recorder = SpanRecorder::new(span);
@ -180,8 +178,7 @@ impl Reconciler {
// check if tombstone is marked as processed // check if tombstone is marked as processed
if self if self
.chunk_adapter .catalog_cache
.catalog_cache()
.processed_tombstones() .processed_tombstones()
.exists( .exists(
chunk.meta().parquet_file_id(), chunk.meta().parquet_file_id(),
@ -281,7 +278,7 @@ trait UpdatableQuerierChunk: QueryChunk {
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk>; fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk>;
} }
impl UpdatableQuerierChunk for QuerierChunk { impl UpdatableQuerierChunk for QuerierParquetChunk {
fn partition_sort_key_arc(&self) -> Option<Arc<SortKey>> { fn partition_sort_key_arc(&self) -> Option<Arc<SortKey>> {
self.partition_sort_key_arc() self.partition_sort_key_arc()
} }

View File

@ -1,6 +1,6 @@
//! Interface for reconciling Ingester and catalog state //! Interface for reconciling Ingester and catalog state
use crate::{chunk::QuerierChunk, ingester::IngesterPartition}; use crate::{ingester::IngesterPartition, parquet::QuerierParquetChunk};
use data_types::{ use data_types::{
CompactionLevel, ParquetFile, PartitionId, SequenceNumber, ShardId, Tombstone, TombstoneId, CompactionLevel, ParquetFile, PartitionId, SequenceNumber, ShardId, Tombstone, TombstoneId,
}; };
@ -78,7 +78,7 @@ impl ParquetFileInfo for Arc<ParquetFile> {
} }
} }
impl ParquetFileInfo for QuerierChunk { impl ParquetFileInfo for QuerierParquetChunk {
fn partition_id(&self) -> PartitionId { fn partition_id(&self) -> PartitionId {
self.meta().partition_id() self.meta().partition_id()
} }

View File

@ -1,6 +1,6 @@
use super::{PruneMetrics, QuerierTable, QuerierTableArgs}; use super::{PruneMetrics, QuerierTable, QuerierTableArgs};
use crate::{ use crate::{
cache::CatalogCache, chunk::ChunkAdapter, create_ingester_connection_for_testing, cache::CatalogCache, create_ingester_connection_for_testing, parquet::ChunkAdapter,
IngesterPartition, IngesterPartition,
}; };
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;