diff --git a/Cargo.lock b/Cargo.lock index b1d9aa95d1..2160a0f034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -756,6 +756,7 @@ dependencies = [ "parking_lot 0.12.0", "parquet_file", "predicate", + "querier", "query", "schema", "snafu", diff --git a/compactor/Cargo.toml b/compactor/Cargo.toml index 40bc48c1b6..e0cb48a307 100644 --- a/compactor/Cargo.toml +++ b/compactor/Cargo.toml @@ -33,4 +33,5 @@ workspace-hack = { path = "../workspace-hack"} [dev-dependencies] arrow_util = { path = "../arrow_util" } iox_tests = { path = "../iox_tests" } +querier = { path = "../querier"} test_helpers = { path = "../test_helpers" } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index d9caeb8ca2..3070079171 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -9,7 +9,7 @@ use backoff::{Backoff, BackoffConfig}; use bytes::Bytes; use data_types2::{ ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TablePartition, Timestamp, - TombstoneId, + Tombstone, TombstoneId, }; use datafusion::error::DataFusionError; use iox_catalog::interface::{Catalog, Transaction}; @@ -142,6 +142,16 @@ pub enum Error { source: iox_catalog::interface::Error, }, + #[snafu(display("Error removing processed tombstones {}", source))] + RemoveProcessedTombstones { + source: iox_catalog::interface::Error, + }, + + #[snafu(display("Error removing tombstones {}", source))] + RemoveTombstones { + source: iox_catalog::interface::Error, + }, + #[snafu(display("Error querying for tombstones for a parquet file {}", source))] QueryingTombstones { source: iox_catalog::interface::Error, @@ -285,12 +295,12 @@ impl Compactor { .add_tombstones_to_groups(overlapped_file_groups) .await?; - // Compact, persist,and update catalog accordingly for each overlaped file - let mut tombstones: HashSet = HashSet::new(); + // Compact, persist, and update catalog accordingly for each overlaped file group + let mut tombstones = BTreeMap::new(); let mut upgrade_level_list: Vec = vec![]; for group in groups_with_tombstones { // keep tombstone ids - tombstones = Self::union_tombstone_ids(tombstones, &group); + tombstones = Self::union_tombstones(tombstones, &group); // Only one file without tombstones, no need to compact if group.parquet_files.len() == 1 && group.tombstones.is_empty() { @@ -315,7 +325,7 @@ impl Compactor { let CompactedData { data, meta, - tombstone_ids, + tombstones, } = split_file; let file_size_and_md = Backoff::new(&self.backoff_config) @@ -326,12 +336,7 @@ impl Compactor { .expect("retry forever"); if let Some((file_size, md)) = file_size_and_md { - catalog_update_info.push(CatalogUpdate::new( - meta, - file_size, - md, - tombstone_ids, - )); + catalog_update_info.push(CatalogUpdate::new(meta, file_size, md, tombstones)); } } let mut txn = self @@ -351,7 +356,7 @@ impl Compactor { } // Remove fully processed tombstones - // TODO: #3953 - remove_fully_processed_tombstones(tombstones) + self.remove_fully_processed_tombstones(tombstones).await?; // Upgrade old level-0 to level 1 self.update_to_level_1(&upgrade_level_list).await?; @@ -376,13 +381,12 @@ impl Compactor { groups } - // Extract tombstones id - fn union_tombstone_ids( - mut tombstones: HashSet, + fn union_tombstones( + mut tombstones: BTreeMap, group_with_tombstones: &GroupWithTombstones, - ) -> HashSet { - for id in group_with_tombstones.tombstone_ids() { - tombstones.insert(id); + ) -> BTreeMap { + for ts in &group_with_tombstones.tombstones { + tombstones.insert(ts.id, (*ts).clone()); } tombstones } @@ -409,20 +413,18 @@ impl Compactor { return Ok(compacted); } - // Collect all the tombstone IDs. One tombstone might be relevant to multiple parquet - // files in this set, so dedupe here. - let tombstone_ids: HashSet<_> = overlapped_files - .iter() - .flat_map(|f| f.tombstone_ids()) - .collect(); - // Keep the fist IoxMetadata to reuse same IDs and names let iox_metadata = overlapped_files[0].iox_metadata(); - // Verify if the given files belong to the same partition - // Note: we can ignore this verification if we assume this is a must-have condition + // Collect all unique tombstone + let mut tombstone_map = overlapped_files[0].tombstones(); + + // Verify if the given files belong to the same partition and collect their tombstones + // One tombstone might be relevant to multiple parquet files in this set, so dedupe here. if let Some((head, tail)) = overlapped_files.split_first() { for file in tail { + tombstone_map.append(&mut file.tombstones()); + let is_same = file.data.sequencer_id == head.data.sequencer_id && file.data.table_id == head.data.table_id && file.data.partition_id == head.data.partition_id; @@ -544,7 +546,7 @@ impl Compactor { sort_key: None, }; - let compacted_data = CompactedData::new(output_batches, meta, tombstone_ids.clone()); + let compacted_data = CompactedData::new(output_batches, meta, tombstone_map.clone()); compacted.push(compacted_data); } @@ -627,11 +629,20 @@ impl Compactor { .context(UpdateSnafu)?; // Now that the parquet file is available, create its processed tombstones - for tombstone_id in catalog_update.tombstone_ids { - txn.processed_tombstones() - .create(parquet.id, tombstone_id) - .await - .context(UpdateSnafu)?; + for (_, tombstone) in catalog_update.tombstones { + // Becasue data may get removed and split during compaction, a few new files + // may no longer overlap with the delete tombstones. Need to verify whether + // they are overlap before adding process tombstones + if (parquet.min_time <= tombstone.min_time + && parquet.max_time >= tombstone.min_time) + || (parquet.min_time > tombstone.min_time + && parquet.min_time <= tombstone.max_time) + { + txn.processed_tombstones() + .create(parquet.id, tombstone.id) + .await + .context(UpdateSnafu)?; + } } } @@ -685,6 +696,87 @@ impl Compactor { min_time + (max_time - min_time) * SPLIT_PERCENTAGE / 100 } + // remove fully processed tombstones + async fn remove_fully_processed_tombstones( + &self, + tombstones: BTreeMap, + ) -> Result<()> { + // get fully proccessed ones + let mut to_be_removed = Vec::with_capacity(tombstones.len()); + for (ts_id, ts) in tombstones { + if self.fully_processed(ts).await { + to_be_removed.push(ts_id); + } + } + + // Remove the tombstones + // Note (todo maybe): if the list of deleted tombstone_ids is long and + // make this transaction slow, we need to split them into many smaller lists + // and each will be removed in its own transaction. + let mut txn = self + .catalog + .start_transaction() + .await + .context(TransactionSnafu)?; + + txn.tombstones() + .remove(&to_be_removed) + .await + .context(RemoveTombstonesSnafu)?; + + txn.commit().await.context(TransactionCommitSnafu)?; + + Ok(()) + } + + // Return true if the given tombstones is fully processed + async fn fully_processed(&self, tombstone: Tombstone) -> bool { + let mut repos = self.catalog.repositories().await; + + // Get number of non-deleted parquet files of the same tableId & sequencerId that overlap with the tombstone time range + let count_pf = repos + .parquet_files() + .count_by_overlaps( + tombstone.table_id, + tombstone.sequencer_id, + tombstone.min_time, + tombstone.max_time, + tombstone.sequence_number, + ) + .await; + let count_pf = match count_pf { + Ok(count_pf) => count_pf, + _ => { + warn!( + "Error getting parquet file count for table ID {}, sequencer ID {}, min time {:?}, max time {:?}. + Won't be able to verify whether its tombstone is fully processed", + tombstone.table_id, tombstone.sequencer_id, tombstone.min_time, tombstone.max_time + ); + return false; + } + }; + + // Get number of the processed parquet file for this tombstones + let count_pt = repos + .processed_tombstones() + .count_by_tombstone_id(tombstone.id) + .await; + let count_pt = match count_pt { + Ok(count_pt) => count_pt, + _ => { + warn!( + "Error getting processed tombstone count for tombstone ID {}. + Won't be able to verify whether the tombstone is fully processed", + tombstone.id + ); + return false; + } + }; + + // Fully processed if two count the same + count_pf == count_pt + } + async fn add_tombstones_to_groups( &self, groups: Vec>, @@ -778,9 +870,331 @@ mod tests { use futures::{stream, StreamExt, TryStreamExt}; use iox_tests::util::TestCatalog; use object_store::path::Path; + use querier::{ + cache::CatalogCache, + chunk::{collect_read_filter, ParquetChunkAdapter}, + }; use query::test::{raw_data, TestChunk}; use time::SystemProvider; + #[tokio::test] + // This is integration test to verify all pieces are put together correctly + async fn test_find_and_compact() { + let catalog = TestCatalog::new(); + + let lp = vec![ + "table,tag1=WA field_int=1000 8000", + "table,tag1=VT field_int=10 10000", + "table,tag1=UT field_int=70 20000", + ] + .join("\n"); + let ns = catalog.create_namespace("ns").await; + let sequencer = ns.create_sequencer(1).await; + let table = ns.create_table("table").await; + + // One parquet file + table + .with_sequencer(&sequencer) + .create_partition("part") + .await + .create_parquet_file_with_min_max_and_creation_time( + &lp, + 1, + 1, + 8000, + 20000, + catalog.time_provider.now().timestamp_nanos(), + ) + .await; + // should have 1 level-0 file + let count = catalog.count_level_0_files(sequencer.sequencer.id).await; + assert_eq!(count, 1); + + // One overlaped tombstone + let tombstone = table + .with_sequencer(&sequencer) + .create_tombstone(20, 6000, 12000, "tag1=VT") + .await; + // Should have 1 tomstone + let count = catalog.count_tombstones_for_table(table.table.id).await; + assert_eq!(count, 1); + + // ------------------------------------------------ + // Compact + let compactor = Compactor { + sequencers: vec![sequencer.sequencer.id], + object_store: Arc::clone(&catalog.object_store), + catalog: Arc::clone(&catalog.catalog), + exec: Arc::new(Executor::new(1)), + time_provider: Arc::new(SystemProvider::new()), + backoff_config: BackoffConfig::default(), + }; + + compactor + .find_and_compact(sequencer.sequencer.id) + .await + .unwrap(); + // should have 2 non-deleted level_0 files. The original file was marked deleted and not counted + let files = catalog.list_by_table_not_to_delete(table.table.id).await; + assert_eq!(files.len(), 2); + // 2 newly created level-0 files as the result of compaction + assert_eq!((files[0].id.get(), files[0].compaction_level), (2, 0)); + assert_eq!((files[1].id.get(), files[1].compaction_level), (3, 0)); + + // processed tombstones created and deleted inside find_and_compact function + let count = catalog + .count_processed_tombstones(tombstone.tombstone.id) + .await; + assert_eq!(count, 0); + // the tombstone is fully processed and should have been removed + let count = catalog.count_tombstones_for_table(table.table.id).await; + assert_eq!(count, 0); + + // ------------------------------------------------ + // Verify the parquet file content + let adapter = ParquetChunkAdapter::new( + Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + )), + catalog.object_store(), + catalog.metric_registry(), + catalog.time_provider(), + ); + // create chunks for 2 files + let chunk_0 = adapter.new_querier_chunk(files[0].clone()).await.unwrap(); + let chunk_1 = adapter.new_querier_chunk(files[1].clone()).await.unwrap(); + // query the chunks + // least recent compacted first half (~90%) + let batches = collect_read_filter(&chunk_0).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", + "+-----------+------+-----------------------------+", + ], + &batches + ); + // most recent compacted second half (~10%) + let batches = collect_read_filter(&chunk_1).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ], + &batches + ); + } + + // A quite sophisticated integration test + // Beside lp data, every value min/max sequence numbers and min/max time are important + // to have a combination of needed tests in this test function + #[tokio::test] + async fn test_find_and_compact_many_files_many_tombstones() { + let catalog = TestCatalog::new(); + + // lp1 does not overlap with any + let lp1 = vec![ + "table,tag1=WA field_int=1000 10", + "table,tag1=VT field_int=10 20", + ] + .join("\n"); + + // lp2 overlaps with lp3 + let lp2 = vec![ + "table,tag1=WA field_int=1000 8000", // will be eliminated due to duplicate + "table,tag1=VT field_int=10 10000", // will be deleted by ts2 + "table,tag1=UT field_int=70 20000", + ] + .join("\n"); + + // lp3 overlaps with lp2 + let lp3 = vec![ + "table,tag1=WA field_int=1500 8000", // latest duplicate and kept + "table,tag1=VT field_int=10 6000", + "table,tag1=UT field_int=270 25000", + ] + .join("\n"); + + // lp4 does not overlapp with any + let lp4 = vec![ + "table,tag2=WA,tag3=10 field_int=1600 28000", + "table,tag2=VT,tag3=20 field_int=20 26000", + ] + .join("\n"); + + let ns = catalog.create_namespace("ns").await; + let sequencer = ns.create_sequencer(1).await; + let table = ns.create_table("table").await; + let partition = table + .with_sequencer(&sequencer) + .create_partition("part") + .await; + + let time = Arc::new(SystemProvider::new()); + + // parquet files + // pf1 does not overlap with any and was created long ago ==> will be upgraded to level 1 during compaction + let _pf1 = partition + .create_parquet_file_with_min_max_and_creation_time(&lp1, 1, 1, 10, 20, 20) + .await + .parquet_file + .clone(); + // pf2 overlaps with pf3 ==> compacted and marked to_delete with a timestamp + let _pf2 = partition + .create_parquet_file_with_min_max_and_creation_time( + &lp2, + 4, + 5, + 8000, + 20000, + time.now().timestamp_nanos(), + ) + .await + .parquet_file + .clone(); + // pf3 overlaps with pf2 ==> compacted and marked to_delete with a timestamp + let _pf3 = partition + .create_parquet_file_with_min_max_and_creation_time( + &lp3, + 8, + 10, + 6000, + 25000, + time.now().timestamp_nanos(), + ) + .await + .parquet_file + .clone(); + // pf4 does not overlap with any and recent created ==> stay level 0 + let _pf4 = partition + .create_parquet_file_with_min_max_and_creation_time( + &lp4, + 18, + 18, + 26000, + 28000, + time.now().timestamp_nanos(), + ) + .await + .parquet_file + .clone(); + // should have 4 level-0 files before compacting + let count = catalog.count_level_0_files(sequencer.sequencer.id).await; + assert_eq!(count, 4); + + // create 3 tombstones + // ts1 overlaps with pf1 and pf2 but issued before pf1 and pf2 hence it won't be used + let ts1 = table + .with_sequencer(&sequencer) + .create_tombstone(2, 6000, 21000, "tag1=UT") + .await; + // ts2 overlap with both pf1 and pf2 but issued before pf3 so only applied to pf2 + let ts2 = table + .with_sequencer(&sequencer) + .create_tombstone(6, 6000, 12000, "tag1=VT") + .await; + // ts3 does not overlap with any files + let ts3 = table + .with_sequencer(&sequencer) + .create_tombstone(22, 1000, 2000, "tag1=VT") + .await; + // should have 3 tomstones + let count = catalog.count_tombstones_for_table(table.table.id).await; + assert_eq!(count, 3); + // should not have any processed tombstones for any tombstones + let count = catalog.count_processed_tombstones(ts1.tombstone.id).await; + assert_eq!(count, 0); + let count = catalog.count_processed_tombstones(ts2.tombstone.id).await; + assert_eq!(count, 0); + let count = catalog.count_processed_tombstones(ts3.tombstone.id).await; + assert_eq!(count, 0); + + // ------------------------------------------------ + // Compact + let compactor = Compactor { + sequencers: vec![sequencer.sequencer.id], + object_store: Arc::clone(&catalog.object_store), + catalog: Arc::clone(&catalog.catalog), + exec: Arc::new(Executor::new(1)), + time_provider: Arc::new(SystemProvider::new()), + backoff_config: BackoffConfig::default(), + }; + compactor + .find_and_compact(sequencer.sequencer.id) + .await + .unwrap(); + + // Should have 4 non-soft-deleted files: pf1 and pf4 not compacted and stay, and 2 newly created after compacting pf2 with pf3 + let files = catalog.list_by_table_not_to_delete(table.table.id).await; + assert_eq!(files.len(), 4); + // pf1 upgraded to level 1 becasue it was too old + assert_eq!((files[0].id.get(), files[0].compaction_level), (1, 1)); + // pf4 stays level 0 since it is not old enough + assert_eq!((files[1].id.get(), files[1].compaction_level), (4, 0)); + // 2 newly created level-0 files as the result of compaction + assert_eq!((files[2].id.get(), files[2].compaction_level), (5, 0)); + assert_eq!((files[3].id.get(), files[3].compaction_level), (6, 0)); + + // should have ts1 and ts3 that not involved in the commpaction process + // ts2 was removed because it was fully processed + let tss = catalog.list_tombstones_by_table(table.table.id).await; + assert_eq!(tss.len(), 2); + assert_eq!(tss[0].id.get(), ts1.tombstone.id.get()); + assert_eq!(tss[1].id.get(), ts3.tombstone.id.get()); + + // processed tombstones of ts2 was created and deleted inside find_and_compact function + let count = catalog.count_processed_tombstones(ts2.tombstone.id).await; + assert_eq!(count, 0); + + // ------------------------------------------------ + // Verify the parquet file content + let adapter = ParquetChunkAdapter::new( + Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + )), + catalog.object_store(), + catalog.metric_registry(), + catalog.time_provider(), + ); + // create chunks for 2 files + let chunk_0 = adapter.new_querier_chunk(files[2].clone()).await.unwrap(); + let chunk_1 = adapter.new_querier_chunk(files[3].clone()).await.unwrap(); + // query the chunks + // least recent compacted first half (~90%) + let batches = collect_read_filter(&chunk_0).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 10 | VT | 1970-01-01T00:00:00.000006Z |", + "| 1500 | WA | 1970-01-01T00:00:00.000008Z |", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ], + &batches + ); + // most recent compacted second half (~10%) + let batches = collect_read_filter(&chunk_1).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 270 | UT | 1970-01-01T00:00:00.000025Z |", + "+-----------+------+-----------------------------+", + ], + &batches + ); + } + #[tokio::test] async fn test_compact_one_file() { let catalog = TestCatalog::new(); @@ -834,6 +1248,7 @@ mod tests { .create_tombstone(20, 6000, 12000, "tag1=VT") .await; pf.add_tombstones(vec![tombstone.tombstone.clone()]); + // should have compacted data let batches = compactor.compact(vec![pf]).await.unwrap(); // 2 sets based on the split rule @@ -1469,7 +1884,10 @@ mod tests { .await .unwrap(); - let compacted_data = CompactedData::new(data, meta, HashSet::from([t1.id])); + let mut tombstones = BTreeMap::new(); + tombstones.insert(t1.id, t1); + + let compacted_data = CompactedData::new(data, meta, tombstones); Compactor::persist( &compacted_data.meta, @@ -1619,7 +2037,7 @@ mod tests { let mut txn = catalog.catalog.start_transaction().await.unwrap(); let catalog_updates = vec![CatalogUpdate { meta: meta.clone(), - tombstone_ids: HashSet::from([t1.id, t2.id]), + tombstones: BTreeMap::from([(t1.id, t1.clone()), (t2.id, t2.clone())]), parquet_file: parquet.clone(), }]; compactor @@ -1649,7 +2067,7 @@ mod tests { let mut txn = catalog.catalog.start_transaction().await.unwrap(); let catalog_updates = vec![CatalogUpdate { meta: meta.clone(), - tombstone_ids: HashSet::from([t3.id, t1.id]), + tombstones: BTreeMap::from([(t3.id, t3.clone()), (t1.id, t1.clone())]), parquet_file: parquet.clone(), }]; compactor @@ -1667,7 +2085,7 @@ mod tests { // parquet file so that one should now be deleted. Should go through let catalog_updates = vec![CatalogUpdate { meta: meta.clone(), - tombstone_ids: HashSet::from([t3.id]), + tombstones: BTreeMap::from([(t3.id, t3.clone())]), parquet_file: other_parquet.clone(), }]; compactor @@ -1701,7 +2119,7 @@ mod tests { t4.id = TombstoneId::new(t4.id.get() + 10); let catalog_updates = vec![CatalogUpdate { meta: meta.clone(), - tombstone_ids: HashSet::from([t4.id]), + tombstones: BTreeMap::from([(t4.id, t4.clone())]), parquet_file: another_parquet.clone(), }]; compactor diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index 366dfddb62..972735ff22 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -9,7 +9,10 @@ use parquet_file::{ chunk::{new_parquet_chunk, ChunkMetrics, DecodedParquetFile}, metadata::{IoxMetadata, IoxParquetMetaData}, }; -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; use time::TimeProvider; /// Wrapper of a group of parquet files and their tombstones that overlap in time and should be @@ -63,6 +66,14 @@ impl ParquetFileWithTombstone { self.data.id } + /// Return all tombstones in btree map format + pub fn tombstones(&self) -> BTreeMap { + self.tombstones + .iter() + .map(|ts| (ts.id, ts.clone())) + .collect() + } + /// Add more tombstones pub fn add_tombstones(&mut self, tombstones: Vec) { self.tombstones.extend(tombstones); @@ -105,7 +116,7 @@ impl ParquetFileWithTombstone { pub struct CompactedData { pub(crate) data: Vec, pub(crate) meta: IoxMetadata, - pub(crate) tombstone_ids: HashSet, + pub(crate) tombstones: BTreeMap, } impl CompactedData { @@ -113,12 +124,12 @@ impl CompactedData { pub fn new( data: Vec, meta: IoxMetadata, - tombstone_ids: HashSet, + tombstones: BTreeMap, ) -> Self { Self { data, meta, - tombstone_ids, + tombstones, } } } @@ -127,7 +138,7 @@ impl CompactedData { #[derive(Debug)] pub struct CatalogUpdate { pub(crate) meta: IoxMetadata, - pub(crate) tombstone_ids: HashSet, + pub(crate) tombstones: BTreeMap, pub(crate) parquet_file: ParquetFileParams, } @@ -137,12 +148,12 @@ impl CatalogUpdate { meta: IoxMetadata, file_size: usize, md: IoxParquetMetaData, - tombstone_ids: HashSet, + tombstones: BTreeMap, ) -> Self { let parquet_file = meta.to_parquet_file(file_size, &md); Self { meta, - tombstone_ids, + tombstones, parquet_file, } } diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index b3beb7de79..dbf256ef37 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -228,6 +228,12 @@ impl TombstoneId { } } +impl std::fmt::Display for TombstoneId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + /// A sequence number from a `Sequencer` (kafka partition) #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] @@ -658,7 +664,7 @@ pub struct PartitionInfo { } /// Data object for a tombstone. -#[derive(Debug, Clone, PartialEq, sqlx::FromRow)] +#[derive(Debug, Clone, PartialEq, PartialOrd, sqlx::FromRow)] pub struct Tombstone { /// the id of the tombstone pub id: TombstoneId, diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index fb3a2afe1e..89cf6cfe4c 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -430,6 +430,9 @@ pub trait TombstoneRepo: Send + Sync { /// list all tombstones for a given table async fn list_by_table(&mut self, table_id: TableId) -> Result>; + /// get tombstones of the given id + async fn get_by_id(&mut self, tombstone_id: TombstoneId) -> Result>; + /// return all tombstones for the sequencer with a sequence number greater than that /// passed in. This will be used by the ingester on startup to see what tombstones /// might have to be applied to data that is read from the write buffer. @@ -439,6 +442,9 @@ pub trait TombstoneRepo: Send + Sync { sequence_number: SequenceNumber, ) -> Result>; + /// Remove given tombstones + async fn remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()>; + /// Return all tombstones that have: /// /// - the specified sequencer ID and table ID @@ -513,6 +519,18 @@ pub trait ParquetFileRepo: Send + Sync { /// Return count async fn count(&mut self) -> Result; + + /// Return count of files of given tableId and sequenceId that + /// overlap with the given min_time and max_time and have sequencer number + /// smaller the given one + async fn count_by_overlaps( + &mut self, + table_id: TableId, + sequencer_id: SequencerId, + min_time: Timestamp, + max_time: Timestamp, + sequence_number: SequenceNumber, + ) -> Result; } /// Functions for working with processed tombstone pointers in the catalog @@ -534,6 +552,9 @@ pub trait ProcessedTombstoneRepo: Send + Sync { /// Return count async fn count(&mut self) -> Result; + + /// Return count for a given tombstone id + async fn count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result; } /// Gets the namespace schema including all tables and columns. @@ -596,7 +617,7 @@ pub(crate) mod test_helpers { use ::test_helpers::{assert_contains, tracing::TracingCapture}; use data_types2::ColumnId; use metric::{Attributes, Metric, U64Histogram}; - use std::{sync::Arc, time::Duration}; + use std::{ops::Add, sync::Arc, time::Duration}; pub(crate) async fn test_catalog(catalog: Arc) { test_setup(Arc::clone(&catalog)).await; @@ -613,6 +634,7 @@ pub(crate) mod test_helpers { test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await; test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await; test_update_to_compaction_level_1(Arc::clone(&catalog)).await; + test_processed_tombstones(Arc::clone(&catalog)).await; test_txn_isolation(Arc::clone(&catalog)).await; test_txn_drop(Arc::clone(&catalog)).await; @@ -1313,8 +1335,8 @@ pub(crate) mod test_helpers { other_table.id, sequencer.id, SequenceNumber::new(2), - min_time, - max_time, + min_time.add(10), + max_time.add(10), "bleh", ) .await @@ -1325,8 +1347,8 @@ pub(crate) mod test_helpers { table.id, sequencer.id, SequenceNumber::new(3), - min_time, - max_time, + min_time.add(10), + max_time.add(10), "sdf", ) .await @@ -1341,13 +1363,13 @@ pub(crate) mod test_helpers { // test list_by_table let listed = repos.tombstones().list_by_table(table.id).await.unwrap(); - assert_eq!(vec![t1, t3], listed); + assert_eq!(vec![t1.clone(), t3.clone()], listed); let listed = repos .tombstones() .list_by_table(other_table.id) .await .unwrap(); - assert_eq!(vec![t2], listed); + assert_eq!(vec![t2.clone()], listed); // test list_by_namespace let namespace2 = repos @@ -1366,8 +1388,8 @@ pub(crate) mod test_helpers { table2.id, sequencer.id, SequenceNumber::new(1), - min_time, - max_time, + min_time.add(10), + max_time.add(10), "whatevs", ) .await @@ -1378,8 +1400,8 @@ pub(crate) mod test_helpers { table2.id, sequencer.id, SequenceNumber::new(2), - min_time, - max_time, + min_time.add(10), + max_time.add(10), "foo", ) .await @@ -1389,13 +1411,40 @@ pub(crate) mod test_helpers { .list_by_namespace(namespace2.id) .await .unwrap(); - assert_eq!(vec![t4, t5], listed); + assert_eq!(vec![t4.clone(), t5.clone()], listed); let listed = repos .tombstones() .list_by_namespace(NamespaceId::new(i32::MAX)) .await .unwrap(); assert!(listed.is_empty()); + + // test get_by_id + let ts = repos.tombstones().get_by_id(t1.id).await.unwrap().unwrap(); + assert_eq!(ts, t1.clone()); + let ts = repos.tombstones().get_by_id(t2.id).await.unwrap().unwrap(); + assert_eq!(ts, t2.clone()); + let ts = repos + .tombstones() + .get_by_id(TombstoneId::new( + t1.id.get() + t2.id.get() + t3.id.get() + t4.id.get() + t5.id.get(), + )) // not exist id + .await + .unwrap(); + assert!(ts.is_none()); + + // test remove + repos.tombstones().remove(&[t1.id, t3.id]).await.unwrap(); + let ts = repos.tombstones().get_by_id(t1.id).await.unwrap(); + assert!(ts.is_none()); // no longer there + let ts = repos.tombstones().get_by_id(t3.id).await.unwrap(); + assert!(ts.is_none()); // no longer there + let ts = repos.tombstones().get_by_id(t2.id).await.unwrap().unwrap(); + assert_eq!(ts, t2.clone()); // still there + let ts = repos.tombstones().get_by_id(t4.id).await.unwrap().unwrap(); + assert_eq!(ts, t4.clone()); // still there + let ts = repos.tombstones().get_by_id(t5.id).await.unwrap().unwrap(); + assert_eq!(ts, t5.clone()); // still there } async fn test_tombstones_by_parquet_file(catalog: Arc) { @@ -1651,9 +1700,6 @@ pub(crate) mod test_helpers { .await .unwrap(); - let min_time = Timestamp::new(1); - let max_time = Timestamp::new(10); - let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, table_id: partition.table_id, @@ -1661,8 +1707,8 @@ pub(crate) mod test_helpers { object_store_id: Uuid::new_v4(), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(140), - min_time, - max_time, + min_time: Timestamp::new(1), + max_time: Timestamp::new(10), file_size_bytes: 1337, parquet_metadata: b"md1".to_vec(), row_count: 0, @@ -1688,6 +1734,8 @@ pub(crate) mod test_helpers { object_store_id: Uuid::new_v4(), min_sequence_number: SequenceNumber::new(45), max_sequence_number: SequenceNumber::new(200), + min_time: Timestamp::new(50), + max_time: Timestamp::new(60), ..parquet_file_params.clone() }; let other_file = repos.parquet_files().create(other_params).await.unwrap(); @@ -1767,6 +1815,10 @@ pub(crate) mod test_helpers { table_id: partition2.table_id, partition_id: partition2.id, object_store_id: Uuid::new_v4(), + min_time: Timestamp::new(1), + max_time: Timestamp::new(10), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(10), ..parquet_file_params }; let f1 = repos @@ -1777,9 +1829,17 @@ pub(crate) mod test_helpers { let f2_params = ParquetFileParams { object_store_id: Uuid::new_v4(), + min_time: Timestamp::new(50), + max_time: Timestamp::new(60), + min_sequence_number: SequenceNumber::new(11), + max_sequence_number: SequenceNumber::new(11), ..f1_params }; - let f2 = repos.parquet_files().create(f2_params).await.unwrap(); + let f2 = repos + .parquet_files() + .create(f2_params.clone()) + .await + .unwrap(); let files = repos .parquet_files() .list_by_namespace_not_to_delete(namespace2.id) @@ -1787,13 +1847,29 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(vec![f1.clone(), f2.clone()], files); + let f3_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + min_time: Timestamp::new(50), + max_time: Timestamp::new(60), + min_sequence_number: SequenceNumber::new(12), + max_sequence_number: SequenceNumber::new(12), + ..f2_params + }; + let f3 = repos.parquet_files().create(f3_params).await.unwrap(); + let files = repos + .parquet_files() + .list_by_namespace_not_to_delete(namespace2.id) + .await + .unwrap(); + assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files); + repos.parquet_files().flag_for_delete(f2.id).await.unwrap(); let files = repos .parquet_files() .list_by_namespace_not_to_delete(namespace2.id) .await .unwrap(); - assert_eq!(vec![f1.clone()], files); + assert_eq!(vec![f1.clone(), f3.clone()], files); let files = repos .parquet_files() @@ -1801,6 +1877,87 @@ pub(crate) mod test_helpers { .await .unwrap(); assert!(files.is_empty()); + + // test count_by_overlaps + // not time overlap + let count = repos + .parquet_files() + .count_by_overlaps( + partition2.table_id, + sequencer.id, + Timestamp::new(11), + Timestamp::new(20), + SequenceNumber::new(20), + ) + .await + .unwrap(); + assert_eq!(count, 0); + // overlaps with f1 + let count = repos + .parquet_files() + .count_by_overlaps( + partition2.table_id, + sequencer.id, + Timestamp::new(1), + Timestamp::new(10), + SequenceNumber::new(20), + ) + .await + .unwrap(); + assert_eq!(count, 1); + // overlaps with f1 and f3 + // f2 is deleted and should not be counted + let count = repos + .parquet_files() + .count_by_overlaps( + partition2.table_id, + sequencer.id, + Timestamp::new(7), + Timestamp::new(55), + SequenceNumber::new(20), + ) + .await + .unwrap(); + assert_eq!(count, 2); + // overlaps with f1 and f3 but on different time range + let count = repos + .parquet_files() + .count_by_overlaps( + partition2.table_id, + sequencer.id, + Timestamp::new(1), + Timestamp::new(100), + SequenceNumber::new(20), + ) + .await + .unwrap(); + assert_eq!(count, 2); + // overlaps with f3 + let count = repos + .parquet_files() + .count_by_overlaps( + partition2.table_id, + sequencer.id, + Timestamp::new(15), + Timestamp::new(100), + SequenceNumber::new(20), + ) + .await + .unwrap(); + assert_eq!(count, 1); + // no overlaps due to smaller sequnce number + let count = repos + .parquet_files() + .count_by_overlaps( + partition2.table_id, + sequencer.id, + Timestamp::new(15), + Timestamp::new(100), + SequenceNumber::new(2), + ) + .await + .unwrap(); + assert_eq!(count, 0); } async fn test_parquet_file_compaction_level_0(catalog: Arc) { @@ -2254,6 +2411,184 @@ pub(crate) mod test_helpers { ); } + async fn test_processed_tombstones(catalog: Arc) { + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); + let namespace = repos + .namespaces() + .create( + "namespace_processed_tombstone_test", + "inf", + kafka.id, + pool.id, + ) + .await + .unwrap(); + let table = repos + .tables() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let sequencer = repos + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + let partition = repos + .partitions() + .create_or_get("one", sequencer.id, table.id) + .await + .unwrap(); + + // parquet files + let parquet_file_params = ParquetFileParams { + sequencer_id: sequencer.id, + table_id: partition.table_id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(1), + max_sequence_number: SequenceNumber::new(1), + min_time: Timestamp::new(100), + max_time: Timestamp::new(250), + file_size_bytes: 1337, + parquet_metadata: b"md1".to_vec(), + row_count: 0, + created_at: Timestamp::new(1), + }; + let p1 = repos + .parquet_files() + .create(parquet_file_params.clone()) + .await + .unwrap(); + let parquet_file_params_2 = ParquetFileParams { + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(2), + max_sequence_number: SequenceNumber::new(3), + min_time: Timestamp::new(200), + max_time: Timestamp::new(300), + ..parquet_file_params + }; + let p2 = repos + .parquet_files() + .create(parquet_file_params_2.clone()) + .await + .unwrap(); + + // tombstones + let t1 = repos + .tombstones() + .create_or_get( + table.id, + sequencer.id, + SequenceNumber::new(10), + Timestamp::new(1), + Timestamp::new(10), + "whatevs", + ) + .await + .unwrap(); + let t2 = repos + .tombstones() + .create_or_get( + table.id, + sequencer.id, + SequenceNumber::new(11), + Timestamp::new(100), + Timestamp::new(110), + "whatevs", + ) + .await + .unwrap(); + let t3 = repos + .tombstones() + .create_or_get( + table.id, + sequencer.id, + SequenceNumber::new(12), + Timestamp::new(200), + Timestamp::new(210), + "whatevs", + ) + .await + .unwrap(); + + // processed tombstones + // p1, t2 + let _pt1 = repos + .processed_tombstones() + .create(p1.id, t2.id) + .await + .unwrap(); + // p1, t3 + let _pt2 = repos + .processed_tombstones() + .create(p1.id, t3.id) + .await + .unwrap(); + // p2, t3 + let _pt3 = repos + .processed_tombstones() + .create(p2.id, t3.id) + .await + .unwrap(); + + // test exist + let exist = repos + .processed_tombstones() + .exist(p1.id, t1.id) + .await + .unwrap(); + assert!(!exist); + let exist = repos + .processed_tombstones() + .exist(p1.id, t2.id) + .await + .unwrap(); + assert!(exist); + + // test count + let count = repos.processed_tombstones().count().await.unwrap(); + assert_eq!(count, 3); + + // test count_by_tombstone_id + let count = repos + .processed_tombstones() + .count_by_tombstone_id(t1.id) + .await + .unwrap(); + assert_eq!(count, 0); + let count = repos + .processed_tombstones() + .count_by_tombstone_id(t2.id) + .await + .unwrap(); + assert_eq!(count, 1); + let count = repos + .processed_tombstones() + .count_by_tombstone_id(t3.id) + .await + .unwrap(); + assert_eq!(count, 2); + + // test remove + repos.tombstones().remove(&[t3.id]).await.unwrap(); + // should still be 1 because t2 was not deleted + let count = repos + .processed_tombstones() + .count_by_tombstone_id(t2.id) + .await + .unwrap(); + assert_eq!(count, 1); + // should be 0 because t3 was deleted + let count = repos + .processed_tombstones() + .count_by_tombstone_id(t3.id) + .await + .unwrap(); + assert_eq!(count, 0); + } + async fn test_txn_isolation(catalog: Arc) { let barrier = Arc::new(tokio::sync::Barrier::new(2)); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 838bd92d55..7d4ef957fa 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -841,6 +841,12 @@ impl TombstoneRepo for MemTxn { Ok(tombstones) } + async fn get_by_id(&mut self, id: TombstoneId) -> Result> { + let stage = self.stage(); + + Ok(stage.tombstones.iter().find(|t| t.id == id).cloned()) + } + async fn list_tombstones_by_sequencer_greater_than( &mut self, sequencer_id: SequencerId, @@ -857,6 +863,22 @@ impl TombstoneRepo for MemTxn { Ok(tombstones) } + async fn remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()> { + let stage = self.stage(); + + // remove the processed tombstones first + stage + .processed_tombstones + .retain(|pt| !tombstone_ids.iter().any(|id| *id == pt.tombstone_id)); + + // remove the tombstones + stage + .tombstones + .retain(|ts| !tombstone_ids.iter().any(|id| *id == ts.id)); + + Ok(()) + } + async fn list_tombstones_for_time_range( &mut self, sequencer_id: SequencerId, @@ -1065,6 +1087,32 @@ impl ParquetFileRepo for MemTxn { } Ok(count_i64.unwrap()) } + + async fn count_by_overlaps( + &mut self, + table_id: TableId, + sequencer_id: SequencerId, + min_time: Timestamp, + max_time: Timestamp, + sequence_number: SequenceNumber, + ) -> Result { + let stage = self.stage(); + + let count = stage + .parquet_files + .iter() + .filter(|f| { + f.sequencer_id == sequencer_id + && f.table_id == table_id + && f.min_sequence_number < sequence_number + && f.to_delete.is_none() + && ((f.min_time <= min_time && f.max_time >= min_time) + || (f.min_time > min_time && f.min_time <= max_time)) + }) + .count(); + + i64::try_from(count).map_err(|_| Error::InvalidValue { value: count }) + } } #[async_trait] @@ -1134,6 +1182,18 @@ impl ProcessedTombstoneRepo for MemTxn { } Ok(count_i64.unwrap()) } + + async fn count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result { + let stage = self.stage(); + + let count = stage + .processed_tombstones + .iter() + .filter(|p| p.tombstone_id == tombstone_id) + .count(); + + i64::try_from(count).map_err(|_| Error::InvalidValue { value: count }) + } } #[cfg(test)] diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 1ff74ecc53..5d929c4cd2 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -253,7 +253,9 @@ decorate!( "tombstone_create_or_get" = create_or_get( &mut self, table_id: TableId, sequencer_id: SequencerId, sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp, predicate: &str) -> Result; "tombstone_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result>; "tombstone_list_by_table" = list_by_table(&mut self, table_id: TableId) -> Result>; + "tombstone_get_by_id" = get_by_id(&mut self, id: TombstoneId) -> Result>; "tombstone_list_tombstones_by_sequencer_greater_than" = list_tombstones_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result>; + "tombstone_remove" = remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()>; "tombstone_list_tombstones_for_time_range" = list_tombstones_for_time_range(&mut self, sequencer_id: SequencerId, table_id: TableId, sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp) -> Result>; ] ); @@ -271,6 +273,7 @@ decorate!( "parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result>; "parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result; "parquet_count" = count(&mut self) -> Result; + "parquet_count_by_overlaps" = count_by_overlaps(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result; ] ); @@ -280,5 +283,6 @@ decorate!( "processed_tombstone_create" = create(&mut self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> Result; "processed_tombstone_exist" = exist(&mut self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> Result; "processed_tombstone_count" = count(&mut self) -> Result; + "processed_tombstone_count_by_tombstone_id" = count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result; ] ); diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 15af36908a..976ac025c7 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1263,6 +1263,27 @@ ORDER BY id; .map_err(|e| Error::SqlxError { source: e }) } + async fn get_by_id(&mut self, id: TombstoneId) -> Result> { + let rec = sqlx::query_as::<_, Tombstone>( + r#" +SELECT * +FROM tombstone +WHERE id = $1; + "#, + ) + .bind(&id) // $1 + .fetch_one(&mut self.inner) + .await; + + if let Err(sqlx::Error::RowNotFound) = rec { + return Ok(None); + } + + let tombstone = rec.map_err(|e| Error::SqlxError { source: e })?; + + Ok(Some(tombstone)) + } + async fn list_tombstones_by_sequencer_greater_than( &mut self, sequencer_id: SequencerId, @@ -1284,6 +1305,38 @@ ORDER BY id; .map_err(|e| Error::SqlxError { source: e }) } + async fn remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()> { + let ids: Vec<_> = tombstone_ids.iter().map(|t| t.get()).collect(); + + // Remove processed tombstones first + sqlx::query( + r#" +DELETE +FROM processed_tombstone +WHERE tombstone_id = ANY($1); + "#, + ) + .bind(&ids[..]) // $1 + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + // Remove tombstones + sqlx::query( + r#" +DELETE +FROM tombstone +WHERE id = ANY($1); + "#, + ) + .bind(&ids[..]) // $1 + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + Ok(()) + } + async fn list_tombstones_for_time_range( &mut self, sequencer_id: SequencerId, @@ -1469,8 +1522,8 @@ WHERE parquet_file.sequencer_id = $1 AND parquet_file.partition_id = $3 AND parquet_file.compaction_level = 1 AND parquet_file.to_delete IS NULL - AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $4) - OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $5)); + AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4) + OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5)); "#, ) .bind(&table_partition.sequencer_id) // $1 @@ -1528,6 +1581,38 @@ RETURNING id; Ok(read_result.count) } + + async fn count_by_overlaps( + &mut self, + table_id: TableId, + sequencer_id: SequencerId, + min_time: Timestamp, + max_time: Timestamp, + sequence_number: SequenceNumber, + ) -> Result { + let read_result = sqlx::query_as::<_, Count>( + r#" +SELECT count(*) as count +FROM parquet_file +WHERE table_id = $1 + AND sequencer_id = $2 + AND min_sequence_number < $3 + AND parquet_file.to_delete IS NULL + AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4) + OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5)); + "#, + ) + .bind(&table_id) // $1 + .bind(&sequencer_id) // $2 + .bind(sequence_number) // $3 + .bind(min_time) // $4 + .bind(max_time) // $5 + .fetch_one(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + Ok(read_result.count) + } } #[async_trait] @@ -1593,6 +1678,18 @@ WHERE parquet_file_id = $1 Ok(read_result.count) } + + async fn count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result { + let read_result = sqlx::query_as::<_, Count>( + r#"SELECT count(*) as count FROM processed_tombstone WHERE tombstone_id = $1;"#, + ) + .bind(&tombstone_id) // $1 + .fetch_one(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + Ok(read_result.count) + } } /// The error code returned by Postgres for a unique constraint violation. diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 1db51ee440..c90ff37a72 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -7,7 +7,8 @@ use arrow::{ use bytes::Bytes; use data_types2::{ ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams, Partition, - QueryPool, SequenceNumber, Sequencer, Table, Timestamp, Tombstone, + QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, Tombstone, + TombstoneId, }; use iox_catalog::{interface::Catalog, mem::MemCatalog}; use iox_object_store::{IoxObjectStore, ParquetFilePath}; @@ -109,6 +110,82 @@ impl TestCatalog { namespace, }) } + + /// return tombstones of a given table + pub async fn list_tombstones_by_table(self: &Arc, table_id: TableId) -> Vec { + self.catalog + .repositories() + .await + .tombstones() + .list_by_table(table_id) + .await + .unwrap() + } + + /// return number of tombstones of a given table + pub async fn count_tombstones_for_table(self: &Arc, table_id: TableId) -> usize { + let ts = self + .catalog + .repositories() + .await + .tombstones() + .list_by_table(table_id) + .await + .unwrap(); + ts.len() + } + + /// return number of processed tombstones of a tombstones + pub async fn count_processed_tombstones(self: &Arc, tombstone_id: TombstoneId) -> i64 { + self.catalog + .repositories() + .await + .processed_tombstones() + .count_by_tombstone_id(tombstone_id) + .await + .unwrap() + } + + /// List level 0 files + pub async fn list_level_0_files( + self: &Arc, + sequencer_id: SequencerId, + ) -> Vec { + self.catalog + .repositories() + .await + .parquet_files() + .level_0(sequencer_id) + .await + .unwrap() + } + + /// Count level 0 files + pub async fn count_level_0_files(self: &Arc, sequencer_id: SequencerId) -> usize { + let level_0 = self + .catalog + .repositories() + .await + .parquet_files() + .level_0(sequencer_id) + .await + .unwrap(); + level_0.len() + } + + /// List all non-deleted files + pub async fn list_by_table_not_to_delete( + self: &Arc, + table_id: TableId, + ) -> Vec { + self.catalog + .repositories() + .await + .parquet_files() + .list_by_table_not_to_delete(table_id) + .await + .unwrap() + } } /// A test namespace @@ -292,6 +369,22 @@ impl TestPartition { max_seq: i64, min_time: i64, max_time: i64, + ) -> Arc { + self.create_parquet_file_with_min_max_and_creation_time( + lp, min_seq, max_seq, min_time, max_time, 1, + ) + .await + } + + /// Create a parquet for the partition + pub async fn create_parquet_file_with_min_max_and_creation_time( + self: &Arc, + lp: &str, + min_seq: i64, + max_seq: i64, + min_time: i64, + max_time: i64, + creation_time: i64, ) -> Arc { let mut repos = self.catalog.catalog.repositories().await; @@ -335,7 +428,7 @@ impl TestPartition { file_size_bytes: file_size_bytes as i64, parquet_metadata: parquet_metadata_bin, row_count: row_count as i64, - created_at: Timestamp::new(1), + created_at: Timestamp::new(creation_time), }; let parquet_file = repos .parquet_files() diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index d87ae54f81..fd90379f65 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -1,15 +1,20 @@ +//! Querier Chunk + use crate::cache::CatalogCache; +use arrow::record_batch::RecordBatch; use data_types2::{ ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, SequenceNumber, SequencerId, }; +use futures::StreamExt; use iox_catalog::interface::Catalog; use iox_object_store::IoxObjectStore; use object_store::DynObjectStore; use parquet_file::chunk::{ new_parquet_chunk, ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk, }; -use schema::sort::SortKey; +use query::{exec::IOxSessionContext, QueryChunk}; +use schema::{selection::Selection, sort::SortKey}; use std::sync::Arc; use time::TimeProvider; use uuid::Uuid; @@ -75,7 +80,9 @@ impl ChunkMeta { pub enum ChunkStorage { /// Data is currently available via parquet file within the object store. Parquet { + /// ID of the parquet file if the chunk parquet_file_id: ParquetFileId, + /// Chunk of the parquet file chunk: Arc, }, } @@ -285,15 +292,30 @@ impl ParquetChunkAdapter { } } +/// collect data for the given chunk +pub async fn collect_read_filter(chunk: &QuerierChunk) -> Vec { + chunk + .read_filter( + IOxSessionContext::default(), + &Default::default(), + Selection::All, + ) + .unwrap() + .collect::>() + .await + .into_iter() + .map(Result::unwrap) + .collect() +} + #[cfg(test)] -mod tests { +pub mod tests { use super::*; - use arrow::{datatypes::DataType, record_batch::RecordBatch}; + use arrow::datatypes::DataType; use arrow_util::assert_batches_eq; - use futures::StreamExt; use iox_tests::util::TestCatalog; - use query::{exec::IOxSessionContext, QueryChunk, QueryChunkMeta}; - use schema::{builder::SchemaBuilder, selection::Selection, sort::SortKeyBuilder}; + use query::QueryChunkMeta; + use schema::{builder::SchemaBuilder, sort::SortKeyBuilder}; #[tokio::test] async fn test_create_record() { @@ -370,19 +392,4 @@ mod tests { &batches ); } - - async fn collect_read_filter(chunk: &QuerierChunk) -> Vec { - chunk - .read_filter( - IOxSessionContext::default(), - &Default::default(), - Selection::All, - ) - .unwrap() - .collect::>() - .await - .into_iter() - .map(Result::unwrap) - .collect() - } } diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 8e2c05eb01..d6d6e866fd 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -10,9 +10,9 @@ )] #![allow(dead_code)] -mod cache; +pub mod cache; mod cache_system; -mod chunk; +pub mod chunk; mod database; /// Flight client to the ingester to request in-memory data. mod flight;