feat: Add `ParquetFiles` cache and memory size estimation for ParquetMetadata (#4661)

* feat: Add `ParquetFiles` cache

* fix: Apply suggestions from code review

Co-authored-by: Marko Mikulicic <mkm@influxdata.com>

* fix: remove commented out debugging println

* refactor: Improve size calculation

* fix: mark `ParquetFileCache::clear` test only

* fix: assert on metric count

Co-authored-by: Marko Mikulicic <mkm@influxdata.com>
pull/24376/head
Andrew Lamb 2022-05-23 13:11:38 -04:00 committed by GitHub
parent 5239417925
commit e877a64462
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 529 additions and 4 deletions

View File

@ -746,6 +746,14 @@ pub struct ParquetFile {
pub created_at: Timestamp,
}
impl ParquetFile {
/// Estimate the memory consumption of this object and its contents
pub fn size(&self) -> usize {
// No additional heap allocations
std::mem::size_of_val(self)
}
}
/// Data for a parquet file reference that has been inserted in the catalog, including the
/// `parquet_metadata` field that can be expensive to fetch.
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]

View File

@ -440,7 +440,7 @@ impl TestPartition {
.await
}
/// Create a parquet for the partition
/// Create a parquet for the partition with the given min/max sequence numbers and min/max time
pub async fn create_parquet_file_with_min_max(
self: &Arc<Self>,
lp: &str,

View File

@ -107,7 +107,7 @@ impl ParquetChunk {
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
pub fn size(&self) -> usize {
mem::size_of::<Self>()
mem::size_of_val(self)
+ self.table_summary.size()
+ mem::size_of_val(&self.schema.as_ref())
+ mem::size_of_val(&self.iox_metadata)
@ -225,4 +225,19 @@ impl DecodedParquetFile {
iox_metadata,
}
}
/// Estimate the memory consumption of this object and its contents
pub fn size(&self) -> usize {
// note substract size of non Arc'd members as they are
// already included in Type::size()
mem::size_of_val(self) +
self.parquet_file.size() -
mem::size_of_val(&self.parquet_file) +
self.parquet_metadata.size() +
// parquet_metadata is wrapped in Arc so not included in size of self
self.decoded_metadata.size()
- mem::size_of_val(&self.decoded_metadata)
+ self.iox_metadata.size()
- mem::size_of_val(&self.iox_metadata)
}
}

View File

@ -111,7 +111,7 @@ use schema::{
InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME,
};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{convert::TryInto, sync::Arc};
use std::{convert::TryInto, mem, sync::Arc};
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};
use uuid::Uuid;
@ -429,6 +429,24 @@ impl IoxMetadata {
created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()),
}
}
/// Estimate the memory consumption of this object and its contents
pub fn size(&self) -> usize {
// size of this structure, including inlined size + heap sizes
let size_without_sortkey_refs = mem::size_of_val(self)
+ self.namespace_name.as_bytes().len()
+ self.table_name.as_bytes().len()
+ self.partition_key.as_bytes().len();
if let Some(sort_key) = self.sort_key.as_ref() {
size_without_sortkey_refs +
sort_key.size()
// already included in `size_of_val(self)` above so remove to avoid double counting
- mem::size_of_val(sort_key)
} else {
size_without_sortkey_refs
}
}
}
/// Parse big-endian UUID from protobuf.
@ -593,7 +611,8 @@ impl IoxParquetMetaData {
/// In-memory size in bytes, including `self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>() + self.data.capacity()
assert_eq!(self.data.len(), self.data.capacity(), "data is not trimmed");
mem::size_of_val(self) + self.data.capacity()
}
}
@ -686,6 +705,14 @@ impl DecodedIoxParquetMetaData {
Ok(column_summaries)
}
/// Estimate the memory consumption of this object and its contents
pub fn size(&self) -> usize {
// This is likely a wild under count as it doesn't include
// memory pointed to in the `ParquetMetaData` structues.
// Feature tracked in arrow-rs: https://github.com/apache/arrow-rs/issues/1729
mem::size_of_val(self)
}
}
/// Read IOx statistics from parquet row group metadata.

View File

@ -11,6 +11,7 @@ use self::{
};
pub mod namespace;
pub mod parquet_file;
pub mod partition;
pub mod processed_tombstones;
mod ram;

