Merge pull request #2561 from influxdata/crepererum/arc_delete_predicates

refactor: `Arc<Vec<...>>` => `Vec<Arc<...>>` for del predicates
pull/24376/head
kodiakhq[bot] 2021-09-16 15:26:20 +00:00 committed by GitHub
commit 315cbb8105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 70 additions and 69 deletions

View File

@ -164,8 +164,13 @@ impl Predicate {
/// Add each range [start, stop] of the delete_predicates into the predicate in
/// the form "time < start OR time > stop" to eliminate that range from the query
pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) {
pub fn add_delete_ranges<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
if let Some(range) = pred.range {
let expr = col(TIME_COLUMN_NAME)
.lt(lit(range.start))
@ -182,8 +187,13 @@ impl Predicate {
/// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
/// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means
/// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")]
pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) {
pub fn add_delete_exprs<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
let mut expr: Option<Expr> = None;
for exp in &pred.exprs {
match expr {

View File

@ -46,7 +46,7 @@ pub trait QueryChunkMeta: Sized {
fn schema(&self) -> Arc<Schema>;
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate>;
fn delete_predicates(&self) -> &[Arc<Predicate>];
}
/// A `Database` is the main trait implemented by the IOx subsystems
@ -137,7 +137,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
&self,
predicate: &Predicate,
selection: Selection<'_>,
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error>;
/// Returns true if data of this chunk is sorted
@ -166,10 +166,10 @@ where
self.as_ref().schema()
}
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = self.as_ref().delete_predicates();
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = self.as_ref().delete_predicates();
debug!(?pred, "Delete predicate in QueryChunkMeta");
self.as_ref().delete_predicates()
pred
}
}

View File

@ -175,7 +175,8 @@ pub struct TestChunk {
predicate_match: Option<PredicateMatch>,
/// Copy of delete predicates passed
delete_predicates: Vec<Predicate>,
delete_predicates: Vec<Arc<Predicate>>,
/// Order of this chunk relative to other overlapping chunks.
order: ChunkOrder,
}
@ -823,7 +824,7 @@ impl QueryChunk for TestChunk {
&self,
predicate: &Predicate,
_selection: Selection<'_>,
_delete_predicates: &[Predicate],
_delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error> {
self.check_error()?;
@ -913,11 +914,11 @@ impl QueryChunkMeta for TestChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = &self.delete_predicates;
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = &self.delete_predicates;
debug!(?pred, "Delete predicate in Test Chunk");
&self.delete_predicates
pred
}
}
@ -927,7 +928,7 @@ pub async fn raw_data(chunks: &[Arc<TestChunk>]) -> Vec<RecordBatch> {
for c in chunks {
let pred = Predicate::default();
let selection = Selection::All;
let delete_predicates: Vec<Predicate> = vec![];
let delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut stream = c
.read_filter(&pred, selection, &delete_predicates)
.expect("Error in read_filter");

View File

@ -519,7 +519,7 @@ impl Db {
pub async fn delete(
self: &Arc<Self>,
table_name: &str,
delete_predicate: &Predicate,
delete_predicate: Arc<Predicate>,
) -> Result<()> {
// get all partitions of this table
let table = self
@ -534,7 +534,7 @@ impl Db {
// save the delete predicate in the chunk
let mut chunk = chunk.write();
chunk
.add_delete_predicate(delete_predicate)
.add_delete_predicate(Arc::clone(&delete_predicate))
.context(AddDeletePredicateError)?;
}
}
@ -3780,7 +3780,7 @@ mod tests {
.timestamp_range(0, 15)
.add_expr(expr)
.build();
db.delete("cpu", &pred).await.unwrap();
db.delete("cpu", Arc::new(pred)).await.unwrap();
// When the above delete is issued, the open mub chunk is frozen with the delete predicate added
// Verify there is MUB but no RUB no OS
assert!(!mutable_chunk_ids(&db, partition_key).is_empty());
@ -3913,7 +3913,7 @@ mod tests {
.add_expr(expr1)
.add_expr(expr2)
.build();
db.delete("cpu", &pred).await.unwrap();
db.delete("cpu", Arc::new(pred)).await.unwrap();
// When the above delete is issued, the open mub chunk is frozen with the delete predicate added
// Verify there is MUB but no RUB no OS
assert!(!mutable_chunk_ids(&db, partition_key).is_empty());

View File

@ -80,7 +80,7 @@ pub struct ChunkMetadata {
pub schema: Arc<Schema>,
/// Delete predicates of this chunk
pub delete_predicates: Arc<Vec<Predicate>>,
pub delete_predicates: Vec<Arc<Predicate>>,
}
/// Different memory representations of a frozen chunk.
@ -307,14 +307,14 @@ impl CatalogChunk {
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
order: ChunkOrder,
) -> Self {
let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(chunk.table_summary()),
schema,
delete_predicates: Arc::clone(&delete_predicates),
delete_predicates,
}),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
};
@ -342,7 +342,7 @@ impl CatalogChunk {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
order: ChunkOrder,
) -> Self {
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
@ -469,7 +469,7 @@ impl CatalogChunk {
}
}
pub fn add_delete_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> {
pub fn add_delete_predicate(&mut self, delete_predicate: Arc<Predicate>) -> Result<()> {
debug!(
?delete_predicate,
"Input delete predicate to CatalogChunk add_delete_predicate"
@ -479,24 +479,14 @@ impl CatalogChunk {
// Freeze/close this chunk and add delete_predicate to its frozen one
self.freeze_with_predicate(delete_predicate)?;
}
ChunkStage::Frozen { meta, .. } => {
ChunkStage::Frozen { meta, .. } | ChunkStage::Persisted { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
let mut del_preds = meta.delete_predicates.clone();
del_preds.push(delete_predicate);
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
});
}
ChunkStage::Persisted { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
delete_predicates: del_preds,
});
}
}
@ -504,22 +494,22 @@ impl CatalogChunk {
Ok(())
}
pub fn delete_predicates(&mut self) -> Arc<Vec<Predicate>> {
pub fn delete_predicates(&mut self) -> &[Arc<Predicate>] {
match &self.stage {
ChunkStage::Open { mb_chunk: _ } => {
// no delete predicate for open chunk
debug!("delete_predicates of Open chunk is empty");
Arc::new(vec![])
&[]
}
ChunkStage::Frozen { meta, .. } => {
let preds = &meta.delete_predicates;
debug!(?preds, "delete_predicates of Frozen chunk");
Arc::clone(&meta.delete_predicates)
preds
}
ChunkStage::Persisted { meta, .. } => {
let preds = &meta.delete_predicates;
debug!(?preds, "delete_predicates of Persisted chunk");
Arc::clone(&meta.delete_predicates)
preds
}
}
}
@ -692,11 +682,14 @@ impl CatalogChunk {
///
/// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage
/// (no-op) and will fail for other stages.
pub fn freeze_with_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> {
self.freeze_with_delete_predicates(vec![delete_predicate.clone()])
pub fn freeze_with_predicate(&mut self, delete_predicate: Arc<Predicate>) -> Result<()> {
self.freeze_with_delete_predicates(vec![delete_predicate])
}
fn freeze_with_delete_predicates(&mut self, delete_predicates: Vec<Predicate>) -> Result<()> {
fn freeze_with_delete_predicates(
&mut self,
delete_predicates: Vec<Arc<Predicate>>,
) -> Result<()> {
match &self.stage {
ChunkStage::Open { mb_chunk, .. } => {
debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk");
@ -709,7 +702,7 @@ impl CatalogChunk {
let metadata = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: s.full_schema(),
delete_predicates: Arc::new(delete_predicates),
delete_predicates,
};
self.stage = ChunkStage::Frozen {
@ -793,7 +786,7 @@ impl CatalogChunk {
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema,
delete_predicates: Arc::clone(&meta.delete_predicates),
delete_predicates: meta.delete_predicates.clone(),
});
match &representation {
@ -1168,7 +1161,7 @@ mod tests {
expected_exprs1.push(e);
// Add a delete predicate into a chunk the open chunk = delete simulation for open chunk
chunk.add_delete_predicate(&del_pred1).unwrap();
chunk.add_delete_predicate(Arc::new(del_pred1)).unwrap();
// chunk must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have a delete predicate
@ -1199,7 +1192,7 @@ mod tests {
let mut expected_exprs2 = vec![];
let e = col("cost").not_eq(lit(15));
expected_exprs2.push(e);
chunk.add_delete_predicate(&del_pred2).unwrap();
chunk.add_delete_predicate(Arc::new(del_pred2)).unwrap();
// chunk still must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have 2 delete predicates
@ -1265,7 +1258,7 @@ mod tests {
now,
now,
ChunkMetrics::new_unregistered(),
Arc::new(vec![] as Vec<Predicate>),
vec![],
ChunkOrder::new(6),
)
}

View File

@ -176,7 +176,7 @@ impl Partition {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> (u32, Arc<RwLock<CatalogChunk>>) {
let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow");
@ -231,7 +231,7 @@ impl Partition {
chunk: Arc<parquet_file::chunk::ParquetChunk>,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name());
@ -246,7 +246,7 @@ impl Partition {
time_of_first_write,
time_of_last_write,
self.metrics.new_chunk_metrics(),
Arc::clone(&delete_predicates),
delete_predicates,
chunk_order,
)),
);

View File

@ -121,7 +121,7 @@ impl DbChunk {
let meta = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: snapshot.full_schema(),
delete_predicates: Arc::new(vec![]), // open chunk does not have delete predicate
delete_predicates: vec![], // open chunk does not have delete predicate
};
(state, Arc::new(meta))
}
@ -226,7 +226,7 @@ impl DbChunk {
}
pub fn to_rub_negated_predicates(
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<Vec<read_buffer::Predicate>> {
let mut rub_preds: Vec<read_buffer::Predicate> = vec![];
for pred in delete_predicates {
@ -331,7 +331,7 @@ impl QueryChunk for DbChunk {
&self,
predicate: &Predicate,
selection: Selection<'_>,
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error> {
// Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain
@ -536,11 +536,11 @@ impl QueryChunkMeta for DbChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = &self.meta.delete_predicates;
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = &self.meta.delete_predicates;
debug!(?pred, "Delete predicate in DbChunk");
&self.meta.delete_predicates
pred
}
}

View File

@ -45,7 +45,7 @@ pub(crate) fn compact_chunks(
let mut input_rows = 0;
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut delete_predicates: Vec<Predicate> = vec![];
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
let query_chunks = chunks
.into_iter()
@ -66,8 +66,7 @@ pub(crate) fn compact_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
let mut preds = (*chunk.delete_predicates()).clone();
delete_predicates.append(&mut preds);
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order());
@ -119,7 +118,7 @@ pub(crate) fn compact_chunks(
time_of_first_write,
time_of_last_write,
schema,
Arc::new(delete_predicates),
delete_predicates,
min_order,
)
};

View File

@ -52,7 +52,7 @@ pub fn persist_chunks(
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut query_chunks = vec![];
let mut delete_predicates: Vec<Predicate> = vec![];
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
for mut chunk in chunks {
// Sanity-check
@ -72,8 +72,7 @@ pub fn persist_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
let mut preds = (*chunk.delete_predicates()).clone();
delete_predicates.append(&mut preds);
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order());
@ -133,7 +132,6 @@ pub fn persist_chunks(
partition_write.force_drop_chunk(id)
}
let del_preds = Arc::new(delete_predicates);
// Upsert remainder to catalog
if let Some(remainder) = remainder {
partition_write.create_rub_chunk(
@ -141,7 +139,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
Arc::clone(&schema),
Arc::clone(&del_preds),
delete_predicates.clone(),
min_order,
);
}
@ -155,7 +153,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
schema,
del_preds,
delete_predicates,
min_order,
);
let to_persist = LockableCatalogChunk {

View File

@ -226,7 +226,7 @@ impl CatalogState for Loader {
.map_err(|e| Box::new(e) as _)
.context(SchemaError { path: info.path })?;
let delete_predicates: Arc<Vec<Predicate>> = Arc::new(vec![]); // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable
let delete_predicates: Vec<Arc<Predicate>> = vec![]; // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable
partition.insert_object_store_only_chunk(
iox_md.chunk_id,
parquet_chunk,

View File

@ -615,7 +615,7 @@ where
del_predicate.exprs.push(expr);
}
db.delete(&table_name, &del_predicate)
db.delete(&table_name, Arc::new(del_predicate))
.await
.map_err(default_db_error_handler)?;
}