feat: delete fully processed tombstones & integration tests for find_and_compact (#4116)

* feat: remove fully processed tombstones

* test: first few tests

* fix: delete SQL

* fix: test how IN (...) works in PG

* fix: test how IN (?) works in PG

* fix: test how IN (?) works in PG

* fix: dynamically add  IN (?, ?, ...)

* fix: dynamically add  IN (?, ?, ...) & its dynamic values

* fix: add argument directly in the SQL

* test: more tests for catalog read and update functions

* chore: move a subfunction to make it easier to read)

* test: first test for find_can_compact but disabled due to bug

* test: integration tests and a bug fix for find_and_compact

* chore: cleanup

* refactor: address review comments

* fix: put 2 delete processed  tombstones and tombstones in a transaction
pull/24376/head
Nga Tran 2022-03-28 14:35:54 -04:00 committed by GitHub
parent fb186c6733
commit 80b7e9cce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1125 additions and 92 deletions

1
Cargo.lock generated
View File

@ -756,6 +756,7 @@ dependencies = [
"parking_lot 0.12.0",
"parquet_file",
"predicate",
"querier",
"query",
"schema",
"snafu",

View File

@ -33,4 +33,5 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
iox_tests = { path = "../iox_tests" }
querier = { path = "../querier"}
test_helpers = { path = "../test_helpers" }

View File

@ -9,7 +9,7 @@ use backoff::{Backoff, BackoffConfig};
use bytes::Bytes;
use data_types2::{
ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TablePartition, Timestamp,
TombstoneId,
Tombstone, TombstoneId,
};
use datafusion::error::DataFusionError;
use iox_catalog::interface::{Catalog, Transaction};
@ -142,6 +142,16 @@ pub enum Error {
source: iox_catalog::interface::Error,
},
#[snafu(display("Error removing processed tombstones {}", source))]
RemoveProcessedTombstones {
source: iox_catalog::interface::Error,
},
#[snafu(display("Error removing tombstones {}", source))]
RemoveTombstones {
source: iox_catalog::interface::Error,
},
#[snafu(display("Error querying for tombstones for a parquet file {}", source))]
QueryingTombstones {
source: iox_catalog::interface::Error,
@ -285,12 +295,12 @@ impl Compactor {
.add_tombstones_to_groups(overlapped_file_groups)
.await?;
// Compact, persist,and update catalog accordingly for each overlaped file
let mut tombstones: HashSet<TombstoneId> = HashSet::new();
// Compact, persist, and update catalog accordingly for each overlaped file group
let mut tombstones = BTreeMap::new();
let mut upgrade_level_list: Vec<ParquetFileId> = vec![];
for group in groups_with_tombstones {
// keep tombstone ids
tombstones = Self::union_tombstone_ids(tombstones, &group);
tombstones = Self::union_tombstones(tombstones, &group);
// Only one file without tombstones, no need to compact
if group.parquet_files.len() == 1 && group.tombstones.is_empty() {
@ -315,7 +325,7 @@ impl Compactor {
let CompactedData {
data,
meta,
tombstone_ids,
tombstones,
} = split_file;
let file_size_and_md = Backoff::new(&self.backoff_config)
@ -326,12 +336,7 @@ impl Compactor {
.expect("retry forever");
if let Some((file_size, md)) = file_size_and_md {
catalog_update_info.push(CatalogUpdate::new(
meta,
file_size,
md,
tombstone_ids,
));
catalog_update_info.push(CatalogUpdate::new(meta, file_size, md, tombstones));
}
}
let mut txn = self
@ -351,7 +356,7 @@ impl Compactor {
}
// Remove fully processed tombstones
// TODO: #3953 - remove_fully_processed_tombstones(tombstones)
self.remove_fully_processed_tombstones(tombstones).await?;
// Upgrade old level-0 to level 1
self.update_to_level_1(&upgrade_level_list).await?;
@ -376,13 +381,12 @@ impl Compactor {
groups
}
// Extract tombstones id
fn union_tombstone_ids(
mut tombstones: HashSet<TombstoneId>,
fn union_tombstones(
mut tombstones: BTreeMap<TombstoneId, Tombstone>,
group_with_tombstones: &GroupWithTombstones,
) -> HashSet<TombstoneId> {
for id in group_with_tombstones.tombstone_ids() {
tombstones.insert(id);
) -> BTreeMap<TombstoneId, Tombstone> {
for ts in &group_with_tombstones.tombstones {
tombstones.insert(ts.id, (*ts).clone());
}
tombstones
}
@ -409,20 +413,18 @@ impl Compactor {
return Ok(compacted);
}
// Collect all the tombstone IDs. One tombstone might be relevant to multiple parquet
// files in this set, so dedupe here.
let tombstone_ids: HashSet<_> = overlapped_files
.iter()
.flat_map(|f| f.tombstone_ids())
.collect();
// Keep the fist IoxMetadata to reuse same IDs and names
let iox_metadata = overlapped_files[0].iox_metadata();
// Verify if the given files belong to the same partition
// Note: we can ignore this verification if we assume this is a must-have condition
// Collect all unique tombstone
let mut tombstone_map = overlapped_files[0].tombstones();
// Verify if the given files belong to the same partition and collect their tombstones
// One tombstone might be relevant to multiple parquet files in this set, so dedupe here.
if let Some((head, tail)) = overlapped_files.split_first() {
for file in tail {
tombstone_map.append(&mut file.tombstones());
let is_same = file.data.sequencer_id == head.data.sequencer_id
&& file.data.table_id == head.data.table_id
&& file.data.partition_id == head.data.partition_id;
@ -544,7 +546,7 @@ impl Compactor {
sort_key: None,
};
let compacted_data = CompactedData::new(output_batches, meta, tombstone_ids.clone());
let compacted_data = CompactedData::new(output_batches, meta, tombstone_map.clone());
compacted.push(compacted_data);
}
@ -627,11 +629,20 @@ impl Compactor {
.context(UpdateSnafu)?;
// Now that the parquet file is available, create its processed tombstones
for tombstone_id in catalog_update.tombstone_ids {
txn.processed_tombstones()
.create(parquet.id, tombstone_id)
.await
.context(UpdateSnafu)?;
for (_, tombstone) in catalog_update.tombstones {
// Becasue data may get removed and split during compaction, a few new files
// may no longer overlap with the delete tombstones. Need to verify whether
// they are overlap before adding process tombstones
if (parquet.min_time <= tombstone.min_time
&& parquet.max_time >= tombstone.min_time)
|| (parquet.min_time > tombstone.min_time
&& parquet.min_time <= tombstone.max_time)
{
txn.processed_tombstones()
.create(parquet.id, tombstone.id)
.await
.context(UpdateSnafu)?;
}
}
}
@ -685,6 +696,87 @@ impl Compactor {
min_time + (max_time - min_time) * SPLIT_PERCENTAGE / 100
}
// remove fully processed tombstones
async fn remove_fully_processed_tombstones(
&self,
tombstones: BTreeMap<TombstoneId, Tombstone>,
) -> Result<()> {
// get fully proccessed ones
let mut to_be_removed = Vec::with_capacity(tombstones.len());
for (ts_id, ts) in tombstones {
if self.fully_processed(ts).await {
to_be_removed.push(ts_id);
}
}
// Remove the tombstones
// Note (todo maybe): if the list of deleted tombstone_ids is long and
// make this transaction slow, we need to split them into many smaller lists
// and each will be removed in its own transaction.
let mut txn = self
.catalog
.start_transaction()
.await
.context(TransactionSnafu)?;
txn.tombstones()
.remove(&to_be_removed)
.await
.context(RemoveTombstonesSnafu)?;
txn.commit().await.context(TransactionCommitSnafu)?;
Ok(())
}
// Return true if the given tombstones is fully processed
async fn fully_processed(&self, tombstone: Tombstone) -> bool {
let mut repos = self.catalog.repositories().await;
// Get number of non-deleted parquet files of the same tableId & sequencerId that overlap with the tombstone time range
let count_pf = repos
.parquet_files()
.count_by_overlaps(
tombstone.table_id,
tombstone.sequencer_id,
tombstone.min_time,
tombstone.max_time,
tombstone.sequence_number,
)
.await;
let count_pf = match count_pf {
Ok(count_pf) => count_pf,
_ => {
warn!(
"Error getting parquet file count for table ID {}, sequencer ID {}, min time {:?}, max time {:?}.
Won't be able to verify whether its tombstone is fully processed",
tombstone.table_id, tombstone.sequencer_id, tombstone.min_time, tombstone.max_time
);
return false;
}
};
// Get number of the processed parquet file for this tombstones
let count_pt = repos
.processed_tombstones()
.count_by_tombstone_id(tombstone.id)
.await;
let count_pt = match count_pt {
Ok(count_pt) => count_pt,
_ => {
warn!(
"Error getting processed tombstone count for tombstone ID {}.
Won't be able to verify whether the tombstone is fully processed",
tombstone.id
);
return false;
}
};
// Fully processed if two count the same
count_pf == count_pt
}
async fn add_tombstones_to_groups(
&self,
groups: Vec<Vec<ParquetFile>>,
@ -778,9 +870,331 @@ mod tests {
use futures::{stream, StreamExt, TryStreamExt};
use iox_tests::util::TestCatalog;
use object_store::path::Path;
use querier::{
cache::CatalogCache,
chunk::{collect_read_filter, ParquetChunkAdapter},
};
use query::test::{raw_data, TestChunk};
use time::SystemProvider;
#[tokio::test]
// This is integration test to verify all pieces are put together correctly
async fn test_find_and_compact() {
let catalog = TestCatalog::new();
let lp = vec![
"table,tag1=WA field_int=1000 8000",
"table,tag1=VT field_int=10 10000",
"table,tag1=UT field_int=70 20000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_sequencer(1).await;
let table = ns.create_table("table").await;
// One parquet file
table
.with_sequencer(&sequencer)
.create_partition("part")
.await
.create_parquet_file_with_min_max_and_creation_time(
&lp,
1,
1,
8000,
20000,
catalog.time_provider.now().timestamp_nanos(),
)
.await;
// should have 1 level-0 file
let count = catalog.count_level_0_files(sequencer.sequencer.id).await;
assert_eq!(count, 1);
// One overlaped tombstone
let tombstone = table
.with_sequencer(&sequencer)
.create_tombstone(20, 6000, 12000, "tag1=VT")
.await;
// Should have 1 tomstone
let count = catalog.count_tombstones_for_table(table.table.id).await;
assert_eq!(count, 1);
// ------------------------------------------------
// Compact
let compactor = Compactor {
sequencers: vec![sequencer.sequencer.id],
object_store: Arc::clone(&catalog.object_store),
catalog: Arc::clone(&catalog.catalog),
exec: Arc::new(Executor::new(1)),
time_provider: Arc::new(SystemProvider::new()),
backoff_config: BackoffConfig::default(),
};
compactor
.find_and_compact(sequencer.sequencer.id)
.await
.unwrap();
// should have 2 non-deleted level_0 files. The original file was marked deleted and not counted
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 2);
// 2 newly created level-0 files as the result of compaction
assert_eq!((files[0].id.get(), files[0].compaction_level), (2, 0));
assert_eq!((files[1].id.get(), files[1].compaction_level), (3, 0));
// processed tombstones created and deleted inside find_and_compact function
let count = catalog
.count_processed_tombstones(tombstone.tombstone.id)
.await;
assert_eq!(count, 0);
// the tombstone is fully processed and should have been removed
let count = catalog.count_tombstones_for_table(table.table.id).await;
assert_eq!(count, 0);
// ------------------------------------------------
// Verify the parquet file content
let adapter = ParquetChunkAdapter::new(
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
)),
catalog.object_store(),
catalog.metric_registry(),
catalog.time_provider(),
);
// create chunks for 2 files
let chunk_0 = adapter.new_querier_chunk(files[0].clone()).await.unwrap();
let chunk_1 = adapter.new_querier_chunk(files[1].clone()).await.unwrap();
// query the chunks
// least recent compacted first half (~90%)
let batches = collect_read_filter(&chunk_0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 1000 | WA | 1970-01-01T00:00:00.000008Z |",
"+-----------+------+-----------------------------+",
],
&batches
);
// most recent compacted second half (~10%)
let batches = collect_read_filter(&chunk_1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 70 | UT | 1970-01-01T00:00:00.000020Z |",
"+-----------+------+-----------------------------+",
],
&batches
);
}
// A quite sophisticated integration test
// Beside lp data, every value min/max sequence numbers and min/max time are important
// to have a combination of needed tests in this test function
#[tokio::test]
async fn test_find_and_compact_many_files_many_tombstones() {
let catalog = TestCatalog::new();
// lp1 does not overlap with any
let lp1 = vec![
"table,tag1=WA field_int=1000 10",
"table,tag1=VT field_int=10 20",
]
.join("\n");
// lp2 overlaps with lp3
let lp2 = vec![
"table,tag1=WA field_int=1000 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10 10000", // will be deleted by ts2
"table,tag1=UT field_int=70 20000",
]
.join("\n");
// lp3 overlaps with lp2
let lp3 = vec![
"table,tag1=WA field_int=1500 8000", // latest duplicate and kept
"table,tag1=VT field_int=10 6000",
"table,tag1=UT field_int=270 25000",
]
.join("\n");
// lp4 does not overlapp with any
let lp4 = vec![
"table,tag2=WA,tag3=10 field_int=1600 28000",
"table,tag2=VT,tag3=20 field_int=20 26000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_sequencer(1).await;
let table = ns.create_table("table").await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
.await;
let time = Arc::new(SystemProvider::new());
// parquet files
// pf1 does not overlap with any and was created long ago ==> will be upgraded to level 1 during compaction
let _pf1 = partition
.create_parquet_file_with_min_max_and_creation_time(&lp1, 1, 1, 10, 20, 20)
.await
.parquet_file
.clone();
// pf2 overlaps with pf3 ==> compacted and marked to_delete with a timestamp
let _pf2 = partition
.create_parquet_file_with_min_max_and_creation_time(
&lp2,
4,
5,
8000,
20000,
time.now().timestamp_nanos(),
)
.await
.parquet_file
.clone();
// pf3 overlaps with pf2 ==> compacted and marked to_delete with a timestamp
let _pf3 = partition
.create_parquet_file_with_min_max_and_creation_time(
&lp3,
8,
10,
6000,
25000,
time.now().timestamp_nanos(),
)
.await
.parquet_file
.clone();
// pf4 does not overlap with any and recent created ==> stay level 0
let _pf4 = partition
.create_parquet_file_with_min_max_and_creation_time(
&lp4,
18,
18,
26000,
28000,
time.now().timestamp_nanos(),
)
.await
.parquet_file
.clone();
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(sequencer.sequencer.id).await;
assert_eq!(count, 4);
// create 3 tombstones
// ts1 overlaps with pf1 and pf2 but issued before pf1 and pf2 hence it won't be used
let ts1 = table
.with_sequencer(&sequencer)
.create_tombstone(2, 6000, 21000, "tag1=UT")
.await;
// ts2 overlap with both pf1 and pf2 but issued before pf3 so only applied to pf2
let ts2 = table
.with_sequencer(&sequencer)
.create_tombstone(6, 6000, 12000, "tag1=VT")
.await;
// ts3 does not overlap with any files
let ts3 = table
.with_sequencer(&sequencer)
.create_tombstone(22, 1000, 2000, "tag1=VT")
.await;
// should have 3 tomstones
let count = catalog.count_tombstones_for_table(table.table.id).await;
assert_eq!(count, 3);
// should not have any processed tombstones for any tombstones
let count = catalog.count_processed_tombstones(ts1.tombstone.id).await;
assert_eq!(count, 0);
let count = catalog.count_processed_tombstones(ts2.tombstone.id).await;
assert_eq!(count, 0);
let count = catalog.count_processed_tombstones(ts3.tombstone.id).await;
assert_eq!(count, 0);
// ------------------------------------------------
// Compact
let compactor = Compactor {
sequencers: vec![sequencer.sequencer.id],
object_store: Arc::clone(&catalog.object_store),
catalog: Arc::clone(&catalog.catalog),
exec: Arc::new(Executor::new(1)),
time_provider: Arc::new(SystemProvider::new()),
backoff_config: BackoffConfig::default(),
};
compactor
.find_and_compact(sequencer.sequencer.id)
.await
.unwrap();
// Should have 4 non-soft-deleted files: pf1 and pf4 not compacted and stay, and 2 newly created after compacting pf2 with pf3
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 4);
// pf1 upgraded to level 1 becasue it was too old
assert_eq!((files[0].id.get(), files[0].compaction_level), (1, 1));
// pf4 stays level 0 since it is not old enough
assert_eq!((files[1].id.get(), files[1].compaction_level), (4, 0));
// 2 newly created level-0 files as the result of compaction
assert_eq!((files[2].id.get(), files[2].compaction_level), (5, 0));
assert_eq!((files[3].id.get(), files[3].compaction_level), (6, 0));
// should have ts1 and ts3 that not involved in the commpaction process
// ts2 was removed because it was fully processed
let tss = catalog.list_tombstones_by_table(table.table.id).await;
assert_eq!(tss.len(), 2);
assert_eq!(tss[0].id.get(), ts1.tombstone.id.get());
assert_eq!(tss[1].id.get(), ts3.tombstone.id.get());
// processed tombstones of ts2 was created and deleted inside find_and_compact function
let count = catalog.count_processed_tombstones(ts2.tombstone.id).await;
assert_eq!(count, 0);
// ------------------------------------------------
// Verify the parquet file content
let adapter = ParquetChunkAdapter::new(
Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
)),
catalog.object_store(),
catalog.metric_registry(),
catalog.time_provider(),
);
// create chunks for 2 files
let chunk_0 = adapter.new_querier_chunk(files[2].clone()).await.unwrap();
let chunk_1 = adapter.new_querier_chunk(files[3].clone()).await.unwrap();
// query the chunks
// least recent compacted first half (~90%)
let batches = collect_read_filter(&chunk_0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 10 | VT | 1970-01-01T00:00:00.000006Z |",
"| 1500 | WA | 1970-01-01T00:00:00.000008Z |",
"| 70 | UT | 1970-01-01T00:00:00.000020Z |",
"+-----------+------+-----------------------------+",
],
&batches
);
// most recent compacted second half (~10%)
let batches = collect_read_filter(&chunk_1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 270 | UT | 1970-01-01T00:00:00.000025Z |",
"+-----------+------+-----------------------------+",
],
&batches
);
}
#[tokio::test]
async fn test_compact_one_file() {
let catalog = TestCatalog::new();
@ -834,6 +1248,7 @@ mod tests {
.create_tombstone(20, 6000, 12000, "tag1=VT")
.await;
pf.add_tombstones(vec![tombstone.tombstone.clone()]);
// should have compacted data
let batches = compactor.compact(vec![pf]).await.unwrap();
// 2 sets based on the split rule
@ -1469,7 +1884,10 @@ mod tests {
.await
.unwrap();
let compacted_data = CompactedData::new(data, meta, HashSet::from([t1.id]));
let mut tombstones = BTreeMap::new();
tombstones.insert(t1.id, t1);
let compacted_data = CompactedData::new(data, meta, tombstones);
Compactor::persist(
&compacted_data.meta,
@ -1619,7 +2037,7 @@ mod tests {
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let catalog_updates = vec![CatalogUpdate {
meta: meta.clone(),
tombstone_ids: HashSet::from([t1.id, t2.id]),
tombstones: BTreeMap::from([(t1.id, t1.clone()), (t2.id, t2.clone())]),
parquet_file: parquet.clone(),
}];
compactor
@ -1649,7 +2067,7 @@ mod tests {
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let catalog_updates = vec![CatalogUpdate {
meta: meta.clone(),
tombstone_ids: HashSet::from([t3.id, t1.id]),
tombstones: BTreeMap::from([(t3.id, t3.clone()), (t1.id, t1.clone())]),
parquet_file: parquet.clone(),
}];
compactor
@ -1667,7 +2085,7 @@ mod tests {
// parquet file so that one should now be deleted. Should go through
let catalog_updates = vec![CatalogUpdate {
meta: meta.clone(),
tombstone_ids: HashSet::from([t3.id]),
tombstones: BTreeMap::from([(t3.id, t3.clone())]),
parquet_file: other_parquet.clone(),
}];
compactor
@ -1701,7 +2119,7 @@ mod tests {
t4.id = TombstoneId::new(t4.id.get() + 10);
let catalog_updates = vec![CatalogUpdate {
meta: meta.clone(),
tombstone_ids: HashSet::from([t4.id]),
tombstones: BTreeMap::from([(t4.id, t4.clone())]),
parquet_file: another_parquet.clone(),
}];
compactor

View File

@ -9,7 +9,10 @@ use parquet_file::{
chunk::{new_parquet_chunk, ChunkMetrics, DecodedParquetFile},
metadata::{IoxMetadata, IoxParquetMetaData},
};
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use time::TimeProvider;
/// Wrapper of a group of parquet files and their tombstones that overlap in time and should be
@ -63,6 +66,14 @@ impl ParquetFileWithTombstone {
self.data.id
}
/// Return all tombstones in btree map format
pub fn tombstones(&self) -> BTreeMap<TombstoneId, Tombstone> {
self.tombstones
.iter()
.map(|ts| (ts.id, ts.clone()))
.collect()
}
/// Add more tombstones
pub fn add_tombstones(&mut self, tombstones: Vec<Tombstone>) {
self.tombstones.extend(tombstones);
@ -105,7 +116,7 @@ impl ParquetFileWithTombstone {
pub struct CompactedData {
pub(crate) data: Vec<RecordBatch>,
pub(crate) meta: IoxMetadata,
pub(crate) tombstone_ids: HashSet<TombstoneId>,
pub(crate) tombstones: BTreeMap<TombstoneId, Tombstone>,
}
impl CompactedData {
@ -113,12 +124,12 @@ impl CompactedData {
pub fn new(
data: Vec<RecordBatch>,
meta: IoxMetadata,
tombstone_ids: HashSet<TombstoneId>,
tombstones: BTreeMap<TombstoneId, Tombstone>,
) -> Self {
Self {
data,
meta,
tombstone_ids,
tombstones,
}
}
}
@ -127,7 +138,7 @@ impl CompactedData {
#[derive(Debug)]
pub struct CatalogUpdate {
pub(crate) meta: IoxMetadata,
pub(crate) tombstone_ids: HashSet<TombstoneId>,
pub(crate) tombstones: BTreeMap<TombstoneId, Tombstone>,
pub(crate) parquet_file: ParquetFileParams,
}
@ -137,12 +148,12 @@ impl CatalogUpdate {
meta: IoxMetadata,
file_size: usize,
md: IoxParquetMetaData,
tombstone_ids: HashSet<TombstoneId>,
tombstones: BTreeMap<TombstoneId, Tombstone>,
) -> Self {
let parquet_file = meta.to_parquet_file(file_size, &md);
Self {
meta,
tombstone_ids,
tombstones,
parquet_file,
}
}

View File

@ -228,6 +228,12 @@ impl TombstoneId {
}
}
impl std::fmt::Display for TombstoneId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// A sequence number from a `Sequencer` (kafka partition)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
@ -658,7 +664,7 @@ pub struct PartitionInfo {
}
/// Data object for a tombstone.
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
#[derive(Debug, Clone, PartialEq, PartialOrd, sqlx::FromRow)]
pub struct Tombstone {
/// the id of the tombstone
pub id: TombstoneId,

View File

@ -430,6 +430,9 @@ pub trait TombstoneRepo: Send + Sync {
/// list all tombstones for a given table
async fn list_by_table(&mut self, table_id: TableId) -> Result<Vec<Tombstone>>;
/// get tombstones of the given id
async fn get_by_id(&mut self, tombstone_id: TombstoneId) -> Result<Option<Tombstone>>;
/// return all tombstones for the sequencer with a sequence number greater than that
/// passed in. This will be used by the ingester on startup to see what tombstones
/// might have to be applied to data that is read from the write buffer.
@ -439,6 +442,9 @@ pub trait TombstoneRepo: Send + Sync {
sequence_number: SequenceNumber,
) -> Result<Vec<Tombstone>>;
/// Remove given tombstones
async fn remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()>;
/// Return all tombstones that have:
///
/// - the specified sequencer ID and table ID
@ -513,6 +519,18 @@ pub trait ParquetFileRepo: Send + Sync {
/// Return count
async fn count(&mut self) -> Result<i64>;
/// Return count of files of given tableId and sequenceId that
/// overlap with the given min_time and max_time and have sequencer number
/// smaller the given one
async fn count_by_overlaps(
&mut self,
table_id: TableId,
sequencer_id: SequencerId,
min_time: Timestamp,
max_time: Timestamp,
sequence_number: SequenceNumber,
) -> Result<i64>;
}
/// Functions for working with processed tombstone pointers in the catalog
@ -534,6 +552,9 @@ pub trait ProcessedTombstoneRepo: Send + Sync {
/// Return count
async fn count(&mut self) -> Result<i64>;
/// Return count for a given tombstone id
async fn count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result<i64>;
}
/// Gets the namespace schema including all tables and columns.
@ -596,7 +617,7 @@ pub(crate) mod test_helpers {
use ::test_helpers::{assert_contains, tracing::TracingCapture};
use data_types2::ColumnId;
use metric::{Attributes, Metric, U64Histogram};
use std::{sync::Arc, time::Duration};
use std::{ops::Add, sync::Arc, time::Duration};
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_setup(Arc::clone(&catalog)).await;
@ -613,6 +634,7 @@ pub(crate) mod test_helpers {
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
test_processed_tombstones(Arc::clone(&catalog)).await;
test_txn_isolation(Arc::clone(&catalog)).await;
test_txn_drop(Arc::clone(&catalog)).await;
@ -1313,8 +1335,8 @@ pub(crate) mod test_helpers {
other_table.id,
sequencer.id,
SequenceNumber::new(2),
min_time,
max_time,
min_time.add(10),
max_time.add(10),
"bleh",
)
.await
@ -1325,8 +1347,8 @@ pub(crate) mod test_helpers {
table.id,
sequencer.id,
SequenceNumber::new(3),
min_time,
max_time,
min_time.add(10),
max_time.add(10),
"sdf",
)
.await
@ -1341,13 +1363,13 @@ pub(crate) mod test_helpers {
// test list_by_table
let listed = repos.tombstones().list_by_table(table.id).await.unwrap();
assert_eq!(vec![t1, t3], listed);
assert_eq!(vec![t1.clone(), t3.clone()], listed);
let listed = repos
.tombstones()
.list_by_table(other_table.id)
.await
.unwrap();
assert_eq!(vec![t2], listed);
assert_eq!(vec![t2.clone()], listed);
// test list_by_namespace
let namespace2 = repos
@ -1366,8 +1388,8 @@ pub(crate) mod test_helpers {
table2.id,
sequencer.id,
SequenceNumber::new(1),
min_time,
max_time,
min_time.add(10),
max_time.add(10),
"whatevs",
)
.await
@ -1378,8 +1400,8 @@ pub(crate) mod test_helpers {
table2.id,
sequencer.id,
SequenceNumber::new(2),
min_time,
max_time,
min_time.add(10),
max_time.add(10),
"foo",
)
.await
@ -1389,13 +1411,40 @@ pub(crate) mod test_helpers {
.list_by_namespace(namespace2.id)
.await
.unwrap();
assert_eq!(vec![t4, t5], listed);
assert_eq!(vec![t4.clone(), t5.clone()], listed);
let listed = repos
.tombstones()
.list_by_namespace(NamespaceId::new(i32::MAX))
.await
.unwrap();
assert!(listed.is_empty());
// test get_by_id
let ts = repos.tombstones().get_by_id(t1.id).await.unwrap().unwrap();
assert_eq!(ts, t1.clone());
let ts = repos.tombstones().get_by_id(t2.id).await.unwrap().unwrap();
assert_eq!(ts, t2.clone());
let ts = repos
.tombstones()
.get_by_id(TombstoneId::new(
t1.id.get() + t2.id.get() + t3.id.get() + t4.id.get() + t5.id.get(),
)) // not exist id
.await
.unwrap();
assert!(ts.is_none());
// test remove
repos.tombstones().remove(&[t1.id, t3.id]).await.unwrap();
let ts = repos.tombstones().get_by_id(t1.id).await.unwrap();
assert!(ts.is_none()); // no longer there
let ts = repos.tombstones().get_by_id(t3.id).await.unwrap();
assert!(ts.is_none()); // no longer there
let ts = repos.tombstones().get_by_id(t2.id).await.unwrap().unwrap();
assert_eq!(ts, t2.clone()); // still there
let ts = repos.tombstones().get_by_id(t4.id).await.unwrap().unwrap();
assert_eq!(ts, t4.clone()); // still there
let ts = repos.tombstones().get_by_id(t5.id).await.unwrap().unwrap();
assert_eq!(ts, t5.clone()); // still there
}
async fn test_tombstones_by_parquet_file(catalog: Arc<dyn Catalog>) {
@ -1651,9 +1700,6 @@ pub(crate) mod test_helpers {
.await
.unwrap();
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
table_id: partition.table_id,
@ -1661,8 +1707,8 @@ pub(crate) mod test_helpers {
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(140),
min_time,
max_time,
min_time: Timestamp::new(1),
max_time: Timestamp::new(10),
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
@ -1688,6 +1734,8 @@ pub(crate) mod test_helpers {
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(45),
max_sequence_number: SequenceNumber::new(200),
min_time: Timestamp::new(50),
max_time: Timestamp::new(60),
..parquet_file_params.clone()
};
let other_file = repos.parquet_files().create(other_params).await.unwrap();
@ -1767,6 +1815,10 @@ pub(crate) mod test_helpers {
table_id: partition2.table_id,
partition_id: partition2.id,
object_store_id: Uuid::new_v4(),
min_time: Timestamp::new(1),
max_time: Timestamp::new(10),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(10),
..parquet_file_params
};
let f1 = repos
@ -1777,9 +1829,17 @@ pub(crate) mod test_helpers {
let f2_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
min_time: Timestamp::new(50),
max_time: Timestamp::new(60),
min_sequence_number: SequenceNumber::new(11),
max_sequence_number: SequenceNumber::new(11),
..f1_params
};
let f2 = repos.parquet_files().create(f2_params).await.unwrap();
let f2 = repos
.parquet_files()
.create(f2_params.clone())
.await
.unwrap();
let files = repos
.parquet_files()
.list_by_namespace_not_to_delete(namespace2.id)
@ -1787,13 +1847,29 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(vec![f1.clone(), f2.clone()], files);
let f3_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
min_time: Timestamp::new(50),
max_time: Timestamp::new(60),
min_sequence_number: SequenceNumber::new(12),
max_sequence_number: SequenceNumber::new(12),
..f2_params
};
let f3 = repos.parquet_files().create(f3_params).await.unwrap();
let files = repos
.parquet_files()
.list_by_namespace_not_to_delete(namespace2.id)
.await
.unwrap();
assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files);
repos.parquet_files().flag_for_delete(f2.id).await.unwrap();
let files = repos
.parquet_files()
.list_by_namespace_not_to_delete(namespace2.id)
.await
.unwrap();
assert_eq!(vec![f1.clone()], files);
assert_eq!(vec![f1.clone(), f3.clone()], files);
let files = repos
.parquet_files()
@ -1801,6 +1877,87 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert!(files.is_empty());
// test count_by_overlaps
// not time overlap
let count = repos
.parquet_files()
.count_by_overlaps(
partition2.table_id,
sequencer.id,
Timestamp::new(11),
Timestamp::new(20),
SequenceNumber::new(20),
)
.await
.unwrap();
assert_eq!(count, 0);
// overlaps with f1
let count = repos
.parquet_files()
.count_by_overlaps(
partition2.table_id,
sequencer.id,
Timestamp::new(1),
Timestamp::new(10),
SequenceNumber::new(20),
)
.await
.unwrap();
assert_eq!(count, 1);
// overlaps with f1 and f3
// f2 is deleted and should not be counted
let count = repos
.parquet_files()
.count_by_overlaps(
partition2.table_id,
sequencer.id,
Timestamp::new(7),
Timestamp::new(55),
SequenceNumber::new(20),
)
.await
.unwrap();
assert_eq!(count, 2);
// overlaps with f1 and f3 but on different time range
let count = repos
.parquet_files()
.count_by_overlaps(
partition2.table_id,
sequencer.id,
Timestamp::new(1),
Timestamp::new(100),
SequenceNumber::new(20),
)
.await
.unwrap();
assert_eq!(count, 2);
// overlaps with f3
let count = repos
.parquet_files()
.count_by_overlaps(
partition2.table_id,
sequencer.id,
Timestamp::new(15),
Timestamp::new(100),
SequenceNumber::new(20),
)
.await
.unwrap();
assert_eq!(count, 1);
// no overlaps due to smaller sequnce number
let count = repos
.parquet_files()
.count_by_overlaps(
partition2.table_id,
sequencer.id,
Timestamp::new(15),
Timestamp::new(100),
SequenceNumber::new(2),
)
.await
.unwrap();
assert_eq!(count, 0);
}
async fn test_parquet_file_compaction_level_0(catalog: Arc<dyn Catalog>) {
@ -2254,6 +2411,184 @@ pub(crate) mod test_helpers {
);
}
async fn test_processed_tombstones(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create(
"namespace_processed_tombstone_test",
"inf",
kafka.id,
pool.id,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = repos
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition = repos
.partitions()
.create_or_get("one", sequencer.id, table.id)
.await
.unwrap();
// parquet files
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(1),
max_sequence_number: SequenceNumber::new(1),
min_time: Timestamp::new(100),
max_time: Timestamp::new(250),
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
};
let p1 = repos
.parquet_files()
.create(parquet_file_params.clone())
.await
.unwrap();
let parquet_file_params_2 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(2),
max_sequence_number: SequenceNumber::new(3),
min_time: Timestamp::new(200),
max_time: Timestamp::new(300),
..parquet_file_params
};
let p2 = repos
.parquet_files()
.create(parquet_file_params_2.clone())
.await
.unwrap();
// tombstones
let t1 = repos
.tombstones()
.create_or_get(
table.id,
sequencer.id,
SequenceNumber::new(10),
Timestamp::new(1),
Timestamp::new(10),
"whatevs",
)
.await
.unwrap();
let t2 = repos
.tombstones()
.create_or_get(
table.id,
sequencer.id,
SequenceNumber::new(11),
Timestamp::new(100),
Timestamp::new(110),
"whatevs",
)
.await
.unwrap();
let t3 = repos
.tombstones()
.create_or_get(
table.id,
sequencer.id,
SequenceNumber::new(12),
Timestamp::new(200),
Timestamp::new(210),
"whatevs",
)
.await
.unwrap();
// processed tombstones
// p1, t2
let _pt1 = repos
.processed_tombstones()
.create(p1.id, t2.id)
.await
.unwrap();
// p1, t3
let _pt2 = repos
.processed_tombstones()
.create(p1.id, t3.id)
.await
.unwrap();
// p2, t3
let _pt3 = repos
.processed_tombstones()
.create(p2.id, t3.id)
.await
.unwrap();
// test exist
let exist = repos
.processed_tombstones()
.exist(p1.id, t1.id)
.await
.unwrap();
assert!(!exist);
let exist = repos
.processed_tombstones()
.exist(p1.id, t2.id)
.await
.unwrap();
assert!(exist);
// test count
let count = repos.processed_tombstones().count().await.unwrap();
assert_eq!(count, 3);
// test count_by_tombstone_id
let count = repos
.processed_tombstones()
.count_by_tombstone_id(t1.id)
.await
.unwrap();
assert_eq!(count, 0);
let count = repos
.processed_tombstones()
.count_by_tombstone_id(t2.id)
.await
.unwrap();
assert_eq!(count, 1);
let count = repos
.processed_tombstones()
.count_by_tombstone_id(t3.id)
.await
.unwrap();
assert_eq!(count, 2);
// test remove
repos.tombstones().remove(&[t3.id]).await.unwrap();
// should still be 1 because t2 was not deleted
let count = repos
.processed_tombstones()
.count_by_tombstone_id(t2.id)
.await
.unwrap();
assert_eq!(count, 1);
// should be 0 because t3 was deleted
let count = repos
.processed_tombstones()
.count_by_tombstone_id(t3.id)
.await
.unwrap();
assert_eq!(count, 0);
}
async fn test_txn_isolation(catalog: Arc<dyn Catalog>) {
let barrier = Arc::new(tokio::sync::Barrier::new(2));

View File

@ -841,6 +841,12 @@ impl TombstoneRepo for MemTxn {
Ok(tombstones)
}
async fn get_by_id(&mut self, id: TombstoneId) -> Result<Option<Tombstone>> {
let stage = self.stage();
Ok(stage.tombstones.iter().find(|t| t.id == id).cloned())
}
async fn list_tombstones_by_sequencer_greater_than(
&mut self,
sequencer_id: SequencerId,
@ -857,6 +863,22 @@ impl TombstoneRepo for MemTxn {
Ok(tombstones)
}
async fn remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()> {
let stage = self.stage();
// remove the processed tombstones first
stage
.processed_tombstones
.retain(|pt| !tombstone_ids.iter().any(|id| *id == pt.tombstone_id));
// remove the tombstones
stage
.tombstones
.retain(|ts| !tombstone_ids.iter().any(|id| *id == ts.id));
Ok(())
}
async fn list_tombstones_for_time_range(
&mut self,
sequencer_id: SequencerId,
@ -1065,6 +1087,32 @@ impl ParquetFileRepo for MemTxn {
}
Ok(count_i64.unwrap())
}
async fn count_by_overlaps(
&mut self,
table_id: TableId,
sequencer_id: SequencerId,
min_time: Timestamp,
max_time: Timestamp,
sequence_number: SequenceNumber,
) -> Result<i64> {
let stage = self.stage();
let count = stage
.parquet_files
.iter()
.filter(|f| {
f.sequencer_id == sequencer_id
&& f.table_id == table_id
&& f.min_sequence_number < sequence_number
&& f.to_delete.is_none()
&& ((f.min_time <= min_time && f.max_time >= min_time)
|| (f.min_time > min_time && f.min_time <= max_time))
})
.count();
i64::try_from(count).map_err(|_| Error::InvalidValue { value: count })
}
}
#[async_trait]
@ -1134,6 +1182,18 @@ impl ProcessedTombstoneRepo for MemTxn {
}
Ok(count_i64.unwrap())
}
async fn count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result<i64> {
let stage = self.stage();
let count = stage
.processed_tombstones
.iter()
.filter(|p| p.tombstone_id == tombstone_id)
.count();
i64::try_from(count).map_err(|_| Error::InvalidValue { value: count })
}
}
#[cfg(test)]

View File

@ -253,7 +253,9 @@ decorate!(
"tombstone_create_or_get" = create_or_get( &mut self, table_id: TableId, sequencer_id: SequencerId, sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp, predicate: &str) -> Result<Tombstone>;
"tombstone_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Tombstone>>;
"tombstone_list_by_table" = list_by_table(&mut self, table_id: TableId) -> Result<Vec<Tombstone>>;
"tombstone_get_by_id" = get_by_id(&mut self, id: TombstoneId) -> Result<Option<Tombstone>>;
"tombstone_list_tombstones_by_sequencer_greater_than" = list_tombstones_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<Tombstone>>;
"tombstone_remove" = remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()>;
"tombstone_list_tombstones_for_time_range" = list_tombstones_for_time_range(&mut self, sequencer_id: SequencerId, table_id: TableId, sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<Tombstone>>;
]
);
@ -271,6 +273,7 @@ decorate!(
"parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result<Vec<ParquetFileId>>;
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
"parquet_count" = count(&mut self) -> Result<i64>;
"parquet_count_by_overlaps" = count_by_overlaps(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
]
);
@ -280,5 +283,6 @@ decorate!(
"processed_tombstone_create" = create(&mut self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> Result<ProcessedTombstone>;
"processed_tombstone_exist" = exist(&mut self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> Result<bool>;
"processed_tombstone_count" = count(&mut self) -> Result<i64>;
"processed_tombstone_count_by_tombstone_id" = count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result<i64>;
]
);

View File

@ -1263,6 +1263,27 @@ ORDER BY id;
.map_err(|e| Error::SqlxError { source: e })
}
async fn get_by_id(&mut self, id: TombstoneId) -> Result<Option<Tombstone>> {
let rec = sqlx::query_as::<_, Tombstone>(
r#"
SELECT *
FROM tombstone
WHERE id = $1;
"#,
)
.bind(&id) // $1
.fetch_one(&mut self.inner)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let tombstone = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(tombstone))
}
async fn list_tombstones_by_sequencer_greater_than(
&mut self,
sequencer_id: SequencerId,
@ -1284,6 +1305,38 @@ ORDER BY id;
.map_err(|e| Error::SqlxError { source: e })
}
async fn remove(&mut self, tombstone_ids: &[TombstoneId]) -> Result<()> {
let ids: Vec<_> = tombstone_ids.iter().map(|t| t.get()).collect();
// Remove processed tombstones first
sqlx::query(
r#"
DELETE
FROM processed_tombstone
WHERE tombstone_id = ANY($1);
"#,
)
.bind(&ids[..]) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
// Remove tombstones
sqlx::query(
r#"
DELETE
FROM tombstone
WHERE id = ANY($1);
"#,
)
.bind(&ids[..]) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
}
async fn list_tombstones_for_time_range(
&mut self,
sequencer_id: SequencerId,
@ -1469,8 +1522,8 @@ WHERE parquet_file.sequencer_id = $1
AND parquet_file.partition_id = $3
AND parquet_file.compaction_level = 1
AND parquet_file.to_delete IS NULL
AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $4)
OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $5));
AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4)
OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5));
"#,
)
.bind(&table_partition.sequencer_id) // $1
@ -1528,6 +1581,38 @@ RETURNING id;
Ok(read_result.count)
}
async fn count_by_overlaps(
&mut self,
table_id: TableId,
sequencer_id: SequencerId,
min_time: Timestamp,
max_time: Timestamp,
sequence_number: SequenceNumber,
) -> Result<i64> {
let read_result = sqlx::query_as::<_, Count>(
r#"
SELECT count(*) as count
FROM parquet_file
WHERE table_id = $1
AND sequencer_id = $2
AND min_sequence_number < $3
AND parquet_file.to_delete IS NULL
AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4)
OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5));
"#,
)
.bind(&table_id) // $1
.bind(&sequencer_id) // $2
.bind(sequence_number) // $3
.bind(min_time) // $4
.bind(max_time) // $5
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.count)
}
}
#[async_trait]
@ -1593,6 +1678,18 @@ WHERE parquet_file_id = $1
Ok(read_result.count)
}
async fn count_by_tombstone_id(&mut self, tombstone_id: TombstoneId) -> Result<i64> {
let read_result = sqlx::query_as::<_, Count>(
r#"SELECT count(*) as count FROM processed_tombstone WHERE tombstone_id = $1;"#,
)
.bind(&tombstone_id) // $1
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.count)
}
}
/// The error code returned by Postgres for a unique constraint violation.

View File

@ -7,7 +7,8 @@ use arrow::{
use bytes::Bytes;
use data_types2::{
ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams, Partition,
QueryPool, SequenceNumber, Sequencer, Table, Timestamp, Tombstone,
QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, Tombstone,
TombstoneId,
};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
@ -109,6 +110,82 @@ impl TestCatalog {
namespace,
})
}
/// return tombstones of a given table
pub async fn list_tombstones_by_table(self: &Arc<Self>, table_id: TableId) -> Vec<Tombstone> {
self.catalog
.repositories()
.await
.tombstones()
.list_by_table(table_id)
.await
.unwrap()
}
/// return number of tombstones of a given table
pub async fn count_tombstones_for_table(self: &Arc<Self>, table_id: TableId) -> usize {
let ts = self
.catalog
.repositories()
.await
.tombstones()
.list_by_table(table_id)
.await
.unwrap();
ts.len()
}
/// return number of processed tombstones of a tombstones
pub async fn count_processed_tombstones(self: &Arc<Self>, tombstone_id: TombstoneId) -> i64 {
self.catalog
.repositories()
.await
.processed_tombstones()
.count_by_tombstone_id(tombstone_id)
.await
.unwrap()
}
/// List level 0 files
pub async fn list_level_0_files(
self: &Arc<Self>,
sequencer_id: SequencerId,
) -> Vec<ParquetFile> {
self.catalog
.repositories()
.await
.parquet_files()
.level_0(sequencer_id)
.await
.unwrap()
}
/// Count level 0 files
pub async fn count_level_0_files(self: &Arc<Self>, sequencer_id: SequencerId) -> usize {
let level_0 = self
.catalog
.repositories()
.await
.parquet_files()
.level_0(sequencer_id)
.await
.unwrap();
level_0.len()
}
/// List all non-deleted files
pub async fn list_by_table_not_to_delete(
self: &Arc<Self>,
table_id: TableId,
) -> Vec<ParquetFile> {
self.catalog
.repositories()
.await
.parquet_files()
.list_by_table_not_to_delete(table_id)
.await
.unwrap()
}
}
/// A test namespace
@ -292,6 +369,22 @@ impl TestPartition {
max_seq: i64,
min_time: i64,
max_time: i64,
) -> Arc<TestParquetFile> {
self.create_parquet_file_with_min_max_and_creation_time(
lp, min_seq, max_seq, min_time, max_time, 1,
)
.await
}
/// Create a parquet for the partition
pub async fn create_parquet_file_with_min_max_and_creation_time(
self: &Arc<Self>,
lp: &str,
min_seq: i64,
max_seq: i64,
min_time: i64,
max_time: i64,
creation_time: i64,
) -> Arc<TestParquetFile> {
let mut repos = self.catalog.catalog.repositories().await;
@ -335,7 +428,7 @@ impl TestPartition {
file_size_bytes: file_size_bytes as i64,
parquet_metadata: parquet_metadata_bin,
row_count: row_count as i64,
created_at: Timestamp::new(1),
created_at: Timestamp::new(creation_time),
};
let parquet_file = repos
.parquet_files()

View File

@ -1,15 +1,20 @@
//! Querier Chunk
use crate::cache::CatalogCache;
use arrow::record_batch::RecordBatch;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, SequenceNumber,
SequencerId,
};
use futures::StreamExt;
use iox_catalog::interface::Catalog;
use iox_object_store::IoxObjectStore;
use object_store::DynObjectStore;
use parquet_file::chunk::{
new_parquet_chunk, ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk,
};
use schema::sort::SortKey;
use query::{exec::IOxSessionContext, QueryChunk};
use schema::{selection::Selection, sort::SortKey};
use std::sync::Arc;
use time::TimeProvider;
use uuid::Uuid;
@ -75,7 +80,9 @@ impl ChunkMeta {
pub enum ChunkStorage {
/// Data is currently available via parquet file within the object store.
Parquet {
/// ID of the parquet file if the chunk
parquet_file_id: ParquetFileId,
/// Chunk of the parquet file
chunk: Arc<ParquetChunk>,
},
}
@ -285,15 +292,30 @@ impl ParquetChunkAdapter {
}
}
/// collect data for the given chunk
pub async fn collect_read_filter(chunk: &QuerierChunk) -> Vec<RecordBatch> {
chunk
.read_filter(
IOxSessionContext::default(),
&Default::default(),
Selection::All,
)
.unwrap()
.collect::<Vec<_>>()
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use arrow::{datatypes::DataType, record_batch::RecordBatch};
use arrow::datatypes::DataType;
use arrow_util::assert_batches_eq;
use futures::StreamExt;
use iox_tests::util::TestCatalog;
use query::{exec::IOxSessionContext, QueryChunk, QueryChunkMeta};
use schema::{builder::SchemaBuilder, selection::Selection, sort::SortKeyBuilder};
use query::QueryChunkMeta;
use schema::{builder::SchemaBuilder, sort::SortKeyBuilder};
#[tokio::test]
async fn test_create_record() {
@ -370,19 +392,4 @@ mod tests {
&batches
);
}
async fn collect_read_filter(chunk: &QuerierChunk) -> Vec<RecordBatch> {
chunk
.read_filter(
IOxSessionContext::default(),
&Default::default(),
Selection::All,
)
.unwrap()
.collect::<Vec<_>>()
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
}

View File

@ -10,9 +10,9 @@
)]
#![allow(dead_code)]
mod cache;
pub mod cache;
mod cache_system;
mod chunk;
pub mod chunk;
mod database;
/// Flight client to the ingester to request in-memory data.
mod flight;