449
querier/src/cache/parquet_file.rs vendored Normal file
View File

@ -0,0 +1,449 @@
//! ParquetFile cache
use backoff::{Backoff, BackoffConfig};
use cache_system::{
backend::{
lru::{LruBackend, ResourcePool},
resource_consumption::FunctionEstimator,
shared::SharedBackend,
},
driver::Cache,
loader::{metrics::MetricsLoader, FunctionLoader},
};
use data_types::{ParquetFileWithMetadata, SequenceNumber, TableId};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use parquet_file::chunk::DecodedParquetFile;
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, mem, sync::Arc};
use super::ram::RamSize;
const CACHE_ID: &str = "parquet_file";
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("CatalogError refreshing parquet file cache: {}", source))]
Catalog {
source: iox_catalog::interface::Error,
},
}
/// A specialized `Error` for errors (needed to make Backoff happy for some reason)
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
/// Holds decoded catalog information about a parquet file
pub struct CachedParquetFiles {
/// Parquet catalog information and decoded metadata
pub files: Arc<Vec<Arc<DecodedParquetFile>>>,
}
impl CachedParquetFiles {
fn new(parquet_files_with_metadata: Vec<ParquetFileWithMetadata>) -> Self {
let files: Vec<_> = parquet_files_with_metadata
.into_iter()
.map(DecodedParquetFile::new)
.map(Arc::new)
.collect();
Self {
files: Arc::new(files),
}
}
/// return the underying files as a new Vec
pub fn vec(&self) -> Vec<Arc<DecodedParquetFile>> {
self.files.as_ref().clone()
}
/// Estimate the memory consumption of this object and its contents
fn size(&self) -> usize {
// simplify accounting by ensuring len and capacity of vector are the same
assert_eq!(self.files.len(), self.files.capacity());
// Note size_of_val is the size of the Azrc
// https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da
// size of the Arc itself
mem::size_of_val(self) +
// Vec overhead
mem::size_of_val(self.files.as_ref()) +
// size of the underlying decoded parquet files
self.files.iter().map(|f| f.size()).sum::<usize>()
}
/// Returns the greatest parquet sequence number stored in this cache entry
pub(crate) fn max_parquet_sequence_number(&self) -> Option<SequenceNumber> {
self.files
.iter()
.map(|f| f.parquet_file.max_sequence_number)
.max()
}
}
/// Cache for parquet file information with metadata.
///
/// DOES NOT CACHE the actual parquet bytes from object store
#[derive(Debug)]
pub struct ParquetFileCache {
cache: Cache<TableId, Arc<CachedParquetFiles>>,
/// Handle that allows clearing entries for existing cache entries
backend: SharedBackend<TableId, Arc<CachedParquetFiles>>,
}
impl ParquetFileCache {
/// Create new empty cache.
pub fn new(
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
ram_pool: Arc<ResourcePool<RamSize>>,
) -> Self {
let loader = Box::new(FunctionLoader::new(move |table_id: TableId| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
Backoff::new(&backoff_config)
.retry_all_errors("get parquet_files", || async {
// TODO refreshing all parquet files for the
// entire table is likely to be quite wasteful
// for large tables.
//
// Some this code could be more efficeint:
//
// 1. incrementally fetch only NEW parquet
// files that aren't already in the cache
//
// 2. track time ranges needed for queries and
// limit files fetched to what is actually
// needed
let parquet_files_with_metadata: Vec<_> = catalog
.repositories()
.await
.parquet_files()
.list_by_table_not_to_delete_with_metadata(table_id)
.await
.context(CatalogSnafu)?;
Ok(Arc::new(CachedParquetFiles::new(
parquet_files_with_metadata,
))) as std::result::Result<_, Error>
})
.await
.expect("retry forever")
}
}));
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,
Arc::clone(&time_provider),
metric_registry,
));
// add to memory pool
let backend = Box::new(LruBackend::new(
Box::new(HashMap::new()),
Arc::clone(&ram_pool),
CACHE_ID,
Arc::new(FunctionEstimator::new(
|k: &TableId, v: &Arc<CachedParquetFiles>| {
RamSize(mem::size_of_val(k) + mem::size_of_val(v) + v.size())
},
)),
));
// get a direct handle so we can clear out entries as needed
let backend = SharedBackend::new(backend);
let cache = Cache::new(loader, Box::new(backend.clone()));
Self { cache, backend }
}
/// Get list of cached parquet files, by table id
pub async fn get(&self, table_id: TableId) -> Arc<CachedParquetFiles> {
self.cache.get(table_id).await
}
/// Mark the entry for table_id as expired (and needs a refresh)
#[cfg(test)]
pub fn expire(&self, table_id: TableId) {
self.backend.remove_if(&table_id, |_| true);
}
/// Clear the parquet file cache if the cache does not contain any
/// files that have the specified `max_parquet_sequence_number`.
///
/// If `None` is passed, returns false and does not clear the cache.
///
/// Returns true if the cache was cleared (it will be refreshed on
/// the next call to get).
///
/// This API is designed to be called with a response from the
/// ingster so there is a single place were the invalidation logic
/// is handled. An `Option` is accepted because the ingester may
/// or may or may not have a `max_parquet_sequence_number`.
///
/// If a `max_parquet_sequence_number` is supplied that is not in
/// our cache, it means the ingester has written new data to the
/// catalog and the cacue is out of date.
pub fn expire_on_newly_persisted_files(
&self,
table_id: TableId,
max_parquet_sequence_number: Option<SequenceNumber>,
) -> bool {
if let Some(max_parquet_sequence_number) = max_parquet_sequence_number {
// check backend cache to see if the maximum sequence
// number desired is less than what we know about
self.backend.remove_if(&table_id, |cached_file| {
let max_cached = cached_file.max_parquet_sequence_number();
if let Some(max_cached) = max_cached {
max_cached < max_parquet_sequence_number
} else {
false
}
})
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use data_types::{ParquetFile, ParquetFileId};
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFile, TestPartition, TestTable};
use test_helpers::assert_close;
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
const METRIC_NAME: &str = "parquet_list_by_table_not_to_delete_with_metadata";
#[tokio::test]
async fn test_parquet_chunks() {
let (catalog, table, partition) = make_catalog().await;
let tfile = partition.create_parquet_file("table1 foo=1 11").await;
let cache = make_cache(&catalog);
let cached_files = cache.get(table.table.id).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = to_file(tfile);
assert_eq!(cached_files[0].parquet_file, expected_parquet_file);
// validate a second request doens't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.get(table.table.id).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
}
#[tokio::test]
async fn test_multiple_tables() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let (table1, partition1) = make_table_and_partition("table1", &ns).await;
let (table2, partition2) = make_table_and_partition("table2", &ns).await;
let tfile1 = partition1.create_parquet_file("table1 foo=1 11").await;
let tfile2 = partition2.create_parquet_file("table2 foo=1 11").await;
let cache = make_cache(&catalog);
let cached_files = cache.get(table1.table.id).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = to_file(tfile1);
assert_eq!(cached_files[0].parquet_file, expected_parquet_file);
let cached_files = cache.get(table2.table.id).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = to_file(tfile2);
assert_eq!(cached_files[0].parquet_file, expected_parquet_file);
}
#[tokio::test]
async fn test_table_does_not_exist() {
let (_catalog, table, partition) = make_catalog().await;
partition.create_parquet_file("table1 foo=1 11").await;
// check in a different catalog where the table doesn't exist (yet)
let different_catalog = TestCatalog::new();
let cache = make_cache(&different_catalog);
let cached_files = cache.get(table.table.id).await.vec();
assert!(cached_files.is_empty());
}
#[tokio::test]
async fn test_size_estimation() {
let (catalog, table, partition) = make_catalog().await;
partition.create_parquet_file("table1 foo=1 11").await;
let table_id = table.table.id;
// expect these sizes change with sizes of parquet and
// its metadata (as the metadata is thrift encoded and
// includes timestamps, etc)
let slop_budget = 10;
let single_file_size = 1066;
let two_file_size = 2097;
assert!(single_file_size < two_file_size);
let cache = make_cache(&catalog);
let cached_files = cache.get(table_id).await;
assert_close!(cached_files.size(), single_file_size, slop_budget);
// add a second file, and force the cache to find it
partition.create_parquet_file("table1 foo=1 11").await;
cache.expire(table_id);
let cached_files = cache.get(table_id).await;
assert_close!(cached_files.size(), two_file_size, slop_budget);
}
#[tokio::test]
async fn test_max_persisted_sequence_number() {
let (catalog, table, partition) = make_catalog().await;
let sequence_number_1 = SequenceNumber::new(1);
let sequence_number_2 = SequenceNumber::new(2);
let sequence_number_3 = SequenceNumber::new(3);
let sequence_number_10 = SequenceNumber::new(10);
let tfile1_2 = partition
.create_parquet_file_with_min_max(
"table1 foo=1 11",
sequence_number_1.get(),
sequence_number_2.get(),
0,
100,
)
.await;
let tfile1_3 = partition
.create_parquet_file_with_min_max(
"table1 foo=1 11",
sequence_number_2.get(),
sequence_number_3.get(),
0,
100,
)
.await;
let cache = make_cache(&catalog);
let table_id = table.table.id;
assert_eq!(
cache.get(table_id).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
// simulate request with sequence number 2
// should not expire anything
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_2));
assert_eq!(
cache.get(table_id).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// simulate request with no sequence number
// should not expire anything
cache.expire_on_newly_persisted_files(table_id, None);
assert_eq!(
cache.get(table_id).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// new file is created, but cache is stale
let tfile1_10 = partition
.create_parquet_file_with_min_max(
"table1 foo=1 11",
sequence_number_2.get(),
sequence_number_10.get(),
0,
100,
)
.await;
// cache doesn't have tfile1_10
assert_eq!(
cache.get(table_id).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
// new request includes sequence 10 and causes a cache refresh
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_10));
// now cache has tfile!_10 (yay!)
assert_eq!(
cache.get(table_id).await.ids(),
ids(&[&tfile1_2, &tfile1_3, &tfile1_10])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
fn ids(files: &[&TestParquetFile]) -> HashSet<ParquetFileId> {
files.iter().map(|f| f.parquet_file.id).collect()
}
/// Extracts parquet ids from various objects
trait ParquetIds {
fn ids(&self) -> HashSet<ParquetFileId>;
}
impl ParquetIds for &CachedParquetFiles {
fn ids(&self) -> HashSet<ParquetFileId> {
self.files.iter().map(|f| f.parquet_file.id).collect()
}
}
impl ParquetIds for Arc<CachedParquetFiles> {
fn ids(&self) -> HashSet<ParquetFileId> {
self.as_ref().ids()
}
}
async fn make_catalog() -> (Arc<TestCatalog>, Arc<TestTable>, Arc<TestPartition>) {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let (table, partition) = make_table_and_partition("table1", &ns).await;
(catalog, table, partition)
}
async fn make_table_and_partition(
table_name: &str,
ns: &Arc<TestNamespace>,
) -> (Arc<TestTable>, Arc<TestPartition>) {
let table = ns.create_table(table_name).await;
let sequencer1 = ns.create_sequencer(1).await;
let partition = table
.with_sequencer(&sequencer1)
.create_partition("k")
.await;
(table, partition)
}
fn make_cache(catalog: &TestCatalog) -> ParquetFileCache {
ParquetFileCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
)
}
fn to_file(tfile: TestParquetFile) -> ParquetFile {
let (parquet_file, _meta) = tfile.parquet_file.split_off_metadata();
parquet_file
}
}

View File

@ -166,3 +166,28 @@ macro_rules! assert_error {
);
};
}
#[macro_export]
/// Assert that `actual` and `expected` values are within `epsilon` of
/// each other. Used to compare values that may fluctuate from run to run (e.g. because they encode timestamps)
///
/// Usage: assert_close!(actual, expected, epsilon);
macro_rules! assert_close {
($ACTUAL:expr, $EXPECTED:expr, $EPSILON:expr) => {{
{
let actual = $ACTUAL;
let expected = $EXPECTED;
let epsilon = $EPSILON;
// determine how far apart they actually are
let delta = actual.abs_diff(expected);
assert!(
delta <= epsilon,
"{} and {} differ by {}, which is more than allowed {}",
actual,
expected,
delta,
epsilon
)
}
}};
}