diff --git a/datafusion_util/src/config.rs b/datafusion_util/src/config.rs new file mode 100644 index 0000000000..b98ad9b4df --- /dev/null +++ b/datafusion_util/src/config.rs @@ -0,0 +1,24 @@ +use datafusion::{config::OPT_COALESCE_TARGET_BATCH_SIZE, prelude::SessionConfig}; + +// The default catalog name - this impacts what SQL queries use if not specified +pub const DEFAULT_CATALOG: &str = "public"; +// The default schema name - this impacts what SQL queries use if not specified +pub const DEFAULT_SCHEMA: &str = "iox"; + +/// The maximum number of rows that DataFusion should create in each RecordBatch +pub const BATCH_SIZE: usize = 8 * 1024; + +const COALESCE_BATCH_SIZE: usize = BATCH_SIZE / 2; + +/// Return a SessionConfig object configured for IOx +pub fn iox_session_config() -> SessionConfig { + SessionConfig::new() + .with_batch_size(BATCH_SIZE) + .set_u64( + OPT_COALESCE_TARGET_BATCH_SIZE, + COALESCE_BATCH_SIZE.try_into().unwrap(), + ) + .create_default_catalog_and_schema(true) + .with_information_schema(true) + .with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA) +} diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index 873afc4dcf..774593cacf 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -10,6 +10,7 @@ //! [datafusion_optimizer::utils](https://docs.rs/datafusion-optimizer/13.0.0/datafusion_optimizer/utils/index.html) //! for expression manipulation functions. +pub mod config; pub mod sender; pub mod watch; diff --git a/iox_query/src/exec.rs b/iox_query/src/exec.rs index 9a7cdbc653..a184fb3fc4 100644 --- a/iox_query/src/exec.rs +++ b/iox_query/src/exec.rs @@ -10,7 +10,6 @@ mod schema_pivot; pub mod seriesset; pub(crate) mod split; pub mod stringset; -pub use context::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; use executor::DedicatedExecutor; use object_store::DynObjectStore; use parquet_file::storage::StorageId; diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index e55d6fc4f3..40719bd667 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -27,7 +27,6 @@ use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{ catalog::catalog::CatalogProvider, - config::OPT_COALESCE_TARGET_BATCH_SIZE, execution::{ context::{QueryPlanner, SessionState, TaskContext}, runtime_env::RuntimeEnv, @@ -41,10 +40,10 @@ use datafusion::{ }, prelude::*, }; +use datafusion_util::config::{iox_session_config, DEFAULT_CATALOG}; use executor::DedicatedExecutor; use futures::TryStreamExt; use observability_deps::tracing::debug; -use parquet_file::serialize::ROW_GROUP_WRITE_SIZE; use query_functions::selectors::register_selector_aggregates; use std::{convert::TryInto, fmt, sync::Arc}; use trace::{ @@ -55,11 +54,6 @@ use trace::{ // Reuse DataFusion error and Result types for this module pub use datafusion::error::{DataFusionError as Error, Result}; -// The default catalog name - this impacts what SQL queries use if not specified -pub const DEFAULT_CATALOG: &str = "public"; -// The default schema name - this impacts what SQL queries use if not specified -pub const DEFAULT_SCHEMA: &str = "iox"; - /// This structure implements the DataFusion notion of "query planner" /// and is needed to create plans with the IOx extension nodes. struct IOxQueryPlanner {} @@ -175,26 +169,9 @@ impl fmt::Debug for IOxSessionConfig { } } -const BATCH_SIZE: usize = 8 * 1024; -const COALESCE_BATCH_SIZE: usize = BATCH_SIZE / 2; - -// ensure read and write work well together -// Skip clippy due to . -#[allow(clippy::assertions_on_constants)] -const _: () = assert!(ROW_GROUP_WRITE_SIZE % BATCH_SIZE == 0); - impl IOxSessionConfig { pub(super) fn new(exec: DedicatedExecutor, runtime: Arc) -> Self { - let session_config = SessionConfig::new() - .with_batch_size(BATCH_SIZE) - // TODO add function in SessionCofig - .set_u64( - OPT_COALESCE_TARGET_BATCH_SIZE, - COALESCE_BATCH_SIZE.try_into().unwrap(), - ) - .create_default_catalog_and_schema(true) - .with_information_schema(true) - .with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA); + let session_config = iox_session_config(); Self { exec, diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 0187da9bed..94db6974ae 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -32,7 +32,6 @@ pub mod pruning; pub mod statistics; pub mod util; -pub use exec::context::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; pub use frontend::common::ScanPlanBuilder; pub use query_functions::group_by::{Aggregate, WindowDuration}; diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index 825164c899..b393aff086 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -6,6 +6,7 @@ use std::{io::Write, sync::Arc}; use arrow::error::ArrowError; use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion_util::config::BATCH_SIZE; use futures::{pin_mut, TryStreamExt}; use observability_deps::tracing::{debug, trace, warn}; use parquet::{ @@ -21,6 +22,11 @@ use crate::metadata::{IoxMetadata, METADATA_KEY}; /// Parquet row group write size pub const ROW_GROUP_WRITE_SIZE: usize = 1024 * 1024; +/// ensure read and write work well together +/// Skip clippy due to . +#[allow(clippy::assertions_on_constants)] +const _: () = assert!(ROW_GROUP_WRITE_SIZE % BATCH_SIZE == 0); + /// [`RecordBatch`] to Parquet serialisation errors. /// /// [`RecordBatch`]: arrow::record_batch::RecordBatch diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index bcfc75c2bc..f4f3b7c6b9 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -19,6 +19,7 @@ use datafusion::{ }, prelude::SessionContext, }; +use datafusion_util::config::iox_session_config; use futures::TryStreamExt; use object_store::{DynObjectStore, ObjectMeta}; use observability_deps::tracing::*; @@ -147,7 +148,7 @@ impl ParquetStorage { pub fn test_df_context(&self) -> SessionContext { // set up "fake" DataFusion session let object_store = Arc::clone(&self.object_store); - let session_ctx = SessionContext::new(); + let session_ctx = SessionContext::with_config(iox_session_config()); let task_ctx = Arc::new(TaskContext::from(&session_ctx)); task_ctx .runtime_env() diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index da01bdcd61..d77c565d24 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -13,9 +13,10 @@ use datafusion::{ datasource::TableProvider, error::DataFusionError, }; +use datafusion_util::config::DEFAULT_SCHEMA; use iox_query::{ exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext}, - QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA, + QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, }; use observability_deps::tracing::{debug, trace}; use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};