perf: loading chunks doesn't require sort
parent
b0e01edb86
commit
2a2c5889c3
|
@ -48,6 +48,38 @@ impl ReorgPlanner {
|
|||
Self::default()
|
||||
}
|
||||
|
||||
/// Creates an execution plan for a full scan of a single chunk.
|
||||
/// This plan is primarilty used to load chunks from one storage medium to
|
||||
/// another.
|
||||
pub fn scan_single_chunk_plan<C>(
|
||||
&self,
|
||||
schema: Arc<Schema>,
|
||||
chunk: Arc<C>,
|
||||
) -> Result<LogicalPlan>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = chunk.table_name();
|
||||
// Prepare the plan for the table
|
||||
let mut builder = ProviderBuilder::new(table_name, schema);
|
||||
|
||||
// There are no predicates in these plans, so no need to prune them
|
||||
builder = builder.add_no_op_pruner();
|
||||
builder = builder.add_chunk(Arc::clone(&chunk));
|
||||
|
||||
let provider = builder.build().context(CreatingProvider { table_name })?;
|
||||
|
||||
// Logical plan to scan all columns with no predicates
|
||||
let plan = LogicalPlanBuilder::scan(table_name, Arc::new(provider) as _, None)
|
||||
.context(BuildingPlan)?
|
||||
.build()
|
||||
.context(BuildingPlan)?;
|
||||
|
||||
debug!(%table_name, plan=%plan.display_indent_schema(),
|
||||
"created single chunk scan plan");
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
/// Creates an execution plan for the COMPACT operations which does the following:
|
||||
///
|
||||
/// 1. Merges chunks together into a single stream
|
||||
|
@ -71,7 +103,7 @@ impl ReorgPlanner {
|
|||
let ScanPlan {
|
||||
plan_builder,
|
||||
provider,
|
||||
} = self.sorted_scan_plan(schema, chunks)?;
|
||||
} = self.scan_plan(schema, chunks, true)?;
|
||||
|
||||
let mut schema = provider.iox_schema();
|
||||
|
||||
|
@ -154,7 +186,7 @@ impl ReorgPlanner {
|
|||
let ScanPlan {
|
||||
plan_builder,
|
||||
provider,
|
||||
} = self.sorted_scan_plan(schema, chunks)?;
|
||||
} = self.scan_plan(schema, chunks, true)?;
|
||||
|
||||
let mut schema = provider.iox_schema();
|
||||
|
||||
|
@ -186,14 +218,15 @@ impl ReorgPlanner {
|
|||
}
|
||||
|
||||
/// Creates a scan plan for the given set of chunks.
|
||||
/// Output data of the scan will be deduplicated and sorted
|
||||
/// on the optimal sort order of the chunks' PK columns (tags and time).
|
||||
/// Output data of the scan will be deduplicated sorted if `sort=true` on
|
||||
/// the optimal sort order of the chunks' PK columns (tags and time).
|
||||
///
|
||||
/// The optimal sort order is computed based on the PK columns cardinality
|
||||
/// that will be best for RLE encoding.
|
||||
///
|
||||
/// Prefer to query::provider::build_scan_plan for the detail of the plan
|
||||
/// Refer to query::provider::build_scan_plan for the detail of the plan
|
||||
///
|
||||
fn sorted_scan_plan<C, I>(&self, schema: Arc<Schema>, chunks: I) -> Result<ScanPlan<C>>
|
||||
fn scan_plan<C, I>(&self, schema: Arc<Schema>, chunks: I, sort: bool) -> Result<ScanPlan<C>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
I: IntoIterator<Item = Arc<C>>,
|
||||
|
@ -207,9 +240,10 @@ impl ReorgPlanner {
|
|||
|
||||
// Prepare the plan for the table
|
||||
let mut builder = ProviderBuilder::new(table_name, schema);
|
||||
|
||||
// Tell the scan of this provider to sort its output on the chunks' PK
|
||||
builder.ensure_pk_sort();
|
||||
if sort {
|
||||
// Tell the scan of this provider to sort its output on the chunks' PK
|
||||
builder.ensure_pk_sort();
|
||||
}
|
||||
|
||||
// There are no predicates in these plans, so no need to prune them
|
||||
builder = builder.add_no_op_pruner();
|
||||
|
@ -318,6 +352,45 @@ mod test {
|
|||
(Arc::new(schema), vec![chunk1, chunk2])
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_plan() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
let (schema, chunks) = get_test_chunks().await;
|
||||
let scan_plan = ReorgPlanner::new()
|
||||
.scan_single_chunk_plan(schema, chunks.into_iter().next().unwrap())
|
||||
.expect("created compact plan");
|
||||
|
||||
let executor = Executor::new(1);
|
||||
let physical_plan = executor
|
||||
.new_context(ExecutorType::Reorg)
|
||||
.prepare_plan(&scan_plan)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// single chunk processed
|
||||
assert_eq!(physical_plan.output_partitioning().partition_count(), 1);
|
||||
|
||||
let batches = datafusion::physical_plan::collect(physical_plan)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// all data from chunk
|
||||
let expected = vec![
|
||||
"+-----------+------------+------+--------------------------------+",
|
||||
"| field_int | field_int2 | tag1 | time |",
|
||||
"+-----------+------------+------+--------------------------------+",
|
||||
"| 1000 | | MT | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 10 | | MT | 1970-01-01T00:00:00.000007Z |",
|
||||
"| 70 | | CT | 1970-01-01T00:00:00.000000100Z |",
|
||||
"| 100 | | AL | 1970-01-01T00:00:00.000000050Z |",
|
||||
"| 5 | | MT | 1970-01-01T00:00:00.000005Z |",
|
||||
"+-----------+------------+------+--------------------------------+",
|
||||
];
|
||||
|
||||
assert_batches_eq!(&expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_plan() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
|
|
@ -8,7 +8,7 @@ use lifecycle::LifecycleWriteGuard;
|
|||
use observability_deps::tracing::info;
|
||||
use query::exec::ExecutorType;
|
||||
use query::frontend::reorg::ReorgPlanner;
|
||||
use query::{compute_sort_key, QueryChunkMeta};
|
||||
use query::QueryChunkMeta;
|
||||
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
|
||||
|
||||
use crate::db::lifecycle::collect_rub;
|
||||
|
@ -43,14 +43,8 @@ pub fn load_chunk(
|
|||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
let fut = async move {
|
||||
let key = compute_sort_key(std::iter::once(db_chunk.summary()));
|
||||
|
||||
// Cannot move query_chunks as the sort key borrows the column names
|
||||
let (_, plan) = ReorgPlanner::new().compact_plan(
|
||||
db_chunk.schema(),
|
||||
std::iter::once(Arc::clone(&db_chunk)),
|
||||
key,
|
||||
)?;
|
||||
let plan =
|
||||
ReorgPlanner::new().scan_single_chunk_plan(db_chunk.schema(), Arc::clone(&db_chunk))?;
|
||||
|
||||
let physical_plan = ctx.prepare_plan(&plan).await?;
|
||||
let stream = ctx.execute_stream(physical_plan).await?;
|
||||
|
|
Loading…
Reference in New Issue