From e877a64462c219061899958600524920da47d96a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 23 May 2022 13:11:38 -0400 Subject: [PATCH] feat: Add `ParquetFiles` cache and memory size estimation for ParquetMetadata (#4661) * feat: Add `ParquetFiles` cache * fix: Apply suggestions from code review Co-authored-by: Marko Mikulicic * fix: remove commented out debugging println * refactor: Improve size calculation * fix: mark `ParquetFileCache::clear` test only * fix: assert on metric count Co-authored-by: Marko Mikulicic --- data_types/src/lib.rs | 8 + iox_tests/src/util.rs | 2 +- parquet_file/src/chunk.rs | 17 +- parquet_file/src/metadata.rs | 31 ++- querier/src/cache/mod.rs | 1 + querier/src/cache/parquet_file.rs | 449 ++++++++++++++++++++++++++++++ test_helpers/src/lib.rs | 25 ++ 7 files changed, 529 insertions(+), 4 deletions(-) create mode 100644 querier/src/cache/parquet_file.rs diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 5587f09397..d94c86b7d3 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -746,6 +746,14 @@ pub struct ParquetFile { pub created_at: Timestamp, } +impl ParquetFile { + /// Estimate the memory consumption of this object and its contents + pub fn size(&self) -> usize { + // No additional heap allocations + std::mem::size_of_val(self) + } +} + /// Data for a parquet file reference that has been inserted in the catalog, including the /// `parquet_metadata` field that can be expensive to fetch. #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 2ce89c0fb9..ad908171e6 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -440,7 +440,7 @@ impl TestPartition { .await } - /// Create a parquet for the partition + /// Create a parquet for the partition with the given min/max sequence numbers and min/max time pub async fn create_parquet_file_with_min_max( self: &Arc, lp: &str, diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 9022dba184..c4d0319b3f 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -107,7 +107,7 @@ impl ParquetChunk { /// Return the approximate memory size of the chunk, in bytes including the /// dictionary, tables, and their rows. pub fn size(&self) -> usize { - mem::size_of::() + mem::size_of_val(self) + self.table_summary.size() + mem::size_of_val(&self.schema.as_ref()) + mem::size_of_val(&self.iox_metadata) @@ -225,4 +225,19 @@ impl DecodedParquetFile { iox_metadata, } } + + /// Estimate the memory consumption of this object and its contents + pub fn size(&self) -> usize { + // note substract size of non Arc'd members as they are + // already included in Type::size() + mem::size_of_val(self) + + self.parquet_file.size() - + mem::size_of_val(&self.parquet_file) + + self.parquet_metadata.size() + + // parquet_metadata is wrapped in Arc so not included in size of self + self.decoded_metadata.size() + - mem::size_of_val(&self.decoded_metadata) + + self.iox_metadata.size() + - mem::size_of_val(&self.iox_metadata) + } } diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 074eac3e27..e2ed475197 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -111,7 +111,7 @@ use schema::{ InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME, }; use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use std::{convert::TryInto, sync::Arc}; +use std::{convert::TryInto, mem, sync::Arc}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol}; use uuid::Uuid; @@ -429,6 +429,24 @@ impl IoxMetadata { created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()), } } + + /// Estimate the memory consumption of this object and its contents + pub fn size(&self) -> usize { + // size of this structure, including inlined size + heap sizes + let size_without_sortkey_refs = mem::size_of_val(self) + + self.namespace_name.as_bytes().len() + + self.table_name.as_bytes().len() + + self.partition_key.as_bytes().len(); + + if let Some(sort_key) = self.sort_key.as_ref() { + size_without_sortkey_refs + + sort_key.size() + // already included in `size_of_val(self)` above so remove to avoid double counting + - mem::size_of_val(sort_key) + } else { + size_without_sortkey_refs + } + } } /// Parse big-endian UUID from protobuf. @@ -593,7 +611,8 @@ impl IoxParquetMetaData { /// In-memory size in bytes, including `self`. pub fn size(&self) -> usize { - std::mem::size_of::() + self.data.capacity() + assert_eq!(self.data.len(), self.data.capacity(), "data is not trimmed"); + mem::size_of_val(self) + self.data.capacity() } } @@ -686,6 +705,14 @@ impl DecodedIoxParquetMetaData { Ok(column_summaries) } + + /// Estimate the memory consumption of this object and its contents + pub fn size(&self) -> usize { + // This is likely a wild under count as it doesn't include + // memory pointed to in the `ParquetMetaData` structues. + // Feature tracked in arrow-rs: https://github.com/apache/arrow-rs/issues/1729 + mem::size_of_val(self) + } } /// Read IOx statistics from parquet row group metadata. diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs index f103bced8e..c679d04c33 100644 --- a/querier/src/cache/mod.rs +++ b/querier/src/cache/mod.rs @@ -11,6 +11,7 @@ use self::{ }; pub mod namespace; +pub mod parquet_file; pub mod partition; pub mod processed_tombstones; mod ram; diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs new file mode 100644 index 0000000000..a2d454f240 --- /dev/null +++ b/querier/src/cache/parquet_file.rs @@ -0,0 +1,449 @@ +//! ParquetFile cache + +use backoff::{Backoff, BackoffConfig}; +use cache_system::{ + backend::{ + lru::{LruBackend, ResourcePool}, + resource_consumption::FunctionEstimator, + shared::SharedBackend, + }, + driver::Cache, + loader::{metrics::MetricsLoader, FunctionLoader}, +}; +use data_types::{ParquetFileWithMetadata, SequenceNumber, TableId}; +use iox_catalog::interface::Catalog; +use iox_time::TimeProvider; +use parquet_file::chunk::DecodedParquetFile; +use snafu::{ResultExt, Snafu}; +use std::{collections::HashMap, mem, sync::Arc}; + +use super::ram::RamSize; + +const CACHE_ID: &str = "parquet_file"; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("CatalogError refreshing parquet file cache: {}", source))] + Catalog { + source: iox_catalog::interface::Error, + }, +} + +/// A specialized `Error` for errors (needed to make Backoff happy for some reason) +pub type Result = std::result::Result; + +#[derive(Debug)] +/// Holds decoded catalog information about a parquet file +pub struct CachedParquetFiles { + /// Parquet catalog information and decoded metadata + pub files: Arc>>, +} + +impl CachedParquetFiles { + fn new(parquet_files_with_metadata: Vec) -> Self { + let files: Vec<_> = parquet_files_with_metadata + .into_iter() + .map(DecodedParquetFile::new) + .map(Arc::new) + .collect(); + + Self { + files: Arc::new(files), + } + } + + /// return the underying files as a new Vec + pub fn vec(&self) -> Vec> { + self.files.as_ref().clone() + } + + /// Estimate the memory consumption of this object and its contents + fn size(&self) -> usize { + // simplify accounting by ensuring len and capacity of vector are the same + assert_eq!(self.files.len(), self.files.capacity()); + + // Note size_of_val is the size of the Azrc + // https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da + + // size of the Arc itself + mem::size_of_val(self) + + // Vec overhead + mem::size_of_val(self.files.as_ref()) + + // size of the underlying decoded parquet files + self.files.iter().map(|f| f.size()).sum::() + } + + /// Returns the greatest parquet sequence number stored in this cache entry + pub(crate) fn max_parquet_sequence_number(&self) -> Option { + self.files + .iter() + .map(|f| f.parquet_file.max_sequence_number) + .max() + } +} + +/// Cache for parquet file information with metadata. +/// +/// DOES NOT CACHE the actual parquet bytes from object store +#[derive(Debug)] +pub struct ParquetFileCache { + cache: Cache>, + + /// Handle that allows clearing entries for existing cache entries + backend: SharedBackend>, +} + +impl ParquetFileCache { + /// Create new empty cache. + pub fn new( + catalog: Arc, + backoff_config: BackoffConfig, + time_provider: Arc, + metric_registry: &metric::Registry, + ram_pool: Arc>, + ) -> Self { + let loader = Box::new(FunctionLoader::new(move |table_id: TableId| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); + + async move { + Backoff::new(&backoff_config) + .retry_all_errors("get parquet_files", || async { + // TODO refreshing all parquet files for the + // entire table is likely to be quite wasteful + // for large tables. + // + // Some this code could be more efficeint: + // + // 1. incrementally fetch only NEW parquet + // files that aren't already in the cache + // + // 2. track time ranges needed for queries and + // limit files fetched to what is actually + // needed + let parquet_files_with_metadata: Vec<_> = catalog + .repositories() + .await + .parquet_files() + .list_by_table_not_to_delete_with_metadata(table_id) + .await + .context(CatalogSnafu)?; + + Ok(Arc::new(CachedParquetFiles::new( + parquet_files_with_metadata, + ))) as std::result::Result<_, Error> + }) + .await + .expect("retry forever") + } + })); + let loader = Arc::new(MetricsLoader::new( + loader, + CACHE_ID, + Arc::clone(&time_provider), + metric_registry, + )); + + // add to memory pool + let backend = Box::new(LruBackend::new( + Box::new(HashMap::new()), + Arc::clone(&ram_pool), + CACHE_ID, + Arc::new(FunctionEstimator::new( + |k: &TableId, v: &Arc| { + RamSize(mem::size_of_val(k) + mem::size_of_val(v) + v.size()) + }, + )), + )); + + // get a direct handle so we can clear out entries as needed + let backend = SharedBackend::new(backend); + + let cache = Cache::new(loader, Box::new(backend.clone())); + + Self { cache, backend } + } + + /// Get list of cached parquet files, by table id + pub async fn get(&self, table_id: TableId) -> Arc { + self.cache.get(table_id).await + } + + /// Mark the entry for table_id as expired (and needs a refresh) + #[cfg(test)] + pub fn expire(&self, table_id: TableId) { + self.backend.remove_if(&table_id, |_| true); + } + + /// Clear the parquet file cache if the cache does not contain any + /// files that have the specified `max_parquet_sequence_number`. + /// + /// If `None` is passed, returns false and does not clear the cache. + /// + /// Returns true if the cache was cleared (it will be refreshed on + /// the next call to get). + /// + /// This API is designed to be called with a response from the + /// ingster so there is a single place were the invalidation logic + /// is handled. An `Option` is accepted because the ingester may + /// or may or may not have a `max_parquet_sequence_number`. + /// + /// If a `max_parquet_sequence_number` is supplied that is not in + /// our cache, it means the ingester has written new data to the + /// catalog and the cacue is out of date. + pub fn expire_on_newly_persisted_files( + &self, + table_id: TableId, + max_parquet_sequence_number: Option, + ) -> bool { + if let Some(max_parquet_sequence_number) = max_parquet_sequence_number { + // check backend cache to see if the maximum sequence + // number desired is less than what we know about + self.backend.remove_if(&table_id, |cached_file| { + let max_cached = cached_file.max_parquet_sequence_number(); + + if let Some(max_cached) = max_cached { + max_cached < max_parquet_sequence_number + } else { + false + } + }) + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + use data_types::{ParquetFile, ParquetFileId}; + use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFile, TestPartition, TestTable}; + use test_helpers::assert_close; + + use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count}; + + const METRIC_NAME: &str = "parquet_list_by_table_not_to_delete_with_metadata"; + + #[tokio::test] + async fn test_parquet_chunks() { + let (catalog, table, partition) = make_catalog().await; + let tfile = partition.create_parquet_file("table1 foo=1 11").await; + + let cache = make_cache(&catalog); + let cached_files = cache.get(table.table.id).await.vec(); + + assert_eq!(cached_files.len(), 1); + let expected_parquet_file = to_file(tfile); + assert_eq!(cached_files[0].parquet_file, expected_parquet_file); + + // validate a second request doens't result in a catalog request + assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); + cache.get(table.table.id).await; + assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); + } + + #[tokio::test] + async fn test_multiple_tables() { + let catalog = TestCatalog::new(); + let ns = catalog.create_namespace("ns").await; + + let (table1, partition1) = make_table_and_partition("table1", &ns).await; + let (table2, partition2) = make_table_and_partition("table2", &ns).await; + + let tfile1 = partition1.create_parquet_file("table1 foo=1 11").await; + let tfile2 = partition2.create_parquet_file("table2 foo=1 11").await; + + let cache = make_cache(&catalog); + + let cached_files = cache.get(table1.table.id).await.vec(); + assert_eq!(cached_files.len(), 1); + let expected_parquet_file = to_file(tfile1); + assert_eq!(cached_files[0].parquet_file, expected_parquet_file); + + let cached_files = cache.get(table2.table.id).await.vec(); + assert_eq!(cached_files.len(), 1); + let expected_parquet_file = to_file(tfile2); + assert_eq!(cached_files[0].parquet_file, expected_parquet_file); + } + + #[tokio::test] + async fn test_table_does_not_exist() { + let (_catalog, table, partition) = make_catalog().await; + partition.create_parquet_file("table1 foo=1 11").await; + + // check in a different catalog where the table doesn't exist (yet) + let different_catalog = TestCatalog::new(); + let cache = make_cache(&different_catalog); + + let cached_files = cache.get(table.table.id).await.vec(); + assert!(cached_files.is_empty()); + } + + #[tokio::test] + async fn test_size_estimation() { + let (catalog, table, partition) = make_catalog().await; + partition.create_parquet_file("table1 foo=1 11").await; + let table_id = table.table.id; + + // expect these sizes change with sizes of parquet and + // its metadata (as the metadata is thrift encoded and + // includes timestamps, etc) + let slop_budget = 10; + + let single_file_size = 1066; + let two_file_size = 2097; + assert!(single_file_size < two_file_size); + + let cache = make_cache(&catalog); + let cached_files = cache.get(table_id).await; + assert_close!(cached_files.size(), single_file_size, slop_budget); + + // add a second file, and force the cache to find it + partition.create_parquet_file("table1 foo=1 11").await; + cache.expire(table_id); + let cached_files = cache.get(table_id).await; + assert_close!(cached_files.size(), two_file_size, slop_budget); + } + + #[tokio::test] + async fn test_max_persisted_sequence_number() { + let (catalog, table, partition) = make_catalog().await; + let sequence_number_1 = SequenceNumber::new(1); + let sequence_number_2 = SequenceNumber::new(2); + let sequence_number_3 = SequenceNumber::new(3); + let sequence_number_10 = SequenceNumber::new(10); + + let tfile1_2 = partition + .create_parquet_file_with_min_max( + "table1 foo=1 11", + sequence_number_1.get(), + sequence_number_2.get(), + 0, + 100, + ) + .await; + let tfile1_3 = partition + .create_parquet_file_with_min_max( + "table1 foo=1 11", + sequence_number_2.get(), + sequence_number_3.get(), + 0, + 100, + ) + .await; + + let cache = make_cache(&catalog); + let table_id = table.table.id; + assert_eq!( + cache.get(table_id).await.ids(), + ids(&[&tfile1_2, &tfile1_3]) + ); + + // simulate request with sequence number 2 + // should not expire anything + assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); + cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_2)); + assert_eq!( + cache.get(table_id).await.ids(), + ids(&[&tfile1_2, &tfile1_3]) + ); + assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); + + // simulate request with no sequence number + // should not expire anything + cache.expire_on_newly_persisted_files(table_id, None); + assert_eq!( + cache.get(table_id).await.ids(), + ids(&[&tfile1_2, &tfile1_3]) + ); + assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); + + // new file is created, but cache is stale + let tfile1_10 = partition + .create_parquet_file_with_min_max( + "table1 foo=1 11", + sequence_number_2.get(), + sequence_number_10.get(), + 0, + 100, + ) + .await; + // cache doesn't have tfile1_10 + assert_eq!( + cache.get(table_id).await.ids(), + ids(&[&tfile1_2, &tfile1_3]) + ); + + // new request includes sequence 10 and causes a cache refresh + cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_10)); + // now cache has tfile!_10 (yay!) + assert_eq!( + cache.get(table_id).await.ids(), + ids(&[&tfile1_2, &tfile1_3, &tfile1_10]) + ); + assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2); + } + + fn ids(files: &[&TestParquetFile]) -> HashSet { + files.iter().map(|f| f.parquet_file.id).collect() + } + + /// Extracts parquet ids from various objects + trait ParquetIds { + fn ids(&self) -> HashSet; + } + + impl ParquetIds for &CachedParquetFiles { + fn ids(&self) -> HashSet { + self.files.iter().map(|f| f.parquet_file.id).collect() + } + } + + impl ParquetIds for Arc { + fn ids(&self) -> HashSet { + self.as_ref().ids() + } + } + + async fn make_catalog() -> (Arc, Arc, Arc) { + let catalog = TestCatalog::new(); + let ns = catalog.create_namespace("ns").await; + + let (table, partition) = make_table_and_partition("table1", &ns).await; + (catalog, table, partition) + } + + async fn make_table_and_partition( + table_name: &str, + ns: &Arc, + ) -> (Arc, Arc) { + let table = ns.create_table(table_name).await; + let sequencer1 = ns.create_sequencer(1).await; + + let partition = table + .with_sequencer(&sequencer1) + .create_partition("k") + .await; + + (table, partition) + } + + fn make_cache(catalog: &TestCatalog) -> ParquetFileCache { + ParquetFileCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + &catalog.metric_registry(), + test_ram_pool(), + ) + } + + fn to_file(tfile: TestParquetFile) -> ParquetFile { + let (parquet_file, _meta) = tfile.parquet_file.split_off_metadata(); + parquet_file + } +} diff --git a/test_helpers/src/lib.rs b/test_helpers/src/lib.rs index 81bd676fd6..3d93eccd01 100644 --- a/test_helpers/src/lib.rs +++ b/test_helpers/src/lib.rs @@ -166,3 +166,28 @@ macro_rules! assert_error { ); }; } + +#[macro_export] +/// Assert that `actual` and `expected` values are within `epsilon` of +/// each other. Used to compare values that may fluctuate from run to run (e.g. because they encode timestamps) +/// +/// Usage: assert_close!(actual, expected, epsilon); +macro_rules! assert_close { + ($ACTUAL:expr, $EXPECTED:expr, $EPSILON:expr) => {{ + { + let actual = $ACTUAL; + let expected = $EXPECTED; + let epsilon = $EPSILON; + // determine how far apart they actually are + let delta = actual.abs_diff(expected); + assert!( + delta <= epsilon, + "{} and {} differ by {}, which is more than allowed {}", + actual, + expected, + delta, + epsilon + ) + } + }}; +}