parent
4b7d02fad1
commit
463d430d43
|
@ -10,10 +10,9 @@ use cache_system::{
|
||||||
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
|
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
|
||||||
loader::{metrics::MetricsLoader, FunctionLoader},
|
loader::{metrics::MetricsLoader, FunctionLoader},
|
||||||
};
|
};
|
||||||
use data_types::{ParquetFileWithMetadata, SequenceNumber, TableId};
|
use data_types::{ParquetFile, SequenceNumber, TableId};
|
||||||
use iox_catalog::interface::Catalog;
|
use iox_catalog::interface::Catalog;
|
||||||
use iox_time::TimeProvider;
|
use iox_time::TimeProvider;
|
||||||
use parquet_file::chunk::DecodedParquetFile;
|
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
use std::{collections::HashMap, mem, sync::Arc};
|
use std::{collections::HashMap, mem, sync::Arc};
|
||||||
|
|
||||||
|
@ -30,20 +29,16 @@ pub enum Error {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Holds catalog information about a parquet file
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Holds decoded catalog information about a parquet file
|
|
||||||
pub struct CachedParquetFiles {
|
pub struct CachedParquetFiles {
|
||||||
/// Parquet catalog information and decoded metadata
|
/// Parquet catalog information
|
||||||
pub files: Arc<Vec<Arc<DecodedParquetFile>>>,
|
pub files: Arc<Vec<Arc<ParquetFile>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CachedParquetFiles {
|
impl CachedParquetFiles {
|
||||||
fn new(parquet_files_with_metadata: Vec<ParquetFileWithMetadata>) -> Self {
|
fn new(parquet_files: Vec<ParquetFile>) -> Self {
|
||||||
let files: Vec<_> = parquet_files_with_metadata
|
let files: Vec<_> = parquet_files.into_iter().map(Arc::new).collect();
|
||||||
.into_iter()
|
|
||||||
.map(DecodedParquetFile::new)
|
|
||||||
.map(Arc::new)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
files: Arc::new(files),
|
files: Arc::new(files),
|
||||||
|
@ -51,7 +46,7 @@ impl CachedParquetFiles {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return the underying files as a new Vec
|
/// return the underying files as a new Vec
|
||||||
pub fn vec(&self) -> Vec<Arc<DecodedParquetFile>> {
|
pub fn vec(&self) -> Vec<Arc<ParquetFile>> {
|
||||||
self.files.as_ref().clone()
|
self.files.as_ref().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,22 +67,19 @@ impl CachedParquetFiles {
|
||||||
mem::size_of_val(self) +
|
mem::size_of_val(self) +
|
||||||
// Vec overhead
|
// Vec overhead
|
||||||
mem::size_of_val(self.files.as_ref()) +
|
mem::size_of_val(self.files.as_ref()) +
|
||||||
// size of the underlying decoded parquet files
|
// size of the underlying parquet files
|
||||||
self.files.iter().map(|f| f.size()).sum::<usize>()
|
self.files.iter().map(|f| f.size()).sum::<usize>()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the greatest parquet sequence number stored in this cache entry
|
/// Returns the greatest parquet sequence number stored in this cache entry
|
||||||
pub(crate) fn max_parquet_sequence_number(&self) -> Option<SequenceNumber> {
|
pub(crate) fn max_parquet_sequence_number(&self) -> Option<SequenceNumber> {
|
||||||
self.files
|
self.files.iter().map(|f| f.max_sequence_number).max()
|
||||||
.iter()
|
|
||||||
.map(|f| f.parquet_file.max_sequence_number)
|
|
||||||
.max()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type CacheT = Box<dyn Cache<K = TableId, V = Arc<CachedParquetFiles>, Extra = ()>>;
|
type CacheT = Box<dyn Cache<K = TableId, V = Arc<CachedParquetFiles>, Extra = ()>>;
|
||||||
|
|
||||||
/// Cache for parquet file information with metadata.
|
/// Cache for parquet file information.
|
||||||
///
|
///
|
||||||
/// DOES NOT CACHE the actual parquet bytes from object store
|
/// DOES NOT CACHE the actual parquet bytes from object store
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -126,17 +118,16 @@ impl ParquetFileCache {
|
||||||
// 2. track time ranges needed for queries and
|
// 2. track time ranges needed for queries and
|
||||||
// limit files fetched to what is actually
|
// limit files fetched to what is actually
|
||||||
// needed
|
// needed
|
||||||
let parquet_files_with_metadata: Vec<_> = catalog
|
let parquet_files: Vec<_> = catalog
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.list_by_table_not_to_delete_with_metadata(table_id)
|
.list_by_table_not_to_delete(table_id)
|
||||||
.await
|
.await
|
||||||
.context(CatalogSnafu)?;
|
.context(CatalogSnafu)?;
|
||||||
|
|
||||||
Ok(Arc::new(CachedParquetFiles::new(
|
Ok(Arc::new(CachedParquetFiles::new(parquet_files)))
|
||||||
parquet_files_with_metadata,
|
as std::result::Result<_, Error>
|
||||||
))) as std::result::Result<_, Error>
|
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.expect("retry forever")
|
.expect("retry forever")
|
||||||
|
@ -234,11 +225,10 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use data_types::{ParquetFile, ParquetFileId};
|
use data_types::{ParquetFile, ParquetFileId};
|
||||||
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFile, TestPartition, TestTable};
|
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFile, TestPartition, TestTable};
|
||||||
use test_helpers::assert_close;
|
|
||||||
|
|
||||||
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
|
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
|
||||||
|
|
||||||
const METRIC_NAME: &str = "parquet_list_by_table_not_to_delete_with_metadata";
|
const METRIC_NAME: &str = "parquet_list_by_table_not_to_delete";
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_parquet_chunks() {
|
async fn test_parquet_chunks() {
|
||||||
|
@ -250,7 +240,7 @@ mod tests {
|
||||||
|
|
||||||
assert_eq!(cached_files.len(), 1);
|
assert_eq!(cached_files.len(), 1);
|
||||||
let expected_parquet_file = to_file(tfile);
|
let expected_parquet_file = to_file(tfile);
|
||||||
assert_eq!(cached_files[0].parquet_file, expected_parquet_file);
|
assert_eq!(cached_files[0].as_ref(), &expected_parquet_file);
|
||||||
|
|
||||||
// validate a second request doens't result in a catalog request
|
// validate a second request doens't result in a catalog request
|
||||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||||
|
@ -274,12 +264,12 @@ mod tests {
|
||||||
let cached_files = cache.get(table1.table.id).await.vec();
|
let cached_files = cache.get(table1.table.id).await.vec();
|
||||||
assert_eq!(cached_files.len(), 1);
|
assert_eq!(cached_files.len(), 1);
|
||||||
let expected_parquet_file = to_file(tfile1);
|
let expected_parquet_file = to_file(tfile1);
|
||||||
assert_eq!(cached_files[0].parquet_file, expected_parquet_file);
|
assert_eq!(cached_files[0].as_ref(), &expected_parquet_file);
|
||||||
|
|
||||||
let cached_files = cache.get(table2.table.id).await.vec();
|
let cached_files = cache.get(table2.table.id).await.vec();
|
||||||
assert_eq!(cached_files.len(), 1);
|
assert_eq!(cached_files.len(), 1);
|
||||||
let expected_parquet_file = to_file(tfile2);
|
let expected_parquet_file = to_file(tfile2);
|
||||||
assert_eq!(cached_files[0].parquet_file, expected_parquet_file);
|
assert_eq!(cached_files[0].as_ref(), &expected_parquet_file);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -301,24 +291,19 @@ mod tests {
|
||||||
partition.create_parquet_file("table1 foo=1 11").await;
|
partition.create_parquet_file("table1 foo=1 11").await;
|
||||||
let table_id = table.table.id;
|
let table_id = table.table.id;
|
||||||
|
|
||||||
// expect these sizes change with sizes of parquet and
|
let single_file_size = 247;
|
||||||
// its metadata (as the metadata is thrift encoded and
|
let two_file_size = 462;
|
||||||
// includes timestamps, etc)
|
|
||||||
let slop_budget = 10;
|
|
||||||
|
|
||||||
let single_file_size = 1208;
|
|
||||||
let two_file_size = 2383;
|
|
||||||
assert!(single_file_size < two_file_size);
|
assert!(single_file_size < two_file_size);
|
||||||
|
|
||||||
let cache = make_cache(&catalog);
|
let cache = make_cache(&catalog);
|
||||||
let cached_files = cache.get(table_id).await;
|
let cached_files = cache.get(table_id).await;
|
||||||
assert_close!(cached_files.size(), single_file_size, slop_budget);
|
assert_eq!(cached_files.size(), single_file_size);
|
||||||
|
|
||||||
// add a second file, and force the cache to find it
|
// add a second file, and force the cache to find it
|
||||||
partition.create_parquet_file("table1 foo=1 11").await;
|
partition.create_parquet_file("table1 foo=1 11").await;
|
||||||
cache.expire(table_id);
|
cache.expire(table_id);
|
||||||
let cached_files = cache.get(table_id).await;
|
let cached_files = cache.get(table_id).await;
|
||||||
assert_close!(cached_files.size(), two_file_size, slop_budget);
|
assert_eq!(cached_files.size(), two_file_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -452,7 +437,7 @@ mod tests {
|
||||||
|
|
||||||
impl ParquetIds for &CachedParquetFiles {
|
impl ParquetIds for &CachedParquetFiles {
|
||||||
fn ids(&self) -> HashSet<ParquetFileId> {
|
fn ids(&self) -> HashSet<ParquetFileId> {
|
||||||
self.files.iter().map(|f| f.parquet_file.id).collect()
|
self.files.iter().map(|f| f.id).collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -106,7 +106,7 @@ impl Reconciler {
|
||||||
let parquet_files = filter_parquet_files(ingester_partitions, parquet_files.vec())?;
|
let parquet_files = filter_parquet_files(ingester_partitions, parquet_files.vec())?;
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
parquet_ids=?parquet_files.iter().map(|f| f.parquet_file.id).collect::<Vec<_>>(),
|
parquet_ids=?parquet_files.iter().map(|f| f.id).collect::<Vec<_>>(),
|
||||||
namespace=%self.namespace_name(),
|
namespace=%self.namespace_name(),
|
||||||
table_name=%self.table_name(),
|
table_name=%self.table_name(),
|
||||||
"Parquet files after filtering"
|
"Parquet files after filtering"
|
||||||
|
@ -117,10 +117,7 @@ impl Reconciler {
|
||||||
for cached_parquet_file in parquet_files {
|
for cached_parquet_file in parquet_files {
|
||||||
if let Some(chunk) = self
|
if let Some(chunk) = self
|
||||||
.chunk_adapter
|
.chunk_adapter
|
||||||
.new_rb_chunk(
|
.new_rb_chunk(Arc::clone(&self.namespace_name), cached_parquet_file)
|
||||||
Arc::clone(&self.namespace_name),
|
|
||||||
Arc::new(cached_parquet_file.parquet_file.clone()),
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
chunks_from_parquet.push(chunk);
|
chunks_from_parquet.push(chunk);
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
//! Interface for reconciling Ingester and catalog state
|
//! Interface for reconciling Ingester and catalog state
|
||||||
|
|
||||||
use crate::ingester::IngesterPartition;
|
use crate::ingester::IngesterPartition;
|
||||||
use data_types::{PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId};
|
use data_types::{ParquetFile, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId};
|
||||||
use parquet_file::chunk::DecodedParquetFile;
|
|
||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
/// Information about an ingester partition.
|
/// Information about an ingester partition.
|
||||||
|
@ -56,24 +55,24 @@ where
|
||||||
|
|
||||||
/// Information about a parquet file.
|
/// Information about a parquet file.
|
||||||
///
|
///
|
||||||
/// This is mostly the same as [`DecodedParquetFile`] but allows easier mocking.
|
/// This is mostly the same as [`ParquetFile`] but allows easier mocking.
|
||||||
pub trait ParquetFileInfo {
|
pub trait ParquetFileInfo {
|
||||||
fn partition_id(&self) -> PartitionId;
|
fn partition_id(&self) -> PartitionId;
|
||||||
fn min_sequence_number(&self) -> SequenceNumber;
|
fn min_sequence_number(&self) -> SequenceNumber;
|
||||||
fn max_sequence_number(&self) -> SequenceNumber;
|
fn max_sequence_number(&self) -> SequenceNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ParquetFileInfo for Arc<DecodedParquetFile> {
|
impl ParquetFileInfo for Arc<ParquetFile> {
|
||||||
fn partition_id(&self) -> PartitionId {
|
fn partition_id(&self) -> PartitionId {
|
||||||
self.parquet_file.partition_id
|
self.partition_id
|
||||||
}
|
}
|
||||||
|
|
||||||
fn min_sequence_number(&self) -> SequenceNumber {
|
fn min_sequence_number(&self) -> SequenceNumber {
|
||||||
self.parquet_file.min_sequence_number
|
self.min_sequence_number
|
||||||
}
|
}
|
||||||
|
|
||||||
fn max_sequence_number(&self) -> SequenceNumber {
|
fn max_sequence_number(&self) -> SequenceNumber {
|
||||||
self.parquet_file.max_sequence_number
|
self.max_sequence_number
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue