chore: Update datafusion pin (#6442)

* chore: Update datafusion pin

* refactor: Update iox_query for new apis

* chore: Update some more apis

* chore: Run cargo hakari tasks

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Andrew Lamb 2022-12-19 15:02:42 -05:00 committed by GitHub
parent 200f4fe9bd
commit d0d5906476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 76 additions and 105 deletions

16
Cargo.lock generated
View File

@ -1295,7 +1295,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1340,7 +1340,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"arrow",
"chrono",
@ -1352,7 +1352,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1364,7 +1364,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"arrow",
"async-trait",
@ -1379,7 +1379,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1408,7 +1408,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"arrow",
"chrono",
@ -1425,7 +1425,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"arrow",
"datafusion-common",
@ -1436,7 +1436,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
dependencies = [
"arrow-schema",
"datafusion-common",

View File

@ -115,8 +115,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "29.0.0" }
arrow-flight = { version = "29.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="40e6a67604514124332bf132020a026bbe5b5903", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="40e6a67604514124332bf132020a026bbe5b5903" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857" }
hashbrown = { version = "0.13.1" }
parquet = { version = "29.0.0" }

View File

@ -23,6 +23,8 @@ use datafusion::arrow::datatypes::DataType;
use datafusion::common::DataFusionError;
use datafusion::datasource::MemTable;
use datafusion::execution::context::TaskContext;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use datafusion::logical_expr::expr::Sort;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
@ -49,11 +51,11 @@ pub trait AsExpr {
/// creates a DataFusion SortExpr
fn as_sort_expr(&self) -> Expr {
Expr::Sort {
Expr::Sort(Sort {
expr: Box::new(self.as_expr()),
asc: true, // Sort ASCENDING
nulls_first: true,
}
})
}
}
@ -254,18 +256,17 @@ pub fn stream_from_batches(
return Box::pin(EmptyRecordBatchStream::new(schema));
}
// TODO should track this memory properly
let dummy_pool = Arc::new(UnboundedMemoryPool::default()) as _;
let dummy_metrics = ExecutionPlanMetricsSet::new();
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0);
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, &dummy_pool, 0);
let stream = SizedRecordBatchStream::new(batches[0].schema(), batches, mem_metrics);
Box::pin(stream)
}
/// Create a SendableRecordBatchStream that sends back no RecordBatches with a specific schema
pub fn stream_from_schema(schema: SchemaRef) -> SendableRecordBatchStream {
let dummy_metrics = ExecutionPlanMetricsSet::new();
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0);
let stream = SizedRecordBatchStream::new(schema, vec![], mem_metrics);
Box::pin(stream)
stream_from_batches(schema, vec![])
}
/// Execute the [ExecutionPlan] with a default [SessionContext] and

View File

