Merge pull request #4709 from influxdata/cn/fetch-from-parquet-file

feat: Make a QuerierRBChunk wrapper that implement QueryChunk and QueryChunkMeta
pull/24376/head
kodiakhq[bot] 2022-05-27 17:14:05 +00:00 committed by GitHub
commit 0a84727c72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 624 additions and 87 deletions

View File

@ -1102,7 +1102,7 @@ mod tests {
use iox_time::SystemProvider;
use querier::{
cache::CatalogCache,
chunk::{collect_read_filter, ParquetChunkAdapter},
chunk::{collect_read_filter, ChunkAdapter},
};
use std::sync::atomic::{AtomicI64, Ordering};
@ -1208,7 +1208,7 @@ mod tests {
// ------------------------------------------------
// Verify the parquet file content
let adapter = ParquetChunkAdapter::new(
let adapter = ChunkAdapter::new(
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
@ -1429,7 +1429,7 @@ mod tests {
// ------------------------------------------------
// Verify the parquet file content
let adapter = ParquetChunkAdapter::new(
let adapter = ChunkAdapter::new(
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),

View File

@ -13,7 +13,10 @@
use influxdb_line_protocol::FieldValue;
use observability_deps::tracing::warn;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
use schema::{
builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema,
TIME_COLUMN_NAME,
};
use snafu::{ResultExt, Snafu};
use std::{
borrow::{Borrow, Cow},
@ -1923,6 +1926,22 @@ impl TableSummary {
size + mem::size_of::<Self>() // Add size of this struct that points to
// table and ColumnSummary
}
/// Extracts min/max values of the timestamp column, if possible
pub fn time_range(&self) -> Option<TimestampMinMax> {
self.column(TIME_COLUMN_NAME).and_then(|c| {
if let Statistics::I64(StatValues {
min: Some(min),
max: Some(max),
..
}) = &c.stats
{
Some(TimestampMinMax::new(*min, *max))
} else {
None
}
})
}
}
/// Kafka partition ID plus offset

View File

@ -6,11 +6,11 @@ use crate::{
storage::ParquetStorage,
};
use data_types::{
ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange,
ParquetFile, ParquetFileWithMetadata, TableSummary, TimestampMinMax, TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use predicate::Predicate;
use schema::{selection::Selection, Schema, TIME_COLUMN_NAME};
use schema::{selection::Selection, Schema};
use std::{collections::BTreeSet, mem, sync::Arc};
#[derive(Debug)]
@ -83,7 +83,7 @@ impl ParquetChunk {
let columns = decoded.read_statistics(&schema).unwrap();
let table_summary = TableSummary { columns };
let rows = decoded.row_count();
let timestamp_min_max = extract_range(&table_summary);
let timestamp_min_max = table_summary.time_range();
let file_size_bytes = decoded_parquet_file.parquet_file.file_size_bytes as usize;
Self {
@ -186,18 +186,6 @@ impl ParquetChunk {
}
}
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
fn extract_range(table_summary: &TableSummary) -> Option<TimestampMinMax> {
table_summary.column(TIME_COLUMN_NAME).and_then(|c| {
if let Statistics::I64(s) = &c.stats {
if let (Some(min), Some(max)) = (s.min, s.max) {
return Some(TimestampMinMax::new(min, max));
}
}
None
})
}
/// Parquet file with decoded metadata.
#[derive(Debug)]
#[allow(missing_docs)]

View File

@ -7,8 +7,8 @@ use std::sync::Arc;
use self::{
namespace::NamespaceCache, parquet_file::ParquetFileCache, partition::PartitionCache,
processed_tombstones::ProcessedTombstonesCache, ram::RamSize, table::TableCache,
tombstones::TombstoneCache,
processed_tombstones::ProcessedTombstonesCache, ram::RamSize, read_buffer::ReadBufferCache,
table::TableCache, tombstones::TombstoneCache,
};
pub mod namespace;
@ -161,4 +161,9 @@ impl CatalogCache {
pub(crate) fn tombstone(&self) -> &TombstoneCache {
&self.tombstone_cache
}
/// Read buffer chunk cache.
pub(crate) fn read_buffer(&self) -> &ReadBufferCache {
unimplemented!("Deliberately not hooking up this cache yet");
}
}

View File

@ -15,6 +15,7 @@ use data_types::{ParquetFile, ParquetFileId};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use iox_time::TimeProvider;
use parquet_file::chunk::DecodedParquetFile;
use read_buffer::RBChunk;
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, mem, sync::Arc};
@ -84,9 +85,11 @@ impl ReadBufferCache {
Self { cache, _backend }
}
/// Get read buffer chunks by Parquet file id
pub async fn get(&self, parquet_file_id: ParquetFileId) -> Arc<RBChunk> {
self.cache.get(parquet_file_id).await
/// Get read buffer chunks from the cache or the Parquet file
pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc<RBChunk> {
let parquet_file = &decoded_parquet_file.parquet_file;
self.cache.get(parquet_file.id).await
}
}

View File

@ -4,7 +4,7 @@ use crate::cache::CatalogCache;
use arrow::record_batch::RecordBatch;
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, ParquetFileId, ParquetFileWithMetadata, PartitionId,
SequenceNumber, SequencerId, TimestampMinMax,
SequenceNumber, SequencerId, TableSummary, TimestampMinMax, TimestampRange,
};
use futures::StreamExt;
use iox_catalog::interface::Catalog;
@ -14,7 +14,8 @@ use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk},
storage::ParquetStorage,
};
use schema::{selection::Selection, sort::SortKey};
use read_buffer::RBChunk;
use schema::{selection::Selection, sort::SortKey, Schema};
use std::sync::Arc;
use uuid::Uuid;
@ -80,6 +81,98 @@ impl ChunkMeta {
}
}
/// Chunk representation of `read_buffer::RBChunk`s for the querier.
#[derive(Debug)]
pub struct QuerierRBChunk {
/// ID of the Parquet file of the chunk
parquet_file_id: ParquetFileId,
/// Underlying read buffer chunk
rb_chunk: Arc<RBChunk>,
/// Table summary
table_summary: TableSummary,
/// min/max time range of this table (extracted from TableSummary), if known
timestamp_min_max: Option<TimestampMinMax>,
/// Immutable chunk metadata
meta: Arc<ChunkMeta>,
/// Schema of the chunk
schema: Arc<Schema>,
/// Delete predicates to be combined with the chunk
delete_predicates: Vec<Arc<DeletePredicate>>,
/// Partition sort key (how does the read buffer use this?)
partition_sort_key: Arc<Option<SortKey>>,
}
impl QuerierRBChunk {
/// Create new read-buffer-backed chunk
pub fn new(
parquet_file_id: ParquetFileId,
rb_chunk: Arc<RBChunk>,
meta: Arc<ChunkMeta>,
schema: Arc<Schema>,
partition_sort_key: Arc<Option<SortKey>>,
) -> Self {
let table_summary = rb_chunk.table_summary();
let timestamp_min_max = table_summary.time_range();
Self {
parquet_file_id,
rb_chunk,
table_summary,
timestamp_min_max,
meta,
schema,
delete_predicates: Vec::new(),
partition_sort_key,
}
}
/// Set delete predicates of the given chunk.
pub fn with_delete_predicates(self, delete_predicates: Vec<Arc<DeletePredicate>>) -> Self {
Self {
delete_predicates,
..self
}
}
/// Get metadata attached to the given chunk.
pub fn meta(&self) -> &ChunkMeta {
self.meta.as_ref()
}
/// Parquet file ID
pub fn parquet_file_id(&self) -> ParquetFileId {
self.parquet_file_id
}
/// Set partition sort key
pub fn with_partition_sort_key(self, partition_sort_key: Arc<Option<SortKey>>) -> Self {
Self {
partition_sort_key,
..self
}
}
/// Return true if this chunk contains values within the time range, or if the range is `None`.
pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool {
match (self.timestamp_min_max, timestamp_range) {
(Some(timestamp_min_max), Some(timestamp_range)) => {
timestamp_min_max.overlaps(*timestamp_range)
}
// If this chunk doesn't have a time column it can't match
(None, Some(_)) => false,
// If there no range specified,
(_, None) => true,
}
}
}
/// Chunk representation of Parquet file chunks for the querier.
///
/// These chunks are usually created on-demand. The querier cache system does not really have a
@ -160,7 +253,7 @@ impl QuerierParquetChunk {
/// Adapter that can create chunks.
#[derive(Debug)]
pub struct ParquetChunkAdapter {
pub struct ChunkAdapter {
/// Cache
catalog_cache: Arc<CatalogCache>,
@ -175,7 +268,7 @@ pub struct ParquetChunkAdapter {
time_provider: Arc<dyn TimeProvider>,
}
impl ParquetChunkAdapter {
impl ChunkAdapter {
/// Create new adapter with empty cache.
pub fn new(
catalog_cache: Arc<CatalogCache>,
@ -276,6 +369,67 @@ impl ParquetChunkAdapter {
partition_sort_key,
))
}
/// Create read buffer chunk. May be from the cache, may be from the parquet file.
pub async fn new_rb_chunk(
&self,
decoded_parquet_file: &DecodedParquetFile,
) -> Option<QuerierRBChunk> {
let parquet_file = &decoded_parquet_file.parquet_file;
let rb_chunk = self
.catalog_cache()
.read_buffer()
.get(decoded_parquet_file)
.await;
let decoded = decoded_parquet_file
.parquet_metadata
.as_ref()
.decode()
.unwrap();
let schema = decoded.read_schema().unwrap();
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
let table_name = self
.catalog_cache
.table()
.name(parquet_file.table_id)
.await?;
let iox_metadata = &decoded_parquet_file.iox_metadata;
// Somewhat hacky workaround because of implicit chunk orders, use min sequence number and
// hope it doesn't overflow u32. Order is non-zero, se we need to add 1.
let order = ChunkOrder::new(1 + iox_metadata.min_sequence_number.get() as u32)
.expect("cannot be zero");
// Read partition sort key
let partition_sort_key = self
.catalog_cache()
.partition()
.sort_key(iox_metadata.partition_id)
.await;
let meta = Arc::new(ChunkMeta {
chunk_id,
table_name,
order,
sort_key: iox_metadata.sort_key.clone(),
sequencer_id: iox_metadata.sequencer_id,
partition_id: iox_metadata.partition_id,
min_sequence_number: parquet_file.min_sequence_number,
max_sequence_number: parquet_file.max_sequence_number,
});
Some(QuerierRBChunk::new(
parquet_file.id,
rb_chunk,
meta,
schema,
partition_sort_key,
))
}
}
/// collect data for the given chunk
@ -307,7 +461,7 @@ pub mod tests {
async fn test_create_record() {
let catalog = TestCatalog::new();
let adapter = ParquetChunkAdapter::new(
let adapter = ChunkAdapter::new(
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),

View File

@ -1,21 +1,44 @@
use crate::chunk::QuerierParquetChunk;
use crate::chunk::{QuerierParquetChunk, QuerierRBChunk};
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch};
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
};
use datafusion::physical_plan::RecordBatchStream;
use iox_query::{QueryChunk, QueryChunkError, QueryChunkMeta};
use observability_deps::tracing::debug;
use predicate::PredicateMatch;
use read_buffer::ReadFilterResults;
use schema::{sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
task::{Context, Poll},
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))]
ParquetFileChunkError {
ParquetFileChunk {
source: parquet_file::storage::ReadError,
chunk_id: ChunkId,
},
#[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, source))]
RBChunk {
source: read_buffer::Error,
chunk_id: ChunkId,
},
#[snafu(display(
"Could not find column name '{}' in read buffer column_values results for chunk {}",
column_name,
chunk_id,
))]
ColumnNameNotFound {
column_name: String,
chunk_id: ChunkId,
},
}
impl QueryChunkMeta for QuerierParquetChunk {
@ -135,3 +158,328 @@ impl QueryChunk for QuerierParquetChunk {
self.meta().order()
}
}
impl QueryChunkMeta for QuerierRBChunk {
fn summary(&self) -> Option<&TableSummary> {
Some(&self.table_summary)
}
fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.partition_sort_key.as_ref().as_ref()
}
fn partition_id(&self) -> Option<PartitionId> {
Some(self.meta.partition_id())
}
fn sort_key(&self) -> Option<&SortKey> {
self.meta().sort_key()
}
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
&self.delete_predicates
}
fn timestamp_min_max(&self) -> Option<TimestampMinMax> {
self.timestamp_min_max
}
}
impl QueryChunk for QuerierRBChunk {
fn id(&self) -> ChunkId {
self.meta().chunk_id
}
fn table_name(&self) -> &str {
self.meta().table_name.as_ref()
}
fn may_contain_pk_duplicates(&self) -> bool {
false
}
fn apply_predicate_to_metadata(
&self,
predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, QueryChunkError> {
let pred_result = if predicate.has_exprs() || self.has_timerange(predicate.range.as_ref()) {
PredicateMatch::Unknown
} else {
PredicateMatch::Zero
};
Ok(pred_result)
}
fn column_names(
&self,
mut ctx: iox_query::exec::IOxSessionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<iox_query::exec::stringset::StringSet>, QueryChunkError> {
ctx.set_metadata("storage", "read_buffer");
ctx.set_metadata("projection", format!("{}", columns));
ctx.set_metadata("predicate", format!("{}", &predicate));
let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(
?predicate,
%e,
"read buffer predicate not supported for column_names, falling back"
);
return Ok(None);
}
};
ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate));
// TODO(edd): wire up delete predicates to be pushed down to
// the read buffer.
let names = self
.rb_chunk
.column_names(rb_predicate, vec![], columns, BTreeSet::new())
.context(RBChunkSnafu {
chunk_id: self.id(),
})?;
ctx.set_metadata("output_values", names.len() as i64);
Ok(Some(names))
}
fn column_values(
&self,
mut ctx: iox_query::exec::IOxSessionContext,
column_name: &str,
predicate: &predicate::Predicate,
) -> Result<Option<iox_query::exec::stringset::StringSet>, QueryChunkError> {
ctx.set_metadata("storage", "read_buffer");
ctx.set_metadata("column_name", column_name.to_string());
ctx.set_metadata("predicate", format!("{}", &predicate));
let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(
?predicate,
%e,
"read buffer predicate not supported for column_values, falling back"
);
return Ok(None);
}
};
ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate));
let mut values = self.rb_chunk.column_values(
rb_predicate,
schema::selection::Selection::Some(&[column_name]),
BTreeMap::new(),
)?;
// The InfluxRPC frontend only supports getting column values
// for one column at a time (this is a restriction on the Influx
// Read gRPC API too). However, the Read Buffer supports multiple
// columns and will return a map - we just need to pull the
// column out to get the set of values.
let values = values
.remove(column_name)
.context(ColumnNameNotFoundSnafu {
chunk_id: self.id(),
column_name,
})?;
ctx.set_metadata("output_values", values.len() as i64);
Ok(Some(values))
}
fn read_filter(
&self,
mut ctx: iox_query::exec::IOxSessionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, QueryChunkError> {
let delete_predicates: Vec<_> = self
.delete_predicates()
.iter()
.map(|pred| Arc::new(pred.as_ref().clone().into()))
.collect();
ctx.set_metadata("delete_predicates", delete_predicates.len() as i64);
// merge the negated delete predicates into the select predicate
let mut pred_with_deleted_exprs = predicate.clone();
pred_with_deleted_exprs.merge_delete_predicates(&delete_predicates);
debug!(?pred_with_deleted_exprs, "Merged negated predicate");
ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs));
ctx.set_metadata("storage", "read_buffer");
ctx.set_metadata("projection", format!("{}", selection));
// Only apply pushdownable predicates
let rb_predicate = self
.rb_chunk
// A predicate unsupported by the Read Buffer or against this chunk's schema is
// replaced with a default empty predicate.
.validate_predicate(to_read_buffer_predicate(predicate).unwrap_or_default())
.unwrap_or_default();
debug!(?rb_predicate, "RB predicate");
ctx.set_metadata("predicate", format!("{}", &rb_predicate));
// combine all delete expressions to RB's negated ones
let negated_delete_exprs = to_read_buffer_negated_predicates(&delete_predicates)?
.into_iter()
// Any delete predicates unsupported by the Read Buffer will be elided.
.filter_map(|p| self.rb_chunk.validate_predicate(p).ok())
.collect::<Vec<_>>();
debug!(?negated_delete_exprs, "Negated Predicate pushed down to RB");
let read_results = self
.rb_chunk
.read_filter(rb_predicate, selection, negated_delete_exprs)
.context(RBChunkSnafu {
chunk_id: self.id(),
})?;
let schema = self
.rb_chunk
.read_filter_table_schema(selection)
.context(RBChunkSnafu {
chunk_id: self.id(),
})?;
Ok(Box::pin(ReadFilterResultsStream::new(
ctx,
read_results,
schema.into(),
)))
}
fn chunk_type(&self) -> &str {
"read_buffer"
}
fn order(&self) -> ChunkOrder {
self.meta().order()
}
}
#[derive(Debug)]
struct ReadBufferPredicateConversionError {
msg: String,
predicate: predicate::Predicate,
}
impl std::fmt::Display for ReadBufferPredicateConversionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Error translating predicate: {}, {:#?}",
self.msg, self.predicate
)
}
}
impl std::error::Error for ReadBufferPredicateConversionError {}
/// Converts a [`predicate::Predicate`] into [`read_buffer::Predicate`], suitable for evaluating on
/// the ReadBuffer.
///
/// NOTE: a valid Read Buffer predicate is not guaranteed to be applicable to an arbitrary Read
/// Buffer chunk, because the applicability of a predicate depends on the schema of the chunk.
///
/// Callers should validate predicates against chunks they are to be executed against using
/// `read_buffer::Chunk::validate_predicate`
fn to_read_buffer_predicate(
predicate: &predicate::Predicate,
) -> Result<read_buffer::Predicate, ReadBufferPredicateConversionError> {
// Try to convert non-time column expressions into binary expressions that are compatible with
// the read buffer.
match predicate
.exprs
.iter()
.map(read_buffer::BinaryExpr::try_from)
.collect::<Result<Vec<_>, _>>()
{
Ok(exprs) => {
// Construct a `ReadBuffer` predicate with or without InfluxDB-specific expressions on
// the time column.
Ok(match predicate.range {
Some(range) => {
read_buffer::Predicate::with_time_range(&exprs, range.start(), range.end())
}
None => read_buffer::Predicate::new(exprs),
})
}
Err(e) => Err(ReadBufferPredicateConversionError {
msg: e,
predicate: predicate.clone(),
}),
}
}
/// NOTE: valid Read Buffer predicates are not guaranteed to be applicable to an arbitrary Read
/// Buffer chunk, because the applicability of a predicate depends on the schema of the chunk.
/// Callers should validate predicates against chunks they are to be executed against using
/// `read_buffer::Chunk::validate_predicate`
fn to_read_buffer_negated_predicates(
delete_predicates: &[Arc<predicate::Predicate>],
) -> Result<Vec<read_buffer::Predicate>, ReadBufferPredicateConversionError> {
let mut rb_preds: Vec<read_buffer::Predicate> = vec![];
for pred in delete_predicates {
let rb_pred = to_read_buffer_predicate(pred)?;
rb_preds.push(rb_pred);
}
debug!(?rb_preds, "read buffer delete predicates");
Ok(rb_preds)
}
/// Adapter which will take a ReadFilterResults and make it an async stream
pub struct ReadFilterResultsStream {
read_results: ReadFilterResults,
schema: SchemaRef,
ctx: iox_query::exec::IOxSessionContext,
}
impl ReadFilterResultsStream {
pub fn new(
ctx: iox_query::exec::IOxSessionContext,
read_results: ReadFilterResults,
schema: SchemaRef,
) -> Self {
Self {
ctx,
read_results,
schema,
}
}
}
impl RecordBatchStream for ReadFilterResultsStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
impl futures::Stream for ReadFilterResultsStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut ctx = self.ctx.child_ctx("next_row_group");
let rb = self.read_results.next();
if let Some(rb) = &rb {
ctx.set_metadata("output_rows", rb.num_rows() as i64);
}
Poll::Ready(Ok(rb).transpose())
}
// TODO is there a useful size_hint to pass?
}

