refactor: do NOT use fake DF context for parquet reading (#5942)

Use the proper top-level DataFusion context and register the object
store there.

Note that we still hide the `ParquetExec` behind an opaque record batch
stream. Fixing that is next on my list.

Helps with #5897.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-10-24 08:20:26 +00:00 committed by GitHub
parent 4ca869fcd9
commit e0062f2d40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 285 additions and 109 deletions

2
Cargo.lock generated
View File

@ -2438,8 +2438,10 @@ dependencies = [
"futures",
"hashbrown",
"itertools",
"object_store",
"observability_deps",
"parking_lot 0.12.1",
"parquet_file",
"predicate",
"query_functions",
"schema",

View File

@ -107,7 +107,6 @@ mod tests {
use arrow_util::assert_batches_sorted_eq;
use backoff::BackoffConfig;
use data_types::{ColumnType, CompactionLevel, ParquetFileId};
use iox_query::exec::Executor;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_time::{SystemProvider, TimeProvider};
use parquet_file::storage::StorageId;
@ -180,7 +179,7 @@ mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
@ -419,7 +418,7 @@ mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
@ -644,7 +643,7 @@ mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
@ -918,7 +917,7 @@ mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
@ -1023,7 +1022,7 @@ mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
@ -1163,7 +1162,7 @@ mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
@ -1409,7 +1408,7 @@ mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,

View File

@ -816,7 +816,7 @@ pub mod tests {
vec![shard.id, another_shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
time_provider,
BackoffConfig::default(),
config,

View File

@ -209,7 +209,6 @@ mod tests {
use crate::{compact::Compactor, handler::CompactorConfig};
use backoff::BackoffConfig;
use data_types::CompactionLevel;
use iox_query::exec::Executor;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable};
use parquet_file::storage::{ParquetStorage, StorageId};
use std::sync::Arc;
@ -500,7 +499,7 @@ mod tests {
vec![shard1.shard.id, shard2.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
time_provider,
BackoffConfig::default(),
config,

View File

@ -440,7 +440,6 @@ pub mod tests {
use arrow_util::assert_batches_sorted_eq;
use backoff::BackoffConfig;
use data_types::{ColumnType, CompactionLevel, ParquetFileId};
use iox_query::exec::Executor;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable};
use iox_time::{SystemProvider, TimeProvider};
use std::{
@ -600,7 +599,7 @@ pub mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
time_provider,
BackoffConfig::default(),
config,
@ -928,7 +927,7 @@ pub mod tests {
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
Arc::new(Executor::new(1)),
catalog.exec(),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,

View File

@ -236,7 +236,7 @@ impl QueryChunk for QueryableParquetChunk {
trace!(?selection, "selection");
self.data
.read_filter(predicate, selection)
.read_filter(predicate, selection, ctx.inner())
.context(ReadParquetSnafu)
.map_err(|e| DataFusionError::External(Box::new(e)))
}

View File

@ -3,13 +3,14 @@ use clap_blocks::{
compactor::CompactorOnceConfig,
object_store::{make_object_store, ObjectStoreConfig},
};
use iox_query::exec::Executor;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::{SystemProvider, TimeProvider};
use ioxd_compactor::build_compactor_from_config;
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use parquet_file::storage::{ParquetStorage, StorageId};
use snafu::prelude::*;
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
mod generate;
@ -82,14 +83,22 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&time_provider),
&*metric_registry,
));
let parquet_store = ParquetStorage::new(object_store, StorageId::from("iox"));
let exec = Arc::new(Executor::new(query_exec_thread_count));
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads: query_exec_thread_count,
target_query_partitions: query_exec_thread_count,
object_stores: HashMap::from([(
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
)]),
}));
let time_provider = Arc::new(SystemProvider::new());
let compactor = build_compactor_from_config(
compactor_config,
catalog,
object_store,
parquet_store,
exec,
time_provider,
metric_registry,

View File

@ -12,7 +12,7 @@ use clap_blocks::{
write_buffer::WriteBufferConfig,
};
use data_types::{IngesterMapping, ShardIndex};
use iox_query::exec::Executor;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::{SystemProvider, TimeProvider};
use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
@ -24,7 +24,8 @@ use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs};
use ioxd_router::create_router_server_type;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use std::sync::Arc;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use trace_exporters::TracingConfig;
use trogging::cli::LoggingConfig;
@ -513,7 +514,16 @@ pub async fn command(config: Config) -> Result<()> {
// configured by a command line)
let num_threads = num_cpus::get();
info!(%num_threads, "Creating shared query executor");
let exec = Arc::new(Executor::new(num_threads));
let parquet_store = ParquetStorage::new(Arc::clone(&object_store), StorageId::from("iox"));
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: num_threads,
object_stores: HashMap::from([(
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
)]),
}));
info!("starting router");
let router = create_router_server_type(
@ -544,7 +554,7 @@ pub async fn command(config: Config) -> Result<()> {
&common_state,
Arc::clone(&metrics),
Arc::clone(&catalog),
Arc::clone(&object_store),
parquet_store,
Arc::clone(&exec),
Arc::clone(&time_provider),
compactor_config,

View File

@ -1,10 +1,12 @@
//! Implementation of command line option for running the compactor
use iox_query::exec::Executor;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::{SystemProvider, TimeProvider};
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use observability_deps::tracing::*;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
@ -94,14 +96,23 @@ pub async fn command(config: Config) -> Result<(), Error> {
&*metric_registry,
));
let exec = Arc::new(Executor::new(config.query_exec_thread_count));
let parquet_store = ParquetStorage::new(object_store, StorageId::from("iox"));
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads: config.query_exec_thread_count,
target_query_partitions: config.query_exec_thread_count,
object_stores: HashMap::from([(
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
)]),
}));
let time_provider = Arc::new(SystemProvider::new());
let server_type = create_compactor_server_type(
&common_state,
Arc::clone(&metric_registry),
catalog,
object_store,
parquet_store,
exec,
time_provider,
config.compactor_config,

View File

@ -25,8 +25,10 @@ executor = { path = "../executor"}
futures = "0.3"
hashbrown = "0.12"
itertools = "0.10.5"
object_store = "0.5.1"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }
query_functions = { path = "../query_functions"}
schema = { path = "../schema" }
snafu = "0.7"