@ -32,8 +32,8 @@ use datafusion::{
catalog::catalog::CatalogProvider,
execution::{
context::{QueryPlanner, SessionState, TaskContext},
memory_pool::MemoryPool,
runtime_env::RuntimeEnv,
MemoryManager,
},
logical_expr::{LogicalPlan, UserDefinedLogicalNode},
physical_plan::{
@ -415,7 +415,7 @@ impl IOxSessionContext {
pub async fn to_series_and_groups(
&self,
series_set_plans: SeriesSetPlans,
memory_manager: Arc<MemoryManager>,
memory_pool: Arc<dyn MemoryPool>,
) -> Result<impl Stream<Item = Result<Either>>> {
let SeriesSetPlans {
mut plans,
@ -476,7 +476,7 @@ impl IOxSessionContext {
// If we have group columns, sort the results, and create the
// appropriate groups
if let Some(group_columns) = group_columns {
let grouper = GroupGenerator::new(group_columns, memory_manager);
let grouper = GroupGenerator::new(group_columns, memory_pool);
Ok(grouper.group(data).await?.boxed())
} else {
Ok(data.map_ok(|series| series.into()).boxed())

View File

@ -11,14 +11,11 @@ use arrow::{
};
use datafusion::{
error::DataFusionError,
execution::{
memory_manager::proxy::{MemoryConsumerProxy, VecAllocExt},
MemoryConsumerId, MemoryManager,
},
execution::memory_pool::{proxy::VecAllocExt, MemoryConsumer, MemoryPool, MemoryReservation},
physical_plan::SendableRecordBatchStream,
};
use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt};
use futures::{ready, Stream, StreamExt};
use predicate::rpc_predicate::{GROUP_KEY_SPECIAL_START, GROUP_KEY_SPECIAL_STOP};
use snafu::{OptionExt, Snafu};
use std::{
@ -502,27 +499,27 @@ impl Stream for SeriesSetConverterStream {
#[derive(Debug)]
pub struct GroupGenerator {
group_columns: Vec<Arc<str>>,
memory_manager: Arc<MemoryManager>,
memory_pool: Arc<dyn MemoryPool>,
collector_buffered_size_max: usize,
}
impl GroupGenerator {
pub fn new(group_columns: Vec<Arc<str>>, memory_manager: Arc<MemoryManager>) -> Self {
pub fn new(group_columns: Vec<Arc<str>>, memory_pool: Arc<dyn MemoryPool>) -> Self {
Self::new_with_buffered_size_max(
group_columns,
memory_manager,
memory_pool,
Collector::<()>::DEFAULT_ALLOCATION_BUFFER_SIZE,
)
}
fn new_with_buffered_size_max(
group_columns: Vec<Arc<str>>,
memory_manager: Arc<MemoryManager>,
memory_pool: Arc<dyn MemoryPool>,
collector_buffered_size_max: usize,
) -> Self {
Self {
group_columns,
memory_manager,
memory_pool,
collector_buffered_size_max,
}
}
@ -541,7 +538,7 @@ impl GroupGenerator {
let mut series = Collector::new(
series,
self.group_columns,
self.memory_manager,
self.memory_pool,
self.collector_buffered_size_max,
)
.await?;
@ -705,7 +702,7 @@ impl SortableSeries {
}
/// [`Future`] that collects [`Series`] objects into a [`SortableSeries`] vector while registering/checking memory
/// allocations with a [`MemoryManager`].
/// allocations with a [`MemoryPool`].
///
/// This avoids unbounded memory growth when merging multiple `Series` in memory
struct Collector<S> {
@ -729,27 +726,20 @@ struct Collector<S> {
/// Buffered but not-yet-registered allocated size.
///
/// We use an additional buffer here because in contrast to the normal DataFusion processing, the input stream is
/// NOT batched and we want to avoid costly memory allocations checks with the [`MemoryManager`] for every single element.
/// NOT batched and we want to avoid costly memory allocations checks with the [`MemoryPool`] for every single element.
buffered_size: usize,
/// Maximum [buffered size](Self::buffered_size).
/// Maximum [buffered size](Self::buffered_size). Decreasing this
/// value causes allocations to be reported to the [`MemoryPool`]
/// more frequently.
buffered_size_max: usize,
/// Our memory consumer.
///
/// This is optional because for [`MemoryConsumerProxy::alloc`], we need to move this into
/// [`mem_proxy_alloc_fut`](Self::mem_proxy_alloc_fut) to avoid self-borrowing.
mem_proxy: Option<MemoryConsumerProxy>,
/// A potential running [`MemoryConsumerProxy::alloc`].
///
/// This owns [`mem_proxy`](Self::mem_proxy) to avoid self-borrowing.
mem_proxy_alloc_fut:
Option<BoxFuture<'static, (MemoryConsumerProxy, Result<(), DataFusionError>)>>,
/// Our memory reservation.
mem_reservation: MemoryReservation,
}
impl<S> Collector<S> {
/// Maximum [buffered size](Self::buffered_size).
/// Default maximum [buffered size](Self::buffered_size) before updating [`MemoryPool`] reservation
const DEFAULT_ALLOCATION_BUFFER_SIZE: usize = 1024 * 1024;
}
@ -760,11 +750,11 @@ where
fn new(
inner: S,
group_columns: Vec<Arc<str>>,
memory_manager: Arc<MemoryManager>,
memory_pool: Arc<dyn MemoryPool>,
buffered_size_max: usize,
) -> Self {
let mem_proxy =
MemoryConsumerProxy::new("Collector stream", MemoryConsumerId::new(0), memory_manager);
let mem_reservation = MemoryConsumer::new("SeriesSet Collector").register(&memory_pool);
Self {
inner_done: false,
outer_done: false,
@ -773,27 +763,16 @@ where
collected: Vec::with_capacity(0),
buffered_size: 0,
buffered_size_max,
mem_proxy: Some(mem_proxy),
mem_proxy_alloc_fut: None,
mem_reservation,
}
}
/// Start a [`MemoryConsumerProxy::alloc`] future.
///
/// # Panic
/// Panics if a future is already running.
fn alloc(&mut self) {
assert!(self.mem_proxy_alloc_fut.is_none());
let mut mem_proxy =
std::mem::take(&mut self.mem_proxy).expect("no mem proxy future running");
/// Registers all `self.buffered_size` with the MemoryPool,
/// resetting self.buffered_size to zero. Returns an error if new
/// memory can not be allocated from the pool.
fn alloc(&mut self) -> Result<(), DataFusionError> {
let bytes = std::mem::take(&mut self.buffered_size);
self.mem_proxy_alloc_fut = Some(
async move {
let res = mem_proxy.alloc(bytes).await;
(mem_proxy, res)
}
.boxed(),
);
self.mem_reservation.try_grow(bytes)
}
}
@ -808,20 +787,6 @@ where
loop {
assert!(!this.outer_done);
// Drive `MemoryConsumerProxy::alloc` to completion.
if let Some(fut) = this.mem_proxy_alloc_fut.as_mut() {
let (mem_proxy, res) = ready!(fut.poll_unpin(cx));
assert!(this.mem_proxy.is_none());
this.mem_proxy = Some(mem_proxy);
this.mem_proxy_alloc_fut = None;
if let Err(e) = res {
// poison this future
this.outer_done = true;
return Poll::Ready(Err(e));
}
}
// if the underlying stream is drained and the allocation future is ready (see above), we can finalize this future
if this.inner_done {
this.outer_done = true;
@ -838,7 +803,9 @@ where
// should we clear our allocation buffer?
if this.buffered_size > this.buffered_size_max {
this.alloc();
if let Err(e) = this.alloc() {
return Poll::Ready(Err(e));
}
continue;
}
}
@ -857,7 +824,9 @@ where
// underlying stream drained. now register the final allocation and then we're done
this.inner_done = true;
if this.buffered_size > 0 {
this.alloc();
if let Err(e) = this.alloc() {
return Poll::Ready(Err(e));
}
}
continue;
}
@ -878,7 +847,7 @@ mod tests {
};
use arrow_util::assert_batches_eq;
use assert_matches::assert_matches;
use datafusion::execution::memory_manager::MemoryManagerConfig;
use datafusion::execution::memory_pool::GreedyMemoryPool;
use datafusion_util::{stream_from_batch, stream_from_batches, stream_from_schema};
use futures::TryStreamExt;
use itertools::Itertools;
@ -1638,9 +1607,9 @@ mod tests {
#[tokio::test]
async fn test_group_generator_mem_limit() {
let memory_manager =
MemoryManager::new(MemoryManagerConfig::try_new_limit(1, 1.0).unwrap());
let ggen = GroupGenerator::new(vec![Arc::from("g")], memory_manager);
let memory_pool = Arc::new(GreedyMemoryPool::new(1)) as _;
let ggen = GroupGenerator::new(vec![Arc::from("g")], memory_pool);
let input = futures::stream::iter([Ok(Series {
tags: vec![Tag {
key: Arc::from("g"),
@ -1660,11 +1629,9 @@ mod tests {
#[tokio::test]
async fn test_group_generator_no_mem_limit() {
let memory_manager =
MemoryManager::new(MemoryManagerConfig::try_new_limit(usize::MAX, 1.0).unwrap());
let memory_pool = Arc::new(GreedyMemoryPool::new(usize::MAX)) as _;
// use a generator w/ a low buffered allocation to force multiple `alloc` calls
let ggen =
GroupGenerator::new_with_buffered_size_max(vec![Arc::from("g")], memory_manager, 1);
let ggen = GroupGenerator::new_with_buffered_size_max(vec![Arc::from("g")], memory_pool, 1);
let input = futures::stream::iter([
Ok(Series {
tags: vec![Tag {

View File

@ -1143,6 +1143,8 @@ impl InfluxRpcPlanner {
.with_chunks(chunks)
.build()?;
let schema = scan_and_filter.schema();
let tags_and_timestamp: Vec<_> = scan_and_filter
.schema()
.tags_iter()
@ -1159,7 +1161,6 @@ impl InfluxRpcPlanner {
.context(BuildingPlanSnafu)?;
// Select away anything that isn't in the influx data model
let schema = scan_and_filter.schema();
let tags_fields_and_timestamps: Vec<Expr> = schema
.tags_iter()
.map(|field| field.name().as_expr())

View File

@ -25,17 +25,17 @@ impl InfluxRegexToDataFusionRegex {
}
impl OptimizerRule for InfluxRegexToDataFusionRegex {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan, DataFusionError> {
optimize(plan)
}
fn name(&self) -> &str {
"influx_regex_to_datafusion_regex"
}
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> datafusion::error::Result<Option<LogicalPlan>> {
optimize(plan).map(Some)
}
}
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan, DataFusionError> {

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use datafusion::optimizer::{optimizer::Optimizer, OptimizerConfig};
use datafusion::optimizer::optimizer::Optimizer;
use self::influx_regex_to_datafusion_regex::InfluxRegexToDataFusionRegex;
@ -10,7 +10,7 @@ mod influx_regex_to_datafusion_regex;
///
/// This is mostly the default optimizer that DataFusion provides but with some additional passes.
pub fn iox_optimizer() -> Optimizer {
let mut opt = Optimizer::new(&OptimizerConfig::default());
let mut opt = Optimizer::new();
opt.rules
.push(Arc::new(InfluxRegexToDataFusionRegex::new()));
opt

View File

@ -100,7 +100,9 @@ fn is_comparison(op: Operator) -> bool {
Operator::And => true,
Operator::Or => true,
Operator::Like => true,
Operator::ILike => true,
Operator::NotLike => true,
Operator::NotILike => true,
Operator::IsDistinctFrom => true,
Operator::IsNotDistinctFrom => true,
Operator::RegexMatch => true,

View File

@ -26,7 +26,7 @@ pub async fn run_series_set_plan_maybe_error(
use futures::TryStreamExt;
ctx.to_series_and_groups(plans, Arc::clone(&ctx.inner().runtime_env().memory_manager))
ctx.to_series_and_groups(plans, Arc::clone(&ctx.inner().runtime_env().memory_pool))
.await?
.map_ok(|series_or_group| series_or_group.to_string())
.try_collect()

View File

@ -1350,7 +1350,7 @@ where
let series_or_groups = ctx
.to_series_and_groups(
series_plan,
Arc::clone(&ctx.inner().runtime_env().memory_manager),
Arc::clone(&ctx.inner().runtime_env().memory_pool),
)
.await
.context(FilteringSeriesSnafu {
@ -1418,7 +1418,7 @@ where
let series_or_groups = ctx
.to_series_and_groups(
grouped_series_set_plan,
Arc::clone(&ctx.inner().runtime_env().memory_manager),
Arc::clone(&ctx.inner().runtime_env().memory_pool),
)
.await
.context(GroupingSeriesSnafu {

View File

@ -28,7 +28,7 @@ bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["std"] }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "40e6a67604514124332bf132020a026bbe5b5903", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "891a800ebb170fed018e53846eb569f3e0638857", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] }
either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }