refactor: remove one buffer copy in the ingester (#4855)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-06-14 13:15:36 -04:00 committed by GitHub
parent e91d00b10c
commit 7d2a5c299f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 46 deletions

View File

@ -202,55 +202,24 @@ async fn prepare_data_to_querier_for_partition(
};
let predicate = request.predicate.clone().unwrap_or_default();
let mut filter_applied_batches = vec![];
if let Some(queryable_batch) = unpersisted_partition_data.persisting {
// ------------------------------------------------
// persisting data
// figure out what batches
let queryable_batch = unpersisted_partition_data
.persisting
.unwrap_or_else(|| QueryableBatch::new(&request.table, vec![], vec![]))
.with_data(unpersisted_partition_data.non_persisted);
let record_batch = run_query(
executor,
Arc::new(queryable_batch),
predicate.clone(),
selection,
)
.await?;
if let Some(record_batch) = record_batch {
if record_batch.num_rows() > 0 {
filter_applied_batches.push(Arc::new(record_batch));
}
}
// No data!
if queryable_batch.data.is_empty() {
return Ok(None);
}
// ------------------------------------------------
// Apply filters on the snapshot batches
if !unpersisted_partition_data.non_persisted.is_empty() {
// Make a Query able batch for all the snapshot
let queryable_batch = QueryableBatch::new(
&request.table,
unpersisted_partition_data.non_persisted,
vec![],
);
let record_batch =
run_query(executor, Arc::new(queryable_batch), predicate, selection).await?;
if let Some(record_batch) = record_batch {
if record_batch.num_rows() > 0 {
filter_applied_batches.push(Arc::new(record_batch));
}
}
}
// ------------------------------------------------
// Combine record batches into one batch and pad null values as needed
// Schema of all record batches after merging
let schema = merge_record_batch_schemas(&filter_applied_batches);
let batch = merge_record_batches(schema.as_arrow(), filter_applied_batches)
.context(ConcatBatchesSnafu)?;
Ok(batch)
run_query(
executor,
Arc::new(queryable_batch),
predicate.clone(),
selection,
)
.await
}
/// Query a given Queryable Batch, applying selection and filters as appropriate

View File

@ -71,6 +71,12 @@ impl QueryableBatch {
}
}
/// Add snapshots to this batch
pub fn with_data(mut self, mut data: Vec<Arc<SnapshotBatch>>) -> Self {
self.data.append(&mut data);
self
}
/// Add more tombstones
pub fn add_tombstones(&mut self, deletes: &[Tombstone]) {
let delete_predicates = tombstones_to_delete_predicates_iter(deletes);