View File

@ -1,7 +1,7 @@
//! Database for the querier that contains all namespaces.
use crate::{
cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection,
cache::CatalogCache, chunk::ChunkAdapter, ingester::IngesterConnection,
namespace::QuerierNamespace, query_log::QueryLog,
};
use async_trait::async_trait;
@ -29,7 +29,7 @@ pub struct QuerierDatabase {
catalog_cache: Arc<CatalogCache>,
/// Adapter to create chunks.
chunk_adapter: Arc<ParquetChunkAdapter>,
chunk_adapter: Arc<ChunkAdapter>,
/// Metric registry
#[allow(dead_code)]
@ -63,7 +63,7 @@ impl QuerierDatabase {
exec: Arc<Executor>,
ingester_connection: Arc<dyn IngesterConnection>,
) -> Self {
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
let chunk_adapter = Arc::new(ChunkAdapter::new(
Arc::clone(&catalog_cache),
store,
Arc::clone(&metric_registry),

View File

@ -1,8 +1,8 @@
//! Namespace within the whole database.
use crate::{
cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection,
query_log::QueryLog, table::QuerierTable,
cache::CatalogCache, chunk::ChunkAdapter, ingester::IngesterConnection, query_log::QueryLog,
table::QuerierTable,
};
use data_types::{NamespaceId, NamespaceSchema};
use iox_query::exec::Executor;
@ -46,7 +46,7 @@ pub struct QuerierNamespace {
impl QuerierNamespace {
/// Create new namespace for given schema.
pub fn new(
chunk_adapter: Arc<ParquetChunkAdapter>,
chunk_adapter: Arc<ChunkAdapter>,
schema: Arc<NamespaceSchema>,
name: Arc<str>,
exec: Arc<Executor>,
@ -98,7 +98,7 @@ impl QuerierNamespace {
ingester_connection: Arc<dyn IngesterConnection>,
) -> Self {
let time_provider = catalog_cache.time_provider();
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
let chunk_adapter = Arc::new(ChunkAdapter::new(
catalog_cache,
store,
metric_registry,

View File

@ -1,7 +1,7 @@
use self::query_access::QuerierTableChunkPruner;
use self::state_reconciler::Reconciler;
use crate::{
chunk::ParquetChunkAdapter,
chunk::ChunkAdapter,
ingester::{self, IngesterPartition},
IngesterConnection,
};
@ -68,7 +68,7 @@ pub struct QuerierTable {
ingester_connection: Arc<dyn IngesterConnection>,
/// Interface to create chunks for this table.
chunk_adapter: Arc<ParquetChunkAdapter>,
chunk_adapter: Arc<ChunkAdapter>,
/// Handle reconciling ingester and catalog data
reconciler: Reconciler,
@ -82,7 +82,7 @@ impl QuerierTable {
table_name: Arc<str>,
schema: Arc<Schema>,
ingester_connection: Arc<dyn IngesterConnection>,
chunk_adapter: Arc<ParquetChunkAdapter>,
chunk_adapter: Arc<ChunkAdapter>,
) -> Self {
let reconciler = Reconciler::new(
Arc::clone(&table_name),

View File

@ -14,7 +14,7 @@ use std::{
use crate::{
cache::parquet_file::CachedParquetFiles,
chunk::{ParquetChunkAdapter, QuerierParquetChunk},
chunk::{ChunkAdapter, QuerierParquetChunk, QuerierRBChunk},
tombstone::QuerierTombstone,
IngesterPartition,
};
@ -32,14 +32,14 @@ pub enum ReconcileError {
pub struct Reconciler {
table_name: Arc<str>,
namespace_name: Arc<str>,
chunk_adapter: Arc<ParquetChunkAdapter>,
chunk_adapter: Arc<ChunkAdapter>,
}
impl Reconciler {
pub(crate) fn new(
table_name: Arc<str>,
namespace_name: Arc<str>,
chunk_adapter: Arc<ParquetChunkAdapter>,
chunk_adapter: Arc<ChunkAdapter>,
) -> Self {
Self {
table_name,
@ -48,9 +48,8 @@ impl Reconciler {
}
}
/// Reconciles ingester state (ingester_partitions) and catalog
/// state (parquet_files and tombstones), producing a list of
/// chunks to query
/// Reconciles ingester state (ingester_partitions) and catalog state (parquet_files and
/// tombstones), producing a list of chunks to query
pub(crate) async fn reconcile(
&self,
ingester_partitions: Vec<IngesterPartition>,
@ -58,7 +57,7 @@ impl Reconciler {
parquet_files: Arc<CachedParquetFiles>,
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
let mut chunks = self
.build_parquet_chunks(&ingester_partitions, tombstones, parquet_files)
.build_chunks_from_parquet(&ingester_partitions, tombstones, parquet_files)
.await?;
chunks.extend(self.build_ingester_chunks(ingester_partitions));
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
@ -73,7 +72,7 @@ impl Reconciler {
Ok(chunks)
}
async fn build_parquet_chunks(
async fn build_chunks_from_parquet(
&self,
ingester_partitions: &[IngesterPartition],
tombstones: Vec<Arc<Tombstone>>,
@ -112,29 +111,30 @@ impl Reconciler {
"Parquet files after filtering"
);
// convert parquet files and tombstones into QuerierParquetChunks
let mut parquet_chunks = Vec::with_capacity(parquet_files.len());
// convert parquet files and tombstones into chunks
let mut chunks_from_parquet = Vec::with_capacity(parquet_files.len());
for cached_parquet_file in parquet_files {
if let Some(chunk) = self
.chunk_adapter
.new_querier_parquet_chunk(&cached_parquet_file)
.await
{
parquet_chunks.push(chunk);
chunks_from_parquet.push(chunk);
}
}
debug!(num_chunks=%parquet_chunks.len(), "Created parquet chunks");
debug!(num_chunks=%chunks_from_parquet.len(), "Created chunks from parquet files");
let mut chunks: Vec<Box<dyn UpdatableQuerierChunk>> =
Vec::with_capacity(parquet_chunks.len() + ingester_partitions.len());
Vec::with_capacity(chunks_from_parquet.len() + ingester_partitions.len());
for chunk in parquet_chunks.into_iter() {
for chunk in chunks_from_parquet.into_iter() {
let chunk = if let Some(tombstones) =
tombstones_by_sequencer.get(&chunk.meta().sequencer_id())
{
let mut delete_predicates = Vec::with_capacity(tombstones.len());
for tombstone in tombstones {
// check conditions that don't need catalog access first to avoid unnecessary catalog load
// check conditions that don't need catalog access first to avoid unnecessary
// catalog load
// Check if tombstone should be excluded based on the ingester response
if tombstone_exclusion
@ -143,28 +143,31 @@ impl Reconciler {
continue;
}
// Check if tombstone even applies to the sequence number range within the parquet file. There
// Check if tombstone even applies to the sequence number range within the
// parquet file. There
// are the following cases here:
//
// 1. Tombstone comes before chunk min sequencer number:
// There is no way the tombstone can affect the chunk.
// 2. Tombstone comes after chunk max sequencer number:
// Tombstone affects whole chunk (it might be marked as processed though, we'll check that
// further down).
// Tombstone affects whole chunk (it might be marked as processed though,
// we'll check that further down).
// 3. Tombstone is in the min-max sequencer number range of the chunk:
// Technically the querier has NO way to determine the rows that are affected by the tombstone
// since we have no row-level sequence numbers. Such a file can be created by two sources -- the
// ingester and the compactor. The ingester must have materialized the tombstone while creating
// the parquet file, so the querier can skip it. The compactor also materialized the tombstones,
// so we can skip it as well. In the compactor case the tombstone will even be marked as
// processed.
// Technically the querier has NO way to determine the rows that are
// affected by the tombstone since we have no row-level sequence numbers.
// Such a file can be created by two sources -- the ingester and the
// compactor. The ingester must have materialized the tombstone while
// creating the parquet file, so the querier can skip it. The compactor also
// materialized the tombstones, so we can skip it as well. In the compactor
// case the tombstone will even be marked as processed.
//
// So the querier only needs to consider the tombstone in case 2.
if tombstone.sequence_number() <= chunk.meta().max_sequence_number() {
continue;
}
// TODO: also consider time ranges (https://github.com/influxdata/influxdb_iox/issues/4086)
// TODO: also consider time ranges
// (https://github.com/influxdata/influxdb_iox/issues/4086)
// check if tombstone is marked as processed
if self
@ -196,7 +199,8 @@ impl Reconciler {
) -> impl Iterator<Item = Box<dyn UpdatableQuerierChunk>> {
// Add ingester chunks to the overall chunk list.
// - filter out chunks that don't have any record batches
// - tombstones don't need to be applied since they were already materialized by the ingester
// - tombstones don't need to be applied since they were already materialized by the
// ingester
ingester_partitions
.into_iter()
.filter(|c| c.has_batches())
@ -287,6 +291,19 @@ impl UpdatableQuerierChunk for QuerierParquetChunk {
}
}
impl UpdatableQuerierChunk for QuerierRBChunk {
fn update_partition_sort_key(
self: Box<Self>,
sort_key: Arc<Option<SortKey>>,
) -> Box<dyn UpdatableQuerierChunk> {
Box::new(self.with_partition_sort_key(sort_key))
}
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk> {
self as _
}
}
impl UpdatableQuerierChunk for IngesterPartition {
fn update_partition_sort_key(
self: Box<Self>,
@ -304,14 +321,15 @@ impl UpdatableQuerierChunk for IngesterPartition {
///
/// The caller may only use the returned parquet files.
///
/// This will remove files that are part of the catalog but that contain data that the ingester persisted AFTER the
/// querier contacted it. See module-level documentation about the order in which the communication and the information
/// processing should take place.
/// This will remove files that are part of the catalog but that contain data that the ingester
/// persisted AFTER the querier contacted it. See module-level documentation about the order in
/// which the communication and the information processing should take place.
///
/// Note that the querier (and this method) do NOT care about the actual age of the parquet files, since the compactor
/// is free to to process files at any given moment (e.g. to combine them or to materialize tombstones). However if the
/// compactor combines files in a way that the querier would need to split it into "desired" data and "too new" data
/// then we will currently bail out with [`ReconcileError`].
/// Note that the querier (and this method) do NOT care about the actual age of the parquet files,
/// since the compactor is free to to process files at any given moment (e.g. to combine them or to
/// materialize tombstones). However if the compactor combines files in a way that the querier
/// would need to split it into "desired" data and "too new" data then we will currently bail out
/// with [`ReconcileError`].
fn filter_parquet_files<I, P>(
ingester_partitions: &[I],
parquet_files: Vec<P>,
@ -322,8 +340,8 @@ where
{
// Build partition-based lookup table.
//
// Note that we don't need to take the sequencer ID into account here because each partition is not only bound to a
// table but also to a sequencer.
// Note that we don't need to take the sequencer ID into account here because each partition is
// not only bound to a table but also to a sequencer.
let lookup_table: HashMap<PartitionId, &I> = ingester_partitions
.iter()
.map(|i| (i.partition_id(), i))
@ -368,7 +386,8 @@ where
file_min_seq_num=%file.min_sequence_number().get(),
"partition was not flagged by the ingester as unpersisted"
);
// partition was not flagged by the ingester as "unpersisted", so we can keep the parquet file
// partition was not flagged by the ingester as "unpersisted", so we can keep the
// parquet file
}
result.push(file);
@ -379,9 +398,10 @@ where
/// Generates "exclude" filter for tombstones.
///
/// Since tombstones are sequencer-wide but data persistence is partition-based (which are sub-units of sequencers), we
/// cannot just remove tombstones entirely but need to decide on a per-partition basis. This function generates a lookup
/// table of partition-tombstone tuples that later need to be EXCLUDED/IGNORED when pairing tombstones with chunks.
/// Since tombstones are sequencer-wide but data persistence is partition-based (which are
/// sub-units of sequencers), we cannot just remove tombstones entirely but need to decide on a
/// per-partition basis. This function generates a lookup table of partition-tombstone tuples that
/// later need to be EXCLUDED/IGNORED when pairing tombstones with chunks.
fn tombstone_exclude_list<I, T>(
ingester_partitions: &[I],
tombstones: &[T],
@ -411,7 +431,8 @@ where
// in persisted range => keep
}
} else {
// partition has no persisted data at all => need to exclude tombstone which is too new
// partition has no persisted data at all => need to exclude tombstone which is
// too new
exclude.insert((p.partition_id(), t.id()));
}
}
@ -423,11 +444,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use data_types::SequenceNumber;
use super::*;
#[test]
fn test_filter_parquet_files_empty() {
let actual =

View File

@ -9,7 +9,7 @@ use parquet_file::storage::ParquetStorage;
use schema::{selection::Selection, sort::SortKey, Schema};
use crate::{
cache::CatalogCache, chunk::ParquetChunkAdapter, create_ingester_connection_for_testing,
cache::CatalogCache, chunk::ChunkAdapter, create_ingester_connection_for_testing,
IngesterPartition,
};
@ -23,7 +23,7 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
catalog.metric_registry(),
usize::MAX,
));
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
let chunk_adapter = Arc::new(ChunkAdapter::new(
catalog_cache,
ParquetStorage::new(catalog.object_store()),
catalog.metric_registry(),