Merge pull request #2730 from influxdata/crepererum/in_mem_expr_part5

refactor: delete predicate clean-ups
pull/24376/head
kodiakhq[bot] 2021-10-05 16:30:32 +00:00 committed by GitHub
commit ad60d73657
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 70 deletions

View File

@ -2,7 +2,7 @@
/// predicates are so common and critical to performance of timeseries
/// databases in general, and IOx in particular, that they are handled
/// specially
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Copy, Debug)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Copy, Debug, Hash)]
pub struct TimestampRange {
/// Start defines the inclusive lower bound.
pub start: i64,

View File

@ -25,7 +25,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
HashMap, HashSet,
},
fmt::Debug,
sync::Arc,
@ -860,7 +860,7 @@ impl<'c> TransactionHandle<'c> {
&mut self,
predicate: &DeletePredicate,
chunks: &[ChunkAddrWithoutDatabase],
) -> Result<()> {
) {
self.transaction
.as_mut()
.expect("transaction handle w/o transaction?!")
@ -877,8 +877,6 @@ impl<'c> TransactionHandle<'c> {
.collect(),
},
));
Ok(())
}
}
@ -987,11 +985,25 @@ impl<'c> CheckpointHandle<'c> {
}
fn create_actions_for_delete_predicates(
delete_predicates: Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>,
delete_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>>,
) -> Result<Vec<proto::transaction::Action>, Error> {
// sort by key (= path) for deterministic output
let delete_predicates = {
let mut tmp: Vec<_> = delete_predicates.into_iter().collect();
tmp.sort_by_key(|(predicate, _chunks)| Arc::clone(predicate));
tmp
};
delete_predicates
.into_iter()
.map(|(predicate, chunks)| {
// sort chunks for deterministic output
let chunks = {
let mut tmp: Vec<_> = chunks.into_iter().collect();
tmp.sort();
tmp
};
let action = proto::DeletePredicate {
predicate: Some(predicate::serialize::serialize(&predicate)),
chunks: chunks
@ -1719,12 +1731,12 @@ mod tests {
// create two predicate
let predicate_1 = create_delete_predicate(42);
let chunks_1 = vec![chunk_addrs[0].clone().into()];
t.delete_predicate(&predicate_1, &chunks_1).unwrap();
t.delete_predicate(&predicate_1, &chunks_1);
state.delete_predicate(predicate_1, chunks_1);
let predicate_2 = create_delete_predicate(1337);
let chunks_2 = vec![chunk_addrs[0].clone().into(), chunk_addrs[1].clone().into()];
t.delete_predicate(&predicate_2, &chunks_2).unwrap();
t.delete_predicate(&predicate_2, &chunks_2);
state.delete_predicate(predicate_2, chunks_2);
t.commit().await.unwrap();

View File

@ -1,5 +1,8 @@
//! Abstract interfaces to make different users work with the perserved catalog.
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use data_types::chunk_metadata::{ChunkAddr, ChunkId};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
@ -22,7 +25,7 @@ pub struct CatalogParquetInfo {
}
/// Same as [ChunkAddr] but w/o the database part.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ChunkAddrWithoutDatabase {
pub table_name: Arc<str>,
pub partition_key: Arc<str>,
@ -130,17 +133,15 @@ pub trait CatalogState {
/// they refer.
#[derive(Debug)]
pub struct CheckpointData {
/// List of all Parquet files that are currently (i.e. by the current version) tracked by the
/// catalog.
/// Maps all Parquet file paths that are currently (i.e. by the current version) tracked by the
/// catalog to the associated metadata.
///
/// If a file was once added but later removed it MUST NOT appear in the result.
pub files: HashMap<ParquetFilePath, CatalogParquetInfo>,
/// List of active delete predicates together with their chunks (by table name, partition key, and chunk ID).
/// Maps active delete predicates to their chunks (by table name, partition key, and chunk ID).
///
/// This must only contains chunks that are still present in the catalog. Predicates that do not have any chunks
/// attached should be left out.
///
/// The vector itself must be sorted by [`DeletePredicate`]. The chunks list must also be sorted.
pub delete_predicates: Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>,
pub delete_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>>,
}

View File

@ -1,7 +1,7 @@
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
HashMap, HashSet,
},
fmt::Debug,
sync::Arc,
@ -77,42 +77,34 @@ impl TestCatalogState {
}
/// Return an iterator over all predicates in this catalog.
pub fn delete_predicates(&self) -> Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)> {
let mut predicates: HashMap<usize, (Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)> =
pub fn delete_predicates(
&self,
) -> HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> {
let mut predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> =
Default::default();
for (table_name, table) in &self.tables {
for (partition_key, partition) in &table.partitions {
for (chunk_id, chunk) in &partition.chunks {
for predicate in &chunk.delete_predicates {
let predicate_ref: &DeletePredicate = predicate.as_ref();
let addr = (predicate_ref as *const DeletePredicate) as usize;
let pred_chunk_closure = || ChunkAddrWithoutDatabase {
table_name: Arc::clone(table_name),
partition_key: Arc::clone(partition_key),
chunk_id: *chunk_id,
};
predicates
.entry(addr)
.and_modify(|(_predicate, v)| v.push(pred_chunk_closure()))
.or_insert_with(|| (Arc::clone(predicate), vec![pred_chunk_closure()]));
.entry(Arc::clone(predicate))
.and_modify(|chunks| {
chunks.insert(pred_chunk_closure());
})
.or_insert_with(|| {
IntoIterator::into_iter([pred_chunk_closure()]).collect()
});
}
}
}
}
let mut predicates: Vec<_> = predicates
.into_values()
.map(|(predicate, mut chunks)| {
chunks.sort();
(predicate, chunks)
})
.collect();
predicates.sort_by(|(predicate_a, _chunks_a), (predicate_b, _chunks_b)| {
predicate_a
.partial_cmp(predicate_b)
.unwrap_or(std::cmp::Ordering::Equal)
});
predicates
}
@ -259,8 +251,8 @@ where
// The expected state of the catalog
let mut expected_files: HashMap<ChunkId, (ParquetFilePath, Arc<IoxParquetMetaData>)> =
HashMap::new();
let mut expected_predicates: Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)> =
vec![];
let mut expected_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> =
HashMap::new();
assert_checkpoint(&state, &f, &expected_files, &expected_predicates);
// add files
@ -528,13 +520,13 @@ where
let predicate_1 = create_delete_predicate(1);
let chunks_1 = vec![chunk_addr_1.clone().into()];
state.delete_predicate(Arc::clone(&predicate_1), chunks_1.clone());
expected_predicates.push((predicate_1, chunks_1));
expected_predicates.insert(predicate_1, chunks_1.into_iter().collect());
// second predicate uses both chunks (but not the older chunks)
let predicate_2 = create_delete_predicate(2);
let chunks_2 = vec![chunk_addr_1.into(), chunk_addr_2.into()];
state.delete_predicate(Arc::clone(&predicate_2), chunks_2.clone());
expected_predicates.push((predicate_2, chunks_2));
expected_predicates.insert(predicate_2, chunks_2.into_iter().collect());
// chunks created afterwards are unaffected
let chunk_addr_3 = chunk_addr(10);
@ -566,7 +558,7 @@ where
expected_predicates = expected_predicates
.into_iter()
.filter_map(|(predicate, chunks)| {
let chunks: Vec<_> = chunks
let chunks: HashSet<_> = chunks
.into_iter()
.filter(|addr| addr.chunk_id != ChunkId::new(8))
.collect();
@ -595,7 +587,7 @@ fn assert_checkpoint<S, F>(
state: &S,
f: &F,
expected_files: &HashMap<ChunkId, (ParquetFilePath, Arc<IoxParquetMetaData>)>,
expected_predicates: &[(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)],
expected_predicates: &HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>>,
) where
F: Fn(&S) -> CheckpointData,
{
@ -625,7 +617,7 @@ fn assert_checkpoint<S, F>(
assert_eq!(stats_actual, stats_expected);
}
assert_eq!(data.delete_predicates, expected_predicates);
assert_eq!(&data.delete_predicates, expected_predicates);
}
/// Get a sorted list of keys from an iterator.

View File

@ -68,7 +68,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx
/// query engine.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DeletePredicate {
/// Only rows within this range are included in
/// results. Other rows are excluded.

View File

@ -3,7 +3,7 @@
use std::{
any::Any,
collections::HashMap,
collections::{HashMap, HashSet},
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
@ -584,9 +584,7 @@ impl Db {
if !affected_persisted_chunks.is_empty() {
let mut transaction = self.preserved_catalog.open_transaction().await;
transaction
.delete_predicate(&delete_predicate, &affected_persisted_chunks)
.context(CommitDeletePredicateError)?;
transaction.delete_predicate(&delete_predicate, &affected_persisted_chunks);
transaction
.commit()
.await
@ -1249,10 +1247,8 @@ impl CatalogProvider for Db {
pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData {
let mut files = HashMap::new();
let mut delete_predicates: HashMap<
usize,
(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>),
> = Default::default();
let mut delete_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> =
Default::default();
for chunk in catalog.chunks() {
let guard = chunk.read();
@ -1276,29 +1272,18 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
|| guard.is_in_lifecycle(ChunkLifecycleAction::Persisting)
{
for predicate in guard.delete_predicates() {
let predicate_ref: &DeletePredicate = predicate.as_ref();
let addr = (predicate_ref as *const DeletePredicate) as usize;
delete_predicates
.entry(addr)
.and_modify(|(_predicate, v)| v.push(guard.addr().clone().into()))
.or_insert_with(|| (Arc::clone(predicate), vec![guard.addr().clone().into()]));
.entry(Arc::clone(predicate))
.and_modify(|chunks| {
chunks.insert(guard.addr().clone().into());
})
.or_insert_with(|| {
IntoIterator::into_iter([guard.addr().clone().into()]).collect()
});
}
}
}
let mut delete_predicates: Vec<_> = delete_predicates
.into_values()
.map(|(predicate, mut chunks)| {
chunks.sort();
(predicate, chunks)
})
.collect();
delete_predicates.sort_by(|(predicate_a, _chunks_a), (predicate_b, _chunks_b)| {
predicate_a
.partial_cmp(predicate_b)
.unwrap_or(std::cmp::Ordering::Equal)
});
CheckpointData {
files,
delete_predicates,