fix: Remove tombstones from querier; they're unused

pull/24376/head
Carol (Nichols || Goulding) 2023-03-01 13:53:39 -05:00
parent b7096bdad4
commit 5f2d82fbc6
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
14 changed files with 30 additions and 1426 deletions

View File

@ -10,18 +10,15 @@ use tokio::runtime::Handle;
use self::{
namespace::NamespaceCache, object_store::ObjectStoreCache, parquet_file::ParquetFileCache,
partition::PartitionCache, processed_tombstones::ProcessedTombstonesCache,
projected_schema::ProjectedSchemaCache, ram::RamSize, tombstones::TombstoneCache,
partition::PartitionCache, projected_schema::ProjectedSchemaCache, ram::RamSize,
};
pub mod namespace;
pub mod object_store;
pub mod parquet_file;
pub mod partition;
pub mod processed_tombstones;
pub mod projected_schema;
mod ram;
pub mod tombstones;
#[cfg(test)]
mod test_util;
@ -38,15 +35,9 @@ pub struct CatalogCache {
/// Namespace cache.
namespace_cache: NamespaceCache,
/// Processed tombstone cache.
processed_tombstones_cache: ProcessedTombstonesCache,
/// Parquet file cache
parquet_file_cache: ParquetFileCache,
/// tombstone cache
tombstone_cache: TombstoneCache,
/// Projected schema cache.
projected_schema_cache: ProjectedSchemaCache,
@ -146,14 +137,6 @@ impl CatalogCache {
handle,
testing,
);
let processed_tombstones_cache = ProcessedTombstonesCache::new(
Arc::clone(&catalog),
backoff_config.clone(),
Arc::clone(&time_provider),
&metric_registry,
Arc::clone(&ram_pool_metadata),
testing,
);
let parquet_file_cache = ParquetFileCache::new(
Arc::clone(&catalog),
backoff_config.clone(),
@ -162,14 +145,6 @@ impl CatalogCache {
Arc::clone(&ram_pool_metadata),
testing,
);
let tombstone_cache = TombstoneCache::new(
Arc::clone(&catalog),
backoff_config.clone(),
Arc::clone(&time_provider),
&metric_registry,
Arc::clone(&ram_pool_metadata),
testing,
);
let projected_schema_cache = ProjectedSchemaCache::new(
Arc::clone(&time_provider),
&metric_registry,
@ -189,9 +164,7 @@ impl CatalogCache {
catalog,
partition_cache,
namespace_cache,
processed_tombstones_cache,
parquet_file_cache,
tombstone_cache,
projected_schema_cache,
object_store_cache,
metric_registry,
@ -224,21 +197,11 @@ impl CatalogCache {
&self.partition_cache
}
/// Processed tombstone cache.
pub(crate) fn processed_tombstones(&self) -> &ProcessedTombstonesCache {
&self.processed_tombstones_cache
}
/// Parquet file cache.
pub(crate) fn parquet_file(&self) -> &ParquetFileCache {
&self.parquet_file_cache
}
/// Tombstone cache.
pub(crate) fn tombstone(&self) -> &TombstoneCache {
&self.tombstone_cache
}
/// Projected schema cache.
pub(crate) fn projected_schema(&self) -> &ProjectedSchemaCache {
&self.projected_schema_cache

View File

@ -1,277 +0,0 @@
//! Processed tombstone cache.
use backoff::{Backoff, BackoffConfig};
use cache_system::{
backend::policy::{
lru::{LruPolicy, ResourcePool},
ttl::{TtlPolicy, TtlProvider},
PolicyBackend,
},
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
loader::{metrics::MetricsLoader, FunctionLoader},
resource_consumption::FunctionEstimator,
};
use data_types::{ParquetFileId, TombstoneId};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use std::{mem::size_of_val, sync::Arc, time::Duration};
use trace::span::Span;
use super::ram::RamSize;
/// Duration to keep "tombstone is NOT processed yet".
///
/// Marking tombstones as processed is a mere optimization, so we can keep this cache entry for a
/// while.
pub const TTL_NOT_PROCESSED: Duration = Duration::from_secs(100);
const CACHE_ID: &str = "processed_tombstones";
type CacheT = Box<
dyn Cache<
K = (ParquetFileId, TombstoneId),
V = bool,
GetExtra = ((), Option<Span>),
PeekExtra = ((), Option<Span>),
>,
>;
/// Cache for processed tombstones.
#[derive(Debug)]
pub struct ProcessedTombstonesCache {
cache: CacheT,
}
impl ProcessedTombstonesCache {
/// Create new empty cache.
pub fn new(
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool,
) -> Self {
let loader = FunctionLoader::new(move |(parquet_file_id, tombstone_id), _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
Backoff::new(&backoff_config)
.retry_all_errors("processed tombstone exists", || async {
catalog
.repositories()
.await
.processed_tombstones()
.exist(parquet_file_id, tombstone_id)
.await
})
.await
.expect("retry forever")
}
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,
Arc::clone(&time_provider),
metric_registry,
testing,
));
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider));
backend.add_policy(TtlPolicy::new(
Arc::new(KeepExistsForever {}),
CACHE_ID,
metric_registry,
));
backend.add_policy(LruPolicy::new(
ram_pool,
CACHE_ID,
Arc::new(FunctionEstimator::new(|k, v| {
RamSize(size_of_val(k) + size_of_val(v))
})),
));
let cache = CacheDriver::new(loader, backend);
let cache = Box::new(CacheWithMetrics::new(
cache,
CACHE_ID,
time_provider,
metric_registry,
));
Self { cache }
}
/// Check if the specified tombstone is mark as "processed" for the given parquet file.
pub async fn exists(
&self,
parquet_file_id: ParquetFileId,
tombstone_id: TombstoneId,
span: Option<Span>,
) -> bool {
self.cache
.get((parquet_file_id, tombstone_id), ((), span))
.await
}
}
#[derive(Debug)]
struct KeepExistsForever;
impl TtlProvider for KeepExistsForever {
type K = (ParquetFileId, TombstoneId);
type V = bool;
fn expires_in(&self, _k: &Self::K, v: &Self::V) -> Option<Duration> {
if *v {
// keep forever
None
} else {
// marking tombstones as processed is a mere optimization, so we can keep this cache entry for a while
Some(TTL_NOT_PROCESSED)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
use data_types::ColumnType;
use iox_tests::{TestCatalog, TestParquetFileBuilder};
const TABLE_LINE_PROTOCOL: &str = "table foo=1 11";
#[tokio::test]
async fn test() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table").await;
table.create_column("foo", ColumnType::F64).await;
table.create_column("time", ColumnType::Time).await;
let shard = ns.create_shard(1).await;
let partition = table.with_shard(&shard).create_partition("k").await;
let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE_LINE_PROTOCOL);
let file1 = partition.create_parquet_file(builder.clone()).await;
let file2 = partition.create_parquet_file(builder).await;
let ts1 = table
.with_shard(&shard)
.create_tombstone(1, 1, 10, "foo=1")
.await;
let ts2 = table
.with_shard(&shard)
.create_tombstone(2, 1, 10, "foo=1")
.await;
ts1.mark_processed(&file1).await;
let cache = ProcessedTombstonesCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
true,
);
assert!(
cache
.exists(file1.parquet_file.id, ts1.tombstone.id, None)
.await
);
assert!(
!cache
.exists(file1.parquet_file.id, ts2.tombstone.id, None)
.await
);
assert!(
!cache
.exists(file2.parquet_file.id, ts1.tombstone.id, None)
.await
);
assert!(
!cache
.exists(file2.parquet_file.id, ts2.tombstone.id, None)
.await
);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 4);
ts2.mark_processed(&file2).await;
// values are cached for a while
assert!(TTL_NOT_PROCESSED > Duration::from_millis(1));
catalog
.mock_time_provider()
.inc(TTL_NOT_PROCESSED - Duration::from_millis(1));
assert!(
!cache
.exists(file2.parquet_file.id, ts2.tombstone.id, None)
.await
);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 4);
catalog.mock_time_provider().inc(Duration::from_millis(1));
assert!(
cache
.exists(file2.parquet_file.id, ts2.tombstone.id, None)
.await
);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 5);
// "true" results are cached forever
assert!(
cache
.exists(file1.parquet_file.id, ts1.tombstone.id, None)
.await
);
assert_histogram_metric_count(&catalog.metric_registry, "processed_tombstone_exist", 5);
// cache key has two dimensions
assert!(
cache
.exists(file1.parquet_file.id, ts1.tombstone.id, None)
.await
);
assert!(
!cache
.exists(file1.parquet_file.id, ts2.tombstone.id, None)
.await
);
assert!(
!cache
.exists(file2.parquet_file.id, ts1.tombstone.id, None)
.await
);
assert!(
cache
.exists(file2.parquet_file.id, ts2.tombstone.id, None)
.await
);
ts1.mark_processed(&file2).await;
catalog.mock_time_provider().inc(TTL_NOT_PROCESSED);
assert!(
cache
.exists(file1.parquet_file.id, ts1.tombstone.id, None)
.await
);
assert!(
!cache
.exists(file1.parquet_file.id, ts2.tombstone.id, None)
.await
);
assert!(
cache
.exists(file2.parquet_file.id, ts1.tombstone.id, None)
.await
);
assert!(
cache
.exists(file2.parquet_file.id, ts2.tombstone.id, None)
.await
);
}
}

