From d0d59064762514c0ec32020858ece7dcc15714c3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 19 Dec 2022 15:02:42 -0500 Subject: [PATCH] chore: Update datafusion pin (#6442) * chore: Update datafusion pin * refactor: Update iox_query for new apis * chore: Update some more apis * chore: Run cargo hakari tasks Co-authored-by: CircleCI[bot] --- Cargo.lock | 16 +-- Cargo.toml | 4 +- datafusion_util/src/lib.rs | 15 +-- iox_query/src/exec/context.rs | 6 +- iox_query/src/exec/seriesset/converter.rs | 107 ++++++------------ iox_query/src/frontend/influxrpc.rs | 3 +- .../influx_regex_to_datafusion_regex.rs | 16 +-- iox_query/src/logical_optimizer/mod.rs | 4 +- predicate/src/rpc_predicate/rewrite.rs | 2 + query_tests/src/influxrpc/util.rs | 2 +- service_grpc_influxrpc/src/service.rs | 4 +- workspace-hack/Cargo.toml | 2 +- 12 files changed, 76 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 028e5895fc..5d2073910b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1295,7 +1295,7 @@ dependencies = [ [[package]] name = "datafusion" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "ahash 0.8.2", "arrow", @@ -1340,7 +1340,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "arrow", "chrono", @@ -1352,7 +1352,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "ahash 0.8.2", "arrow", @@ -1364,7 +1364,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "arrow", "async-trait", @@ -1379,7 +1379,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "ahash 0.8.2", "arrow", @@ -1408,7 +1408,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "arrow", "chrono", @@ -1425,7 +1425,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "arrow", "datafusion-common", @@ -1436,7 +1436,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=40e6a67604514124332bf132020a026bbe5b5903#40e6a67604514124332bf132020a026bbe5b5903" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" dependencies = [ "arrow-schema", "datafusion-common", diff --git a/Cargo.toml b/Cargo.toml index 5f39a98196..84fd6c9749 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,8 +115,8 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "29.0.0" } arrow-flight = { version = "29.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="40e6a67604514124332bf132020a026bbe5b5903", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="40e6a67604514124332bf132020a026bbe5b5903" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857" } hashbrown = { version = "0.13.1" } parquet = { version = "29.0.0" } diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index 774593cacf..f89bb4bcc5 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -23,6 +23,8 @@ use datafusion::arrow::datatypes::DataType; use datafusion::common::DataFusionError; use datafusion::datasource::MemTable; use datafusion::execution::context::TaskContext; +use datafusion::execution::memory_pool::UnboundedMemoryPool; +use datafusion::logical_expr::expr::Sort; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; @@ -49,11 +51,11 @@ pub trait AsExpr { /// creates a DataFusion SortExpr fn as_sort_expr(&self) -> Expr { - Expr::Sort { + Expr::Sort(Sort { expr: Box::new(self.as_expr()), asc: true, // Sort ASCENDING nulls_first: true, - } + }) } } @@ -254,18 +256,17 @@ pub fn stream_from_batches( return Box::pin(EmptyRecordBatchStream::new(schema)); } + // TODO should track this memory properly + let dummy_pool = Arc::new(UnboundedMemoryPool::default()) as _; let dummy_metrics = ExecutionPlanMetricsSet::new(); - let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0); + let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, &dummy_pool, 0); let stream = SizedRecordBatchStream::new(batches[0].schema(), batches, mem_metrics); Box::pin(stream) } /// Create a SendableRecordBatchStream that sends back no RecordBatches with a specific schema pub fn stream_from_schema(schema: SchemaRef) -> SendableRecordBatchStream { - let dummy_metrics = ExecutionPlanMetricsSet::new(); - let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0); - let stream = SizedRecordBatchStream::new(schema, vec![], mem_metrics); - Box::pin(stream) + stream_from_batches(schema, vec![]) } /// Execute the [ExecutionPlan] with a default [SessionContext] and diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index e4c1592199..bb1e0016bc 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -32,8 +32,8 @@ use datafusion::{ catalog::catalog::CatalogProvider, execution::{ context::{QueryPlanner, SessionState, TaskContext}, + memory_pool::MemoryPool, runtime_env::RuntimeEnv, - MemoryManager, }, logical_expr::{LogicalPlan, UserDefinedLogicalNode}, physical_plan::{ @@ -415,7 +415,7 @@ impl IOxSessionContext { pub async fn to_series_and_groups( &self, series_set_plans: SeriesSetPlans, - memory_manager: Arc, + memory_pool: Arc, ) -> Result>> { let SeriesSetPlans { mut plans, @@ -476,7 +476,7 @@ impl IOxSessionContext { // If we have group columns, sort the results, and create the // appropriate groups if let Some(group_columns) = group_columns { - let grouper = GroupGenerator::new(group_columns, memory_manager); + let grouper = GroupGenerator::new(group_columns, memory_pool); Ok(grouper.group(data).await?.boxed()) } else { Ok(data.map_ok(|series| series.into()).boxed()) diff --git a/iox_query/src/exec/seriesset/converter.rs b/iox_query/src/exec/seriesset/converter.rs index d63546e1b6..4340487bf7 100644 --- a/iox_query/src/exec/seriesset/converter.rs +++ b/iox_query/src/exec/seriesset/converter.rs @@ -11,14 +11,11 @@ use arrow::{ }; use datafusion::{ error::DataFusionError, - execution::{ - memory_manager::proxy::{MemoryConsumerProxy, VecAllocExt}, - MemoryConsumerId, MemoryManager, - }, + execution::memory_pool::{proxy::VecAllocExt, MemoryConsumer, MemoryPool, MemoryReservation}, physical_plan::SendableRecordBatchStream, }; -use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt}; +use futures::{ready, Stream, StreamExt}; use predicate::rpc_predicate::{GROUP_KEY_SPECIAL_START, GROUP_KEY_SPECIAL_STOP}; use snafu::{OptionExt, Snafu}; use std::{ @@ -502,27 +499,27 @@ impl Stream for SeriesSetConverterStream { #[derive(Debug)] pub struct GroupGenerator { group_columns: Vec>, - memory_manager: Arc, + memory_pool: Arc, collector_buffered_size_max: usize, } impl GroupGenerator { - pub fn new(group_columns: Vec>, memory_manager: Arc) -> Self { + pub fn new(group_columns: Vec>, memory_pool: Arc) -> Self { Self::new_with_buffered_size_max( group_columns, - memory_manager, + memory_pool, Collector::<()>::DEFAULT_ALLOCATION_BUFFER_SIZE, ) } fn new_with_buffered_size_max( group_columns: Vec>, - memory_manager: Arc, + memory_pool: Arc, collector_buffered_size_max: usize, ) -> Self { Self { group_columns, - memory_manager, + memory_pool, collector_buffered_size_max, } } @@ -541,7 +538,7 @@ impl GroupGenerator { let mut series = Collector::new( series, self.group_columns, - self.memory_manager, + self.memory_pool, self.collector_buffered_size_max, ) .await?; @@ -705,7 +702,7 @@ impl SortableSeries { } /// [`Future`] that collects [`Series`] objects into a [`SortableSeries`] vector while registering/checking memory -/// allocations with a [`MemoryManager`]. +/// allocations with a [`MemoryPool`]. /// /// This avoids unbounded memory growth when merging multiple `Series` in memory struct Collector { @@ -729,27 +726,20 @@ struct Collector { /// Buffered but not-yet-registered allocated size. /// /// We use an additional buffer here because in contrast to the normal DataFusion processing, the input stream is - /// NOT batched and we want to avoid costly memory allocations checks with the [`MemoryManager`] for every single element. + /// NOT batched and we want to avoid costly memory allocations checks with the [`MemoryPool`] for every single element. buffered_size: usize, - /// Maximum [buffered size](Self::buffered_size). + /// Maximum [buffered size](Self::buffered_size). Decreasing this + /// value causes allocations to be reported to the [`MemoryPool`] + /// more frequently. buffered_size_max: usize, - /// Our memory consumer. - /// - /// This is optional because for [`MemoryConsumerProxy::alloc`], we need to move this into - /// [`mem_proxy_alloc_fut`](Self::mem_proxy_alloc_fut) to avoid self-borrowing. - mem_proxy: Option, - - /// A potential running [`MemoryConsumerProxy::alloc`]. - /// - /// This owns [`mem_proxy`](Self::mem_proxy) to avoid self-borrowing. - mem_proxy_alloc_fut: - Option)>>, + /// Our memory reservation. + mem_reservation: MemoryReservation, } impl Collector { - /// Maximum [buffered size](Self::buffered_size). + /// Default maximum [buffered size](Self::buffered_size) before updating [`MemoryPool`] reservation const DEFAULT_ALLOCATION_BUFFER_SIZE: usize = 1024 * 1024; } @@ -760,11 +750,11 @@ where fn new( inner: S, group_columns: Vec>, - memory_manager: Arc, + memory_pool: Arc, buffered_size_max: usize, ) -> Self { - let mem_proxy = - MemoryConsumerProxy::new("Collector stream", MemoryConsumerId::new(0), memory_manager); + let mem_reservation = MemoryConsumer::new("SeriesSet Collector").register(&memory_pool); + Self { inner_done: false, outer_done: false, @@ -773,27 +763,16 @@ where collected: Vec::with_capacity(0), buffered_size: 0, buffered_size_max, - mem_proxy: Some(mem_proxy), - mem_proxy_alloc_fut: None, + mem_reservation, } } - /// Start a [`MemoryConsumerProxy::alloc`] future. - /// - /// # Panic - /// Panics if a future is already running. - fn alloc(&mut self) { - assert!(self.mem_proxy_alloc_fut.is_none()); - let mut mem_proxy = - std::mem::take(&mut self.mem_proxy).expect("no mem proxy future running"); + /// Registers all `self.buffered_size` with the MemoryPool, + /// resetting self.buffered_size to zero. Returns an error if new + /// memory can not be allocated from the pool. + fn alloc(&mut self) -> Result<(), DataFusionError> { let bytes = std::mem::take(&mut self.buffered_size); - self.mem_proxy_alloc_fut = Some( - async move { - let res = mem_proxy.alloc(bytes).await; - (mem_proxy, res) - } - .boxed(), - ); + self.mem_reservation.try_grow(bytes) } } @@ -808,20 +787,6 @@ where loop { assert!(!this.outer_done); - - // Drive `MemoryConsumerProxy::alloc` to completion. - if let Some(fut) = this.mem_proxy_alloc_fut.as_mut() { - let (mem_proxy, res) = ready!(fut.poll_unpin(cx)); - assert!(this.mem_proxy.is_none()); - this.mem_proxy = Some(mem_proxy); - this.mem_proxy_alloc_fut = None; - if let Err(e) = res { - // poison this future - this.outer_done = true; - return Poll::Ready(Err(e)); - } - } - // if the underlying stream is drained and the allocation future is ready (see above), we can finalize this future if this.inner_done { this.outer_done = true; @@ -838,7 +803,9 @@ where // should we clear our allocation buffer? if this.buffered_size > this.buffered_size_max { - this.alloc(); + if let Err(e) = this.alloc() { + return Poll::Ready(Err(e)); + } continue; } } @@ -857,7 +824,9 @@ where // underlying stream drained. now register the final allocation and then we're done this.inner_done = true; if this.buffered_size > 0 { - this.alloc(); + if let Err(e) = this.alloc() { + return Poll::Ready(Err(e)); + } } continue; } @@ -878,7 +847,7 @@ mod tests { }; use arrow_util::assert_batches_eq; use assert_matches::assert_matches; - use datafusion::execution::memory_manager::MemoryManagerConfig; + use datafusion::execution::memory_pool::GreedyMemoryPool; use datafusion_util::{stream_from_batch, stream_from_batches, stream_from_schema}; use futures::TryStreamExt; use itertools::Itertools; @@ -1638,9 +1607,9 @@ mod tests { #[tokio::test] async fn test_group_generator_mem_limit() { - let memory_manager = - MemoryManager::new(MemoryManagerConfig::try_new_limit(1, 1.0).unwrap()); - let ggen = GroupGenerator::new(vec![Arc::from("g")], memory_manager); + let memory_pool = Arc::new(GreedyMemoryPool::new(1)) as _; + + let ggen = GroupGenerator::new(vec![Arc::from("g")], memory_pool); let input = futures::stream::iter([Ok(Series { tags: vec![Tag { key: Arc::from("g"), @@ -1660,11 +1629,9 @@ mod tests { #[tokio::test] async fn test_group_generator_no_mem_limit() { - let memory_manager = - MemoryManager::new(MemoryManagerConfig::try_new_limit(usize::MAX, 1.0).unwrap()); + let memory_pool = Arc::new(GreedyMemoryPool::new(usize::MAX)) as _; // use a generator w/ a low buffered allocation to force multiple `alloc` calls - let ggen = - GroupGenerator::new_with_buffered_size_max(vec![Arc::from("g")], memory_manager, 1); + let ggen = GroupGenerator::new_with_buffered_size_max(vec![Arc::from("g")], memory_pool, 1); let input = futures::stream::iter([ Ok(Series { tags: vec![Tag { diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 6bbd697608..8596022b82 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -1143,6 +1143,8 @@ impl InfluxRpcPlanner { .with_chunks(chunks) .build()?; + let schema = scan_and_filter.schema(); + let tags_and_timestamp: Vec<_> = scan_and_filter .schema() .tags_iter() @@ -1159,7 +1161,6 @@ impl InfluxRpcPlanner { .context(BuildingPlanSnafu)?; // Select away anything that isn't in the influx data model - let schema = scan_and_filter.schema(); let tags_fields_and_timestamps: Vec = schema .tags_iter() .map(|field| field.name().as_expr()) diff --git a/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs b/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs index 3d743dce25..0cc586d843 100644 --- a/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs +++ b/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs @@ -25,17 +25,17 @@ impl InfluxRegexToDataFusionRegex { } impl OptimizerRule for InfluxRegexToDataFusionRegex { - fn optimize( - &self, - plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, - ) -> Result { - optimize(plan) - } - fn name(&self) -> &str { "influx_regex_to_datafusion_regex" } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> datafusion::error::Result> { + optimize(plan).map(Some) + } } fn optimize(plan: &LogicalPlan) -> Result { diff --git a/iox_query/src/logical_optimizer/mod.rs b/iox_query/src/logical_optimizer/mod.rs index fd7e36030d..fba32fa298 100644 --- a/iox_query/src/logical_optimizer/mod.rs +++ b/iox_query/src/logical_optimizer/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use datafusion::optimizer::{optimizer::Optimizer, OptimizerConfig}; +use datafusion::optimizer::optimizer::Optimizer; use self::influx_regex_to_datafusion_regex::InfluxRegexToDataFusionRegex; @@ -10,7 +10,7 @@ mod influx_regex_to_datafusion_regex; /// /// This is mostly the default optimizer that DataFusion provides but with some additional passes. pub fn iox_optimizer() -> Optimizer { - let mut opt = Optimizer::new(&OptimizerConfig::default()); + let mut opt = Optimizer::new(); opt.rules .push(Arc::new(InfluxRegexToDataFusionRegex::new())); opt diff --git a/predicate/src/rpc_predicate/rewrite.rs b/predicate/src/rpc_predicate/rewrite.rs index a30b287c17..da4814f718 100644 --- a/predicate/src/rpc_predicate/rewrite.rs +++ b/predicate/src/rpc_predicate/rewrite.rs @@ -100,7 +100,9 @@ fn is_comparison(op: Operator) -> bool { Operator::And => true, Operator::Or => true, Operator::Like => true, + Operator::ILike => true, Operator::NotLike => true, + Operator::NotILike => true, Operator::IsDistinctFrom => true, Operator::IsNotDistinctFrom => true, Operator::RegexMatch => true, diff --git a/query_tests/src/influxrpc/util.rs b/query_tests/src/influxrpc/util.rs index 965cf1f6a0..b6589973f8 100644 --- a/query_tests/src/influxrpc/util.rs +++ b/query_tests/src/influxrpc/util.rs @@ -26,7 +26,7 @@ pub async fn run_series_set_plan_maybe_error( use futures::TryStreamExt; - ctx.to_series_and_groups(plans, Arc::clone(&ctx.inner().runtime_env().memory_manager)) + ctx.to_series_and_groups(plans, Arc::clone(&ctx.inner().runtime_env().memory_pool)) .await? .map_ok(|series_or_group| series_or_group.to_string()) .try_collect() diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index f86cf7abd6..9f2df735fe 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -1350,7 +1350,7 @@ where let series_or_groups = ctx .to_series_and_groups( series_plan, - Arc::clone(&ctx.inner().runtime_env().memory_manager), + Arc::clone(&ctx.inner().runtime_env().memory_pool), ) .await .context(FilteringSeriesSnafu { @@ -1418,7 +1418,7 @@ where let series_or_groups = ctx .to_series_and_groups( grouped_series_set_plan, - Arc::clone(&ctx.inner().runtime_env().memory_manager), + Arc::clone(&ctx.inner().runtime_env().memory_pool), ) .await .context(GroupingSeriesSnafu { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 3a112c288f..6ec5bc1550 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -28,7 +28,7 @@ bytes = { version = "1", features = ["std"] } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] } crossbeam-utils = { version = "0.8", features = ["std"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "40e6a67604514124332bf132020a026bbe5b5903", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "891a800ebb170fed018e53846eb569f3e0638857", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] } either = { version = "1", features = ["use_std"] } fixedbitset = { version = "0.4", features = ["std"] }