diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index d3aef40c41..699e14ab2f 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -10,8 +10,8 @@ use arrow::record_batch::RecordBatch; use backoff::{Backoff, BackoffConfig}; use bytes::Bytes; use data_types2::{ - ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TablePartition, Timestamp, - Tombstone, TombstoneId, + ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequencerId, TableId, + TablePartition, Timestamp, Tombstone, TombstoneId, }; use datafusion::error::DataFusionError; use iox_catalog::interface::{Catalog, Transaction}; @@ -124,6 +124,11 @@ pub enum Error { source: iox_catalog::interface::Error, }, + #[snafu(display("Error while requesting parquet file metadata {}", source))] + ParquetMetadata { + source: iox_catalog::interface::Error, + }, + #[snafu(display("Error converting the parquet stream to bytes: {}", source))] ConvertingToBytes { source: parquet_file::storage::Error, @@ -371,7 +376,7 @@ impl Compactor { .repositories() .await .parquet_files() - .list_by_partition_not_to_delete(partition_id) + .list_by_partition_not_to_delete_with_metadata(partition_id) .await .context(ListParquetFilesSnafu)?; if parquet_files.is_empty() { @@ -483,7 +488,7 @@ impl Compactor { fn group_small_contiguous_groups( mut file_groups: Vec, compaction_max_size_bytes: i64, - ) -> Vec> { + ) -> Vec> { let mut groups = Vec::with_capacity(file_groups.len()); if file_groups.is_empty() { return groups; @@ -550,7 +555,7 @@ impl Compactor { return Ok(compacted); } - // Keep the fist IoxMetadata to reuse same IDs and names + // Save the parquet metadata for the first file to reuse IDs and names let iox_metadata = overlapped_files[0].iox_metadata(); // Collect all unique tombstone @@ -819,7 +824,9 @@ impl Compactor { /// Given a list of parquet files that come from the same Table Partition, group files together /// if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering. - fn overlapped_groups(mut parquet_files: Vec) -> Vec { + fn overlapped_groups( + mut parquet_files: Vec, + ) -> Vec { let mut groups = Vec::with_capacity(parquet_files.len()); // While there are still files not in any group @@ -951,7 +958,7 @@ impl Compactor { async fn add_tombstones_to_groups( &self, - groups: Vec>, + groups: Vec>, ) -> Result> { let mut repo = self.catalog.repositories().await; let tombstone_repo = repo.tombstones(); @@ -1131,7 +1138,9 @@ mod tests { .await .unwrap(); // should have 2 non-deleted level_0 files. The original file was marked deleted and not counted - let files = catalog.list_by_table_not_to_delete(table.table.id).await; + let mut files = catalog + .list_by_table_not_to_delete_with_metadata(table.table.id) + .await; assert_eq!(files.len(), 2); // 2 newly created level-1 files as the result of compaction assert_eq!((files[0].id.get(), files[0].compaction_level), (2, 1)); @@ -1158,8 +1167,10 @@ mod tests { catalog.time_provider(), ); // create chunks for 2 files - let chunk_0 = adapter.new_querier_chunk(files[0].clone()).await.unwrap(); - let chunk_1 = adapter.new_querier_chunk(files[1].clone()).await.unwrap(); + let files1 = files.pop().unwrap(); + let files0 = files.pop().unwrap(); + let chunk_0 = adapter.new_querier_chunk(files0).await.unwrap(); + let chunk_1 = adapter.new_querier_chunk(files1).await.unwrap(); // query the chunks // least recent compacted first half (~90%) let batches = collect_read_filter(&chunk_0).await; @@ -1245,7 +1256,7 @@ mod tests { // parquet files // pf1 does not overlap with any and very large ==> will be upgraded to level 1 during compaction - let _pf1 = partition + partition .create_parquet_file_with_min_max_size_and_creation_time( &lp1, 1, @@ -1255,11 +1266,9 @@ mod tests { compactor.config.compaction_max_size_bytes() + 10, 20, ) - .await - .parquet_file - .clone(); + .await; // pf2 overlaps with pf3 ==> compacted and marked to_delete with a timestamp - let _pf2 = partition + partition .create_parquet_file_with_min_max_and_creation_time( &lp2, 4, @@ -1268,11 +1277,9 @@ mod tests { 20000, time.now().timestamp_nanos(), ) - .await - .parquet_file - .clone(); + .await; // pf3 overlaps with pf2 ==> compacted and marked to_delete with a timestamp - let _pf3 = partition + partition .create_parquet_file_with_min_max_and_creation_time( &lp3, 8, @@ -1281,11 +1288,9 @@ mod tests { 25000, time.now().timestamp_nanos(), ) - .await - .parquet_file - .clone(); + .await; // pf4 does not overlap with any but small => will also be compacted with pf2 and pf3 - let _pf4 = partition + partition .create_parquet_file_with_min_max_and_creation_time( &lp4, 18, @@ -1294,9 +1299,7 @@ mod tests { 28000, time.now().timestamp_nanos(), ) - .await - .parquet_file - .clone(); + .await; // should have 4 level-0 files before compacting let count = catalog.count_level_0_files(sequencer.sequencer.id).await; assert_eq!(count, 4); @@ -1339,7 +1342,9 @@ mod tests { .unwrap(); // Should have 3 non-soft-deleted files: pf1 not compacted and stay, and 2 newly created after compacting pf2, pf3, pf4 - let files = catalog.list_by_table_not_to_delete(table.table.id).await; + let mut files = catalog + .list_by_table_not_to_delete_with_metadata(table.table.id) + .await; assert_eq!(files.len(), 3); // pf1 upgraded to level 1 assert_eq!((files[0].id.get(), files[0].compaction_level), (1, 1)); @@ -1370,8 +1375,10 @@ mod tests { catalog.time_provider(), ); // create chunks for 2 files - let chunk_0 = adapter.new_querier_chunk(files[1].clone()).await.unwrap(); - let chunk_1 = adapter.new_querier_chunk(files[2].clone()).await.unwrap(); + let files2 = files.pop().unwrap(); + let files1 = files.pop().unwrap(); + let chunk_0 = adapter.new_querier_chunk(files1).await.unwrap(); + let chunk_1 = adapter.new_querier_chunk(files2).await.unwrap(); // query the chunks // least recent compacted first half (~90%) let batches = collect_read_filter(&chunk_0).await; @@ -1422,8 +1429,7 @@ mod tests { .await .create_parquet_file_with_min_max(&lp, 1, 1, 8000, 20000) .await - .parquet_file - .clone(); + .parquet_file; let compactor = Compactor::new( vec![sequencer.sequencer.id], @@ -1516,13 +1522,11 @@ mod tests { let parquet_file1 = partition .create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000) .await - .parquet_file - .clone(); + .parquet_file; let parquet_file2 = partition .create_parquet_file_with_min_max(&lp2, 10, 15, 6000, 25000) .await - .parquet_file - .clone(); + .parquet_file; let compactor = Compactor::new( vec![sequencer.sequencer.id], @@ -1618,18 +1622,15 @@ mod tests { let parquet_file1 = partition .create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000) .await - .parquet_file - .clone(); + .parquet_file; let parquet_file2 = partition .create_parquet_file_with_min_max(&lp2, 10, 15, 6000, 25000) .await - .parquet_file - .clone(); + .parquet_file; let parquet_file3 = partition .create_parquet_file_with_min_max(&lp3, 20, 25, 6000, 8000) .await - .parquet_file - .clone(); + .parquet_file; let compactor = Compactor::new( vec![sequencer.sequencer.id], @@ -1702,7 +1703,7 @@ mod tests { /// A test utility function to make minimially-viable ParquetFile records with particular /// min/max times. Does not involve the catalog at all. - fn arbitrary_parquet_file(min_time: i64, max_time: i64) -> ParquetFile { + fn arbitrary_parquet_file(min_time: i64, max_time: i64) -> ParquetFileWithMetadata { arbitrary_parquet_file_with_size(min_time, max_time, 100) } @@ -1710,8 +1711,8 @@ mod tests { min_time: i64, max_time: i64, file_size_bytes: i64, - ) -> ParquetFile { - ParquetFile { + ) -> ParquetFileWithMetadata { + ParquetFileWithMetadata { id: ParquetFileId::new(0), sequencer_id: SequencerId::new(0), namespace_id: NamespaceId::new(0), @@ -1759,13 +1760,11 @@ mod tests { let pf1 = partition .create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000) .await - .parquet_file - .clone(); + .parquet_file; let pf2 = partition .create_parquet_file_with_min_max(&lp2, 1, 5, 28000, 35000) .await - .parquet_file - .clone(); + .parquet_file; // Build 2 QueryableParquetChunks let pt1 = ParquetFileWithTombstone { @@ -1776,6 +1775,7 @@ mod tests { data: Arc::new(pf2), tombstones: vec![], }; + let pc1 = pt1.to_queryable_parquet_chunk( Arc::clone(&catalog.object_store), table.table.name.clone(), @@ -2203,7 +2203,11 @@ mod tests { ..p1.clone() }; let pf1 = txn.parquet_files().create(p1).await.unwrap(); + let pf1_metadata = txn.parquet_files().parquet_metadata(pf1.id).await.unwrap(); + let pf1 = ParquetFileWithMetadata::new(pf1, pf1_metadata); let pf2 = txn.parquet_files().create(p2).await.unwrap(); + let pf2_metadata = txn.parquet_files().parquet_metadata(pf2.id).await.unwrap(); + let pf2 = ParquetFileWithMetadata::new(pf2, pf2_metadata); let parquet_files = vec![pf1.clone(), pf2.clone()]; let groups = vec![ diff --git a/compactor/src/query.rs b/compactor/src/query.rs index 478ef5a396..fccc911451 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -4,11 +4,11 @@ use std::sync::Arc; use data_types2::{ tombstones_to_delete_predicates, ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, - SequenceNumber, TableSummary, Tombstone, + SequenceNumber, TableSummary, Timestamp, Tombstone, }; use datafusion::physical_plan::SendableRecordBatchStream; use observability_deps::tracing::trace; -use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata}; +use parquet_file::chunk::ParquetChunk; use predicate::{Predicate, PredicateMatch}; use query::{ exec::{stringset::StringSet, IOxSessionContext}, @@ -39,9 +39,12 @@ pub type Result = std::result::Result; #[derive(Debug, Clone)] pub struct QueryableParquetChunk { data: Arc, // data of the parquet file - iox_metadata: Arc, // metadata of the parquet file delete_predicates: Vec>, // converted from tombstones table_name: String, // needed to build query plan + min_sequence_number: SequenceNumber, + max_sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, } impl QueryableParquetChunk { @@ -49,15 +52,21 @@ impl QueryableParquetChunk { pub fn new( table_name: impl Into, data: Arc, - iox_metadata: Arc, deletes: &[Tombstone], + min_sequence_number: SequenceNumber, + max_sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, ) -> Self { let delete_predicates = tombstones_to_delete_predicates(deletes); Self { data, - iox_metadata, delete_predicates, table_name: table_name.into(), + min_sequence_number, + max_sequence_number, + min_time, + max_time, } } @@ -72,22 +81,22 @@ impl QueryableParquetChunk { /// Return min sequence number pub fn min_sequence_number(&self) -> SequenceNumber { - self.iox_metadata.min_sequence_number + self.min_sequence_number } /// Return max sequence number pub fn max_sequence_number(&self) -> SequenceNumber { - self.iox_metadata.max_sequence_number + self.max_sequence_number } /// Return min time pub fn min_time(&self) -> i64 { - self.iox_metadata.time_of_first_write.timestamp_nanos() + self.min_time.get() } /// Return max time pub fn max_time(&self) -> i64 { - self.iox_metadata.time_of_last_write.timestamp_nanos() + self.max_time.get() } } @@ -121,7 +130,7 @@ impl QueryChunk for QueryableParquetChunk { // Note: parquet_file's id is an uuid which is also the datatype of the ChunkId. However, // it is not safe to use it for sorting chunk fn id(&self) -> ChunkId { - let timestamp_nano = self.iox_metadata.time_of_first_write.timestamp_nanos(); + let timestamp_nano = self.min_time.get(); let timestamp_nano_u128 = u128::try_from(timestamp_nano).expect("Cannot convert timestamp nano to u128 "); @@ -221,7 +230,7 @@ impl QueryChunk for QueryableParquetChunk { // Order of the chunk so they can be deduplicate correctly fn order(&self) -> ChunkOrder { - let seq_num = self.iox_metadata.min_sequence_number.get(); + let seq_num = self.min_sequence_number.get(); let seq_num = u32::try_from(seq_num) .expect("Sequence number should have been converted to chunk order successfully"); ChunkOrder::new(seq_num) diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index ce38bac724..f05ae37260 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -3,7 +3,7 @@ use crate::query::QueryableParquetChunk; use arrow::record_batch::RecordBatch; use data_types2::{ - ParquetFile, ParquetFileId, ParquetFileParams, Timestamp, Tombstone, TombstoneId, + ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Timestamp, Tombstone, TombstoneId, }; use iox_object_store::IoxObjectStore; use object_store::DynObjectStore; @@ -37,8 +37,8 @@ impl GroupWithTombstones { /// Wrapper of group of parquet files with their min time and total size #[derive(Debug, Clone, PartialEq)] pub struct GroupWithMinTimeAndSize { - /// Parquet files - pub(crate) parquet_files: Vec, + /// Parquet files and their metadata + pub(crate) parquet_files: Vec, /// min time of all parquet_files pub(crate) min_time: Timestamp, @@ -51,7 +51,7 @@ pub struct GroupWithMinTimeAndSize { #[allow(missing_docs)] #[derive(Debug, Clone)] pub struct ParquetFileWithTombstone { - pub(crate) data: Arc, + pub(crate) data: Arc, pub(crate) tombstones: Vec, } @@ -112,8 +112,11 @@ impl ParquetFileWithTombstone { QueryableParquetChunk::new( table_name, Arc::new(parquet_chunk), - Arc::new(decoded_parquet_file.iox_metadata), &self.tombstones, + self.data.min_sequence_number, + self.data.max_sequence_number, + self.data.min_time, + self.data.max_time, ) } diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index 33c8721188..e4ba3efebf 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -720,8 +720,44 @@ pub fn tombstones_to_delete_predicates_iter( } /// Data for a parquet file reference that has been inserted in the catalog. -#[derive(Debug, Clone, PartialEq, sqlx::FromRow)] +#[derive(Debug, Clone, Copy, PartialEq, sqlx::FromRow)] pub struct ParquetFile { + /// the id of the file in the catalog + pub id: ParquetFileId, + /// the sequencer that sequenced writes that went into this file + pub sequencer_id: SequencerId, + /// the namespace + pub namespace_id: NamespaceId, + /// the table + pub table_id: TableId, + /// the partition + pub partition_id: PartitionId, + /// the uuid used in the object store path for this file + pub object_store_id: Uuid, + /// the minimum sequence number from a record in this file + pub min_sequence_number: SequenceNumber, + /// the maximum sequence number from a record in this file + pub max_sequence_number: SequenceNumber, + /// the min timestamp of data in this file + pub min_time: Timestamp, + /// the max timestamp of data in this file + pub max_time: Timestamp, + /// When this file was marked for deletion + pub to_delete: Option, + /// file size in bytes + pub file_size_bytes: i64, + /// the number of rows of data in this file + pub row_count: i64, + /// the compaction level of the file + pub compaction_level: i16, + /// the creation time of the parquet file + pub created_at: Timestamp, +} + +/// Data for a parquet file reference that has been inserted in the catalog, including the +/// `parquet_metadata` field that can be expensive to fetch. +#[derive(Debug, Clone, PartialEq, sqlx::FromRow)] +pub struct ParquetFileWithMetadata { /// the id of the file in the catalog pub id: ParquetFileId, /// the sequencer that sequenced writes that went into this file @@ -756,6 +792,93 @@ pub struct ParquetFile { pub created_at: Timestamp, } +impl ParquetFileWithMetadata { + /// Create an instance from an instance of ParquetFile and metadata bytes fetched from the + /// catalog. + pub fn new(parquet_file: ParquetFile, parquet_metadata: Vec) -> Self { + let ParquetFile { + id, + sequencer_id, + namespace_id, + table_id, + partition_id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time, + max_time, + to_delete, + file_size_bytes, + row_count, + compaction_level, + created_at, + } = parquet_file; + + Self { + id, + sequencer_id, + namespace_id, + table_id, + partition_id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time, + max_time, + to_delete, + file_size_bytes, + parquet_metadata, + row_count, + compaction_level, + created_at, + } + } + + /// Split the parquet_metadata off, leaving a regular ParquetFile and the bytes to transfer + /// ownership separately. + pub fn split_off_metadata(self) -> (ParquetFile, Vec) { + let Self { + id, + sequencer_id, + namespace_id, + table_id, + partition_id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time, + max_time, + to_delete, + file_size_bytes, + parquet_metadata, + row_count, + compaction_level, + created_at, + } = self; + + ( + ParquetFile { + id, + sequencer_id, + namespace_id, + table_id, + partition_id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time, + max_time, + to_delete, + file_size_bytes, + row_count, + compaction_level, + created_at, + }, + parquet_metadata, + ) + } +} + /// Data for a parquet file to be inserted into the catalog. #[derive(Debug, Clone, PartialEq)] pub struct ParquetFileParams { diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 598d9cc67b..db59bafdb3 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -3,10 +3,10 @@ use async_trait::async_trait; use data_types2::{ Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, - NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, - PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, - Sequencer, SequencerId, Table, TableId, TablePartition, TableSchema, Timestamp, Tombstone, - TombstoneId, + NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, + ParquetFileWithMetadata, Partition, PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, + QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, TablePartition, + TableSchema, Timestamp, Tombstone, TombstoneId, }; use snafu::{OptionExt, Snafu}; use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc}; @@ -525,6 +525,13 @@ pub trait ParquetFileRepo: Send + Sync { /// [`to_delete`](ParquetFile::to_delete). async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; + /// List all parquet files and their metadata within a given table that are NOT marked as + /// [`to_delete`](ParquetFile::to_delete). Fetching metadata can be expensive. + async fn list_by_table_not_to_delete_with_metadata( + &mut self, + table_id: TableId, + ) -> Result>; + /// Delete all parquet files that were marked to be deleted earlier than the specified time. /// Returns the deleted records. async fn delete_old(&mut self, older_than: Timestamp) -> Result>; @@ -543,12 +550,20 @@ pub trait ParquetFileRepo: Send + Sync { max_time: Timestamp, ) -> Result>; - /// List parquet files for a given partition that are NOT marked as [`to_delete`](ParquetFile::to_delete). + /// List parquet files for a given partition that are NOT marked as + /// [`to_delete`](ParquetFile::to_delete). async fn list_by_partition_not_to_delete( &mut self, partition_id: PartitionId, ) -> Result>; + /// List parquet files and their metadata for a given partition that are NOT marked as + /// [`to_delete`](ParquetFile::to_delete). Fetching metadata can be expensive. + async fn list_by_partition_not_to_delete_with_metadata( + &mut self, + partition_id: PartitionId, + ) -> Result>; + /// Update the compaction level of the specified parquet files to level 1. Returns the IDs /// of the files that were successfully updated. async fn update_to_level_1( @@ -559,6 +574,9 @@ pub trait ParquetFileRepo: Send + Sync { /// Verify if the parquet file exists by selecting its id async fn exist(&mut self, id: ParquetFileId) -> Result; + /// Fetch the parquet_metadata bytes for the given id. Potentially expensive. + async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result>; + /// Return count async fn count(&mut self) -> Result; @@ -1738,6 +1756,13 @@ pub(crate) mod test_helpers { .await .unwrap(); + let metadata = repos + .parquet_files() + .parquet_metadata(parquet_file.id) + .await + .unwrap(); + assert_eq!(metadata, b"md1".to_vec()); + // verify that trying to create a file with the same UUID throws an error let err = repos .parquet_files() @@ -1770,13 +1795,13 @@ pub(crate) mod test_helpers { .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); - assert_eq!(vec![parquet_file.clone(), other_file.clone()], files); + assert_eq!(vec![parquet_file, other_file], files); let files = repos .parquet_files() .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150)) .await .unwrap(); - assert_eq!(vec![other_file.clone()], files); + assert_eq!(vec![other_file], files); // verify that to_delete is initially set to null and the file does not get deleted assert!(parquet_file.to_delete.is_none()); @@ -1833,6 +1858,23 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(files, vec![other_file]); + // test list_by_table_not_to_delete_with_metadata + let files = repos + .parquet_files() + .list_by_table_not_to_delete_with_metadata(table.id) + .await + .unwrap(); + assert_eq!(files, vec![]); + let files = repos + .parquet_files() + .list_by_table_not_to_delete_with_metadata(other_table.id) + .await + .unwrap(); + assert_eq!( + files, + vec![ParquetFileWithMetadata::new(other_file, b"md1".to_vec())] + ); + // test list_by_namespace_not_to_delete let namespace2 = repos .namespaces() @@ -1890,7 +1932,7 @@ pub(crate) mod test_helpers { .list_by_namespace_not_to_delete(namespace2.id) .await .unwrap(); - assert_eq!(vec![f1.clone(), f2.clone()], files); + assert_eq!(vec![f1, f2], files); let f3_params = ParquetFileParams { object_store_id: Uuid::new_v4(), @@ -1906,7 +1948,7 @@ pub(crate) mod test_helpers { .list_by_namespace_not_to_delete(namespace2.id) .await .unwrap(); - assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files); + assert_eq!(vec![f1, f2, f3], files); repos.parquet_files().flag_for_delete(f2.id).await.unwrap(); let files = repos @@ -1914,7 +1956,7 @@ pub(crate) mod test_helpers { .list_by_namespace_not_to_delete(namespace2.id) .await .unwrap(); - assert_eq!(vec![f1.clone(), f3.clone()], files); + assert_eq!(vec![f1, f3], files); let files = repos .parquet_files() @@ -2449,6 +2491,19 @@ pub(crate) mod test_helpers { .await .unwrap(); assert_eq!(files, vec![parquet_file, level1_file]); + + let files = repos + .parquet_files() + .list_by_partition_not_to_delete_with_metadata(partition.id) + .await + .unwrap(); + assert_eq!( + files, + vec![ + ParquetFileWithMetadata::new(parquet_file, b"md1".to_vec()), + ParquetFileWithMetadata::new(level1_file, b"md1".to_vec()), + ] + ); } async fn test_update_to_compaction_level_1(catalog: Arc) { @@ -2519,7 +2574,7 @@ pub(crate) mod test_helpers { let nonexistent_parquet_file_id = ParquetFileId::new(level_0_file.id.get() + 1); // Level 0 parquet files should contain both existing files at this point - let expected = vec![parquet_file.clone(), level_0_file.clone()]; + let expected = vec![parquet_file, level_0_file]; let level_0 = repos.parquet_files().level_0(sequencer.id).await.unwrap(); let mut level_0_ids: Vec<_> = level_0.iter().map(|pf| pf.id).collect(); level_0_ids.sort(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index e4968a4558..3619aca9d2 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -13,14 +13,17 @@ use crate::{ use async_trait::async_trait; use data_types2::{ Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, - ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, - ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, - TableId, TablePartition, Timestamp, Tombstone, TombstoneId, + ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId, + PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, + SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, }; use observability_deps::tracing::warn; -use std::fmt::Formatter; -use std::sync::Arc; -use std::{collections::HashSet, convert::TryFrom}; +use std::{ + collections::{BTreeMap, HashSet}, + convert::TryFrom, + fmt::Formatter, + sync::Arc, +}; use time::{SystemProvider, TimeProvider}; use tokio::sync::{Mutex, OwnedMutexGuard}; @@ -60,6 +63,7 @@ struct MemCollections { partitions: Vec, tombstones: Vec, parquet_files: Vec, + parquet_file_metadata: BTreeMap>, processed_tombstones: Vec, } @@ -961,12 +965,16 @@ impl ParquetFileRepo for MemTxn { row_count, to_delete: None, file_size_bytes, - parquet_metadata, compaction_level, created_at, }; + + stage + .parquet_file_metadata + .insert(parquet_file.id, parquet_metadata); + stage.parquet_files.push(parquet_file); - Ok(stage.parquet_files.last().unwrap().clone()) + Ok(*stage.parquet_files.last().unwrap()) } async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { @@ -1029,6 +1037,31 @@ impl ParquetFileRepo for MemTxn { Ok(parquet_files) } + async fn list_by_table_not_to_delete_with_metadata( + &mut self, + table_id: TableId, + ) -> Result> { + let stage = self.stage(); + + let parquet_files: Vec<_> = stage + .parquet_files + .iter() + .filter(|f| table_id == f.table_id && f.to_delete.is_none()) + .cloned() + .map(|f| { + ParquetFileWithMetadata::new( + f, + stage + .parquet_file_metadata + .get(&f.id) + .cloned() + .unwrap_or_default(), + ) + }) + .collect(); + Ok(parquet_files) + } + async fn delete_old(&mut self, older_than: Timestamp) -> Result> { let stage = self.stage(); @@ -1038,6 +1071,10 @@ impl ParquetFileRepo for MemTxn { stage.parquet_files = keep; + for delete in &delete { + stage.parquet_file_metadata.remove(&delete.id); + } + Ok(delete) } @@ -1092,6 +1129,30 @@ impl ParquetFileRepo for MemTxn { .collect()) } + async fn list_by_partition_not_to_delete_with_metadata( + &mut self, + partition_id: PartitionId, + ) -> Result> { + let stage = self.stage(); + + Ok(stage + .parquet_files + .iter() + .filter(|f| f.partition_id == partition_id && f.to_delete.is_none()) + .cloned() + .map(|f| { + ParquetFileWithMetadata::new( + f, + stage + .parquet_file_metadata + .get(&f.id) + .cloned() + .unwrap_or_default(), + ) + }) + .collect()) + } + async fn update_to_level_1( &mut self, parquet_file_ids: &[ParquetFileId], @@ -1118,6 +1179,16 @@ impl ParquetFileRepo for MemTxn { Ok(stage.parquet_files.iter().any(|f| f.id == id)) } + async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result> { + let stage = self.stage(); + + stage + .parquet_file_metadata + .get(&id) + .cloned() + .ok_or(Error::ParquetRecordNotFound { id }) + } + async fn count(&mut self) -> Result { let stage = self.stage(); diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 6f736fc81d..3dbb048c6c 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -8,9 +8,9 @@ use crate::interface::{ use async_trait::async_trait; use data_types2::{ Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, - ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, - ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, - TableId, TablePartition, Timestamp, Tombstone, TombstoneId, + ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId, + PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, + SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, }; use metric::{Metric, U64Histogram, U64HistogramOptions}; use std::{fmt::Debug, sync::Arc}; @@ -269,12 +269,15 @@ decorate!( "parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; "parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; + "parquet_list_by_table_not_to_delete_with_metadata" = list_by_table_not_to_delete_with_metadata(&mut self, table_id: TableId) -> Result>; "parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result>; "parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result>; + "parquet_list_by_partition_not_to_delete_with_metadata" = list_by_partition_not_to_delete_with_metadata(&mut self, partition_id: PartitionId) -> Result>; "parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result>; "parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result>; "parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result>; "parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result; + "parquet_metadata" = parquet_metadata(&mut self, id: ParquetFileId) -> Result>; "parquet_count" = count(&mut self) -> Result; "parquet_count_by_overlaps" = count_by_overlaps(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result; ] diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 9f59062ee7..189402de88 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -12,9 +12,9 @@ use crate::{ use async_trait::async_trait; use data_types2::{ Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, - ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, - ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, - TableId, TablePartition, Timestamp, Tombstone, TombstoneId, + ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId, + PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, + SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, }; use observability_deps::tracing::{info, warn}; use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row}; @@ -1502,9 +1502,13 @@ RETURNING *; sequencer_id: SequencerId, sequence_number: SequenceNumber, ) -> Result> { + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT * +SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, + min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 @@ -1522,9 +1526,15 @@ ORDER BY id; &mut self, namespace_id: NamespaceId, ) -> Result> { + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT parquet_file.* +SELECT parquet_file.id, parquet_file.sequencer_id, parquet_file.namespace_id, + parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id, + parquet_file.min_sequence_number, parquet_file.max_sequence_number, parquet_file.min_time, + parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes, + parquet_file.row_count, parquet_file.compaction_level, parquet_file.created_at FROM parquet_file INNER JOIN table_name on table_name.id = parquet_file.table_id WHERE table_name.namespace_id = $1 @@ -1538,8 +1548,29 @@ WHERE table_name.namespace_id = $1 } async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result> { + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" +SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, + min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at +FROM parquet_file +WHERE table_id = $1 AND to_delete IS NULL; + "#, + ) + .bind(&table_id) // $1 + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e }) + } + + async fn list_by_table_not_to_delete_with_metadata( + &mut self, + table_id: TableId, + ) -> Result> { + sqlx::query_as::<_, ParquetFileWithMetadata>( + r#" SELECT * FROM parquet_file WHERE table_id = $1 AND to_delete IS NULL; @@ -1568,10 +1599,14 @@ RETURNING *; async fn level_0(&mut self, sequencer_id: SequencerId) -> Result> { // this intentionally limits the returned files to 10,000 as it is used to make // a decision on the highest priority partitions. If compaction has never been - // run this could end up returning millions of results and taking too long to run + // run this could end up returning millions of results and taking too long to run. + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT * +SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, + min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at FROM parquet_file WHERE parquet_file.sequencer_id = $1 AND parquet_file.compaction_level = 0 @@ -1591,9 +1626,13 @@ WHERE parquet_file.sequencer_id = $1 min_time: Timestamp, max_time: Timestamp, ) -> Result> { + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT * +SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, + min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at FROM parquet_file WHERE parquet_file.sequencer_id = $1 AND parquet_file.table_id = $2 @@ -1618,13 +1657,35 @@ WHERE parquet_file.sequencer_id = $1 &mut self, partition_id: PartitionId, ) -> Result> { + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" +SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, + min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at +FROM parquet_file +WHERE parquet_file.partition_id = $1 + AND parquet_file.to_delete IS NULL; + "#, + ) + .bind(&partition_id) // $1 + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e }) + } + + async fn list_by_partition_not_to_delete_with_metadata( + &mut self, + partition_id: PartitionId, + ) -> Result> { + sqlx::query_as::<_, ParquetFileWithMetadata>( + r#" SELECT * FROM parquet_file WHERE parquet_file.partition_id = $1 AND parquet_file.to_delete IS NULL; - "#, + "#, ) .bind(&partition_id) // $1 .fetch_all(&mut self.inner) @@ -1668,6 +1729,17 @@ RETURNING id; Ok(read_result.count > 0) } + async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result> { + let read_result = + sqlx::query(r#"SELECT parquet_metadata FROM parquet_file WHERE id = $1;"#) + .bind(&id) // $1 + .fetch_one(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + Ok(read_result.get("parquet_metadata")) + } + async fn count(&mut self) -> Result { let read_result = sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#) diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 6f72bd3b8c..722445890f 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -6,9 +6,9 @@ use arrow::{ }; use bytes::Bytes; use data_types2::{ - Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams, - Partition, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, - Tombstone, TombstoneId, + Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileId, + ParquetFileParams, ParquetFileWithMetadata, Partition, QueryPool, SequenceNumber, Sequencer, + SequencerId, Table, TableId, Timestamp, Tombstone, TombstoneId, }; use iox_catalog::{ interface::{Catalog, INITIAL_COMPACTION_LEVEL}, @@ -189,6 +189,31 @@ impl TestCatalog { .await .unwrap() } + + /// List all non-deleted files with their metadata + pub async fn list_by_table_not_to_delete_with_metadata( + self: &Arc, + table_id: TableId, + ) -> Vec { + self.catalog + .repositories() + .await + .parquet_files() + .list_by_table_not_to_delete_with_metadata(table_id) + .await + .unwrap() + } + + /// Get a parquet file's metadata bytes + pub async fn parquet_metadata(&self, parquet_file_id: ParquetFileId) -> Vec { + self.catalog + .repositories() + .await + .parquet_files() + .parquet_metadata(parquet_file_id) + .await + .unwrap() + } } /// A test namespace @@ -373,7 +398,7 @@ pub struct TestPartition { impl TestPartition { /// Create a parquet for the partition - pub async fn create_parquet_file(self: &Arc, lp: &str) -> Arc { + pub async fn create_parquet_file(self: &Arc, lp: &str) -> TestParquetFile { self.create_parquet_file_with_min_max( lp, 1, @@ -392,7 +417,7 @@ impl TestPartition { max_seq: i64, min_time: i64, max_time: i64, - ) -> Arc { + ) -> TestParquetFile { self.create_parquet_file_with_min_max_and_creation_time( lp, min_seq, max_seq, min_time, max_time, 1, ) @@ -408,7 +433,7 @@ impl TestPartition { min_time: i64, max_time: i64, creation_time: i64, - ) -> Arc { + ) -> TestParquetFile { let mut repos = self.catalog.catalog.repositories().await; let (table, batch) = lp_to_mutable_batch(lp); @@ -451,7 +476,7 @@ impl TestPartition { min_time: Timestamp::new(min_time), max_time: Timestamp::new(max_time), file_size_bytes: real_file_size_bytes as i64, - parquet_metadata: parquet_metadata_bin, + parquet_metadata: parquet_metadata_bin.clone(), row_count: row_count as i64, created_at: Timestamp::new(creation_time), compaction_level: INITIAL_COMPACTION_LEVEL, @@ -462,11 +487,13 @@ impl TestPartition { .await .unwrap(); - Arc::new(TestParquetFile { + let parquet_file = ParquetFileWithMetadata::new(parquet_file, parquet_metadata_bin); + + TestParquetFile { catalog: Arc::clone(&self.catalog), namespace: Arc::clone(&self.namespace), parquet_file, - }) + } } /// Create a parquet for the partition with fake sizew for testing @@ -480,7 +507,7 @@ impl TestPartition { max_time: i64, file_size_bytes: i64, creation_time: i64, - ) -> Arc { + ) -> TestParquetFile { let mut repos = self.catalog.catalog.repositories().await; let (table, batch) = lp_to_mutable_batch(lp); @@ -523,7 +550,7 @@ impl TestPartition { min_time: Timestamp::new(min_time), max_time: Timestamp::new(max_time), file_size_bytes, - parquet_metadata: parquet_metadata_bin, + parquet_metadata: parquet_metadata_bin.clone(), row_count: row_count as i64, created_at: Timestamp::new(creation_time), compaction_level: INITIAL_COMPACTION_LEVEL, @@ -534,11 +561,13 @@ impl TestPartition { .await .unwrap(); - Arc::new(TestParquetFile { + let parquet_file = ParquetFileWithMetadata::new(parquet_file, parquet_metadata_bin); + + TestParquetFile { catalog: Arc::clone(&self.catalog), namespace: Arc::clone(&self.namespace), parquet_file, - }) + } } } @@ -590,12 +619,12 @@ async fn create_parquet_file( pub struct TestParquetFile { pub catalog: Arc, pub namespace: Arc, - pub parquet_file: ParquetFile, + pub parquet_file: ParquetFileWithMetadata, } impl TestParquetFile { /// Make the parquet file deletable - pub async fn flag_for_delete(self: &Arc) { + pub async fn flag_for_delete(&self) { let mut repos = self.catalog.catalog.repositories().await; repos @@ -604,6 +633,11 @@ impl TestParquetFile { .await .unwrap() } + + /// When only the ParquetFile is needed without the metadata, use this instead of the field + pub fn parquet_file_no_metadata(self) -> ParquetFile { + self.parquet_file.split_off_metadata().0 + } } /// A catalog test tombstone @@ -616,7 +650,7 @@ pub struct TestTombstone { impl TestTombstone { /// mark the tombstone proccesed - pub async fn mark_processed(self: &Arc, parquet_file: &Arc) { + pub async fn mark_processed(self: &Arc, parquet_file: &TestParquetFile) { assert!(Arc::ptr_eq(&self.catalog, &parquet_file.catalog)); assert!(Arc::ptr_eq(&self.namespace, &parquet_file.namespace)); diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index a8228e71ac..d23fd91db5 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -6,7 +6,7 @@ use data_types::{ partition_metadata::{Statistics, TableSummary}, timestamp::{TimestampMinMax, TimestampRange}, }; -use data_types2::ParquetFile; +use data_types2::{ParquetFile, ParquetFileWithMetadata}; use datafusion::physical_plan::SendableRecordBatchStream; use iox_object_store::{IoxObjectStore, ParquetFilePath}; use observability_deps::tracing::*; @@ -280,10 +280,9 @@ pub struct DecodedParquetFile { } impl DecodedParquetFile { - pub fn new(parquet_file: ParquetFile) -> Self { - let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes( - parquet_file.parquet_metadata.clone(), - )); + pub fn new(parquet_file_with_metadata: ParquetFileWithMetadata) -> Self { + let (parquet_file, parquet_metadata) = parquet_file_with_metadata.split_off_metadata(); + let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(parquet_metadata)); let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken"); let iox_metadata = decoded_metadata .read_iox_metadata_new() diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 3f7d36b787..29eae254f5 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -3,8 +3,8 @@ use crate::cache::CatalogCache; use arrow::record_batch::RecordBatch; use data_types2::{ - ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, SequenceNumber, - SequencerId, + ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, + ParquetFileWithMetadata, SequenceNumber, SequencerId, }; use futures::StreamExt; use iox_catalog::interface::Catalog; @@ -212,8 +212,11 @@ impl ParquetChunkAdapter { /// Create new querier chunk. /// /// Returns `None` if some data required to create this chunk is already gone from the catalog. - pub async fn new_querier_chunk(&self, parquet_file: ParquetFile) -> Option { - let decoded_parquet_file = DecodedParquetFile::new(parquet_file); + pub async fn new_querier_chunk( + &self, + parquet_file_with_metadata: ParquetFileWithMetadata, + ) -> Option { + let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata); let chunk = Arc::new(self.new_parquet_chunk(&decoded_parquet_file).await?); let addr = self @@ -334,8 +337,7 @@ pub mod tests { .await .create_parquet_file(&lp) .await - .parquet_file - .clone(); + .parquet_file; // create chunk let chunk = adapter.new_querier_chunk(parquet_file).await.unwrap(); diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 9a85538272..8671dd0fd0 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -7,7 +7,7 @@ use crate::{ IngesterConnection, }; use backoff::{Backoff, BackoffConfig}; -use data_types2::{ParquetFile, TableId}; +use data_types2::{ParquetFileWithMetadata, TableId}; use observability_deps::tracing::debug; use predicate::Predicate; use query::{provider::ChunkPruner, QueryChunk}; @@ -107,7 +107,7 @@ impl QuerierTable { let parquet_files = txn .parquet_files() - .list_by_table_not_to_delete(self.id) + .list_by_table_not_to_delete_with_metadata(self.id) .await?; let tombstones = txn.tombstones().list_by_table(self.id).await?; @@ -132,8 +132,12 @@ impl QuerierTable { // convert parquet files and tombstones to nicer objects let mut chunks = Vec::with_capacity(parquet_files.len()); - for parquet_file in parquet_files { - if let Some(chunk) = self.chunk_adapter.new_querier_chunk(parquet_file).await { + for parquet_file_with_metadata in parquet_files { + if let Some(chunk) = self + .chunk_adapter + .new_querier_chunk(parquet_file_with_metadata) + .await + { chunks.push(chunk); } } @@ -252,7 +256,10 @@ impl QuerierTable { /// /// Specificially, ensure that the persisted number from all /// chunks is consistent with the parquet files we know about -fn validate_cache(_partitions: &[Arc], _parquet_files: &[ParquetFile]) -> bool { +fn validate_cache( + _partitions: &[Arc], + _parquet_files: &[ParquetFileWithMetadata], +) -> bool { // TODO fill out the validation logic here true }