refactor: Make collect_rub create the RBChunk
Which gets rid of the need for new_rub_chunk. This will enable creating RBChunks that are guaranteed to have data.pull/24376/head
parent
0a724878e6
commit
d347750366
|
@ -328,32 +328,35 @@ impl LifecycleChunk for CatalogChunk {
|
|||
}
|
||||
}
|
||||
|
||||
/// Creates a new RUB chunk
|
||||
fn new_rub_chunk(db: &Db, table_name: &str) -> read_buffer::RBChunk {
|
||||
// create a new read buffer chunk with memory tracking
|
||||
/// Executes a plan and collects the results into a read buffer chunk
|
||||
// This is an async function but has been desugared manually because it's hitting
|
||||
// https://github.com/rust-lang/rust/issues/63033
|
||||
fn collect_rub(
|
||||
stream: SendableRecordBatchStream,
|
||||
db: &Db,
|
||||
table_name: &str,
|
||||
) -> impl futures::Future<Output = Result<read_buffer::RBChunk>> {
|
||||
use futures::{future, TryStreamExt};
|
||||
|
||||
let table_name = table_name.to_string();
|
||||
let metrics = db
|
||||
.metrics_registry
|
||||
.register_domain_with_labels("read_buffer", db.metric_labels.clone());
|
||||
let chunk_metrics = read_buffer::ChunkMetrics::new(&metrics);
|
||||
|
||||
read_buffer::RBChunk::new(table_name, read_buffer::ChunkMetrics::new(&metrics))
|
||||
}
|
||||
async {
|
||||
let mut chunk = read_buffer::RBChunk::new(table_name, chunk_metrics);
|
||||
|
||||
/// Executes a plan and collects the results into a read buffer chunk
|
||||
async fn collect_rub(
|
||||
stream: SendableRecordBatchStream,
|
||||
chunk: &mut read_buffer::RBChunk,
|
||||
) -> Result<()> {
|
||||
use futures::{future, TryStreamExt};
|
||||
stream
|
||||
.try_filter(|batch| future::ready(batch.num_rows() > 0))
|
||||
.try_for_each(|batch| {
|
||||
chunk.upsert_table(batch);
|
||||
future::ready(Ok(()))
|
||||
})
|
||||
.await?;
|
||||
|
||||
stream
|
||||
.try_filter(|batch| future::ready(batch.num_rows() > 0))
|
||||
.try_for_each(|batch| {
|
||||
chunk.upsert_table(batch);
|
||||
future::ready(Ok(()))
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
Ok(chunk)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the merged schema for the chunks that are being
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
use super::{error::Result, merge_schemas, LockableCatalogChunk, LockableCatalogPartition};
|
||||
use crate::db::{
|
||||
catalog::{chunk::CatalogChunk, partition::Partition},
|
||||
lifecycle::{collect_rub, new_rub_chunk},
|
||||
lifecycle::collect_rub,
|
||||
DbChunk,
|
||||
};
|
||||
use data_types::job::Job;
|
||||
|
@ -54,9 +54,6 @@ pub(crate) fn compact_chunks(
|
|||
// drop partition lock
|
||||
let partition = partition.into_data().partition;
|
||||
|
||||
// create a new read buffer chunk with memory tracking
|
||||
let mut rb_chunk = new_rub_chunk(&db, &table_name);
|
||||
|
||||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
let fut = async move {
|
||||
|
@ -78,7 +75,7 @@ pub(crate) fn compact_chunks(
|
|||
|
||||
let physical_plan = ctx.prepare_plan(&plan)?;
|
||||
let stream = ctx.execute(physical_plan).await?;
|
||||
collect_rub(stream, &mut rb_chunk).await?;
|
||||
let rb_chunk = collect_rub(stream, &db, &table_name).await?;
|
||||
let rb_row_groups = rb_chunk.row_groups();
|
||||
|
||||
let new_chunk = {
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::{future::Future, sync::Arc};
|
|||
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
|
||||
|
||||
use super::{error::Result, LockableCatalogChunk};
|
||||
use crate::db::lifecycle::{collect_rub, new_rub_chunk};
|
||||
use crate::db::lifecycle::collect_rub;
|
||||
|
||||
/// The implementation for moving a chunk to the read buffer
|
||||
///
|
||||
|
@ -45,7 +45,6 @@ pub fn move_chunk_to_read_buffer(
|
|||
|
||||
// Drop locks
|
||||
let chunk = guard.into_data().chunk;
|
||||
let mut rb_chunk = new_rub_chunk(db.as_ref(), &table_summary.name);
|
||||
|
||||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
|
@ -60,7 +59,7 @@ pub fn move_chunk_to_read_buffer(
|
|||
|
||||
let physical_plan = ctx.prepare_plan(&plan)?;
|
||||
let stream = ctx.execute(physical_plan).await?;
|
||||
collect_rub(stream, &mut rb_chunk).await?;
|
||||
let rb_chunk = collect_rub(stream, db.as_ref(), &table_summary.name).await?;
|
||||
|
||||
// Can drop and re-acquire as lifecycle action prevents concurrent modification
|
||||
let mut guard = chunk.write();
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
use super::{LockableCatalogChunk, LockableCatalogPartition, Result};
|
||||
use crate::db::{
|
||||
catalog::{chunk::CatalogChunk, partition::Partition},
|
||||
lifecycle::{collect_rub, merge_schemas, new_rub_chunk, write::write_chunk_to_object_store},
|
||||
lifecycle::{collect_rub, merge_schemas, write::write_chunk_to_object_store},
|
||||
DbChunk,
|
||||
};
|
||||
use data_types::job::Job;
|
||||
|
@ -58,8 +58,6 @@ pub fn persist_chunks(
|
|||
|
||||
// drop partition lock guard
|
||||
let partition = partition.into_data().partition;
|
||||
let mut to_persist = new_rub_chunk(db.as_ref(), &table_name);
|
||||
let mut remainder = new_rub_chunk(db.as_ref(), &table_name);
|
||||
|
||||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
|
@ -88,9 +86,9 @@ pub fn persist_chunks(
|
|||
let to_persist_stream = ctx.execute_partition(Arc::clone(&physical_plan), 0).await?;
|
||||
let remainder_stream = ctx.execute_partition(physical_plan, 1).await?;
|
||||
|
||||
futures::future::try_join(
|
||||
collect_rub(to_persist_stream, &mut to_persist),
|
||||
collect_rub(remainder_stream, &mut remainder),
|
||||
let (to_persist, remainder) = futures::future::try_join(
|
||||
collect_rub(to_persist_stream, db.as_ref(), &table_name),
|
||||
collect_rub(remainder_stream, db.as_ref(), &table_name),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
Loading…
Reference in New Issue