View File

@ -1,455 +0,0 @@
//! Tombstone cache
use backoff::{Backoff, BackoffConfig};
use cache_system::{
backend::policy::{
lru::{LruPolicy, ResourcePool},
remove_if::{RemoveIfHandle, RemoveIfPolicy},
PolicyBackend,
},
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
loader::{metrics::MetricsLoader, FunctionLoader},
resource_consumption::FunctionEstimator,
};
use data_types::{SequenceNumber, TableId, Tombstone};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use snafu::{ResultExt, Snafu};
use std::{mem, sync::Arc};
use trace::span::Span;
use super::ram::RamSize;
const CACHE_ID: &str = "tombstone";
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("CatalogError refreshing tombstone cache: {}", source))]
Catalog {
source: iox_catalog::interface::Error,
},
}
/// Holds decoded catalog information about a parquet file
#[derive(Debug, Clone)]
pub struct CachedTombstones {
/// Tombstones that were cached in the catalog
pub tombstones: Arc<Vec<Arc<Tombstone>>>,
}
impl CachedTombstones {
fn new(tombstones: Vec<Tombstone>) -> Self {
let tombstones: Vec<_> = tombstones.into_iter().map(Arc::new).collect();
Self {
tombstones: Arc::new(tombstones),
}
}
fn size(&self) -> usize {
assert_eq!(self.tombstones.len(), self.tombstones.capacity());
mem::size_of_val(self) +
// size of Vec
mem::size_of_val(&self.tombstones) +
// Size of Arcs in Vec
(self.tombstones.capacity() * mem::size_of::<Arc<Tombstone>>()) +
self.tombstones.iter().map(|t| t.size()).sum::<usize>()
}
/// return the underlying Tombestones
pub fn to_vec(&self) -> Vec<Arc<Tombstone>> {
self.tombstones.iter().map(Arc::clone).collect()
}
/// Returns the greatest tombstone sequence number stored in this cache entry
pub(crate) fn max_tombstone_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstones.iter().map(|f| f.sequence_number).max()
}
}
type CacheT = Box<
dyn Cache<
K = TableId,
V = CachedTombstones,
GetExtra = ((), Option<Span>),
PeekExtra = ((), Option<Span>),
>,
>;
/// Cache for tombstones for a particular table
#[derive(Debug)]
pub struct TombstoneCache {
cache: CacheT,
/// Handle that allows clearing entries for existing cache entries
remove_if_handle: RemoveIfHandle<TableId, CachedTombstones>,
}
impl TombstoneCache {
/// Create new empty cache.
pub fn new(
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool,
) -> Self {
let loader = FunctionLoader::new(move |table_id: TableId, _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
Backoff::new(&backoff_config)
.retry_all_errors("get tombstones", || async {
let cached_tombstone = CachedTombstones::new(
catalog
.repositories()
.await
.tombstones()
.list_by_table(table_id)
.await
.context(CatalogSnafu)?,
);
Ok(cached_tombstone) as std::result::Result<_, Error>
})
.await
.expect("retry forever")
}
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,
Arc::clone(&time_provider),
metric_registry,
testing,
));
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
let (policy_constructor, remove_if_handle) =
RemoveIfPolicy::create_constructor_and_handle(CACHE_ID, metric_registry);
backend.add_policy(policy_constructor);
backend.add_policy(LruPolicy::new(
Arc::clone(&ram_pool),
CACHE_ID,
Arc::new(FunctionEstimator::new(
|k: &TableId, v: &CachedTombstones| {
RamSize(mem::size_of_val(k) + mem::size_of_val(v) + v.size())
},
)),
));
let cache = CacheDriver::new(loader, backend);
let cache = Box::new(CacheWithMetrics::new(
cache,
CACHE_ID,
time_provider,
metric_registry,
));
Self {
cache,
remove_if_handle,
}
}
/// Get list of cached tombstones, by table id
///
/// # Expiration
/// Clear the tombstone cache if it doesn't contain any tombstones
/// that have the specified `max_tombstone_sequence_number`.
///
/// If `None` is passed, returns false and does not clear the cache.
///
/// Returns true if the cache was cleared (it will be refreshed on
/// the next call to get).
///
/// This API is designed to be called with a response from the
/// ingster so there is a single place were the invalidation logic
/// is handled. An `Option` is accepted because the ingester may
/// or may or may not have a `max_tombstone_sequence_number`.
///
/// If a `max_tombstone_sequence_number` is supplied that is not in
/// our cache, it means the ingester has written new data to the
/// catalog and the cache is out of date.
pub async fn get(
&self,
table_id: TableId,
max_tombstone_sequence_number: Option<SequenceNumber>,
span: Option<Span>,
) -> CachedTombstones {
self.remove_if_handle
.remove_if_and_get(
&self.cache,
table_id,
|cached_file| {
if let Some(max_tombstone_sequence_number) = max_tombstone_sequence_number {
let max_cached = cached_file.max_tombstone_sequence_number();
if let Some(max_cached) = max_cached {
max_cached < max_tombstone_sequence_number
} else {
// a max sequence was provided but there were no
// files in the cache. Means we need to refresh
true
}
} else {
false
}
},
((), span),
)
.await
}
/// Mark the entry for table_id as expired / needs a refresh
#[cfg(test)]
pub fn expire(&self, table_id: TableId) {
self.remove_if_handle.remove_if(&table_id, |_| true);
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use data_types::TombstoneId;
use iox_tests::TestCatalog;
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
const METRIC_NAME: &str = "tombstone_list_by_table";
#[tokio::test]
async fn test_tombstones() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table1 = ns.create_table("table1").await;
let shard1 = ns.create_shard(1).await;
let table_and_shard = table1.with_shard(&shard1);
let table_id = table1.table.id;
let tombstone1 = table_and_shard.create_tombstone(7, 1, 100, "foo=1").await;
let cache = make_cache(&catalog);
let cached_tombstones = cache.get(table_id, None, None).await.to_vec();
assert_eq!(cached_tombstones.len(), 1);
assert_eq!(cached_tombstones[0].as_ref(), &tombstone1.tombstone);
// validate a second request doens't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.get(table_id, None, None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
}
#[tokio::test]
async fn test_multiple_tables() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table1 = ns.create_table("table1").await;
let shard1 = ns.create_shard(1).await;
let table_and_shard1 = table1.with_shard(&shard1);
let table_id1 = table1.table.id;
let tombstone1 = table_and_shard1.create_tombstone(8, 1, 100, "foo=1").await;
let cache = make_cache(&catalog);
let table2 = ns.create_table("table2").await;
let shard2 = ns.create_shard(2).await;
let table_and_shard2 = table2.with_shard(&shard2);
let table_id2 = table2.table.id;
let tombstone2 = table_and_shard2.create_tombstone(8, 1, 100, "foo=1").await;
let cached_tombstones = cache.get(table_id1, None, None).await.to_vec();
assert_eq!(cached_tombstones.len(), 1);
assert_eq!(cached_tombstones[0].as_ref(), &tombstone1.tombstone);
let cached_tombstones = cache.get(table_id2, None, None).await.to_vec();
assert_eq!(cached_tombstones.len(), 1);
assert_eq!(cached_tombstones[0].as_ref(), &tombstone2.tombstone);
}
#[tokio::test]
async fn test_size() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table1 = ns.create_table("table1").await;
let shard1 = ns.create_shard(1).await;
let table_and_shard = table1.with_shard(&shard1);
let table_id = table1.table.id;
let cache = make_cache(&catalog);
let single_tombstone_size = 101;
let two_tombstone_size = 186;
assert!(single_tombstone_size < two_tombstone_size);
// Create tombstone 1
table_and_shard.create_tombstone(7, 1, 100, "foo=1").await;
let cached_tombstones = cache.get(table_id, None, None).await;
assert_eq!(cached_tombstones.to_vec().len(), 1);
assert_eq!(cached_tombstones.size(), single_tombstone_size);
// add a second tombstone and force the cache to find it
table_and_shard.create_tombstone(8, 1, 100, "foo=1").await;
cache.expire(table_id);
let cached_tombstones = cache.get(table_id, None, None).await;
assert_eq!(cached_tombstones.to_vec().len(), 2);
assert_eq!(cached_tombstones.size(), two_tombstone_size);
}
#[tokio::test]
async fn test_non_existent_table() {
let catalog = TestCatalog::new();
let cache = make_cache(&catalog);
let made_up_table = TableId::new(1337);
let cached_tombstones = cache.get(made_up_table, None, None).await.to_vec();
assert!(cached_tombstones.is_empty());
}
#[tokio::test]
async fn test_max_persisted_sequence_number() {
let catalog = TestCatalog::new();
let sequence_number_1 = SequenceNumber::new(1);
let sequence_number_2 = SequenceNumber::new(2);
let sequence_number_10 = SequenceNumber::new(10);
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table1 = ns.create_table("table1").await;
let shard1 = ns.create_shard(1).await;
let table_and_shard = table1.with_shard(&shard1);
let table_id = table1.table.id;
let cache = make_cache(&catalog);
// Create tombstone 1
let tombstone1 = table_and_shard
.create_tombstone(sequence_number_1.get(), 1, 100, "foo=1")
.await
.tombstone
.id;
let tombstone2 = table_and_shard
.create_tombstone(sequence_number_2.get(), 1, 100, "foo=1")
.await
.tombstone
.id;
assert_ids(
&cache.get(table_id, None, None).await,
&[tombstone1, tombstone2],
);
// simulate request with no sequence number
// should not expire anything
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
assert_ids(
&cache.get(table_id, None, None).await,
&[tombstone1, tombstone2],
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// simulate request with sequence number 2
// should not expire anything
assert_ids(
&cache.get(table_id, Some(sequence_number_2), None).await,
&[tombstone1, tombstone2],
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// add a new tombstone (at sequence 10)
let tombstone10 = table_and_shard
.create_tombstone(sequence_number_10.get(), 1, 100, "foo=1")
.await
.tombstone
.id;
// cache is stale,
assert_ids(
&cache.get(table_id, None, None).await,
&[tombstone1, tombstone2],
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// new request includes sequence 10 and causes a cache refresh
assert_ids(
&cache.get(table_id, Some(sequence_number_10), None).await,
&[tombstone1, tombstone2, tombstone10],
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
#[tokio::test]
async fn test_expore_empty() {
let catalog = TestCatalog::new();
let sequence_number_1 = SequenceNumber::new(1);
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table1 = ns.create_table("table1").await;
let shard1 = ns.create_shard(1).await;
let table_and_shard = table1.with_shard(&shard1);
let table_id = table1.table.id;
let cache = make_cache(&catalog);
// no tombstones for the table, cached
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// second request to should be cached
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// calls to expire if there are no new known tombstones should not still be cached
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Create a tombstone
let tombstone1 = table_and_shard
.create_tombstone(sequence_number_1.get(), 1, 100, "foo=1")
.await
.tombstone
.id;
// cache is stale
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Now call to expire with knowledge of new tombstone, will cause a cache refresh
assert_ids(
&cache.get(table_id, Some(sequence_number_1), None).await,
&[tombstone1],
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
fn make_cache(catalog: &Arc<TestCatalog>) -> TombstoneCache {
TombstoneCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
true,
)
}
/// Assert that the ids in cached_tombestones match what is in `id`
fn assert_ids(cached_tombstone: &CachedTombstones, ids: &[TombstoneId]) {
let cached_ids: HashSet<_> = cached_tombstone.to_vec().iter().map(|t| t.id).collect();
let ids: HashSet<_> = ids.iter().copied().collect();
assert_eq!(cached_ids, ids)
}
}

View File

@ -686,7 +686,6 @@ impl IngesterStreamDecoder {
shard_id,
md.completed_persistence_count,
status.parquet_max_sequence_number.map(SequenceNumber::new),
None,
partition_sort_key,
);
self.current_partition = CurrentPartition::Some(partition);
@ -898,10 +897,6 @@ pub struct IngesterPartition {
/// persisted for this partition
parquet_max_sequence_number: Option<SequenceNumber>,
/// Maximum sequence number of tombstone that the ingester has
/// persisted for this partition
tombstone_max_sequence_number: Option<SequenceNumber>,
/// Partition-wide sort key.
partition_sort_key: Option<Arc<SortKey>>,
@ -918,7 +913,6 @@ impl IngesterPartition {
shard_id: ShardId,
completed_persistence_count: u64,
parquet_max_sequence_number: Option<SequenceNumber>,
tombstone_max_sequence_number: Option<SequenceNumber>,
partition_sort_key: Option<Arc<SortKey>>,
) -> Self {
Self {
@ -927,7 +921,6 @@ impl IngesterPartition {
shard_id,
completed_persistence_count,
parquet_max_sequence_number,
tombstone_max_sequence_number,
partition_sort_key,
chunks: vec![],
}
@ -1015,10 +1008,6 @@ impl IngesterPartition {
self.parquet_max_sequence_number
}
pub(crate) fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstone_max_sequence_number
}
pub(crate) fn chunks(&self) -> &[IngesterChunk] {
&self.chunks
}
@ -1420,7 +1409,6 @@ mod tests {
assert_eq!(p.partition_id.get(), 1);
assert_eq!(p.shard_id.get(), 1);
assert_eq!(p.parquet_max_sequence_number, None);
assert_eq!(p.tombstone_max_sequence_number, None);
assert_eq!(p.chunks.len(), 0);
assert_eq!(p.ingester_uuid.unwrap(), ingester_uuid);
assert_eq!(p.completed_persistence_count, 5);
@ -1635,7 +1623,6 @@ mod tests {
p1.parquet_max_sequence_number,
Some(SequenceNumber::new(11))
);
assert_eq!(p1.tombstone_max_sequence_number, None);
assert_eq!(p1.chunks.len(), 2);
assert_eq!(p1.chunks[0].schema().as_arrow(), schema_1_1);
assert_eq!(p1.chunks[0].batches.len(), 2);
@ -1652,7 +1639,6 @@ mod tests {
p2.parquet_max_sequence_number,
Some(SequenceNumber::new(21))
);
assert_eq!(p2.tombstone_max_sequence_number, None);
assert_eq!(p2.chunks.len(), 1);
assert_eq!(p2.chunks[0].schema().as_arrow(), schema_2_1);
assert_eq!(p2.chunks[0].batches.len(), 1);
@ -1665,7 +1651,6 @@ mod tests {
p3.parquet_max_sequence_number,
Some(SequenceNumber::new(31))
);
assert_eq!(p3.tombstone_max_sequence_number, None);
assert_eq!(p3.chunks.len(), 1);
assert_eq!(p3.chunks[0].schema().as_arrow(), schema_3_1);
assert_eq!(p3.chunks[0].batches.len(), 1);
@ -1776,7 +1761,6 @@ mod tests {
p1.parquet_max_sequence_number,
Some(SequenceNumber::new(11))
);
assert_eq!(p1.tombstone_max_sequence_number, None);
let p2 = &partitions[1];
assert_eq!(p2.ingester_uuid.unwrap(), ingester_uuid1);
@ -1787,7 +1771,6 @@ mod tests {
p2.parquet_max_sequence_number,
Some(SequenceNumber::new(21))
);
assert_eq!(p2.tombstone_max_sequence_number, None);
let p3 = &partitions[2];
assert_eq!(p3.ingester_uuid.unwrap(), ingester_uuid2);
@ -1798,7 +1781,6 @@ mod tests {
p3.parquet_max_sequence_number,
Some(SequenceNumber::new(31))
);
assert_eq!(p3.tombstone_max_sequence_number, None);
}
#[tokio::test]
@ -2059,7 +2041,6 @@ mod tests {
for case in cases {
let parquet_max_sequence_number = None;
let tombstone_max_sequence_number = None;
// Construct a partition and ensure it doesn't error
let ingester_partition = IngesterPartition::new(
Some(ingester_uuid),
@ -2067,7 +2048,6 @@ mod tests {
ShardId::new(1),
0,
parquet_max_sequence_number,
tombstone_max_sequence_number,
None,
)
.try_add_chunk(ChunkId::new(), expected_schema.clone(), vec![case])
@ -2092,14 +2072,12 @@ mod tests {
RecordBatch::try_from_iter(vec![("b", int64_array()), ("time", ts_array())]).unwrap();
let parquet_max_sequence_number = None;
let tombstone_max_sequence_number = None;
let err = IngesterPartition::new(
Some(ingester_uuid),
PartitionId::new(1),
ShardId::new(1),
0,
parquet_max_sequence_number,
tombstone_max_sequence_number,
None,
)
.try_add_chunk(ChunkId::new(), expected_schema, vec![batch])

View File

@ -22,7 +22,6 @@ mod query_log;
mod server;
mod system_tables;
mod table;
mod tombstone;
pub use cache::CatalogCache as QuerierCatalogCache;
pub use database::{Error as QuerierDatabaseError, QuerierDatabase};

View File

@ -21,8 +21,7 @@ mod test_util;
/// # Data Structures & Sync
///
/// Tables and schemas are created when [`QuerierNamespace`] is created because DataFusion does not
/// implement async schema inspection. The actual payload (chunks and tombstones) are only queried
/// on demand.
/// implement async schema inspection. The actual payload (chunks) is only queried on demand.
///
/// Most accesses to the [IOx Catalog](iox_catalog::interface::Catalog) are cached via
/// [`CatalogCache`].

View File

@ -290,11 +290,9 @@ mod tests {
.with_max_time(11);
partition_cpu_b_1.create_parquet_file(builder).await;
// row `host=d perc=52 13` will be removed by the tombstone
let lp = [
"mem,host=c perc=50 11",
"mem,host=c perc=51 12",
"mem,host=d perc=52 13",
"mem,host=d perc=53 14",
]
.join("\n");
@ -318,19 +316,7 @@ mod tests {
.flag_for_delete()
.await;
// will be pruned by the tombstone
let builder = TestParquetFileBuilder::default()
.with_max_l0_created_at(Time::from_timestamp_nanos(9))
.with_line_protocol("mem,host=d perc=55 1")
.with_max_seq(7)
.with_min_time(1)
.with_max_time(1);
partition_mem_c_1.create_parquet_file(builder).await;
table_mem
.with_shard(&shard1)
.create_tombstone(1000, 1, 13, "host=d")
.await;
table_mem.with_shard(&shard1);
let querier_namespace = Arc::new(querier_namespace(&ns).await);
@ -511,9 +497,8 @@ mod tests {
"###
);
// 3 chunks but 1 (with time = 1) got pruned by the tombstone --> 2 chunks left
// The 2 participated chunks in the plan do not overlap -> no deduplication, no sort. Final sort is for order by
// FilterExec is for the tombstone
// The 2 participated chunks in the plan do not overlap -> no deduplication, no sort. Final
// sort is for order by.
insta::assert_yaml_snapshot!(
format_explain(&querier_namespace, "EXPLAIN SELECT * FROM mem ORDER BY host,time").await,
@r###"
@ -524,9 +509,7 @@ mod tests {
- "| logical_plan | Sort: mem.host ASC NULLS LAST, mem.time ASC NULLS LAST |"
- "| | TableScan: mem projection=[host, perc, time] |"
- "| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |"
- "| | CoalesceBatchesExec: target_batch_size=8192 |"
- "| | FilterExec: time@2 < 1 OR time@2 > 13 OR NOT host@0 = CAST(d AS Dictionary(Int32, Utf8)) |"
- "| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=time@2 < 1 OR time@2 > 13 OR NOT host@0 = CAST(d AS Dictionary(Int32, Utf8)), projection=[host, perc, time] |"
- "| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, perc, time] |"
- "| | |"
- "----------"
"###

View File

@ -222,11 +222,9 @@ impl ChunkAdapter {
let order = ChunkOrder::new(parquet_file.max_l0_created_at.get());
let meta = Arc::new(QuerierParquetChunkMeta {
parquet_file_id: parquet_file.id,
chunk_id,
order,
sort_key: Some(sort_key),
shard_id: parquet_file.shard_id,
partition_id: parquet_file.partition_id,
max_sequence_number: parquet_file.max_sequence_number,
compaction_level: parquet_file.compaction_level,

View File

@ -1,8 +1,8 @@
//! Querier Chunks
use data_types::{
ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, ParquetFileId, PartitionId,
SequenceNumber, ShardId, TableSummary,
ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId, SequenceNumber,
TableSummary,
};
use iox_query::util::create_basic_summary;
use parquet_file::chunk::ParquetChunk;
@ -17,9 +17,6 @@ pub use creation::ChunkAdapter;
/// Immutable metadata attached to a [`QuerierParquetChunk`].
#[derive(Debug)]
pub struct QuerierParquetChunkMeta {
/// ID of the Parquet file of the chunk
parquet_file_id: ParquetFileId,
/// The ID of the chunk
chunk_id: ChunkId,
@ -29,9 +26,6 @@ pub struct QuerierParquetChunkMeta {
/// Sort key.
sort_key: Option<SortKey>,
/// Shard that created the data within this chunk.
shard_id: ShardId,
/// Partition ID.
partition_id: PartitionId,
@ -43,11 +37,6 @@ pub struct QuerierParquetChunkMeta {
}
impl QuerierParquetChunkMeta {
/// ID of the Parquet file of the chunk
pub fn parquet_file_id(&self) -> ParquetFileId {
self.parquet_file_id
}
/// Chunk order.
pub fn order(&self) -> ChunkOrder {
self.order
@ -58,11 +47,6 @@ impl QuerierParquetChunkMeta {
self.sort_key.as_ref()
}
/// Shard that created the data within this chunk.
pub fn shard_id(&self) -> ShardId {
self.shard_id
}
/// Partition ID.
pub fn partition_id(&self) -> PartitionId {
self.partition_id

View File

@ -142,8 +142,6 @@ impl QuerierTable {
}
/// Query all chunks within this table.
///
/// This currently contains all parquet files linked to their unprocessed tombstones.
pub async fn chunks(
&self,
predicate: &Predicate,
@ -211,7 +209,7 @@ impl QuerierTable {
// ask ingesters for data, also optimistically fetching catalog
// contents at the same time to pre-warm cache
let (partitions, _parquet_files, _tombstones) = join!(
let (partitions, _parquet_files) = join!(
self.ingester_partitions(
&predicate,
span_recorder.child_span("ingester partitions"),
@ -222,21 +220,11 @@ impl QuerierTable {
None,
span_recorder.child_span("cache GET parquet_file (pre-warm")
),
catalog_cache.tombstone().get(
self.id(),
None,
span_recorder.child_span("cache GET tombstone (pre-warm)")
),
);
// handle errors / cache refresh
let partitions = partitions?;
let max_tombstone_sequence_number = partitions
.iter()
.flat_map(|p| p.tombstone_max_sequence_number())
.max();
// Determine number of persisted parquet files per ingester UUID seen in the ingester query
// responses for cache invalidation. If `persisted_file_counts_by_ingester_uuid` is empty,
// then there are no results from the ingesters.
@ -255,20 +243,16 @@ impl QuerierTable {
);
// Now fetch the actual contents of the catalog we need
// NB: Pass max parquet/tombstone sequence numbers to `get`
// to ensure cache is refreshed if we learned about new files/tombstones.
let (parquet_files, tombstones) = join!(
catalog_cache.parquet_file().get(
// NB: Pass max parquet sequence numbers to `get`
// to ensure cache is refreshed if we learned about new files.
let parquet_files = catalog_cache
.parquet_file()
.get(
self.id(),
Some(persisted_file_counts_by_ingester_uuid),
span_recorder.child_span("cache GET parquet_file"),
),
catalog_cache.tombstone().get(
self.id(),
max_tombstone_sequence_number,
span_recorder.child_span("cache GET tombstone")
)
);
.await;
let columns: HashSet<ColumnId> = parquet_files
.files
@ -294,7 +278,6 @@ impl QuerierTable {
let reconciler = Reconciler::new(
Arc::clone(&self.table_name),
Arc::clone(&self.namespace_name),
Arc::clone(self.chunk_adapter.catalog_cache()),
);
// create parquet files
@ -312,7 +295,6 @@ impl QuerierTable {
let chunks = reconciler
.reconcile(
partitions,
tombstones.to_vec(),
retention_delete_pred,
parquet_files,
span_recorder.child_span("reconcile"),
@ -434,15 +416,6 @@ impl QuerierTable {
.parquet_file()
.expire(self.table_id)
}
/// clear the tombstone cache
#[cfg(test)]
fn clear_tombstone_cache(&self) {
self.chunk_adapter
.catalog_cache()
.tombstone()
.expire(self.table_id)
}
}
// Given metadata from a list of ingester request [`PartitionData`]s, sum the total completed
@ -576,7 +549,6 @@ mod tests {
// As we have now made new parquet files, force a cache refresh
querier_table.inner().clear_parquet_cache();
querier_table.inner().clear_tombstone_cache();
// Invoke chunks that will prune chunks fully outside retention C3
let mut chunks = querier_table.chunks().await.unwrap();
@ -687,26 +659,13 @@ mod tests {
file111.flag_for_delete().await;
let tombstone1 = table1
.with_shard(&shard1)
.create_tombstone(7, 1, 100, "foo=1")
.await;
tombstone1.mark_processed(&file112).await;
let tombstone2 = table1
.with_shard(&shard1)
.create_tombstone(8, 1, 100, "foo=1")
.await;
tombstone2.mark_processed(&file112).await;
// As we have now made new parquet files, force a cache refresh
querier_table.inner().clear_parquet_cache();
querier_table.inner().clear_tombstone_cache();
// now we have some files
// this contains all files except for:
// - file111: marked for delete
// - file221: wrong table
// - file123: filtered by predicate
let pred = Predicate::new().with_range(0, 100);
let mut chunks = querier_table.chunks_with_predicate(&pred).await.unwrap();
chunks.sort_by_key(|c| c.id());
@ -737,20 +696,6 @@ mod tests {
chunks[5].id(),
ChunkId::new_test(file122.parquet_file.id.get() as u128),
);
// check delete predicates
// file112: marked as processed
assert_eq!(chunks[0].delete_predicates().len(), 0);
// file113: has delete predicate
assert_eq!(chunks[1].delete_predicates().len(), 2);
// file114: predicates are directly within the chunk range => assume they are materialized
assert_eq!(chunks[2].delete_predicates().len(), 0);
// file115: came after in sequence
assert_eq!(chunks[3].delete_predicates().len(), 0);
// file121: wrong shard
assert_eq!(chunks[4].delete_predicates().len(), 0);
// file122: wrong shard
assert_eq!(chunks[5].delete_predicates().len(), 0);
}
#[tokio::test]
@ -877,109 +822,6 @@ mod tests {
assert_eq!(chunks.len(), 3);
}
#[tokio::test]
async fn test_tombstone_cache_refresh() {
maybe_start_logging();
let catalog = TestCatalog::new();
// infinite retention
let ns = catalog.create_namespace_with_retention("ns", None).await;
let table = ns.create_table("table1").await;
let shard = ns.create_shard(1).await;
let partition = table.with_shard(&shard).create_partition("k").await;
let schema = make_schema(&table).await;
// Expect 1 chunk with with one delete predicate
let querier_table = TestQuerierTable::new(&catalog, &table).await;
let builder =
IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]);
// parquet file with max sequence number 1
let pf_builder = TestParquetFileBuilder::default()
.with_line_protocol("table1 foo=1 11")
.with_max_seq(1);
partition.create_parquet_file(pf_builder).await;
// tombstone with max sequence number 2
table
.with_shard(&shard)
.create_tombstone(2, 1, 100, "foo=1")
.await;
let max_parquet_sequence_number = Some(SequenceNumber::new(1));
let max_tombstone_sequence_number = Some(SequenceNumber::new(2));
let ingester_partition =
builder.build(max_parquet_sequence_number, max_tombstone_sequence_number);
let querier_table = querier_table.with_ingester_partition(ingester_partition);
let deletes = num_deletes(querier_table.chunks().await.unwrap());
assert_eq!(&deletes, &[1, 0]);
// Now, make a second tombstone with max sequence number 3
table
.with_shard(&shard)
.create_tombstone(3, 1, 100, "foo=1")
.await;
// With the same ingester response, still expect 1 delete
// (because cache is not cleared)
let deletes = num_deletes(querier_table.chunks().await.unwrap());
assert_eq!(&deletes, &[1, 0]);
// update the ingester response to return a new max delete sequence number
let max_tombstone_sequence_number = Some(SequenceNumber::new(3));
let ingester_partition =
builder.build(max_parquet_sequence_number, max_tombstone_sequence_number);
let querier_table = querier_table
.clear_ingester_partitions()
.with_ingester_partition(ingester_partition);
// second tombstone should be found
let deletes = num_deletes(querier_table.chunks().await.unwrap());
assert_eq!(&deletes, &[2, 0]);
}
#[tokio::test]
async fn test_tombstone_cache_refresh_with_retention() {
maybe_start_logging();
let catalog = TestCatalog::new();
// 1-hour retention
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table1").await;
let shard = ns.create_shard(1).await;
let partition = table.with_shard(&shard).create_partition("k").await;
let schema = make_schema(&table).await;
// Expect 1 chunk with with one delete predicate
let querier_table = TestQuerierTable::new(&catalog, &table).await;
let builder =
IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]);
// parquet file with max sequence number 1
let pf_builder = TestParquetFileBuilder::default()
.with_line_protocol("table1 foo=1 11")
.with_max_seq(1);
partition.create_parquet_file(pf_builder).await;
// tombstone with max sequence number 2
table
.with_shard(&shard)
.create_tombstone(2, 1, 100, "foo=1")
.await;
let max_parquet_sequence_number = Some(SequenceNumber::new(1));
let max_tombstone_sequence_number = Some(SequenceNumber::new(2));
let ingester_partition =
builder.build(max_parquet_sequence_number, max_tombstone_sequence_number);
let querier_table = querier_table.with_ingester_partition(ingester_partition);
let deletes = num_deletes(querier_table.chunks().await.unwrap());
// For parquet: There must be two delete predicates: one from the tombstone, one from retention
// For ingester: one delete predicate from retention
assert_eq!(&deletes, &[2, 1]);
}
/// Adds a "foo" column to the table and returns the created schema
async fn make_schema(table: &Arc<TestTable>) -> Schema {
table.create_column("foo", ColumnType::F64).await;
@ -1086,13 +928,4 @@ mod tests {
self.querier_table.chunks(pred, span, projection).await
}
}
/// returns the number of deletes in each chunk
fn num_deletes(mut chunks: Vec<Arc<dyn QueryChunk>>) -> Vec<usize> {
chunks.sort_by_key(|c| c.id());
chunks
.iter()
.map(|chunk| chunk.delete_predicates().len())
.collect()
}
}

View File

@ -2,23 +2,18 @@
mod interface;
use data_types::{DeletePredicate, PartitionId, ShardId, Tombstone, TombstoneId};
use data_types::{DeletePredicate, PartitionId};
use iox_query::QueryChunk;
use observability_deps::tracing::debug;
use schema::sort::SortKey;
use snafu::Snafu;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use trace::span::{Span, SpanRecorder};
use crate::{
cache::CatalogCache, ingester::IngesterChunk, parquet::QuerierParquetChunk,
tombstone::QuerierTombstone, IngesterPartition,
};
use self::interface::{IngesterPartitionInfo, TombstoneInfo};
use crate::{ingester::IngesterChunk, parquet::QuerierParquetChunk, IngesterPartition};
#[derive(Snafu, Debug)]
#[allow(missing_copy_implementations)]
@ -32,28 +27,21 @@ pub enum ReconcileError {
pub struct Reconciler {
table_name: Arc<str>,
namespace_name: Arc<str>,
catalog_cache: Arc<CatalogCache>,
}
impl Reconciler {
pub(crate) fn new(
table_name: Arc<str>,
namespace_name: Arc<str>,
catalog_cache: Arc<CatalogCache>,
) -> Self {
pub(crate) fn new(table_name: Arc<str>, namespace_name: Arc<str>) -> Self {
Self {
table_name,
namespace_name,
catalog_cache,
}
}
/// Reconciles ingester state (ingester_partitions) and catalog state (parquet_files and
/// tombstones), producing a list of chunks to query
/// Reconciles ingester state (ingester_partitions) and catalog state (parquet_files),
/// producing a list of chunks to query
pub(crate) async fn reconcile(
&self,
ingester_partitions: Vec<IngesterPartition>,
tombstones: Vec<Arc<Tombstone>>,
retention_delete_pred: Option<Arc<DeletePredicate>>,
parquet_files: Vec<QuerierParquetChunk>,
span: Option<Span>,
@ -62,7 +50,6 @@ impl Reconciler {
let mut chunks = self
.build_chunks_from_parquet(
&ingester_partitions,
tombstones,
retention_delete_pred.clone(),
parquet_files,
span_recorder.child_span("build_chunks_from_parquet"),
@ -84,46 +71,17 @@ impl Reconciler {
async fn build_chunks_from_parquet(
&self,
ingester_partitions: &[IngesterPartition],
tombstones: Vec<Arc<Tombstone>>,
retention_delete_pred: Option<Arc<DeletePredicate>>,
parquet_files: Vec<QuerierParquetChunk>,
span: Option<Span>,
_span: Option<Span>,
) -> Result<Vec<Box<dyn UpdatableQuerierChunk>>, ReconcileError> {
let span_recorder = SpanRecorder::new(span);
debug!(
namespace=%self.namespace_name(),
table_name=%self.table_name(),
?tombstones,
num_parquet_files=parquet_files.len(),
"Reconciling "
);
let tombstone_exclusion = tombstone_exclude_list(ingester_partitions, &tombstones);
let querier_tombstones: Vec<_> =
tombstones.into_iter().map(QuerierTombstone::from).collect();
// match chunks and tombstones
let mut tombstones_by_shard: HashMap<ShardId, Vec<QuerierTombstone>> = HashMap::new();
for tombstone in querier_tombstones {
tombstones_by_shard
.entry(tombstone.shard_id())
.or_default()
.push(tombstone);
}
debug!(
namespace=%self.namespace_name(),
table_name=%self.table_name(),
n=parquet_files.len(),
parquet_ids=?parquet_files
.iter()
.map(|f| f.meta().parquet_file_id().get())
.collect::<Vec<_>>(),
"Parquet files after filtering"
);
debug!(num_chunks=%parquet_files.len(), "Created chunks from parquet files");
let mut chunks: Vec<Box<dyn UpdatableQuerierChunk>> =
@ -131,70 +89,7 @@ impl Reconciler {
let retention_expr_len = usize::from(retention_delete_pred.is_some());
for chunk in parquet_files.into_iter() {
let tombstones = tombstones_by_shard.get(&chunk.meta().shard_id());
let tombstones_len = if let Some(tombstones) = tombstones {
tombstones.len()
} else {
0
};
let mut delete_predicates = Vec::with_capacity(tombstones_len + retention_expr_len);
if let Some(tombstones) = tombstones {
for tombstone in tombstones {
// check conditions that don't need catalog access first to avoid unnecessary
// catalog load
// Check if tombstone should be excluded based on the ingester response
if tombstone_exclusion
.contains(&(chunk.meta().partition_id(), tombstone.tombstone_id()))
{
continue;
}
// Check if tombstone even applies to the sequence number range within the
// parquet file. There
// are the following cases here:
//
// 1. Tombstone comes before chunk min sequence number:
// There is no way the tombstone can affect the chunk.
// 2. Tombstone comes after chunk max sequence number:
// Tombstone affects whole chunk (it might be marked as processed though,
// we'll check that further down).
// 3. Tombstone is in the min-max sequence number range of the chunk:
// Technically the querier has NO way to determine the rows that are
// affected by the tombstone since we have no row-level sequence numbers.
// Such a file can be created by two sources -- the ingester and the
// compactor. The ingester must have materialized the tombstone while
// creating the parquet file, so the querier can skip it. The compactor also
// materialized the tombstones, so we can skip it as well. In the compactor
// case the tombstone will even be marked as processed.
//
// So the querier only needs to consider the tombstone in case 2.
if tombstone.sequence_number() <= chunk.meta().max_sequence_number() {
continue;
}
// TODO: also consider time ranges
// (https://github.com/influxdata/influxdb_iox/issues/4086)
// check if tombstone is marked as processed
if self
.catalog_cache
.processed_tombstones()
.exists(
chunk.meta().parquet_file_id(),
tombstone.tombstone_id(),
span_recorder.child_span("cache GET exists processed_tombstone"),
)
.await
{
continue;
}
delete_predicates.push(Arc::clone(tombstone.delete_predicate()));
}
}
let mut delete_predicates = Vec::with_capacity(retention_expr_len);
if let Some(retention_delete_pred) = retention_delete_pred.clone() {
delete_predicates.push(retention_delete_pred);
@ -215,8 +110,6 @@ impl Reconciler {
) -> impl Iterator<Item = Box<dyn UpdatableQuerierChunk>> {
// Add ingester chunks to the overall chunk list.
// - filter out chunks that don't have any record batches
// - tombstones don't need to be applied since they were already materialized by the
// ingester
ingester_partitions
.into_iter()
.flat_map(move |c| {
@ -321,169 +214,19 @@ impl UpdatableQuerierChunk for IngesterChunk {
}
}
/// Generates "exclude" filter for tombstones.
///
/// Since tombstones are shard-wide but data persistence is partition-based (which are
/// sub-units of shards), we cannot just remove tombstones entirely but need to decide on a
/// per-partition basis. This function generates a lookup table of partition-tombstone tuples that
/// later need to be EXCLUDED/IGNORED when pairing tombstones with chunks.
fn tombstone_exclude_list<I, T>(
ingester_partitions: &[I],
tombstones: &[T],
) -> HashSet<(PartitionId, TombstoneId)>
where
I: IngesterPartitionInfo,
T: TombstoneInfo,
{
// Build shard-based lookup table.
let mut lookup_table: HashMap<ShardId, Vec<&I>> = HashMap::default();
for partition in ingester_partitions {
lookup_table
.entry(partition.shard_id())
.or_default()
.push(partition);
}
let mut exclude = HashSet::new();
for t in tombstones {
if let Some(partitions) = lookup_table.get(&t.shard_id()) {
for p in partitions {
if let Some(persisted_max) = p.tombstone_max_sequence_number() {
if t.sequence_number() > persisted_max {
// newer than persisted => exclude
exclude.insert((p.partition_id(), t.id()));
} else {
// in persisted range => keep
}
} else {
// partition has no persisted data at all => need to exclude tombstone which is
// too new
exclude.insert((p.partition_id(), t.id()));
}
}
}
}
exclude
}
#[cfg(test)]
mod tests {
use super::{interface::ParquetFileInfo, *};
use data_types::{CompactionLevel, SequenceNumber};
#[test]
fn test_filter_tombstones_empty() {
let actual =
tombstone_exclude_list::<MockIngesterPartitionInfo, MockTombstoneInfo>(&[], &[]);
assert!(actual.is_empty());
}
#[test]
fn test_filter_tombstones_many() {
let ingester_partitions = &[
MockIngesterPartitionInfo {
partition_id: PartitionId::new(1),
shard_id: ShardId::new(1),
parquet_max_sequence_number: None,
tombstone_max_sequence_number: Some(SequenceNumber::new(10)),
},
MockIngesterPartitionInfo {
partition_id: PartitionId::new(2),
shard_id: ShardId::new(1),
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
MockIngesterPartitionInfo {
partition_id: PartitionId::new(3),
shard_id: ShardId::new(1),
parquet_max_sequence_number: None,
tombstone_max_sequence_number: Some(SequenceNumber::new(3)),
},
MockIngesterPartitionInfo {
partition_id: PartitionId::new(4),
shard_id: ShardId::new(2),
parquet_max_sequence_number: None,
tombstone_max_sequence_number: Some(SequenceNumber::new(7)),
},
];
let tombstones = &[
MockTombstoneInfo {
id: TombstoneId::new(1),
shard_id: ShardId::new(1),
sequence_number: SequenceNumber::new(2),
},
MockTombstoneInfo {
id: TombstoneId::new(2),
shard_id: ShardId::new(1),
sequence_number: SequenceNumber::new(3),
},
MockTombstoneInfo {
id: TombstoneId::new(3),
shard_id: ShardId::new(1),
sequence_number: SequenceNumber::new(4),
},
MockTombstoneInfo {
id: TombstoneId::new(4),
shard_id: ShardId::new(1),
sequence_number: SequenceNumber::new(9),
},
MockTombstoneInfo {
id: TombstoneId::new(5),
shard_id: ShardId::new(1),
sequence_number: SequenceNumber::new(10),
},
MockTombstoneInfo {
id: TombstoneId::new(6),
shard_id: ShardId::new(1),
sequence_number: SequenceNumber::new(11),
},
MockTombstoneInfo {
id: TombstoneId::new(7),
shard_id: ShardId::new(2),
sequence_number: SequenceNumber::new(6),
},
MockTombstoneInfo {
id: TombstoneId::new(8),
shard_id: ShardId::new(2),
sequence_number: SequenceNumber::new(7),
},
MockTombstoneInfo {
id: TombstoneId::new(9),
shard_id: ShardId::new(2),
sequence_number: SequenceNumber::new(8),
},
MockTombstoneInfo {
id: TombstoneId::new(10),
shard_id: ShardId::new(3),
sequence_number: SequenceNumber::new(10),
},
];
let actual = tombstone_exclude_list(ingester_partitions, tombstones);
let expected = HashSet::from([
(PartitionId::new(1), TombstoneId::new(6)),
(PartitionId::new(2), TombstoneId::new(1)),
(PartitionId::new(2), TombstoneId::new(2)),
(PartitionId::new(2), TombstoneId::new(3)),
(PartitionId::new(2), TombstoneId::new(4)),
(PartitionId::new(2), TombstoneId::new(5)),
(PartitionId::new(2), TombstoneId::new(6)),
(PartitionId::new(3), TombstoneId::new(3)),
(PartitionId::new(3), TombstoneId::new(4)),
(PartitionId::new(3), TombstoneId::new(5)),
(PartitionId::new(3), TombstoneId::new(6)),
(PartitionId::new(4), TombstoneId::new(9)),
]);
assert_eq!(actual, expected);
}
use super::{
interface::{IngesterPartitionInfo, ParquetFileInfo},
*,
};
use data_types::{CompactionLevel, SequenceNumber, ShardId};
#[derive(Debug)]
struct MockIngesterPartitionInfo {
partition_id: PartitionId,
shard_id: ShardId,
parquet_max_sequence_number: Option<SequenceNumber>,
tombstone_max_sequence_number: Option<SequenceNumber>,
}
impl IngesterPartitionInfo for MockIngesterPartitionInfo {
@ -498,10 +241,6 @@ mod tests {
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.parquet_max_sequence_number
}
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstone_max_sequence_number
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
@ -524,25 +263,4 @@ mod tests {
self.compaction_level
}
}
#[derive(Debug)]
struct MockTombstoneInfo {
id: TombstoneId,
shard_id: ShardId,
sequence_number: SequenceNumber,
}
impl TombstoneInfo for MockTombstoneInfo {
fn id(&self) -> TombstoneId {
self.id
}
fn shard_id(&self) -> ShardId {
self.shard_id
}
fn sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}
}

View File

@ -1,9 +1,7 @@
//! Interface for reconciling Ingester and catalog state
use crate::{ingester::IngesterPartition, parquet::QuerierParquetChunk};
use data_types::{
CompactionLevel, ParquetFile, PartitionId, SequenceNumber, ShardId, Tombstone, TombstoneId,
};
use data_types::{CompactionLevel, ParquetFile, PartitionId, SequenceNumber, ShardId};
use std::{ops::Deref, sync::Arc};
/// Information about an ingester partition.
@ -13,7 +11,6 @@ pub trait IngesterPartitionInfo {
fn partition_id(&self) -> PartitionId;
fn shard_id(&self) -> ShardId;
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber>;
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber>;
}
impl IngesterPartitionInfo for IngesterPartition {
@ -28,10 +25,6 @@ impl IngesterPartitionInfo for IngesterPartition {
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().parquet_max_sequence_number()
}
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().tombstone_max_sequence_number()
}
}
impl<T> IngesterPartitionInfo for Arc<T>
@ -49,10 +42,6 @@ where
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().parquet_max_sequence_number()
}
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().tombstone_max_sequence_number()
}
}
/// Information about a parquet file.
@ -91,40 +80,3 @@ impl ParquetFileInfo for QuerierParquetChunk {
self.meta().compaction_level()
}
}
/// Information about a tombstone.
///
/// This is mostly the same as [`Tombstone`] but allows easier mocking.
pub trait TombstoneInfo {
fn id(&self) -> TombstoneId;
fn shard_id(&self) -> ShardId;
fn sequence_number(&self) -> SequenceNumber;
}
impl TombstoneInfo for Tombstone {
fn id(&self) -> TombstoneId {
self.id
}
fn shard_id(&self) -> ShardId {
self.shard_id
}
fn sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}
impl TombstoneInfo for Arc<Tombstone> {
fn id(&self) -> TombstoneId {
self.id
}
fn shard_id(&self) -> ShardId {
self.shard_id
}
fn sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}

View File

@ -102,16 +102,13 @@ impl IngesterPartitionBuilder {
&self,
parquet_max_sequence_number: Option<SequenceNumber>,
) -> IngesterPartition {
let tombstone_max_sequence_number = None;
self.build(parquet_max_sequence_number, tombstone_max_sequence_number)
self.build(parquet_max_sequence_number)
}
/// Create an ingester partition with the specified field values
pub(crate) fn build(
&self,
parquet_max_sequence_number: Option<SequenceNumber>,
tombstone_max_sequence_number: Option<SequenceNumber>,
) -> IngesterPartition {
let data = self.lp.iter().map(|lp| lp_to_record_batch(lp)).collect();
@ -121,7 +118,6 @@ impl IngesterPartitionBuilder {
self.shard.shard.id,
0,
parquet_max_sequence_number,
tombstone_max_sequence_number,
self.partition_sort_key.clone(),
)
.try_add_chunk(

View File

@ -1,67 +0,0 @@
use data_types::{DeletePredicate, SequenceNumber, ShardId, Tombstone, TombstoneId};
use predicate::delete_predicate::parse_delete_predicate;
use std::sync::Arc;
/// Tombstone as it is handled by the querier.
#[derive(Debug, Clone)]
pub struct QuerierTombstone {
/// Delete predicate associated with this tombstone.
delete_predicate: Arc<DeletePredicate>,
/// Shard that this tombstone affects.
shard_id: ShardId,
/// The sequence number assigned to the tombstone from the shard.
sequence_number: SequenceNumber,
/// Tombstone ID.
tombstone_id: TombstoneId,
}
impl QuerierTombstone {
/// Delete predicate associated with this tombstone.
pub fn delete_predicate(&self) -> &Arc<DeletePredicate> {
&self.delete_predicate
}
/// Shard that this tombstone affects.
pub fn shard_id(&self) -> ShardId {
self.shard_id
}
/// The sequence number assigned to the tombstone from the shard.
pub fn sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
/// Tombstone ID.
pub fn tombstone_id(&self) -> TombstoneId {
self.tombstone_id
}
}
impl From<&Tombstone> for QuerierTombstone {
fn from(tombstone: &Tombstone) -> Self {
let delete_predicate = Arc::new(
parse_delete_predicate(
&tombstone.min_time.get().to_string(),
&tombstone.max_time.get().to_string(),
&tombstone.serialized_predicate,
)
.expect("broken delete predicate"),
);
Self {
delete_predicate,
shard_id: tombstone.shard_id,
sequence_number: tombstone.sequence_number,
tombstone_id: tombstone.id,
}
}
}
impl From<Arc<Tombstone>> for QuerierTombstone {
fn from(tombstone: Arc<Tombstone>) -> Self {
tombstone.as_ref().into()
}
}