feat: Use new extra loading info to load read buffer chunks into cache

pull/24376/head
Carol (Nichols || Goulding) 2022-05-27 16:25:35 -04:00
parent 054c25de50
commit 9328ba8c45
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 134 additions and 30 deletions

View File

@ -207,6 +207,15 @@ impl ParquetStorage {
Some(Arc::new(AutoAbortJoinHandle::new(handle))), Some(Arc::new(AutoAbortJoinHandle::new(handle))),
)) ))
} }
/// Read all data from the parquet file.
pub fn read_all(
&self,
schema: SchemaRef,
meta: &IoxMetadata,
) -> Result<SendableRecordBatchStream, ReadError> {
self.read_filter(&Predicate::default(), Selection::All, schema, meta)
}
} }
/// Return indices of the schema's fields of the selection columns /// Return indices of the schema's fields of the selection columns

View File

@ -11,21 +11,23 @@ use cache_system::{
driver::Cache, driver::Cache,
loader::{metrics::MetricsLoader, FunctionLoader}, loader::{metrics::MetricsLoader, FunctionLoader},
}; };
use data_types::{ParquetFile, ParquetFileId}; use data_types::ParquetFileId;
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt; use futures::StreamExt;
use iox_time::TimeProvider; use iox_time::TimeProvider;
use parquet_file::chunk::DecodedParquetFile; use parquet_file::{chunk::DecodedParquetFile, storage::ParquetStorage};
use read_buffer::RBChunk; use read_buffer::RBChunk;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, mem, sync::Arc}; use std::{collections::HashMap, mem, sync::Arc};
const CACHE_ID: &str = "read_buffer"; const CACHE_ID: &str = "read_buffer";
type ExtraFetchInfo = (Arc<DecodedParquetFile>, Arc<str>, ParquetStorage);
/// Cache for parquet file data decoded into read buffer chunks /// Cache for parquet file data decoded into read buffer chunks
#[derive(Debug)] #[derive(Debug)]
pub struct ReadBufferCache { pub struct ReadBufferCache {
cache: Cache<ParquetFileId, Arc<RBChunk>, ()>, cache: Cache<ParquetFileId, Arc<RBChunk>, ExtraFetchInfo>,
/// Handle that allows clearing entries for existing cache entries /// Handle that allows clearing entries for existing cache entries
_backend: SharedBackend<ParquetFileId, Arc<RBChunk>>, _backend: SharedBackend<ParquetFileId, Arc<RBChunk>>,
@ -39,16 +41,24 @@ impl ReadBufferCache {
ram_pool: Arc<ResourcePool<RamSize>>, ram_pool: Arc<ResourcePool<RamSize>>,
) -> Self { ) -> Self {
let loader = Box::new(FunctionLoader::new( let loader = Box::new(FunctionLoader::new(
move |parquet_file_id: ParquetFileId, _extra| { move |_parquet_file_id, (decoded_parquet_file, table_name, store): ExtraFetchInfo| {
let backoff_config = BackoffConfig::default(); let backoff_config = BackoffConfig::default();
async move { async move {
let rb_chunk = Backoff::new(&backoff_config) let rb_chunk = Backoff::new(&backoff_config)
.retry_all_errors("get read buffer chunk by parquet file ID", || async { .retry_all_errors("get read buffer chunk from parquet file", || {
let parquet_file = parquet_file_by_id(parquet_file_id); let decoded_parquet_file_for_load = Arc::clone(&decoded_parquet_file);
let table_name = parquet_file_table_name(&parquet_file).to_string(); let table_name_for_load = Arc::clone(&table_name);
let record_batch_stream = record_batches_stream(&parquet_file); let store_for_load = store.clone();
read_buffer_chunk_from_stream(table_name, record_batch_stream).await
async {
load_from_parquet_file(
decoded_parquet_file_for_load,
table_name_for_load,
store_for_load,
)
.await
}
}) })
.await .await
.expect("retry forever"); .expect("retry forever");
@ -86,23 +96,52 @@ impl ReadBufferCache {
} }
/// Get read buffer chunks from the cache or the Parquet file /// Get read buffer chunks from the cache or the Parquet file
pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc<RBChunk> { pub async fn get(
let parquet_file = &decoded_parquet_file.parquet_file; &self,
decoded_parquet_file: Arc<DecodedParquetFile>,
self.cache.get(parquet_file.id, ()).await table_name: Arc<str>,
store: ParquetStorage,
) -> Arc<RBChunk> {
self.cache
.get(
decoded_parquet_file.parquet_file_id(),
(decoded_parquet_file, table_name, store),
)
.await
} }
} }
fn parquet_file_by_id(_parquet_file_id: ParquetFileId) -> ParquetFile { #[derive(Debug, Snafu)]
unimplemented!() enum LoadError {
#[snafu(display("Error reading from storage: {}", source))]
ReadingFromStorage {
source: parquet_file::storage::ReadError,
},
#[snafu(display("Error building read buffer chunk: {}", source))]
BuildingChunk { source: RBChunkError },
} }
fn parquet_file_table_name(_parquet_file: &ParquetFile) -> &str { async fn load_from_parquet_file(
unimplemented!() decoded_parquet_file: Arc<DecodedParquetFile>,
table_name: Arc<str>,
store: ParquetStorage,
) -> Result<RBChunk, LoadError> {
let record_batch_stream =
record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?;
read_buffer_chunk_from_stream(table_name, record_batch_stream)
.await
.context(BuildingChunkSnafu)
} }
fn record_batches_stream(_parquet_file: &ParquetFile) -> SendableRecordBatchStream { fn record_batches_stream(
unimplemented!() decoded_parquet_file: Arc<DecodedParquetFile>,
store: ParquetStorage,
) -> Result<SendableRecordBatchStream, parquet_file::storage::ReadError> {
let schema = decoded_parquet_file.schema().as_arrow();
let iox_metadata = &decoded_parquet_file.iox_metadata;
store.read_all(schema, iox_metadata)
} }
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
@ -118,12 +157,12 @@ enum RBChunkError {
} }
async fn read_buffer_chunk_from_stream( async fn read_buffer_chunk_from_stream(
table_name: String, table_name: Arc<str>,
mut stream: SendableRecordBatchStream, mut stream: SendableRecordBatchStream,
) -> Result<RBChunk, RBChunkError> { ) -> Result<RBChunk, RBChunkError> {
let schema = stream.schema(); let schema = stream.schema();
let mut builder = read_buffer::RBChunkBuilder::new(table_name, schema); let mut builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema);
while let Some(record_batch) = stream.next().await { while let Some(record_batch) = stream.next().await {
builder builder
@ -141,7 +180,8 @@ mod tests {
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use datafusion_util::stream_from_batches; use datafusion_util::stream_from_batches;
use iox_tests::util::TestCatalog; use iox_tests::util::{TestCatalog, TestPartition};
use metric::{Attributes, Metric, U64Counter};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use read_buffer::Predicate; use read_buffer::Predicate;
use schema::selection::Selection; use schema::selection::Selection;
@ -154,14 +194,65 @@ mod tests {
) )
} }
async fn make_catalog() -> Arc<TestCatalog> { async fn make_catalog() -> (Arc<TestCatalog>, Arc<TestPartition>) {
TestCatalog::new() let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let table = ns.create_table("table1").await;
let sequencer1 = ns.create_sequencer(1).await;
let partition = table
.with_sequencer(&sequencer1)
.create_partition("k")
.await;
(catalog, partition)
} }
#[tokio::test] #[tokio::test]
async fn test_rb_chunks() { async fn test_rb_chunks() {
let catalog = make_catalog().await; let (catalog, partition) = make_catalog().await;
let _cache = make_cache(&catalog);
let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await;
let parquet_file = test_parquet_file.parquet_file;
let decoded = Arc::new(DecodedParquetFile::new(parquet_file));
let storage = ParquetStorage::new(Arc::clone(&catalog.object_store));
let cache = make_cache(&catalog);
let rb = cache
.get(Arc::clone(&decoded), "table1".into(), storage.clone())
.await;
let rb_batches: Vec<RecordBatch> = rb
.read_filter(Predicate::default(), Selection::All, vec![])
.unwrap()
.collect();
let expected = [
"+-----+--------------------------------+",
"| foo | time |",
"+-----+--------------------------------+",
"| 1 | 1970-01-01T00:00:00.000000011Z |",
"+-----+--------------------------------+",
];
assert_batches_eq!(expected, &rb_batches);
// This should fetch from the cache
let _rb_again = cache.get(decoded, "table1".into(), storage).await;
let m: Metric<U64Counter> = catalog
.metric_registry
.get_instrument("cache_load_function_calls")
.unwrap();
let v = m
.get_observer(&Attributes::from(&[("name", "read_buffer")]))
.unwrap()
.fetch();
// Load is only called once
assert_eq!(v, 1);
} }
fn lp_to_record_batch(lp: &str) -> RecordBatch { fn lp_to_record_batch(lp: &str) -> RecordBatch {
@ -181,7 +272,7 @@ mod tests {
let stream = stream_from_batches(batches); let stream = stream_from_batches(batches);
let rb = read_buffer_chunk_from_stream("cpu".to_string(), stream) let rb = read_buffer_chunk_from_stream("cpu".into(), stream)
.await .await
.unwrap(); .unwrap();

View File

@ -257,7 +257,7 @@ pub struct ChunkAdapter {
/// Cache /// Cache
catalog_cache: Arc<CatalogCache>, catalog_cache: Arc<CatalogCache>,
/// Object store. /// Object store. Wrapper around an Arc; cheap to clone.
store: ParquetStorage, store: ParquetStorage,
/// Metric registry. /// Metric registry.
@ -369,7 +369,7 @@ impl ChunkAdapter {
/// Create read buffer chunk. May be from the cache, may be from the parquet file. /// Create read buffer chunk. May be from the cache, may be from the parquet file.
pub async fn new_rb_chunk( pub async fn new_rb_chunk(
&self, &self,
decoded_parquet_file: &DecodedParquetFile, decoded_parquet_file: Arc<DecodedParquetFile>,
) -> Option<QuerierRBChunk> { ) -> Option<QuerierRBChunk> {
let parquet_file_id = decoded_parquet_file.parquet_file_id(); let parquet_file_id = decoded_parquet_file.parquet_file_id();
let schema = decoded_parquet_file.schema(); let schema = decoded_parquet_file.schema();
@ -383,7 +383,11 @@ impl ChunkAdapter {
let rb_chunk = self let rb_chunk = self
.catalog_cache() .catalog_cache()
.read_buffer() .read_buffer()
.get(decoded_parquet_file) .get(
Arc::clone(&decoded_parquet_file),
Arc::clone(&table_name),
self.store.clone(),
)
.await; .await;
// Somewhat hacky workaround because of implicit chunk orders, use min sequence number and // Somewhat hacky workaround because of implicit chunk orders, use min sequence number and