View File

@ -12,9 +12,11 @@ pub(crate) mod split;
pub mod stringset;
pub use context::{DEFAULT_CATALOG, DEFAULT_SCHEMA};
use executor::DedicatedExecutor;
use object_store::DynObjectStore;
use parquet_file::storage::StorageId;
use trace::span::{SpanExt, SpanRecorder};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use datafusion::{
self,
@ -40,6 +42,39 @@ pub struct ExecutorConfig {
/// Target parallelism for query execution
pub target_query_partitions: usize,
/// Object stores
pub object_stores: HashMap<StorageId, Arc<DynObjectStore>>,
}
#[derive(Debug)]
pub struct DedicatedExecutors {
/// Executor for running user queries
query_exec: DedicatedExecutor,
/// Executor for running system/reorganization tasks such as
/// compact
reorg_exec: DedicatedExecutor,
/// Number of threads per thread pool
num_threads: usize,
}
impl DedicatedExecutors {
pub fn new(num_threads: usize) -> Self {
let query_exec = DedicatedExecutor::new("IOx Query Executor Thread", num_threads);
let reorg_exec = DedicatedExecutor::new("IOx Reorg Executor Thread", num_threads);
Self {
query_exec,
reorg_exec,
num_threads,
}
}
pub fn num_threads(&self) -> usize {
self.num_threads
}
}
/// Handles executing DataFusion plans, and marshalling the results into rust
@ -49,12 +84,8 @@ pub struct ExecutorConfig {
/// running, based on a policy
#[derive(Debug)]
pub struct Executor {
/// Executor for running user queries
query_exec: DedicatedExecutor,
/// Executor for running system/reorganization tasks such as
/// compact
reorg_exec: DedicatedExecutor,
/// Executors
executors: Arc<DedicatedExecutors>,
/// The default configuration options with which to create contexts
config: ExecutorConfig,
@ -68,6 +99,7 @@ pub struct Executor {
pub enum ExecutorType {
/// Run using the pool for queries
Query,
/// Run using the pool for system / reorganization tasks
Reorg,
}
@ -79,19 +111,39 @@ impl Executor {
Self::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: num_threads,
object_stores: HashMap::default(),
})
}
pub fn new_with_config(config: ExecutorConfig) -> Self {
let query_exec = DedicatedExecutor::new("IOx Query Executor Thread", config.num_threads);
let reorg_exec = DedicatedExecutor::new("IOx Reorg Executor Thread", config.num_threads);
let executors = Arc::new(DedicatedExecutors::new(config.num_threads));
Self::new_with_config_and_executors(config, executors)
}
/// Low-level constructor.
///
/// This is mostly useful if you wanna keep the executors (because they are quiet expensive to create) but need a fresh IOx runtime.
///
/// # Panic
/// Panics if the number of threads in `executors` is different from `config`.
pub fn new_with_config_and_executors(
config: ExecutorConfig,
executors: Arc<DedicatedExecutors>,
) -> Self {
assert_eq!(config.num_threads, executors.num_threads);
let runtime_config = RuntimeConfig::new();
for (id, store) in &config.object_stores {
runtime_config
.object_store_registry
.register_store("iox", id, Arc::clone(store));
}
let runtime = Arc::new(RuntimeEnv::new(runtime_config).expect("creating runtime"));
Self {
query_exec,
reorg_exec,
executors,
config,
runtime,
}
@ -128,15 +180,15 @@ impl Executor {
/// Return the execution pool of the specified type
fn executor(&self, executor_type: ExecutorType) -> &DedicatedExecutor {
match executor_type {
ExecutorType::Query => &self.query_exec,
ExecutorType::Reorg => &self.reorg_exec,
ExecutorType::Query => &self.executors.query_exec,
ExecutorType::Reorg => &self.executors.reorg_exec,
}
}
/// Initializes shutdown.
pub fn shutdown(&self) {
self.query_exec.shutdown();
self.reorg_exec.shutdown();
self.executors.query_exec.shutdown();
self.executors.reorg_exec.shutdown();
}
/// Stops all subsequent task executions, and waits for the worker
@ -146,8 +198,8 @@ impl Executor {
/// executing thread to complete. All other calls to join will
/// complete immediately.
pub async fn join(&self) {
self.query_exec.join().await;
self.reorg_exec.join().await;
self.executors.query_exec.join().await;
self.executors.reorg_exec.join().await;
}
}

View File

@ -15,7 +15,11 @@ use iox_catalog::{
interface::{get_schema_by_id, get_table_schema_by_id, Catalog, PartitionRepo},
mem::MemCatalog,
};
use iox_query::{exec::Executor, provider::RecordBatchDeduplicator, util::arrow_sort_key_exprs};
use iox_query::{
exec::{DedicatedExecutors, Executor, ExecutorConfig},
provider::RecordBatchDeduplicator,
util::arrow_sort_key_exprs,
};
use iox_time::{MockProvider, Time, TimeProvider};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::{memory::InMemory, DynObjectStore};
@ -32,11 +36,12 @@ use schema::{
sort::{adjust_sort_key_columns, compute_sort_key, SortKey},
Schema,
};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;
/// Global executor used by all test catalogs.
static GLOBAL_EXEC: Lazy<Arc<Executor>> = Lazy::new(|| Arc::new(Executor::new(1)));
static GLOBAL_EXEC: Lazy<Arc<DedicatedExecutors>> =
Lazy::new(|| Arc::new(DedicatedExecutors::new(1)));
/// Catalog for tests
#[derive(Debug)]
@ -45,6 +50,7 @@ pub struct TestCatalog {
pub catalog: Arc<dyn Catalog>,
pub metric_registry: Arc<metric::Registry>,
pub object_store: Arc<DynObjectStore>,
pub parquet_store: ParquetStorage,
pub time_provider: Arc<MockProvider>,
pub exec: Arc<Executor>,
}
@ -52,25 +58,39 @@ pub struct TestCatalog {
impl TestCatalog {
/// Initialize the catalog
///
/// All test catalogs use the same [`Executor`]. Use [`with_exec`](Self::with_exec) if you need a special or
/// All test catalogs use the same [`Executor`]. Use [`with_execs`](Self::with_execs) if you need a special or
/// dedicated executor.
pub fn new() -> Arc<Self> {
let exec = Arc::clone(&GLOBAL_EXEC);
Self::with_exec(exec)
Self::with_execs(exec, 1)
}
/// Initialize with given executor.
pub fn with_exec(exec: Arc<Executor>) -> Arc<Self> {
/// Initialize with given executors.
pub fn with_execs(exec: Arc<DedicatedExecutors>, target_query_partitions: usize) -> Arc<Self> {
let metric_registry = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let object_store = Arc::new(InMemory::new());
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store) as _, StorageId::from("iox"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp(0, 0)));
let exec = Arc::new(Executor::new_with_config_and_executors(
ExecutorConfig {
num_threads: exec.num_threads(),
target_query_partitions,
object_stores: HashMap::from([(
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
)]),
},
exec,
));
Arc::new(Self {
metric_registry,
catalog,
object_store,
parquet_store,
time_provider,
exec,
})
@ -353,8 +373,6 @@ impl TestTable {
/// Read the record batches from the specified Parquet File associated with this table.
pub async fn read_parquet_file(&self, file: ParquetFile) -> Vec<RecordBatch> {
let storage = ParquetStorage::new(self.catalog.object_store(), StorageId::from("iox"));
// get schema
let table_catalog_schema = self.catalog_schema().await;
let column_id_lookup = table_catalog_schema.column_id_map();
@ -366,9 +384,17 @@ impl TestTable {
.collect();
let schema = table_schema.select_by_names(&selection).unwrap();
let chunk = ParquetChunk::new(Arc::new(file), Arc::new(schema), storage);
let chunk = ParquetChunk::new(
Arc::new(file),
Arc::new(schema),
self.catalog.parquet_store.clone(),
);
let rx = chunk
.read_filter(&Predicate::default(), Selection::All)
.read_filter(
&Predicate::default(),
Selection::All,
&chunk.store().test_df_context(),
)
.unwrap();
datafusion::physical_plan::common::collect(rx)
.await

View File

@ -18,8 +18,7 @@ use ioxd_common::{
setup_builder,
};
use metric::Registry;
use object_store::DynObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
use parquet_file::storage::ParquetStorage;
use std::{
fmt::{Debug, Display},
sync::Arc,
@ -135,7 +134,7 @@ pub async fn create_compactor_server_type(
common_state: &CommonServerState,
metric_registry: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
compactor_config: CompactorConfig,
@ -143,7 +142,7 @@ pub async fn create_compactor_server_type(
let compactor = build_compactor_from_config(
compactor_config,
catalog,
object_store,
parquet_store,
exec,
time_provider,
Arc::clone(&metric_registry),
@ -161,7 +160,7 @@ pub async fn create_compactor_server_type(
pub async fn build_compactor_from_config(
compactor_config: CompactorConfig,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
metric_registry: Arc<Registry>,
@ -197,8 +196,6 @@ pub async fn build_compactor_from_config(
}
txn.commit().await?;
let parquet_store = ParquetStorage::new(object_store, StorageId::from("iox"));
let CompactorConfig {
max_desired_file_size_bytes,
percentage_max_file_size,

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use clap_blocks::querier::{IngesterAddresses, QuerierConfig};
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_query::exec::{Executor, ExecutorType};
use iox_time::TimeProvider;
use ioxd_common::{
add_service,
@ -169,6 +169,20 @@ pub async fn create_querier_server_type(
&Handle::current(),
));
// register cached object store with the execution context
let parquet_store = catalog_cache.parquet_store();
let existing = args
.exec
.new_context(ExecutorType::Query)
.inner()
.runtime_env()
.register_object_store(
"iox",
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
);
assert!(existing.is_none());
let ingester_connection = match args.ingester_addresses {
IngesterAddresses::None => None,
IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections_by_shard(

View File

@ -3,7 +3,7 @@
use crate::{storage::ParquetStorage, ParquetFilePath};
use data_types::{ParquetFile, TimestampMinMax};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::{physical_plan::SendableRecordBatchStream, prelude::SessionContext};
use predicate::Predicate;
use schema::{selection::Selection, Schema};
use std::{collections::BTreeSet, mem, sync::Arc};
@ -33,6 +33,11 @@ impl ParquetChunk {
}
}
/// Store that contains this file.
pub fn store(&self) -> &ParquetStorage {
&self.store
}
/// Return raw parquet file metadata.
pub fn parquet_file(&self) -> &Arc<ParquetFile> {
&self.parquet_file
@ -77,6 +82,7 @@ impl ParquetChunk {
&self,
predicate: &Predicate,
selection: Selection<'_>,
session_ctx: &SessionContext,
) -> Result<SendableRecordBatchStream, crate::storage::ReadError> {
let path: ParquetFilePath = self.parquet_file.as_ref().into();
self.store.read_filter(
@ -85,6 +91,7 @@ impl ParquetChunk {
Arc::clone(&self.schema.as_arrow()),
&path,
self.file_size_bytes(),
session_ctx,
)
}

View File

@ -102,6 +102,18 @@ impl From<&'static str> for StorageId {
}
}
impl AsRef<str> for StorageId {
fn as_ref(&self) -> &str {
self.0
}
}
impl std::fmt::Display for StorageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an
/// underlying [`ObjectStore`].
///
@ -139,6 +151,19 @@ impl ParquetStorage {
self.id
}
/// Fake DataFusion context for testing that contains this store
pub fn test_df_context(&self) -> SessionContext {
// set up "fake" DataFusion session
let object_store = Arc::clone(&self.object_store);
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
task_ctx
.runtime_env()
.register_object_store("iox", self.id, object_store);
session_ctx
}
/// Push `batches`, a stream of [`RecordBatch`] instances, to object
/// storage.
///
@ -223,19 +248,11 @@ impl ParquetStorage {
schema: SchemaRef,
path: &ParquetFilePath,
file_size: usize,
session_ctx: &SessionContext,
) -> Result<SendableRecordBatchStream, ReadError> {
let path = path.object_store_path();
trace!(path=?path, "fetching parquet data for filtered read");
// set up "fake" DataFusion session (TODO thread the real one
// down so config options set on query context take effect here)
let object_store = Arc::clone(&self.object_store);
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
task_ctx
.runtime_env()
.register_object_store("iox", "iox", object_store);
// Compute final (output) schema after selection
let schema = Arc::new(
select_schema(selection, &schema)
@ -253,7 +270,8 @@ impl ParquetStorage {
};
let expr = predicate.filter_expr();
let base_config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("iox://iox/").expect("valid object store URL"),
object_store_url: ObjectStoreUrl::parse(format!("iox://{}/", self.id))
.expect("valid object store URL"),
file_schema: Arc::clone(&schema),
file_groups: vec![vec![PartitionedFile {
object_meta,
@ -272,7 +290,8 @@ impl ParquetStorage {
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::once(execute_stream(Arc::new(exec), task_ctx)).try_flatten(),
futures::stream::once(execute_stream(Arc::new(exec), session_ctx.task_ctx()))
.try_flatten(),
)))
}
}
@ -593,6 +612,7 @@ mod tests {
expected_schema,
&path,
file_size,
&store.test_df_context(),
)
.expect("should read record batches from object store");
let schema = rx.schema();

View File

@ -1,5 +1,6 @@
//! Caches used by the querier.
use ::object_store::ObjectStore;
use ::parquet_file::storage::{ParquetStorage, StorageId};
use backoff::BackoffConfig;
use cache_system::backend::policy::lru::ResourcePool;
use iox_catalog::interface::Catalog;
@ -248,4 +249,12 @@ impl CatalogCache {
pub(crate) fn object_store(&self) -> &ObjectStoreCache {
&self.object_store_cache
}
/// Parquet store that points to the cached object store.
pub fn parquet_store(&self) -> ParquetStorage {
ParquetStorage::new(
Arc::clone(self.object_store_cache.object_store()),
StorageId::from("iox_cached"),
)
}
}

View File

@ -7,10 +7,7 @@ use data_types::{
PartitionId, SequenceNumber, ShardId, TableSummary, TimestampMinMax,
};
use iox_catalog::interface::Catalog;
use parquet_file::{
chunk::ParquetChunk,
storage::{ParquetStorage, StorageId},
};
use parquet_file::chunk::ParquetChunk;
use schema::{sort::SortKey, Schema};
use std::{collections::HashMap, sync::Arc};
use trace::span::{Span, SpanRecorder};
@ -176,12 +173,6 @@ pub struct ChunkAdapter {
/// Cache
catalog_cache: Arc<CatalogCache>,
/// Object store.
///
/// Internally, `ParquetStorage` wraps the actual store implementation in an `Arc`, so
/// `ParquetStorage` is cheap to clone.
store: ParquetStorage,
/// Metric registry.
metric_registry: Arc<metric::Registry>,
}
@ -189,13 +180,8 @@ pub struct ChunkAdapter {
impl ChunkAdapter {
/// Create new adapter with empty cache.
pub fn new(catalog_cache: Arc<CatalogCache>, metric_registry: Arc<metric::Registry>) -> Self {
let store = ParquetStorage::new(
Arc::clone(catalog_cache.object_store().object_store()),
StorageId::from("iox"),
);
Self {
catalog_cache,
store,
metric_registry,
}
}
@ -235,7 +221,7 @@ impl ChunkAdapter {
let parquet_chunk = Arc::new(ParquetChunk::new(
parquet_file,
parts.schema,
self.store.clone(),
self.catalog_cache.parquet_store(),
));
Some(QuerierChunk::new(
@ -362,7 +348,10 @@ pub mod tests {
use arrow_util::assert_batches_eq;
use data_types::{ColumnType, NamespaceSchema};
use futures::StreamExt;
use iox_query::{exec::IOxSessionContext, QueryChunk, QueryChunkMeta};
use iox_query::{
exec::{ExecutorType, IOxSessionContext},
QueryChunk, QueryChunkMeta,
};
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFileBuilder};
use metric::{Attributes, Observation, RawReporter};
use schema::{builder::SchemaBuilder, selection::Selection, sort::SortKeyBuilder};
@ -394,7 +383,7 @@ pub mod tests {
let table_summary_1 = chunk.summary().unwrap();
// check if chunk can be queried
assert_content(&chunk).await;
assert_content(&chunk, &test_data).await;
// check state again
assert_eq!(chunk.chunk_type(), "parquet");
@ -410,13 +399,12 @@ pub mod tests {
}
/// collect data for the given chunk
async fn collect_read_filter(chunk: &dyn QueryChunk) -> Vec<RecordBatch> {
async fn collect_read_filter(
chunk: &dyn QueryChunk,
ctx: IOxSessionContext,
) -> Vec<RecordBatch> {
chunk
.read_filter(
IOxSessionContext::with_testing(),
&Default::default(),
Selection::All,
)
.read_filter(ctx, &Default::default(), Selection::All)
.unwrap()
.collect::<Vec<_>>()
.await
@ -539,8 +527,16 @@ pub mod tests {
assert_eq!(actual_sort_key, &expected_sort_key);
}
async fn assert_content(chunk: &QuerierChunk) {
let batches = collect_read_filter(chunk).await;
async fn assert_content(chunk: &QuerierChunk, test_data: &TestData) {
let ctx = test_data.catalog.exec.new_context(ExecutorType::Query);
let parquet_store = test_data.adapter.catalog_cache.parquet_store();
ctx.inner().runtime_env().register_object_store(
"iox",
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
);
let batches = collect_read_filter(chunk, ctx).await;
assert_batches_eq!(
&[

View File

@ -141,7 +141,7 @@ impl QueryChunk for QuerierChunk {
let _span_recorder = span_recorder;
self.parquet_chunk
.read_filter(&pred_with_deleted_exprs, selection)
.read_filter(&pred_with_deleted_exprs, selection, ctx.inner())
.map_err(Box::new)
.context(ParquetFileChunkSnafu { chunk_id })
.map_err(|e| DataFusionError::External(Box::new(e)))

View File

@ -4,6 +4,7 @@ use crate::{
};
use data_types::{ShardIndex, TableId};
use iox_catalog::interface::get_schema_by_name;
use iox_query::exec::ExecutorType;
use iox_tests::util::TestNamespace;
use sharder::JumpHash;
use std::sync::Arc;
@ -33,6 +34,19 @@ pub async fn querier_namespace_with_limit(
&Handle::current(),
));
// add cached store
let parquet_store = catalog_cache.parquet_store();
ns.catalog
.exec()
.new_context(ExecutorType::Query)
.inner()
.runtime_env()
.register_object_store(
"iox",
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
);
let sharder = Arc::new(JumpHash::new((0..1).map(ShardIndex::new).map(Arc::new)));
QuerierNamespace::new_testing(

View File

@ -21,7 +21,7 @@ use ingester::{
querier_handler::{prepare_data_to_querier, FlatIngesterQueryResponse, IngesterQueryResponse},
};
use iox_catalog::interface::get_schema_by_name;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_query::exec::{DedicatedExecutors, ExecutorType};
use iox_tests::util::{TestCatalog, TestNamespace, TestShard};
use itertools::Itertools;
use mutable_batch_lp::LinesConverter;
@ -647,18 +647,14 @@ struct MockIngester {
/// Query-test specific executor with static properties that may be relevant for the query optimizer and therefore may
/// change `EXPLAIN` plans.
static GLOBAL_EXEC: Lazy<Arc<Executor>> = Lazy::new(|| {
Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads: 1,
target_query_partitions: 4,
}))
});
static GLOBAL_EXEC: Lazy<Arc<DedicatedExecutors>> =
Lazy::new(|| Arc::new(DedicatedExecutors::new(1)));
impl MockIngester {
/// Create new empty ingester.
async fn new() -> Self {
let exec = Arc::clone(&GLOBAL_EXEC);
let catalog = TestCatalog::with_exec(exec);
let catalog = TestCatalog::with_execs(exec, 4);
let ns = catalog.create_namespace("test_db").await;
let shard = ns.create_shard(1).await;
@ -910,6 +906,20 @@ impl MockIngester {
self.catalog.object_store(),
&Handle::current(),
));
// patch in parquet store
let parquet_store = catalog_cache.parquet_store();
ns.catalog
.exec()
.new_context(ExecutorType::Query)
.inner()
.runtime_env()
.register_object_store(
"iox",
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
);
let shard_to_ingesters = [(
ShardIndex::new(0),
IngesterMapping::Addr(Arc::from("some_address")),