Merge pull request #4704 from influxdata/cn/welcome-back-read-buffer
feat: Start of a read buffer chunk cachepull/24376/head
commit
f645ec8a42
|
@ -3948,6 +3948,7 @@ dependencies = [
|
|||
"pin-project",
|
||||
"predicate",
|
||||
"rand",
|
||||
"read_buffer",
|
||||
"schema",
|
||||
"service_common",
|
||||
"service_grpc_schema",
|
||||
|
|
|
@ -1,30 +1,35 @@
|
|||
//! Main data structure, see [`Cache`].
|
||||
use std::{collections::HashMap, hash::Hash, sync::Arc};
|
||||
|
||||
use super::{backend::CacheBackend, loader::Loader};
|
||||
use futures::{
|
||||
future::{BoxFuture, Shared},
|
||||
FutureExt, TryFutureExt,
|
||||
};
|
||||
use observability_deps::tracing::debug;
|
||||
use parking_lot::Mutex;
|
||||
use std::{collections::HashMap, hash::Hash, sync::Arc};
|
||||
use tokio::{
|
||||
sync::oneshot::{error::RecvError, Sender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
use super::{backend::CacheBackend, loader::Loader};
|
||||
|
||||
/// High-level cache implementation.
|
||||
///
|
||||
/// # Concurrency
|
||||
/// Multiple cache requests for different keys can run at the same time. When data is requested for the same key the
|
||||
/// underlying loader will only be polled once, even when the requests are made while the loader is still running.
|
||||
///
|
||||
/// Multiple cache requests for different keys can run at the same time. When data is requested for
|
||||
/// the same key the underlying loader will only be polled once, even when the requests are made
|
||||
/// while the loader is still running.
|
||||
///
|
||||
/// # Cancellation
|
||||
/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will still be cached.
|
||||
///
|
||||
/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will
|
||||
/// still be cached.
|
||||
///
|
||||
/// # Panic
|
||||
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. The data will NOT be cached.
|
||||
///
|
||||
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic.
|
||||
/// The data will NOT be cached.
|
||||
#[derive(Debug)]
|
||||
pub struct Cache<K, V>
|
||||
where
|
||||
|
@ -57,7 +62,8 @@ where
|
|||
|
||||
/// Get value from cache.
|
||||
pub async fn get(&self, k: K) -> V {
|
||||
// place state locking into its own scope so it doesn't leak into the generator (async function)
|
||||
// place state locking into its own scope so it doesn't leak into the generator (async
|
||||
// function)
|
||||
let receiver = {
|
||||
let mut state = self.state.lock();
|
||||
|
||||
|
@ -83,7 +89,8 @@ where
|
|||
let tag = state.tag_counter;
|
||||
state.tag_counter += 1;
|
||||
|
||||
// need to wrap the query into a tokio task so that it doesn't get cancelled when this very request is canceled
|
||||
// need to wrap the query into a tokio task so that it doesn't get cancelled when
|
||||
// this very request is cancelled
|
||||
let state_captured = Arc::clone(&self.state);
|
||||
let loader = Arc::clone(&self.loader);
|
||||
let k_captured = k.clone();
|
||||
|
@ -93,7 +100,8 @@ where
|
|||
let k_for_loader = k_captured.clone();
|
||||
|
||||
// execute the loader
|
||||
// If we panic here then `tx` will be dropped and the receivers will be notified.
|
||||
// If we panic here then `tx` will be dropped and the receivers will be
|
||||
// notified.
|
||||
let v = loader.load(k_for_loader).await;
|
||||
|
||||
// remove "running" state and store result
|
||||
|
@ -104,14 +112,16 @@ where
|
|||
Some(running_query) if running_query.tag == tag => {
|
||||
state.running_queries.remove(&k_captured);
|
||||
|
||||
// this very query is in charge of the key, so store in in the underlying cache
|
||||
// this very query is in charge of the key, so store in in the
|
||||
// underlying cache
|
||||
state.cached_entries.set(k_captured, v.clone());
|
||||
|
||||
true
|
||||
}
|
||||
_ => {
|
||||
// This query is actually not really running any longer but got shut down, e.g. due
|
||||
// to side loading. Do NOT store the generated value in the underlying cache.
|
||||
// This query is actually not really running any longer but got
|
||||
// shut down, e.g. due to side loading. Do NOT store the
|
||||
// generated value in the underlying cache.
|
||||
|
||||
false
|
||||
}
|
||||
|
@ -119,8 +129,9 @@ where
|
|||
};
|
||||
|
||||
if !was_running {
|
||||
// value was side-loaded, so we cannot populate `v`. Instead block this execution branch and
|
||||
// wait for `rx_set` to deliver the side-loaded result.
|
||||
// value was side-loaded, so we cannot populate `v`. Instead block this
|
||||
// execution branch and wait for `rx_set` to deliver the side-loaded
|
||||
// result.
|
||||
loop {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
@ -134,8 +145,9 @@ where
|
|||
maybe_v = rx_set.fuse() => {
|
||||
match maybe_v {
|
||||
Ok(v) => {
|
||||
// data get side-loaded via `Cache::set`. In this case, we do NOT modify the state
|
||||
// because there would be a lock-gap. The `set` function will do that for us instead.
|
||||
// data get side-loaded via `Cache::set`. In this case, we do
|
||||
// NOT modify the state because there would be a lock-gap. The
|
||||
// `set` function will do that for us instead.
|
||||
v
|
||||
}
|
||||
Err(_) => {
|
||||
|
@ -187,9 +199,10 @@ where
|
|||
// it's OK when the receiver side is gone (likely panicked)
|
||||
running_query.set.send(v.clone()).ok();
|
||||
|
||||
// When we side-load data into the running task, the task does NOT modify the backend, so we have to do
|
||||
// that. The reason for not letting the task feed the side-loaded data back into `cached_entries` is that we
|
||||
// would need to drop the state lock here before the task could acquire it, leading to a lock gap.
|
||||
// When we side-load data into the running task, the task does NOT modify the
|
||||
// backend, so we have to do that. The reason for not letting the task feed the
|
||||
// side-loaded data back into `cached_entries` is that we would need to drop the
|
||||
// state lock here before the task could acquire it, leading to a lock gap.
|
||||
Some(running_query.join_handle)
|
||||
} else {
|
||||
None
|
||||
|
@ -215,9 +228,10 @@ where
|
|||
{
|
||||
fn drop(&mut self) {
|
||||
for (_k, running_query) in self.state.lock().running_queries.drain() {
|
||||
// It's unlikely that anyone is still using the shared receiver at this point, because Cache::get borrow
|
||||
// the self. If it is still in use, aborting the task will cancel the contained future which in turn will
|
||||
// drop the sender of the oneshot channel. The receivers will be notified.
|
||||
// It's unlikely that anyone is still using the shared receiver at this point, because
|
||||
// `Cache::get` borrows the `self`. If it is still in use, aborting the task will
|
||||
// cancel the contained future which in turn will drop the sender of the oneshot
|
||||
// channel. The receivers will be notified.
|
||||
running_query.join_handle.abort();
|
||||
}
|
||||
}
|
||||
|
@ -226,11 +240,12 @@ where
|
|||
/// A [`tokio::sync::oneshot::Receiver`] that can be cloned.
|
||||
///
|
||||
/// The types are:
|
||||
/// - `Arc<Mutex<V>>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time the reference to `V`
|
||||
/// (i.e. the `Arc`) must be cloneable for `Shared`
|
||||
///
|
||||
/// - `Arc<Mutex<V>>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time
|
||||
/// the reference to `V` (i.e. the `Arc`) must be cloneable for `Shared`
|
||||
/// - `Arc<RecvError>`: Is required because `RecvError` is not `Clone` but `Shared` requires that.
|
||||
/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to `Result<Arc<Mutex<V>>, Arc<RecvError>>` results in
|
||||
/// a kinda messy type and we wanna erase that.
|
||||
/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to `Result<Arc<Mutex<V>>,
|
||||
/// Arc<RecvError>>` results in a kinda messy type and we wanna erase that.
|
||||
/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places.
|
||||
type SharedReceiver<V> = Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<RecvError>>>>;
|
||||
|
||||
|
@ -483,14 +498,16 @@ mod tests {
|
|||
impl TestLoader {
|
||||
/// Panic when loading value for `k`.
|
||||
///
|
||||
/// If this is used together with [`block`](Self::block), the panic will occur AFTER blocking.
|
||||
/// If this is used together with [`block`](Self::block), the panic will occur AFTER
|
||||
/// blocking.
|
||||
fn panic_once(&self, k: u8) {
|
||||
self.panic.lock().insert(k);
|
||||
}
|
||||
|
||||
/// Block all [`load`](Self::load) requests until [`unblock`](Self::unblock) is called.
|
||||
///
|
||||
/// If this is used together with [`panic_once`](Self::panic_once), the panic will occur AFTER blocking.
|
||||
/// If this is used together with [`panic_once`](Self::panic_once), the panic will occur
|
||||
/// AFTER blocking.
|
||||
fn block(&self) {
|
||||
let mut blocked = self.blocked.lock();
|
||||
assert!(blocked.is_none());
|
||||
|
@ -523,7 +540,8 @@ mod tests {
|
|||
async fn load(&self, k: u8) -> String {
|
||||
self.loaded.lock().push(k);
|
||||
|
||||
// need to capture the cloned notify handle, otherwise the lock guard leaks into the generator
|
||||
// need to capture the cloned notify handle, otherwise the lock guard leaks into the
|
||||
// generator
|
||||
let maybe_block = self.blocked.lock().clone();
|
||||
if let Some(block) = maybe_block {
|
||||
block.notified().await;
|
||||
|
|
|
@ -1218,11 +1218,11 @@ mod tests {
|
|||
let files1 = files.pop().unwrap();
|
||||
let files0 = files.pop().unwrap();
|
||||
let chunk_0 = adapter
|
||||
.new_querier_chunk_from_file_with_metadata(files0)
|
||||
.new_querier_parquet_chunk_from_file_with_metadata(files0)
|
||||
.await
|
||||
.unwrap();
|
||||
let chunk_1 = adapter
|
||||
.new_querier_chunk_from_file_with_metadata(files1)
|
||||
.new_querier_parquet_chunk_from_file_with_metadata(files1)
|
||||
.await
|
||||
.unwrap();
|
||||
// query the chunks
|
||||
|
@ -1439,11 +1439,11 @@ mod tests {
|
|||
let files2 = files.pop().unwrap();
|
||||
let files1 = files.pop().unwrap();
|
||||
let chunk_0 = adapter
|
||||
.new_querier_chunk_from_file_with_metadata(files1)
|
||||
.new_querier_parquet_chunk_from_file_with_metadata(files1)
|
||||
.await
|
||||
.unwrap();
|
||||
let chunk_1 = adapter
|
||||
.new_querier_chunk_from_file_with_metadata(files2)
|
||||
.new_querier_parquet_chunk_from_file_with_metadata(files2)
|
||||
.await
|
||||
.unwrap();
|
||||
// query the chunks
|
||||
|
|
|
@ -25,6 +25,7 @@ pin-project = "1.0"
|
|||
predicate = { path = "../predicate" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
rand = "0.8.3"
|
||||
read_buffer = { path = "../read_buffer" }
|
||||
service_common = { path = "../service_common" }
|
||||
service_grpc_schema = { path = "../service_grpc_schema" }
|
||||
schema = { path = "../schema" }
|
||||
|
|
|
@ -15,6 +15,7 @@ pub mod parquet_file;
|
|||
pub mod partition;
|
||||
pub mod processed_tombstones;
|
||||
mod ram;
|
||||
pub mod read_buffer;
|
||||
pub mod table;
|
||||
pub mod tombstones;
|
||||
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
//! Cache Parquet file data in Read Buffer chunks.
|
||||
|
||||
use super::ram::RamSize;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use cache_system::{
|
||||
backend::{
|
||||
lru::{LruBackend, ResourcePool},
|
||||
resource_consumption::FunctionEstimator,
|
||||
shared::SharedBackend,
|
||||
},
|
||||
driver::Cache,
|
||||
loader::{metrics::MetricsLoader, FunctionLoader},
|
||||
};
|
||||
use data_types::{ParquetFile, ParquetFileId};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use futures::StreamExt;
|
||||
use iox_time::TimeProvider;
|
||||
use read_buffer::RBChunk;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{collections::HashMap, mem, sync::Arc};
|
||||
|
||||
const CACHE_ID: &str = "read_buffer";
|
||||
|
||||
/// Cache for parquet file data decoded into read buffer chunks
|
||||
#[derive(Debug)]
|
||||
pub struct ReadBufferCache {
|
||||
cache: Cache<ParquetFileId, Arc<RBChunk>>,
|
||||
|
||||
/// Handle that allows clearing entries for existing cache entries
|
||||
_backend: SharedBackend<ParquetFileId, Arc<RBChunk>>,
|
||||
}
|
||||
|
||||
impl ReadBufferCache {
|
||||
/// Create a new empty cache.
|
||||
pub fn new(
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
metric_registry: &metric::Registry,
|
||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(
|
||||
move |parquet_file_id: ParquetFileId| {
|
||||
let backoff_config = BackoffConfig::default();
|
||||
|
||||
async move {
|
||||
let rb_chunk = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("get read buffer chunk by parquet file ID", || async {
|
||||
let parquet_file = parquet_file_by_id(parquet_file_id);
|
||||
let table_name = parquet_file_table_name(&parquet_file).to_string();
|
||||
let record_batch_stream = record_batches_stream(&parquet_file);
|
||||
read_buffer_chunk_from_stream(table_name, record_batch_stream).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
Arc::new(rb_chunk)
|
||||
}
|
||||
},
|
||||
));
|
||||
|
||||
let loader = Arc::new(MetricsLoader::new(
|
||||
loader,
|
||||
CACHE_ID,
|
||||
Arc::clone(&time_provider),
|
||||
metric_registry,
|
||||
));
|
||||
|
||||
// add to memory pool
|
||||
let backend = Box::new(LruBackend::new(
|
||||
Box::new(HashMap::new()),
|
||||
Arc::clone(&ram_pool),
|
||||
CACHE_ID,
|
||||
Arc::new(FunctionEstimator::new(
|
||||
|k: &ParquetFileId, v: &Arc<RBChunk>| {
|
||||
RamSize(mem::size_of_val(k) + mem::size_of_val(v) + v.size())
|
||||
},
|
||||
)),
|
||||
));
|
||||
|
||||
// get a direct handle so we can clear out entries as needed
|
||||
let _backend = SharedBackend::new(backend);
|
||||
|
||||
let cache = Cache::new(loader, Box::new(_backend.clone()));
|
||||
|
||||
Self { cache, _backend }
|
||||
}
|
||||
|
||||
/// Get read buffer chunks by Parquet file id
|
||||
pub async fn get(&self, parquet_file_id: ParquetFileId) -> Arc<RBChunk> {
|
||||
self.cache.get(parquet_file_id).await
|
||||
}
|
||||
}
|
||||
|
||||
fn parquet_file_by_id(_parquet_file_id: ParquetFileId) -> ParquetFile {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn parquet_file_table_name(_parquet_file: &ParquetFile) -> &str {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn record_batches_stream(_parquet_file: &ParquetFile) -> SendableRecordBatchStream {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
enum RBChunkError {
|
||||
#[snafu(display("Error streaming record batches: {}", source))]
|
||||
Streaming { source: arrow::error::ArrowError },
|
||||
|
||||
#[snafu(display("Error pushing record batch into chunk: {}", source))]
|
||||
Pushing { source: arrow::error::ArrowError },
|
||||
|
||||
#[snafu(display("Read buffer error: {}", source))]
|
||||
ReadBuffer { source: read_buffer::Error },
|
||||
}
|
||||
|
||||
async fn read_buffer_chunk_from_stream(
|
||||
table_name: String,
|
||||
mut stream: SendableRecordBatchStream,
|
||||
) -> Result<RBChunk, RBChunkError> {
|
||||
let schema = stream.schema();
|
||||
|
||||
let mut builder = read_buffer::RBChunkBuilder::new(table_name, schema);
|
||||
|
||||
while let Some(record_batch) = stream.next().await {
|
||||
builder
|
||||
.push_record_batch(record_batch.context(StreamingSnafu)?)
|
||||
.context(PushingSnafu)?;
|
||||
}
|
||||
|
||||
builder.build().context(ReadBufferSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::cache::ram::test_util::test_ram_pool;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use datafusion_util::stream_from_batches;
|
||||
use iox_tests::util::TestCatalog;
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use read_buffer::Predicate;
|
||||
use schema::selection::Selection;
|
||||
|
||||
fn make_cache(catalog: &TestCatalog) -> ReadBufferCache {
|
||||
ReadBufferCache::new(
|
||||
catalog.time_provider(),
|
||||
&catalog.metric_registry(),
|
||||
test_ram_pool(),
|
||||
)
|
||||
}
|
||||
|
||||
async fn make_catalog() -> Arc<TestCatalog> {
|
||||
TestCatalog::new()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rb_chunks() {
|
||||
let catalog = make_catalog().await;
|
||||
let _cache = make_cache(&catalog);
|
||||
}
|
||||
|
||||
fn lp_to_record_batch(lp: &str) -> RecordBatch {
|
||||
let (_table, batch) = lp_to_mutable_batch(lp);
|
||||
|
||||
batch.to_arrow(Selection::All).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_read_buffer_chunk_from_stream_of_record_batches() {
|
||||
let lines = ["cpu,host=a load=1 11", "cpu,host=a load=2 22"];
|
||||
let batches = lines
|
||||
.into_iter()
|
||||
.map(lp_to_record_batch)
|
||||
.map(Arc::new)
|
||||
.collect();
|
||||
|
||||
let stream = stream_from_batches(batches);
|
||||
|
||||
let rb = read_buffer_chunk_from_stream("cpu".to_string(), stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rb_batches: Vec<RecordBatch> = rb
|
||||
.read_filter(Predicate::default(), Selection::All, vec![])
|
||||
.unwrap()
|
||||
.collect();
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+",
|
||||
"| host | load | time |",
|
||||
"+------+------+--------------------------------+",
|
||||
"| a | 1 | 1970-01-01T00:00:00.000000011Z |",
|
||||
"| a | 2 | 1970-01-01T00:00:00.000000022Z |",
|
||||
"+------+------+--------------------------------+",
|
||||
];
|
||||
|
||||
assert_batches_eq!(expected, &rb_batches);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
//! Querier Chunk
|
||||
//! Querier Chunks
|
||||
|
||||
use crate::cache::CatalogCache;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
|
@ -20,7 +20,7 @@ use uuid::Uuid;
|
|||
|
||||
mod query_access;
|
||||
|
||||
/// Immutable metadata attached to a [`QuerierChunk`].
|
||||
/// Immutable metadata attached to a [`QuerierParquetChunk`].
|
||||
#[derive(Debug)]
|
||||
pub struct ChunkMeta {
|
||||
/// The ID of the chunk
|
||||
|
@ -80,27 +80,19 @@ impl ChunkMeta {
|
|||
}
|
||||
}
|
||||
|
||||
/// Determines how the chunk data is currently accessible.
|
||||
#[derive(Debug)]
|
||||
pub enum ChunkStorage {
|
||||
/// Data is currently available via parquet file within the object store.
|
||||
Parquet {
|
||||
/// ID of the parquet file if the chunk
|
||||
parquet_file_id: ParquetFileId,
|
||||
/// Chunk of the parquet file
|
||||
chunk: Arc<ParquetChunk>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Chunk representation for the querier.
|
||||
/// Chunk representation of Parquet file chunks for the querier.
|
||||
///
|
||||
/// These chunks are usually created on-demand. The querier cache system does not really have a notion of chunks (rather
|
||||
/// it knows about parquet files, local FS caches, ingester data, cached read buffers) but we need to combine all that
|
||||
/// knowledge into chunk objects because this is what the query engine (DataFusion and InfluxRPC) expect.
|
||||
/// These chunks are usually created on-demand. The querier cache system does not really have a
|
||||
/// notion of chunks (rather it knows about parquet files, local FS caches, ingester data, cached
|
||||
/// read buffers) but we need to combine all that knowledge into chunk objects because this is what
|
||||
/// the query engine (DataFusion and InfluxRPC) expect.
|
||||
#[derive(Debug)]
|
||||
pub struct QuerierChunk {
|
||||
/// How the data is currently structured / available for query.
|
||||
storage: ChunkStorage,
|
||||
pub struct QuerierParquetChunk {
|
||||
/// ID of the Parquet file of the chunk
|
||||
parquet_file_id: ParquetFileId,
|
||||
|
||||
/// Chunk of the Parquet file
|
||||
parquet_chunk: Arc<ParquetChunk>,
|
||||
|
||||
/// Immutable metadata.
|
||||
meta: Arc<ChunkMeta>,
|
||||
|
@ -112,19 +104,17 @@ pub struct QuerierChunk {
|
|||
partition_sort_key: Arc<Option<SortKey>>,
|
||||
}
|
||||
|
||||
impl QuerierChunk {
|
||||
impl QuerierParquetChunk {
|
||||
/// Create new parquet-backed chunk (object store data).
|
||||
pub fn new_parquet(
|
||||
pub fn new(
|
||||
parquet_file_id: ParquetFileId,
|
||||
chunk: Arc<ParquetChunk>,
|
||||
parquet_chunk: Arc<ParquetChunk>,
|
||||
meta: Arc<ChunkMeta>,
|
||||
partition_sort_key: Arc<Option<SortKey>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
storage: ChunkStorage::Parquet {
|
||||
parquet_file_id,
|
||||
chunk,
|
||||
},
|
||||
parquet_file_id,
|
||||
parquet_chunk,
|
||||
meta,
|
||||
delete_predicates: Vec::new(),
|
||||
partition_sort_key,
|
||||
|
@ -152,20 +142,14 @@ impl QuerierChunk {
|
|||
self.meta.as_ref()
|
||||
}
|
||||
|
||||
/// Parquet file ID if this chunk is backed by a parquet file.
|
||||
pub fn parquet_file_id(&self) -> Option<ParquetFileId> {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet {
|
||||
parquet_file_id, ..
|
||||
} => Some(*parquet_file_id),
|
||||
}
|
||||
/// Parquet file ID
|
||||
pub fn parquet_file_id(&self) -> ParquetFileId {
|
||||
self.parquet_file_id
|
||||
}
|
||||
|
||||
/// Return time range
|
||||
pub fn timestamp_min_max(&self) -> Option<TimestampMinMax> {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { chunk, .. } => chunk.timestamp_min_max(),
|
||||
}
|
||||
self.parquet_chunk.timestamp_min_max()
|
||||
}
|
||||
|
||||
/// Partition sort key
|
||||
|
@ -233,24 +217,24 @@ impl ParquetChunkAdapter {
|
|||
))
|
||||
}
|
||||
|
||||
/// Create new querier chunk from a catalog record
|
||||
/// Create new querier Parquet chunk from a catalog record
|
||||
///
|
||||
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
|
||||
pub async fn new_querier_chunk_from_file_with_metadata(
|
||||
pub async fn new_querier_parquet_chunk_from_file_with_metadata(
|
||||
&self,
|
||||
parquet_file_with_metadata: ParquetFileWithMetadata,
|
||||
) -> Option<QuerierChunk> {
|
||||
) -> Option<QuerierParquetChunk> {
|
||||
let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata);
|
||||
self.new_querier_chunk(&decoded_parquet_file).await
|
||||
self.new_querier_parquet_chunk(&decoded_parquet_file).await
|
||||
}
|
||||
|
||||
/// Create new querier chunk.
|
||||
/// Create new querier Parquet chunk.
|
||||
///
|
||||
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
|
||||
pub async fn new_querier_chunk(
|
||||
pub async fn new_querier_parquet_chunk(
|
||||
&self,
|
||||
decoded_parquet_file: &DecodedParquetFile,
|
||||
) -> Option<QuerierChunk> {
|
||||
) -> Option<QuerierParquetChunk> {
|
||||
let parquet_file = &decoded_parquet_file.parquet_file;
|
||||
let chunk = Arc::new(self.new_parquet_chunk(decoded_parquet_file).await?);
|
||||
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
|
||||
|
@ -285,7 +269,7 @@ impl ParquetChunkAdapter {
|
|||
max_sequence_number: parquet_file.max_sequence_number,
|
||||
});
|
||||
|
||||
Some(QuerierChunk::new_parquet(
|
||||
Some(QuerierParquetChunk::new(
|
||||
parquet_file.id,
|
||||
chunk,
|
||||
meta,
|
||||
|
@ -295,7 +279,7 @@ impl ParquetChunkAdapter {
|
|||
}
|
||||
|
||||
/// collect data for the given chunk
|
||||
pub async fn collect_read_filter(chunk: &QuerierChunk) -> Vec<RecordBatch> {
|
||||
pub async fn collect_read_filter(chunk: &QuerierParquetChunk) -> Vec<RecordBatch> {
|
||||
chunk
|
||||
.read_filter(
|
||||
IOxSessionContext::default(),
|
||||
|
@ -355,7 +339,7 @@ pub mod tests {
|
|||
|
||||
// create chunk
|
||||
let chunk = adapter
|
||||
.new_querier_chunk(&DecodedParquetFile::new(parquet_file))
|
||||
.new_querier_parquet_chunk(&DecodedParquetFile::new(parquet_file))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::chunk::{ChunkStorage, QuerierChunk};
|
||||
use crate::chunk::QuerierParquetChunk;
|
||||
use data_types::{
|
||||
ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
|
||||
};
|
||||
|
@ -18,17 +18,13 @@ pub enum Error {
|
|||
},
|
||||
}
|
||||
|
||||
impl QueryChunkMeta for QuerierChunk {
|
||||
impl QueryChunkMeta for QuerierParquetChunk {
|
||||
fn summary(&self) -> Option<&TableSummary> {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { chunk, .. } => Some(chunk.table_summary().as_ref()),
|
||||
}
|
||||
Some(self.parquet_chunk.table_summary().as_ref())
|
||||
}
|
||||
|
||||
fn schema(&self) -> Arc<Schema> {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { chunk, .. } => chunk.schema(),
|
||||
}
|
||||
self.parquet_chunk.schema()
|
||||
}
|
||||
|
||||
fn partition_sort_key(&self) -> Option<&SortKey> {
|
||||
|
@ -52,7 +48,7 @@ impl QueryChunkMeta for QuerierChunk {
|
|||
}
|
||||
}
|
||||
|
||||
impl QueryChunk for QuerierChunk {
|
||||
impl QueryChunk for QuerierParquetChunk {
|
||||
fn id(&self) -> ChunkId {
|
||||
self.meta().chunk_id
|
||||
}
|
||||
|
@ -62,23 +58,19 @@ impl QueryChunk for QuerierChunk {
|
|||
}
|
||||
|
||||
fn may_contain_pk_duplicates(&self) -> bool {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { .. } => false,
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn apply_predicate_to_metadata(
|
||||
&self,
|
||||
predicate: &predicate::Predicate,
|
||||
) -> Result<predicate::PredicateMatch, QueryChunkError> {
|
||||
let pred_result = match &self.storage {
|
||||
ChunkStorage::Parquet { chunk, .. } => {
|
||||
if predicate.has_exprs() || chunk.has_timerange(predicate.range.as_ref()) {
|
||||
PredicateMatch::Unknown
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
}
|
||||
}
|
||||
let pred_result = if predicate.has_exprs()
|
||||
|| self.parquet_chunk.has_timerange(predicate.range.as_ref())
|
||||
{
|
||||
PredicateMatch::Unknown
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
};
|
||||
|
||||
Ok(pred_result)
|
||||
|
@ -90,15 +82,11 @@ impl QueryChunk for QuerierChunk {
|
|||
predicate: &predicate::Predicate,
|
||||
columns: schema::selection::Selection<'_>,
|
||||
) -> Result<Option<iox_query::exec::stringset::StringSet>, QueryChunkError> {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { chunk, .. } => {
|
||||
if !predicate.is_empty() {
|
||||
// if there is anything in the predicate, bail for now and force a full plan
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(chunk.column_names(columns))
|
||||
}
|
||||
if !predicate.is_empty() {
|
||||
// if there is anything in the predicate, bail for now and force a full plan
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(self.parquet_chunk.column_names(columns))
|
||||
}
|
||||
|
||||
fn column_values(
|
||||
|
@ -107,13 +95,9 @@ impl QueryChunk for QuerierChunk {
|
|||
_column_name: &str,
|
||||
_predicate: &predicate::Predicate,
|
||||
) -> Result<Option<iox_query::exec::stringset::StringSet>, QueryChunkError> {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { .. } => {
|
||||
// Since DataFusion can read Parquet, there is no advantage to
|
||||
// manually implementing this vs just letting DataFusion do its thing
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
// Since DataFusion can read Parquet, there is no advantage to
|
||||
// manually implementing this vs just letting DataFusion do its thing
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn read_filter(
|
||||
|
@ -134,23 +118,17 @@ impl QueryChunk for QuerierChunk {
|
|||
pred_with_deleted_exprs.merge_delete_predicates(&delete_predicates);
|
||||
debug!(?pred_with_deleted_exprs, "Merged negated predicate");
|
||||
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { chunk, .. } => {
|
||||
ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs));
|
||||
chunk
|
||||
.read_filter(&pred_with_deleted_exprs, selection)
|
||||
.context(ParquetFileChunkSnafu {
|
||||
chunk_id: self.id(),
|
||||
})
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
}
|
||||
}
|
||||
ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs));
|
||||
self.parquet_chunk
|
||||
.read_filter(&pred_with_deleted_exprs, selection)
|
||||
.context(ParquetFileChunkSnafu {
|
||||
chunk_id: self.id(),
|
||||
})
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
}
|
||||
|
||||
fn chunk_type(&self) -> &str {
|
||||
match &self.storage {
|
||||
ChunkStorage::Parquet { .. } => "parquet",
|
||||
}
|
||||
"parquet"
|
||||
}
|
||||
|
||||
fn order(&self) -> ChunkOrder {
|
||||
|
|
|
@ -24,7 +24,7 @@ use std::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
chunk::{ParquetChunkAdapter, QuerierChunk},
|
||||
chunk::{ParquetChunkAdapter, QuerierParquetChunk},
|
||||
tombstone::QuerierTombstone,
|
||||
IngesterPartition,
|
||||
};
|
||||
|
@ -115,12 +115,12 @@ impl Reconciler {
|
|||
"Parquet files after filtering"
|
||||
);
|
||||
|
||||
// convert parquet files and tombstones into QuerierChunks
|
||||
// convert parquet files and tombstones into QuerierParquetChunks
|
||||
let mut parquet_chunks = Vec::with_capacity(parquet_files.len());
|
||||
for parquet_file_with_metadata in parquet_files {
|
||||
if let Some(chunk) = self
|
||||
.chunk_adapter
|
||||
.new_querier_chunk_from_file_with_metadata(parquet_file_with_metadata)
|
||||
.new_querier_parquet_chunk_from_file_with_metadata(parquet_file_with_metadata)
|
||||
.await
|
||||
{
|
||||
parquet_chunks.push(chunk);
|
||||
|
@ -174,12 +174,7 @@ impl Reconciler {
|
|||
.chunk_adapter
|
||||
.catalog_cache()
|
||||
.processed_tombstones()
|
||||
.exists(
|
||||
chunk
|
||||
.parquet_file_id()
|
||||
.expect("just created from a parquet file"),
|
||||
tombstone.tombstone_id(),
|
||||
)
|
||||
.exists(chunk.parquet_file_id(), tombstone.tombstone_id())
|
||||
.await
|
||||
{
|
||||
continue;
|
||||
|
@ -282,7 +277,7 @@ trait UpdatableQuerierChunk: QueryChunk {
|
|||
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk>;
|
||||
}
|
||||
|
||||
impl UpdatableQuerierChunk for QuerierChunk {
|
||||
impl UpdatableQuerierChunk for QuerierParquetChunk {
|
||||
fn update_partition_sort_key(
|
||||
self: Box<Self>,
|
||||
sort_key: Arc<Option<SortKey>>,
|
||||
|
|
Loading…
Reference in New Issue