refactor: `Arc<Vec<...>>` => `Vec<Arc<...>>` for del predicates

The motivations are:

1. The API uses a SINGLE predicate and adds that to many chunks. With
   `Arc<Vec<...>>` you gain nothing, with `Vec<Arc<...>>` the predicate
   is only stored once (in many vectors)
2. While we currently add predicates blindly to all chunks, we can be way
   smarter in the future and prune out tables, partitions or even single
   chunks (based on statistics). With that, it will be rare that many
   chunks share the exact same set of predicates.
3. It would be nice if we could de-duplicate predicates when writing them
   to the preserved catalog without needing to repeat the pruning
   discussed in point 2. This is way easier to implement whan chunks
   exists in `Arc`s.
4. As a side-note: the `Arc<Vec<...>>` wasn't really cloned around but
   instead was created many time. So the new version should be more
   memory efficient out of the box.
pull/24376/head
Marco Neumann 2021-09-16 14:52:57 +02:00
parent ce224bd37f
commit ec943081c7
11 changed files with 70 additions and 69 deletions

View File

@ -164,8 +164,13 @@ impl Predicate {
/// Add each range [start, stop] of the delete_predicates into the predicate in
/// the form "time < start OR time > stop" to eliminate that range from the query
pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) {
pub fn add_delete_ranges<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
if let Some(range) = pred.range {
let expr = col(TIME_COLUMN_NAME)
.lt(lit(range.start))
@ -182,8 +187,13 @@ impl Predicate {
/// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
/// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means
/// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")]
pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) {
pub fn add_delete_exprs<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
let mut expr: Option<Expr> = None;
for exp in &pred.exprs {
match expr {

View File

@ -46,7 +46,7 @@ pub trait QueryChunkMeta: Sized {
fn schema(&self) -> Arc<Schema>;
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate>;
fn delete_predicates(&self) -> &[Arc<Predicate>];
}
/// A `Database` is the main trait implemented by the IOx subsystems
@ -137,7 +137,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
&self,
predicate: &Predicate,
selection: Selection<'_>,
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error>;
/// Returns true if data of this chunk is sorted
@ -166,10 +166,10 @@ where
self.as_ref().schema()
}
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = self.as_ref().delete_predicates();
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = self.as_ref().delete_predicates();
debug!(?pred, "Delete predicate in QueryChunkMeta");
self.as_ref().delete_predicates()
pred
}
}

View File

@ -175,7 +175,8 @@ pub struct TestChunk {
predicate_match: Option<PredicateMatch>,
/// Copy of delete predicates passed
delete_predicates: Vec<Predicate>,
delete_predicates: Vec<Arc<Predicate>>,
/// Order of this chunk relative to other overlapping chunks.
order: ChunkOrder,
}
@ -823,7 +824,7 @@ impl QueryChunk for TestChunk {
&self,
predicate: &Predicate,
_selection: Selection<'_>,
_delete_predicates: &[Predicate],
_delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error> {
self.check_error()?;
@ -913,11 +914,11 @@ impl QueryChunkMeta for TestChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = &self.delete_predicates;
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = &self.delete_predicates;
debug!(?pred, "Delete predicate in Test Chunk");
&self.delete_predicates
pred
}
}
@ -927,7 +928,7 @@ pub async fn raw_data(chunks: &[Arc<TestChunk>]) -> Vec<RecordBatch> {
for c in chunks {
let pred = Predicate::default();
let selection = Selection::All;
let delete_predicates: Vec<Predicate> = vec![];
let delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut stream = c
.read_filter(&pred, selection, &delete_predicates)
.expect("Error in read_filter");

View File

@ -519,7 +519,7 @@ impl Db {
pub async fn delete(
self: &Arc<Self>,
table_name: &str,
delete_predicate: &Predicate,
delete_predicate: Arc<Predicate>,
) -> Result<()> {
// get all partitions of this table
let table = self
@ -534,7 +534,7 @@ impl Db {
// save the delete predicate in the chunk
let mut chunk = chunk.write();
chunk
.add_delete_predicate(delete_predicate)
.add_delete_predicate(Arc::clone(&delete_predicate))
.context(AddDeletePredicateError)?;
}
}
@ -3780,7 +3780,7 @@ mod tests {
.timestamp_range(0, 15)
.add_expr(expr)
.build();
db.delete("cpu", &pred).await.unwrap();
db.delete("cpu", Arc::new(pred)).await.unwrap();
// When the above delete is issued, the open mub chunk is frozen with the delete predicate added
// Verify there is MUB but no RUB no OS
assert!(!mutable_chunk_ids(&db, partition_key).is_empty());
@ -3913,7 +3913,7 @@ mod tests {
.add_expr(expr1)
.add_expr(expr2)
.build();
db.delete("cpu", &pred).await.unwrap();
db.delete("cpu", Arc::new(pred)).await.unwrap();
// When the above delete is issued, the open mub chunk is frozen with the delete predicate added
// Verify there is MUB but no RUB no OS
assert!(!mutable_chunk_ids(&db, partition_key).is_empty());

View File

@ -80,7 +80,7 @@ pub struct ChunkMetadata {
pub schema: Arc<Schema>,
/// Delete predicates of this chunk
pub delete_predicates: Arc<Vec<Predicate>>,
pub delete_predicates: Vec<Arc<Predicate>>,
}
/// Different memory representations of a frozen chunk.
@ -307,14 +307,14 @@ impl CatalogChunk {
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
order: ChunkOrder,
) -> Self {
let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(chunk.table_summary()),
schema,
delete_predicates: Arc::clone(&delete_predicates),
delete_predicates,
}),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
};
@ -342,7 +342,7 @@ impl CatalogChunk {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
order: ChunkOrder,
) -> Self {
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
@ -469,7 +469,7 @@ impl CatalogChunk {
}
}
pub fn add_delete_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> {
pub fn add_delete_predicate(&mut self, delete_predicate: Arc<Predicate>) -> Result<()> {
debug!(
?delete_predicate,
"Input delete predicate to CatalogChunk add_delete_predicate"
@ -479,24 +479,14 @@ impl CatalogChunk {
// Freeze/close this chunk and add delete_predicate to its frozen one
self.freeze_with_predicate(delete_predicate)?;
}
ChunkStage::Frozen { meta, .. } => {
ChunkStage::Frozen { meta, .. } | ChunkStage::Persisted { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
let mut del_preds = meta.delete_predicates.clone();
del_preds.push(delete_predicate);
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
});
}
ChunkStage::Persisted { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
delete_predicates: del_preds,
});
}
}
@ -504,22 +494,22 @@ impl CatalogChunk {
Ok(())
}
pub fn delete_predicates(&mut self) -> Arc<Vec<Predicate>> {
pub fn delete_predicates(&mut self) -> &[Arc<Predicate>] {
match &self.stage {
ChunkStage::Open { mb_chunk: _ } => {
// no delete predicate for open chunk
debug!("delete_predicates of Open chunk is empty");
Arc::new(vec![])
&[]
}
ChunkStage::Frozen { meta, .. } => {
let preds = &meta.delete_predicates;
debug!(?preds, "delete_predicates of Frozen chunk");
Arc::clone(&meta.delete_predicates)
preds
}
ChunkStage::Persisted { meta, .. } => {
let preds = &meta.delete_predicates;
debug!(?preds, "delete_predicates of Persisted chunk");
Arc::clone(&meta.delete_predicates)
preds
}
}
}
@ -692,11 +682,14 @@ impl CatalogChunk {
///
/// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage
/// (no-op) and will fail for other stages.
pub fn freeze_with_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> {
self.freeze_with_delete_predicates(vec![delete_predicate.clone()])
pub fn freeze_with_predicate(&mut self, delete_predicate: Arc<Predicate>) -> Result<()> {
self.freeze_with_delete_predicates(vec![delete_predicate])
}
fn freeze_with_delete_predicates(&mut self, delete_predicates: Vec<Predicate>) -> Result<()> {
fn freeze_with_delete_predicates(
&mut self,
delete_predicates: Vec<Arc<Predicate>>,
) -> Result<()> {
match &self.stage {
ChunkStage::Open { mb_chunk, .. } => {
debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk");
@ -709,7 +702,7 @@ impl CatalogChunk {
let metadata = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: s.full_schema(),
delete_predicates: Arc::new(delete_predicates),
delete_predicates,
};
self.stage = ChunkStage::Frozen {
@ -793,7 +786,7 @@ impl CatalogChunk {
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema,
delete_predicates: Arc::clone(&meta.delete_predicates),
delete_predicates: meta.delete_predicates.clone(),
});
match &representation {
@ -1168,7 +1161,7 @@ mod tests {
expected_exprs1.push(e);
// Add a delete predicate into a chunk the open chunk = delete simulation for open chunk
chunk.add_delete_predicate(&del_pred1).unwrap();
chunk.add_delete_predicate(Arc::new(del_pred1)).unwrap();
// chunk must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have a delete predicate
@ -1199,7 +1192,7 @@ mod tests {
let mut expected_exprs2 = vec![];
let e = col("cost").not_eq(lit(15));
expected_exprs2.push(e);
chunk.add_delete_predicate(&del_pred2).unwrap();
chunk.add_delete_predicate(Arc::new(del_pred2)).unwrap();
// chunk still must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have 2 delete predicates
@ -1265,7 +1258,7 @@ mod tests {
now,
now,
ChunkMetrics::new_unregistered(),
Arc::new(vec![] as Vec<Predicate>),
vec![],
ChunkOrder::new(6),
)
}

View File

@ -176,7 +176,7 @@ impl Partition {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> (u32, Arc<RwLock<CatalogChunk>>) {
let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow");
@ -231,7 +231,7 @@ impl Partition {
chunk: Arc<parquet_file::chunk::ParquetChunk>,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name());
@ -246,7 +246,7 @@ impl Partition {
time_of_first_write,
time_of_last_write,
self.metrics.new_chunk_metrics(),
Arc::clone(&delete_predicates),
delete_predicates,
chunk_order,
)),
);

View File

@ -121,7 +121,7 @@ impl DbChunk {
let meta = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: snapshot.full_schema(),
delete_predicates: Arc::new(vec![]), // open chunk does not have delete predicate
delete_predicates: vec![], // open chunk does not have delete predicate
};
(state, Arc::new(meta))
}
@ -226,7 +226,7 @@ impl DbChunk {
}
pub fn to_rub_negated_predicates(
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<Vec<read_buffer::Predicate>> {
let mut rub_preds: Vec<read_buffer::Predicate> = vec![];
for pred in delete_predicates {
@ -331,7 +331,7 @@ impl QueryChunk for DbChunk {
&self,
predicate: &Predicate,
selection: Selection<'_>,
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error> {
// Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain
@ -536,11 +536,11 @@ impl QueryChunkMeta for DbChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = &self.meta.delete_predicates;
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = &self.meta.delete_predicates;
debug!(?pred, "Delete predicate in DbChunk");
&self.meta.delete_predicates
pred
}
}

View File

@ -45,7 +45,7 @@ pub(crate) fn compact_chunks(
let mut input_rows = 0;
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut delete_predicates: Vec<Predicate> = vec![];
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
let query_chunks = chunks
.into_iter()
@ -66,8 +66,7 @@ pub(crate) fn compact_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
let mut preds = (*chunk.delete_predicates()).clone();
delete_predicates.append(&mut preds);
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order());
@ -119,7 +118,7 @@ pub(crate) fn compact_chunks(
time_of_first_write,
time_of_last_write,
schema,
Arc::new(delete_predicates),
delete_predicates,
min_order,
)
};

View File

@ -52,7 +52,7 @@ pub fn persist_chunks(
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut query_chunks = vec![];
let mut delete_predicates: Vec<Predicate> = vec![];
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
for mut chunk in chunks {
// Sanity-check
@ -72,8 +72,7 @@ pub fn persist_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
let mut preds = (*chunk.delete_predicates()).clone();
delete_predicates.append(&mut preds);
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order());
@ -133,7 +132,6 @@ pub fn persist_chunks(
partition_write.force_drop_chunk(id)
}
let del_preds = Arc::new(delete_predicates);
// Upsert remainder to catalog
if let Some(remainder) = remainder {
partition_write.create_rub_chunk(
@ -141,7 +139,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
Arc::clone(&schema),
Arc::clone(&del_preds),
delete_predicates.clone(),
min_order,
);
}
@ -155,7 +153,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
schema,
del_preds,
delete_predicates,
min_order,
);
let to_persist = LockableCatalogChunk {

View File

@ -226,7 +226,7 @@ impl CatalogState for Loader {
.map_err(|e| Box::new(e) as _)
.context(SchemaError { path: info.path })?;
let delete_predicates: Arc<Vec<Predicate>> = Arc::new(vec![]); // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable
let delete_predicates: Vec<Arc<Predicate>> = vec![]; // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable
partition.insert_object_store_only_chunk(
iox_md.chunk_id,
parquet_chunk,

View File

@ -615,7 +615,7 @@ where
del_predicate.exprs.push(expr);
}
db.delete(&table_name, &del_predicate)
db.delete(&table_name, Arc::new(del_predicate))
.await
.map_err(default_db_error_handler)?;
}