From 5fd3ffc17f29013c631ccdb6817fcf9d0ca230e9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 26 May 2022 11:35:49 -0400 Subject: [PATCH 1/6] refactor: Rename ParquetChunkAdapter to only ChunkAdapter It might be creating chunks of different kinds other than ParquetChunks. --- compactor/src/compact.rs | 6 +- querier/src/chunk/mod.rs | 6 +- querier/src/database.rs | 6 +- querier/src/namespace/mod.rs | 8 +-- querier/src/table/mod.rs | 6 +- querier/src/table/state_reconciler.rs | 91 ++++++++++++++------------- querier/src/table/test_util.rs | 4 +- 7 files changed, 67 insertions(+), 60 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index aa17c66567..5f4193d9d2 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -1097,7 +1097,7 @@ mod tests { use iox_time::SystemProvider; use querier::{ cache::CatalogCache, - chunk::{collect_read_filter, ParquetChunkAdapter}, + chunk::{collect_read_filter, ChunkAdapter}, }; use std::sync::atomic::{AtomicI64, Ordering}; @@ -1203,7 +1203,7 @@ mod tests { // ------------------------------------------------ // Verify the parquet file content - let adapter = ParquetChunkAdapter::new( + let adapter = ChunkAdapter::new( Arc::new(CatalogCache::new( catalog.catalog(), catalog.time_provider(), @@ -1424,7 +1424,7 @@ mod tests { // ------------------------------------------------ // Verify the parquet file content - let adapter = ParquetChunkAdapter::new( + let adapter = ChunkAdapter::new( Arc::new(CatalogCache::new( catalog.catalog(), catalog.time_provider(), diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 7c06514af2..080ebbaf9c 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -160,7 +160,7 @@ impl QuerierParquetChunk { /// Adapter that can create chunks. #[derive(Debug)] -pub struct ParquetChunkAdapter { +pub struct ChunkAdapter { /// Cache catalog_cache: Arc, @@ -175,7 +175,7 @@ pub struct ParquetChunkAdapter { time_provider: Arc, } -impl ParquetChunkAdapter { +impl ChunkAdapter { /// Create new adapter with empty cache. pub fn new( catalog_cache: Arc, @@ -307,7 +307,7 @@ pub mod tests { async fn test_create_record() { let catalog = TestCatalog::new(); - let adapter = ParquetChunkAdapter::new( + let adapter = ChunkAdapter::new( Arc::new(CatalogCache::new( catalog.catalog(), catalog.time_provider(), diff --git a/querier/src/database.rs b/querier/src/database.rs index 5d8417832c..08bb75b39d 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -1,7 +1,7 @@ //! Database for the querier that contains all namespaces. use crate::{ - cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection, + cache::CatalogCache, chunk::ChunkAdapter, ingester::IngesterConnection, namespace::QuerierNamespace, query_log::QueryLog, }; use async_trait::async_trait; @@ -29,7 +29,7 @@ pub struct QuerierDatabase { catalog_cache: Arc, /// Adapter to create chunks. - chunk_adapter: Arc, + chunk_adapter: Arc, /// Metric registry #[allow(dead_code)] @@ -63,7 +63,7 @@ impl QuerierDatabase { exec: Arc, ingester_connection: Arc, ) -> Self { - let chunk_adapter = Arc::new(ParquetChunkAdapter::new( + let chunk_adapter = Arc::new(ChunkAdapter::new( Arc::clone(&catalog_cache), store, Arc::clone(&metric_registry), diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 61e250dd80..d0b2bfa672 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -1,8 +1,8 @@ //! Namespace within the whole database. use crate::{ - cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection, - query_log::QueryLog, table::QuerierTable, + cache::CatalogCache, chunk::ChunkAdapter, ingester::IngesterConnection, query_log::QueryLog, + table::QuerierTable, }; use data_types::{NamespaceId, NamespaceSchema}; use iox_query::exec::Executor; @@ -46,7 +46,7 @@ pub struct QuerierNamespace { impl QuerierNamespace { /// Create new namespace for given schema. pub fn new( - chunk_adapter: Arc, + chunk_adapter: Arc, schema: Arc, name: Arc, exec: Arc, @@ -98,7 +98,7 @@ impl QuerierNamespace { ingester_connection: Arc, ) -> Self { let time_provider = catalog_cache.time_provider(); - let chunk_adapter = Arc::new(ParquetChunkAdapter::new( + let chunk_adapter = Arc::new(ChunkAdapter::new( catalog_cache, store, metric_registry, diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index ef8758fc21..d901f061bc 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -1,7 +1,7 @@ use self::query_access::QuerierTableChunkPruner; use self::state_reconciler::Reconciler; use crate::{ - chunk::ParquetChunkAdapter, + chunk::ChunkAdapter, ingester::{self, IngesterPartition}, IngesterConnection, }; @@ -68,7 +68,7 @@ pub struct QuerierTable { ingester_connection: Arc, /// Interface to create chunks for this table. - chunk_adapter: Arc, + chunk_adapter: Arc, /// Handle reconciling ingester and catalog data reconciler: Reconciler, @@ -82,7 +82,7 @@ impl QuerierTable { table_name: Arc, schema: Arc, ingester_connection: Arc, - chunk_adapter: Arc, + chunk_adapter: Arc, ) -> Self { let reconciler = Reconciler::new( Arc::clone(&table_name), diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index f767d2e31a..33e549adca 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -14,7 +14,7 @@ use std::{ use crate::{ cache::parquet_file::CachedParquetFiles, - chunk::{ParquetChunkAdapter, QuerierParquetChunk}, + chunk::{ChunkAdapter, QuerierParquetChunk}, tombstone::QuerierTombstone, IngesterPartition, }; @@ -32,14 +32,14 @@ pub enum ReconcileError { pub struct Reconciler { table_name: Arc, namespace_name: Arc, - chunk_adapter: Arc, + chunk_adapter: Arc, } impl Reconciler { pub(crate) fn new( table_name: Arc, namespace_name: Arc, - chunk_adapter: Arc, + chunk_adapter: Arc, ) -> Self { Self { table_name, @@ -48,9 +48,8 @@ impl Reconciler { } } - /// Reconciles ingester state (ingester_partitions) and catalog - /// state (parquet_files and tombstones), producing a list of - /// chunks to query + /// Reconciles ingester state (ingester_partitions) and catalog state (parquet_files and + /// tombstones), producing a list of chunks to query pub(crate) async fn reconcile( &self, ingester_partitions: Vec, @@ -58,7 +57,7 @@ impl Reconciler { parquet_files: Arc, ) -> Result>, ReconcileError> { let mut chunks = self - .build_parquet_chunks(&ingester_partitions, tombstones, parquet_files) + .build_chunks_from_parquet(&ingester_partitions, tombstones, parquet_files) .await?; chunks.extend(self.build_ingester_chunks(ingester_partitions)); debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation"); @@ -73,7 +72,7 @@ impl Reconciler { Ok(chunks) } - async fn build_parquet_chunks( + async fn build_chunks_from_parquet( &self, ingester_partitions: &[IngesterPartition], tombstones: Vec>, @@ -112,29 +111,30 @@ impl Reconciler { "Parquet files after filtering" ); - // convert parquet files and tombstones into QuerierParquetChunks - let mut parquet_chunks = Vec::with_capacity(parquet_files.len()); + // convert parquet files and tombstones into chunks + let mut chunks_from_parquet = Vec::with_capacity(parquet_files.len()); for cached_parquet_file in parquet_files { if let Some(chunk) = self .chunk_adapter .new_querier_parquet_chunk(&cached_parquet_file) .await { - parquet_chunks.push(chunk); + chunks_from_parquet.push(chunk); } } - debug!(num_chunks=%parquet_chunks.len(), "Created parquet chunks"); + debug!(num_chunks=%chunks_from_parquet.len(), "Created chunks from parquet files"); let mut chunks: Vec> = - Vec::with_capacity(parquet_chunks.len() + ingester_partitions.len()); + Vec::with_capacity(chunks_from_parquet.len() + ingester_partitions.len()); - for chunk in parquet_chunks.into_iter() { + for chunk in chunks_from_parquet.into_iter() { let chunk = if let Some(tombstones) = tombstones_by_sequencer.get(&chunk.meta().sequencer_id()) { let mut delete_predicates = Vec::with_capacity(tombstones.len()); for tombstone in tombstones { - // check conditions that don't need catalog access first to avoid unnecessary catalog load + // check conditions that don't need catalog access first to avoid unnecessary + // catalog load // Check if tombstone should be excluded based on the ingester response if tombstone_exclusion @@ -143,28 +143,31 @@ impl Reconciler { continue; } - // Check if tombstone even applies to the sequence number range within the parquet file. There + // Check if tombstone even applies to the sequence number range within the + // parquet file. There // are the following cases here: // // 1. Tombstone comes before chunk min sequencer number: // There is no way the tombstone can affect the chunk. // 2. Tombstone comes after chunk max sequencer number: - // Tombstone affects whole chunk (it might be marked as processed though, we'll check that - // further down). + // Tombstone affects whole chunk (it might be marked as processed though, + // we'll check that further down). // 3. Tombstone is in the min-max sequencer number range of the chunk: - // Technically the querier has NO way to determine the rows that are affected by the tombstone - // since we have no row-level sequence numbers. Such a file can be created by two sources -- the - // ingester and the compactor. The ingester must have materialized the tombstone while creating - // the parquet file, so the querier can skip it. The compactor also materialized the tombstones, - // so we can skip it as well. In the compactor case the tombstone will even be marked as - // processed. + // Technically the querier has NO way to determine the rows that are + // affected by the tombstone since we have no row-level sequence numbers. + // Such a file can be created by two sources -- the ingester and the + // compactor. The ingester must have materialized the tombstone while + // creating the parquet file, so the querier can skip it. The compactor also + // materialized the tombstones, so we can skip it as well. In the compactor + // case the tombstone will even be marked as processed. // // So the querier only needs to consider the tombstone in case 2. if tombstone.sequence_number() <= chunk.meta().max_sequence_number() { continue; } - // TODO: also consider time ranges (https://github.com/influxdata/influxdb_iox/issues/4086) + // TODO: also consider time ranges + // (https://github.com/influxdata/influxdb_iox/issues/4086) // check if tombstone is marked as processed if self @@ -196,7 +199,8 @@ impl Reconciler { ) -> impl Iterator> { // Add ingester chunks to the overall chunk list. // - filter out chunks that don't have any record batches - // - tombstones don't need to be applied since they were already materialized by the ingester + // - tombstones don't need to be applied since they were already materialized by the + // ingester ingester_partitions .into_iter() .filter(|c| c.has_batches()) @@ -304,14 +308,15 @@ impl UpdatableQuerierChunk for IngesterPartition { /// /// The caller may only use the returned parquet files. /// -/// This will remove files that are part of the catalog but that contain data that the ingester persisted AFTER the -/// querier contacted it. See module-level documentation about the order in which the communication and the information -/// processing should take place. +/// This will remove files that are part of the catalog but that contain data that the ingester +/// persisted AFTER the querier contacted it. See module-level documentation about the order in +/// which the communication and the information processing should take place. /// -/// Note that the querier (and this method) do NOT care about the actual age of the parquet files, since the compactor -/// is free to to process files at any given moment (e.g. to combine them or to materialize tombstones). However if the -/// compactor combines files in a way that the querier would need to split it into "desired" data and "too new" data -/// then we will currently bail out with [`ReconcileError`]. +/// Note that the querier (and this method) do NOT care about the actual age of the parquet files, +/// since the compactor is free to to process files at any given moment (e.g. to combine them or to +/// materialize tombstones). However if the compactor combines files in a way that the querier +/// would need to split it into "desired" data and "too new" data then we will currently bail out +/// with [`ReconcileError`]. fn filter_parquet_files( ingester_partitions: &[I], parquet_files: Vec

, @@ -322,8 +327,8 @@ where { // Build partition-based lookup table. // - // Note that we don't need to take the sequencer ID into account here because each partition is not only bound to a - // table but also to a sequencer. + // Note that we don't need to take the sequencer ID into account here because each partition is + // not only bound to a table but also to a sequencer. let lookup_table: HashMap = ingester_partitions .iter() .map(|i| (i.partition_id(), i)) @@ -368,7 +373,8 @@ where file_min_seq_num=%file.min_sequence_number().get(), "partition was not flagged by the ingester as unpersisted" ); - // partition was not flagged by the ingester as "unpersisted", so we can keep the parquet file + // partition was not flagged by the ingester as "unpersisted", so we can keep the + // parquet file } result.push(file); @@ -379,9 +385,10 @@ where /// Generates "exclude" filter for tombstones. /// -/// Since tombstones are sequencer-wide but data persistence is partition-based (which are sub-units of sequencers), we -/// cannot just remove tombstones entirely but need to decide on a per-partition basis. This function generates a lookup -/// table of partition-tombstone tuples that later need to be EXCLUDED/IGNORED when pairing tombstones with chunks. +/// Since tombstones are sequencer-wide but data persistence is partition-based (which are +/// sub-units of sequencers), we cannot just remove tombstones entirely but need to decide on a +/// per-partition basis. This function generates a lookup table of partition-tombstone tuples that +/// later need to be EXCLUDED/IGNORED when pairing tombstones with chunks. fn tombstone_exclude_list( ingester_partitions: &[I], tombstones: &[T], @@ -411,7 +418,8 @@ where // in persisted range => keep } } else { - // partition has no persisted data at all => need to exclude tombstone which is too new + // partition has no persisted data at all => need to exclude tombstone which is + // too new exclude.insert((p.partition_id(), t.id())); } } @@ -423,11 +431,10 @@ where #[cfg(test)] mod tests { + use super::*; use assert_matches::assert_matches; use data_types::SequenceNumber; - use super::*; - #[test] fn test_filter_parquet_files_empty() { let actual = diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index aaf5ec2b63..c8be1db0eb 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -9,7 +9,7 @@ use parquet_file::storage::ParquetStorage; use schema::{selection::Selection, sort::SortKey, Schema}; use crate::{ - cache::CatalogCache, chunk::ParquetChunkAdapter, create_ingester_connection_for_testing, + cache::CatalogCache, chunk::ChunkAdapter, create_ingester_connection_for_testing, IngesterPartition, }; @@ -23,7 +23,7 @@ pub async fn querier_table(catalog: &Arc, table: &Arc) - catalog.metric_registry(), usize::MAX, )); - let chunk_adapter = Arc::new(ParquetChunkAdapter::new( + let chunk_adapter = Arc::new(ChunkAdapter::new( catalog_cache, ParquetStorage::new(catalog.object_store()), catalog.metric_registry(), From b2905650aa1c883dc69b4ede1355de42d30e7d79 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 26 May 2022 12:48:53 -0400 Subject: [PATCH 2/6] refactor: Extract extract_range to be a method on TableSummary So that other kinds of chunks can use this code too. --- data_types/src/lib.rs | 21 ++++++++++++++++++++- parquet_file/src/chunk.rs | 18 +++--------------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 435a4e7b5e..b21e0539a4 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -13,7 +13,10 @@ use influxdb_line_protocol::FieldValue; use observability_deps::tracing::warn; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; -use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema}; +use schema::{ + builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema, + TIME_COLUMN_NAME, +}; use snafu::{ResultExt, Snafu}; use std::{ borrow::{Borrow, Cow}, @@ -1923,6 +1926,22 @@ impl TableSummary { size + mem::size_of::() // Add size of this struct that points to // table and ColumnSummary } + + /// Extracts min/max values of the timestamp column, if possible + pub fn time_range(&self) -> Option { + self.column(TIME_COLUMN_NAME).and_then(|c| { + if let Statistics::I64(StatValues { + min: Some(min), + max: Some(max), + .. + }) = &c.stats + { + Some(TimestampMinMax::new(*min, *max)) + } else { + None + } + }) + } } /// Kafka partition ID plus offset diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index c4d0319b3f..f96c8c758b 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -6,11 +6,11 @@ use crate::{ storage::ParquetStorage, }; use data_types::{ - ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange, + ParquetFile, ParquetFileWithMetadata, TableSummary, TimestampMinMax, TimestampRange, }; use datafusion::physical_plan::SendableRecordBatchStream; use predicate::Predicate; -use schema::{selection::Selection, Schema, TIME_COLUMN_NAME}; +use schema::{selection::Selection, Schema}; use std::{collections::BTreeSet, mem, sync::Arc}; #[derive(Debug)] @@ -83,7 +83,7 @@ impl ParquetChunk { let columns = decoded.read_statistics(&schema).unwrap(); let table_summary = TableSummary { columns }; let rows = decoded.row_count(); - let timestamp_min_max = extract_range(&table_summary); + let timestamp_min_max = table_summary.time_range(); let file_size_bytes = decoded_parquet_file.parquet_file.file_size_bytes as usize; Self { @@ -186,18 +186,6 @@ impl ParquetChunk { } } -/// Extracts min/max values of the timestamp column, from the TableSummary, if possible -fn extract_range(table_summary: &TableSummary) -> Option { - table_summary.column(TIME_COLUMN_NAME).and_then(|c| { - if let Statistics::I64(s) = &c.stats { - if let (Some(min), Some(max)) = (s.min, s.max) { - return Some(TimestampMinMax::new(min, max)); - } - } - None - }) -} - /// Parquet file with decoded metadata. #[derive(Debug)] #[allow(missing_docs)] From 2cb351cd0d8f9159c7ff8e0ef0e41b483212444a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 26 May 2022 11:58:03 -0400 Subject: [PATCH 3/6] feat: Make a QuerierRBChunk wrapper to handle traits and extra data This brings back a bunch of code from OG from read buffer backed DbChunks. --- querier/src/cache/mod.rs | 9 +- querier/src/cache/read_buffer.rs | 7 +- querier/src/chunk/mod.rs | 160 +++++++++++- querier/src/chunk/query_access.rs | 356 +++++++++++++++++++++++++- querier/src/table/state_reconciler.rs | 15 +- 5 files changed, 536 insertions(+), 11 deletions(-) diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs index d0509ea98e..52259adcbc 100644 --- a/querier/src/cache/mod.rs +++ b/querier/src/cache/mod.rs @@ -7,8 +7,8 @@ use std::sync::Arc; use self::{ namespace::NamespaceCache, parquet_file::ParquetFileCache, partition::PartitionCache, - processed_tombstones::ProcessedTombstonesCache, ram::RamSize, table::TableCache, - tombstones::TombstoneCache, + processed_tombstones::ProcessedTombstonesCache, ram::RamSize, read_buffer::ReadBufferCache, + table::TableCache, tombstones::TombstoneCache, }; pub mod namespace; @@ -161,4 +161,9 @@ impl CatalogCache { pub(crate) fn tombstone(&self) -> &TombstoneCache { &self.tombstone_cache } + + /// Read buffer chunk cache. + pub(crate) fn read_buffer(&self) -> &ReadBufferCache { + unimplemented!("Deliberately not hooking up this cache yet"); + } } diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index 8d14e419ea..900fc5086b 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -15,6 +15,7 @@ use data_types::{ParquetFile, ParquetFileId}; use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use iox_time::TimeProvider; +use parquet_file::chunk::DecodedParquetFile; use read_buffer::RBChunk; use snafu::{ResultExt, Snafu}; use std::{collections::HashMap, mem, sync::Arc}; @@ -85,8 +86,10 @@ impl ReadBufferCache { } /// Get read buffer chunks by Parquet file id - pub async fn get(&self, parquet_file_id: ParquetFileId) -> Arc { - self.cache.get(parquet_file_id).await + pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc { + let parquet_file = &decoded_parquet_file.parquet_file; + + self.cache.get(parquet_file.id).await } } diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 080ebbaf9c..74427d382e 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -4,7 +4,7 @@ use crate::cache::CatalogCache; use arrow::record_batch::RecordBatch; use data_types::{ ChunkId, ChunkOrder, DeletePredicate, ParquetFileId, ParquetFileWithMetadata, PartitionId, - SequenceNumber, SequencerId, TimestampMinMax, + SequenceNumber, SequencerId, TableSummary, TimestampMinMax, TimestampRange, }; use futures::StreamExt; use iox_catalog::interface::Catalog; @@ -14,7 +14,8 @@ use parquet_file::{ chunk::{ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk}, storage::ParquetStorage, }; -use schema::{selection::Selection, sort::SortKey}; +use read_buffer::RBChunk; +use schema::{selection::Selection, sort::SortKey, Schema}; use std::sync::Arc; use uuid::Uuid; @@ -80,6 +81,98 @@ impl ChunkMeta { } } +/// Chunk representation of `read_buffer::RBChunk`s for the querier. +#[derive(Debug)] +pub struct QuerierRBChunk { + /// ID of the Parquet file of the chunk + parquet_file_id: ParquetFileId, + + /// Underlying read buffer chunk + rb_chunk: Arc, + + /// Table summary + table_summary: TableSummary, + + /// min/max time range of this table (extracted from TableSummary), if known + timestamp_min_max: Option, + + /// Immutable chunk metadata + meta: Arc, + + /// Schema of the chunk + schema: Arc, + + /// Delete predicates to be combined with the chunk + delete_predicates: Vec>, + + /// Partition sort key (how does the read buffer use this?) + partition_sort_key: Arc>, +} + +impl QuerierRBChunk { + /// Create new read-buffer-backed chunk + pub fn new( + parquet_file_id: ParquetFileId, + rb_chunk: Arc, + meta: Arc, + schema: Arc, + partition_sort_key: Arc>, + ) -> Self { + let table_summary = rb_chunk.table_summary(); + let timestamp_min_max = table_summary.time_range(); + + Self { + parquet_file_id, + rb_chunk, + table_summary, + timestamp_min_max, + meta, + schema, + delete_predicates: Vec::new(), + partition_sort_key, + } + } + + /// Set delete predicates of the given chunk. + pub fn with_delete_predicates(self, delete_predicates: Vec>) -> Self { + Self { + delete_predicates, + ..self + } + } + + /// Get metadata attached to the given chunk. + pub fn meta(&self) -> &ChunkMeta { + self.meta.as_ref() + } + + /// Parquet file ID + pub fn parquet_file_id(&self) -> ParquetFileId { + self.parquet_file_id + } + + /// Set partition sort key + pub fn with_partition_sort_key(self, partition_sort_key: Arc>) -> Self { + Self { + partition_sort_key, + ..self + } + } + + /// Return true if this chunk contains values within the time range, or if the range is `None`. + pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool { + match (self.timestamp_min_max, timestamp_range) { + (Some(timestamp_min_max), Some(timestamp_range)) => { + timestamp_min_max.overlaps(*timestamp_range) + } + // If this chunk doesn't have a time column it can't match + (None, Some(_)) => false, + // If there no range specified, + (_, None) => true, + } + } +} + /// Chunk representation of Parquet file chunks for the querier. /// /// These chunks are usually created on-demand. The querier cache system does not really have a @@ -276,6 +369,69 @@ impl ChunkAdapter { partition_sort_key, )) } + + /// Create new cached chunk. + /// + /// Returns `None` if some data required to create this chunk is already gone from the catalog. + pub async fn new_chunk( + &self, + decoded_parquet_file: &DecodedParquetFile, + ) -> Option { + let parquet_file = &decoded_parquet_file.parquet_file; + + let rb_chunk = self + .catalog_cache() + .read_buffer() + .get(decoded_parquet_file) + .await; + + let decoded = decoded_parquet_file + .parquet_metadata + .as_ref() + .decode() + .unwrap(); + let schema = decoded.read_schema().unwrap(); + + let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)); + let table_name = self + .catalog_cache + .table() + .name(parquet_file.table_id) + .await?; + + let iox_metadata = &decoded_parquet_file.iox_metadata; + + // Somewhat hacky workaround because of implicit chunk orders, use min sequence number and + // hope it doesn't overflow u32. Order is non-zero, se we need to add 1. + let order = ChunkOrder::new(1 + iox_metadata.min_sequence_number.get() as u32) + .expect("cannot be zero"); + + // Read partition sort key + let partition_sort_key = self + .catalog_cache() + .partition() + .sort_key(iox_metadata.partition_id) + .await; + + let meta = Arc::new(ChunkMeta { + chunk_id, + table_name, + order, + sort_key: iox_metadata.sort_key.clone(), + sequencer_id: iox_metadata.sequencer_id, + partition_id: iox_metadata.partition_id, + min_sequence_number: parquet_file.min_sequence_number, + max_sequence_number: parquet_file.max_sequence_number, + }); + + Some(QuerierRBChunk::new( + parquet_file.id, + rb_chunk, + meta, + schema, + partition_sort_key, + )) + } } /// collect data for the given chunk diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index edda004956..77b6d41fc3 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -1,21 +1,44 @@ -use crate::chunk::QuerierParquetChunk; +use crate::chunk::{QuerierParquetChunk, QuerierRBChunk}; +use arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch}; use data_types::{ ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, }; +use datafusion::physical_plan::RecordBatchStream; use iox_query::{QueryChunk, QueryChunkError, QueryChunkMeta}; use observability_deps::tracing::debug; use predicate::PredicateMatch; +use read_buffer::ReadFilterResults; use schema::{sort::SortKey, Schema}; -use snafu::{ResultExt, Snafu}; -use std::sync::Arc; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, + task::{Context, Poll}, +}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))] - ParquetFileChunkError { + ParquetFileChunk { source: parquet_file::storage::ReadError, chunk_id: ChunkId, }, + + #[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, source))] + RBChunk { + source: read_buffer::Error, + chunk_id: ChunkId, + }, + + #[snafu(display( + "Could not find column name '{}' in read buffer column_values results for chunk {}", + column_name, + chunk_id, + ))] + ColumnNameNotFound { + column_name: String, + chunk_id: ChunkId, + }, } impl QueryChunkMeta for QuerierParquetChunk { @@ -135,3 +158,328 @@ impl QueryChunk for QuerierParquetChunk { self.meta().order() } } + +impl QueryChunkMeta for QuerierRBChunk { + fn summary(&self) -> Option<&TableSummary> { + Some(&self.table_summary) + } + + fn schema(&self) -> Arc { + Arc::clone(&self.schema) + } + + fn partition_sort_key(&self) -> Option<&SortKey> { + self.partition_sort_key.as_ref().as_ref() + } + + fn partition_id(&self) -> Option { + Some(self.meta.partition_id()) + } + + fn sort_key(&self) -> Option<&SortKey> { + self.meta().sort_key() + } + + fn delete_predicates(&self) -> &[Arc] { + &self.delete_predicates + } + + fn timestamp_min_max(&self) -> Option { + self.timestamp_min_max + } +} + +impl QueryChunk for QuerierRBChunk { + fn id(&self) -> ChunkId { + self.meta().chunk_id + } + + fn table_name(&self) -> &str { + self.meta().table_name.as_ref() + } + + fn may_contain_pk_duplicates(&self) -> bool { + false + } + + fn apply_predicate_to_metadata( + &self, + predicate: &predicate::Predicate, + ) -> Result { + let pred_result = if predicate.has_exprs() || self.has_timerange(predicate.range.as_ref()) { + PredicateMatch::Unknown + } else { + PredicateMatch::Zero + }; + + Ok(pred_result) + } + + fn column_names( + &self, + mut ctx: iox_query::exec::IOxSessionContext, + predicate: &predicate::Predicate, + columns: schema::selection::Selection<'_>, + ) -> Result, QueryChunkError> { + ctx.set_metadata("storage", "read_buffer"); + ctx.set_metadata("projection", format!("{}", columns)); + ctx.set_metadata("predicate", format!("{}", &predicate)); + + let rb_predicate = match to_read_buffer_predicate(predicate) { + Ok(rb_predicate) => rb_predicate, + Err(e) => { + debug!( + ?predicate, + %e, + "read buffer predicate not supported for column_names, falling back" + ); + return Ok(None); + } + }; + ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate)); + + // TODO(edd): wire up delete predicates to be pushed down to + // the read buffer. + + let names = self + .rb_chunk + .column_names(rb_predicate, vec![], columns, BTreeSet::new()) + .context(RBChunkSnafu { + chunk_id: self.id(), + })?; + ctx.set_metadata("output_values", names.len() as i64); + + Ok(Some(names)) + } + + fn column_values( + &self, + mut ctx: iox_query::exec::IOxSessionContext, + column_name: &str, + predicate: &predicate::Predicate, + ) -> Result, QueryChunkError> { + ctx.set_metadata("storage", "read_buffer"); + ctx.set_metadata("column_name", column_name.to_string()); + ctx.set_metadata("predicate", format!("{}", &predicate)); + + let rb_predicate = match to_read_buffer_predicate(predicate) { + Ok(rb_predicate) => rb_predicate, + Err(e) => { + debug!( + ?predicate, + %e, + "read buffer predicate not supported for column_values, falling back" + ); + return Ok(None); + } + }; + ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate)); + + let mut values = self.rb_chunk.column_values( + rb_predicate, + schema::selection::Selection::Some(&[column_name]), + BTreeMap::new(), + )?; + + // The InfluxRPC frontend only supports getting column values + // for one column at a time (this is a restriction on the Influx + // Read gRPC API too). However, the Read Buffer support multiple + // columns and will return a map - we just need to pull the + // column out to get the set of values. + let values = values + .remove(column_name) + .context(ColumnNameNotFoundSnafu { + chunk_id: self.id(), + column_name, + })?; + ctx.set_metadata("output_values", values.len() as i64); + + Ok(Some(values)) + } + + fn read_filter( + &self, + mut ctx: iox_query::exec::IOxSessionContext, + predicate: &predicate::Predicate, + selection: schema::selection::Selection<'_>, + ) -> Result { + let delete_predicates: Vec<_> = self + .delete_predicates() + .iter() + .map(|pred| Arc::new(pred.as_ref().clone().into())) + .collect(); + ctx.set_metadata("delete_predicates", delete_predicates.len() as i64); + + // merge the negated delete predicates into the select predicate + let mut pred_with_deleted_exprs = predicate.clone(); + pred_with_deleted_exprs.merge_delete_predicates(&delete_predicates); + debug!(?pred_with_deleted_exprs, "Merged negated predicate"); + + ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs)); + ctx.set_metadata("storage", "read_buffer"); + ctx.set_metadata("projection", format!("{}", selection)); + + // Only apply pushdownable predicates + let rb_predicate = self + .rb_chunk + // A predicate unsupported by the Read Buffer or against this chunk's schema is + // replaced with a default empty predicate. + .validate_predicate(to_read_buffer_predicate(predicate).unwrap_or_default()) + .unwrap_or_default(); + debug!(?rb_predicate, "RB predicate"); + ctx.set_metadata("predicate", format!("{}", &rb_predicate)); + + // combine all delete expressions to RB's negated ones + let negated_delete_exprs = to_read_buffer_negated_predicates(&delete_predicates)? + .into_iter() + // Any delete predicates unsupported by the Read Buffer will be elided. + .filter_map(|p| self.rb_chunk.validate_predicate(p).ok()) + .collect::>(); + + debug!(?negated_delete_exprs, "Negated Predicate pushed down to RB"); + + let read_results = self + .rb_chunk + .read_filter(rb_predicate, selection, negated_delete_exprs) + .context(RBChunkSnafu { + chunk_id: self.id(), + })?; + let schema = self + .rb_chunk + .read_filter_table_schema(selection) + .context(RBChunkSnafu { + chunk_id: self.id(), + })?; + + Ok(Box::pin(ReadFilterResultsStream::new( + ctx, + read_results, + schema.into(), + ))) + } + + fn chunk_type(&self) -> &str { + "read_buffer" + } + + fn order(&self) -> ChunkOrder { + self.meta().order() + } +} + +#[derive(Debug)] +struct ReadBufferPredicateConversionError { + msg: String, + predicate: predicate::Predicate, +} + +impl std::fmt::Display for ReadBufferPredicateConversionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Error translating predicate: {}, {:#?}", + self.msg, self.predicate + ) + } +} + +impl std::error::Error for ReadBufferPredicateConversionError {} + +/// Converts a [`predicate::Predicate`] into [`read_buffer::Predicate`], suitable for evaluating on +/// the ReadBuffer. +/// +/// NOTE: a valid Read Buffer predicate is not guaranteed to be applicable to an arbitrary Read +/// Buffer chunk, because the applicability of a predicate depends on the schema of the chunk. +/// +/// Callers should validate predicates against chunks they are to be executed against using +/// `read_buffer::Chunk::validate_predicate` +fn to_read_buffer_predicate( + predicate: &predicate::Predicate, +) -> Result { + // Try to convert non-time column expressions into binary expressions that are compatible with + // the read buffer. + match predicate + .exprs + .iter() + .map(read_buffer::BinaryExpr::try_from) + .collect::, _>>() + { + Ok(exprs) => { + // Construct a `ReadBuffer` predicate with or without InfluxDB-specific expressions on + // the time column. + Ok(match predicate.range { + Some(range) => { + read_buffer::Predicate::with_time_range(&exprs, range.start(), range.end()) + } + None => read_buffer::Predicate::new(exprs), + }) + } + Err(e) => Err(ReadBufferPredicateConversionError { + msg: e, + predicate: predicate.clone(), + }), + } +} + +/// NOTE: valid Read Buffer predicates are not guaranteed to be applicable to an arbitrary Read +/// Buffer chunk, because the applicability of a predicate depends on the schema of the chunk. +/// Callers should validate predicates against chunks they are to be executed against using +/// `read_buffer::Chunk::validate_predicate` +fn to_read_buffer_negated_predicates( + delete_predicates: &[Arc], +) -> Result, ReadBufferPredicateConversionError> { + let mut rb_preds: Vec = vec![]; + for pred in delete_predicates { + let rb_pred = to_read_buffer_predicate(pred)?; + rb_preds.push(rb_pred); + } + + debug!(?rb_preds, "read buffer delete predicates"); + Ok(rb_preds) +} + +/// Adapter which will take a ReadFilterResults and make it an async stream +pub struct ReadFilterResultsStream { + read_results: ReadFilterResults, + schema: SchemaRef, + ctx: iox_query::exec::IOxSessionContext, +} + +impl ReadFilterResultsStream { + pub fn new( + ctx: iox_query::exec::IOxSessionContext, + read_results: ReadFilterResults, + schema: SchemaRef, + ) -> Self { + Self { + ctx, + read_results, + schema, + } + } +} + +impl RecordBatchStream for ReadFilterResultsStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl futures::Stream for ReadFilterResultsStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + let mut ctx = self.ctx.child_ctx("next_row_group"); + let rb = self.read_results.next(); + if let Some(rb) = &rb { + ctx.set_metadata("output_rows", rb.num_rows() as i64); + } + + Poll::Ready(Ok(rb).transpose()) + } + + // TODO is there a useful size_hint to pass? +} diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index 33e549adca..5bdcf359c4 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -14,7 +14,7 @@ use std::{ use crate::{ cache::parquet_file::CachedParquetFiles, - chunk::{ChunkAdapter, QuerierParquetChunk}, + chunk::{ChunkAdapter, QuerierParquetChunk, QuerierRBChunk}, tombstone::QuerierTombstone, IngesterPartition, }; @@ -291,6 +291,19 @@ impl UpdatableQuerierChunk for QuerierParquetChunk { } } +impl UpdatableQuerierChunk for QuerierRBChunk { + fn update_partition_sort_key( + self: Box, + sort_key: Arc>, + ) -> Box { + Box::new(self.with_partition_sort_key(sort_key)) + } + + fn upcast_to_querier_chunk(self: Box) -> Box { + self as _ + } +} + impl UpdatableQuerierChunk for IngesterPartition { fn update_partition_sort_key( self: Box, From 5232594aab9af9eb1e1acdf2792d5e981328ed2a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" <193874+carols10cents@users.noreply.github.com> Date: Fri, 27 May 2022 13:04:13 -0400 Subject: [PATCH 4/6] docs: Fix grammar in a comment Co-authored-by: Andrew Lamb --- querier/src/chunk/query_access.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 77b6d41fc3..0422ac0d9c 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -283,7 +283,7 @@ impl QueryChunk for QuerierRBChunk { // The InfluxRPC frontend only supports getting column values // for one column at a time (this is a restriction on the Influx - // Read gRPC API too). However, the Read Buffer support multiple + // Read gRPC API too). However, the Read Buffer supports multiple // columns and will return a map - we just need to pull the // column out to get the set of values. let values = values From f0b4d71f47d662a9bde823dc7f705f1caa79d308 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 27 May 2022 12:57:50 -0400 Subject: [PATCH 5/6] docs: Update comment to reflect new implementation --- querier/src/cache/read_buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index 900fc5086b..25db8cdefa 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -85,7 +85,7 @@ impl ReadBufferCache { Self { cache, _backend } } - /// Get read buffer chunks by Parquet file id + /// Get read buffer chunks from the cache or the Parquet file pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc { let parquet_file = &decoded_parquet_file.parquet_file; From 55cd8d15bea04d208a2d55e6203d7b62888854eb Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 27 May 2022 12:59:47 -0400 Subject: [PATCH 6/6] fix: Update method name to specify the kind of chunk it makes --- querier/src/chunk/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 74427d382e..7cf7f0ea4c 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -370,10 +370,8 @@ impl ChunkAdapter { )) } - /// Create new cached chunk. - /// - /// Returns `None` if some data required to create this chunk is already gone from the catalog. - pub async fn new_chunk( + /// Create read buffer chunk. May be from the cache, may be from the parquet file. + pub async fn new_rb_chunk( &self, decoded_parquet_file: &DecodedParquetFile, ) -> Option {