docs: add additional documentation for sets of test parameters
parent
54e9d38589
commit
957ff79e2f
|
@ -3,25 +3,26 @@
|
||||||
use crate::Store;
|
use crate::Store;
|
||||||
use arrow::{
|
use arrow::{
|
||||||
datatypes::{Schema, SchemaRef},
|
datatypes::{Schema, SchemaRef},
|
||||||
util::pretty, record_batch::{RecordBatch, RecordBatchReader},
|
record_batch::{RecordBatch, RecordBatchReader},
|
||||||
|
util::pretty,
|
||||||
};
|
};
|
||||||
use datafusion::prelude::*;
|
use datafusion::prelude::*;
|
||||||
use datafusion::{
|
use datafusion::{
|
||||||
datasource::TableProvider,
|
datasource::TableProvider,
|
||||||
execution::{
|
execution::{
|
||||||
context::ExecutionContextState,
|
context::ExecutionContextState,
|
||||||
physical_plan::{ExecutionPlan, Partition, common::RecordBatchIterator},
|
physical_plan::{common::RecordBatchIterator, ExecutionPlan, Partition},
|
||||||
},
|
},
|
||||||
logicalplan::{make_logical_plan_node, Expr, LogicalPlan},
|
logicalplan::{make_logical_plan_node, Expr, LogicalPlan},
|
||||||
lp::LogicalPlanNode,
|
lp::LogicalPlanNode,
|
||||||
optimizer::utils,
|
optimizer::utils,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::column;
|
||||||
use std::{
|
use std::{
|
||||||
fmt,
|
fmt,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
};
|
};
|
||||||
use crate::column;
|
|
||||||
|
|
||||||
/// Wrapper to adapt a Store to a DataFusion "TableProvider" --
|
/// Wrapper to adapt a Store to a DataFusion "TableProvider" --
|
||||||
/// eventually we could also implement this directly on Store
|
/// eventually we could also implement this directly on Store
|
||||||
|
@ -118,8 +119,7 @@ impl DeloreanQueryEngine {
|
||||||
fn rewrite_to_segment_scan(&self, plan: &LogicalPlan) -> LogicalPlan {
|
fn rewrite_to_segment_scan(&self, plan: &LogicalPlan) -> LogicalPlan {
|
||||||
if let LogicalPlan::Filter { predicate, input } = plan {
|
if let LogicalPlan::Filter { predicate, input } = plan {
|
||||||
// see if the input is a TableScan
|
// see if the input is a TableScan
|
||||||
if let LogicalPlan::TableScan { .. } = **input
|
if let LogicalPlan::TableScan { .. } = **input {
|
||||||
{
|
|
||||||
return make_logical_plan_node(Box::new(SegmentScan::new(
|
return make_logical_plan_node(Box::new(SegmentScan::new(
|
||||||
self.store.clone(),
|
self.store.clone(),
|
||||||
predicate.clone(),
|
predicate.clone(),
|
||||||
|
@ -159,7 +159,6 @@ impl SegmentScan {
|
||||||
predicate,
|
predicate,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LogicalPlanNode for SegmentScan {
|
impl LogicalPlanNode for SegmentScan {
|
||||||
|
@ -236,7 +235,7 @@ impl LogicalPlanNode for SegmentScan {
|
||||||
let time_range = (1590036110000000, 1590040770000000);
|
let time_range = (1590036110000000, 1590040770000000);
|
||||||
let string_predicate = StringPredicate {
|
let string_predicate = StringPredicate {
|
||||||
col_name: "env".into(),
|
col_name: "env".into(),
|
||||||
value: "prod01-eu-central-1".into()
|
value: "prod01-eu-central-1".into(),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Arc::new(SegmentScanExec::new(
|
Ok(Arc::new(SegmentScanExec::new(
|
||||||
|
@ -245,10 +244,8 @@ impl LogicalPlanNode for SegmentScan {
|
||||||
string_predicate,
|
string_predicate,
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct StringPredicate {
|
struct StringPredicate {
|
||||||
col_name: String,
|
col_name: String,
|
||||||
|
@ -262,13 +259,16 @@ pub struct SegmentScanExec {
|
||||||
|
|
||||||
// Specialized predicates to apply
|
// Specialized predicates to apply
|
||||||
time_range: (i64, i64),
|
time_range: (i64, i64),
|
||||||
string_predicate: StringPredicate
|
string_predicate: StringPredicate,
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentScanExec {
|
impl SegmentScanExec {
|
||||||
fn new(store: Arc<Store>, time_range: (i64, i64), string_predicate: StringPredicate) -> Self {
|
fn new(store: Arc<Store>, time_range: (i64, i64), string_predicate: StringPredicate) -> Self {
|
||||||
SegmentScanExec { store , time_range, string_predicate }
|
SegmentScanExec {
|
||||||
|
store,
|
||||||
|
time_range,
|
||||||
|
string_predicate,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,7 +279,7 @@ impl ExecutionPlan for SegmentScanExec {
|
||||||
|
|
||||||
fn partitions(&self) -> datafusion::error::Result<Vec<Arc<dyn Partition>>> {
|
fn partitions(&self) -> datafusion::error::Result<Vec<Arc<dyn Partition>>> {
|
||||||
let store = self.store.clone();
|
let store = self.store.clone();
|
||||||
Ok(vec![Arc::new(SegmentPartition{
|
Ok(vec![Arc::new(SegmentPartition {
|
||||||
store,
|
store,
|
||||||
time_range: self.time_range,
|
time_range: self.time_range,
|
||||||
string_predicate: self.string_predicate.clone(),
|
string_predicate: self.string_predicate.clone(),
|
||||||
|
@ -292,11 +292,12 @@ struct SegmentPartition {
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
time_range: (i64, i64),
|
time_range: (i64, i64),
|
||||||
string_predicate: StringPredicate,
|
string_predicate: StringPredicate,
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Partition for SegmentPartition {
|
impl Partition for SegmentPartition {
|
||||||
fn execute(&self) -> datafusion::error::Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
|
fn execute(
|
||||||
|
&self,
|
||||||
|
) -> datafusion::error::Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
|
||||||
let combined_results: Vec<Arc<RecordBatch>> = vec![];
|
let combined_results: Vec<Arc<RecordBatch>> = vec![];
|
||||||
|
|
||||||
let segments = self.store.segments();
|
let segments = self.store.segments();
|
||||||
|
@ -305,7 +306,6 @@ impl Partition for SegmentPartition {
|
||||||
let col_name = &self.string_predicate.col_name;
|
let col_name = &self.string_predicate.col_name;
|
||||||
let scalar = column::Scalar::String(&self.string_predicate.value);
|
let scalar = column::Scalar::String(&self.string_predicate.value);
|
||||||
|
|
||||||
|
|
||||||
// Here
|
// Here
|
||||||
let _columns = segments.read_filter_eq(
|
let _columns = segments.read_filter_eq(
|
||||||
self.time_range,
|
self.time_range,
|
||||||
|
@ -322,12 +322,9 @@ impl Partition for SegmentPartition {
|
||||||
// If we were implementing this for real, we would not convert
|
// If we were implementing this for real, we would not convert
|
||||||
// `columns` into RecordBatches and feed them back out
|
// `columns` into RecordBatches and feed them back out
|
||||||
|
|
||||||
|
|
||||||
Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
|
Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
|
||||||
self.store.schema().clone(),
|
self.store.schema().clone(),
|
||||||
combined_results,
|
combined_results,
|
||||||
))))
|
))))
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue