feat: read from parquet and only parquet (#5879)
* feat: query only from parquet
* Revert "feat: query only from parquet"
This reverts commit 5ce3c3449c0b9c90154c8c6ece4a40a9c083b7ba.
* Revert "revert: disable read buffer usage in querier (#5579) (#5603)"
This reverts commit df5ef875b4.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
parent
235abf65c5
commit
6f931411f3
|
|
@ -14,7 +14,6 @@ use ioxd_common::{
|
|||
};
|
||||
use metric::Registry;
|
||||
use object_store::DynObjectStore;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use querier::{
|
||||
create_ingester_connections_by_shard, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
|
||||
QuerierHandlerImpl, QuerierServer,
|
||||
|
|
@ -182,7 +181,6 @@ pub async fn create_querier_server_type(
|
|||
QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
Arc::clone(&args.metric_registry),
|
||||
ParquetStorage::new(args.object_store),
|
||||
args.exec,
|
||||
ingester_connection,
|
||||
args.querier_config.max_concurrent_queries(),
|
||||
|
|
|
|||
|
|
@ -56,7 +56,6 @@ mod tests {
|
|||
use super::*;
|
||||
use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService;
|
||||
use iox_tests::util::TestCatalog;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use querier::{create_ingester_connection_for_testing, QuerierCatalogCache};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
|
|
@ -78,7 +77,6 @@ mod tests {
|
|||
QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
catalog.metric_registry(),
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
|
|
@ -115,7 +113,6 @@ mod tests {
|
|||
QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
catalog.metric_registry(),
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
|
|
|
|||
|
|
@ -188,6 +188,7 @@ impl From<Arc<RBChunk>> for ChunkStage {
|
|||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub enum QuerierChunkLoadSetting {
|
||||
/// Only stay in parquet mode and never use read buffer data.
|
||||
#[default]
|
||||
ParquetOnly,
|
||||
|
||||
/// Only use read buffer data.
|
||||
|
|
@ -195,12 +196,11 @@ pub enum QuerierChunkLoadSetting {
|
|||
/// This forces the querier to load the read buffer for this chunk.
|
||||
ReadBufferOnly,
|
||||
|
||||
/// Default "on-demand" handling.
|
||||
/// "On-demand" handling.
|
||||
///
|
||||
/// When the chunk is created, it will look up if there is already read buffer data loaded (or loading is already in
|
||||
/// progress) and is that. Otherwise it will parquet. If the actual row data is requested (via
|
||||
/// [`QueryChunk::read_filter`](iox_query::QueryChunk::read_filter)) then it will force-load the read buffer.
|
||||
#[default]
|
||||
OnDemand,
|
||||
}
|
||||
|
||||
|
|
@ -358,10 +358,10 @@ impl ChunkAdapter {
|
|||
/// Create new adapter with empty cache.
|
||||
pub fn new(
|
||||
catalog_cache: Arc<CatalogCache>,
|
||||
store: ParquetStorage,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
load_settings: HashMap<ParquetFileId, QuerierChunkLoadSetting>,
|
||||
) -> Self {
|
||||
let store = ParquetStorage::new(Arc::clone(catalog_cache.object_store().object_store()));
|
||||
Self {
|
||||
catalog_cache,
|
||||
store,
|
||||
|
|
@ -730,7 +730,6 @@ pub mod tests {
|
|||
catalog.object_store(),
|
||||
&Handle::current(),
|
||||
)),
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.metric_registry(),
|
||||
HashMap::from([(parquet_file.id, load_settings)]),
|
||||
);
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ use backoff::{Backoff, BackoffConfig};
|
|||
use data_types::{Namespace, ShardIndex};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use service_common::QueryDatabaseProvider;
|
||||
use sharder::JumpHash;
|
||||
use snafu::Snafu;
|
||||
|
|
@ -107,7 +106,6 @@ impl QuerierDatabase {
|
|||
pub async fn new(
|
||||
catalog_cache: Arc<CatalogCache>,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
store: ParquetStorage,
|
||||
exec: Arc<Executor>,
|
||||
ingester_connection: Option<Arc<dyn IngesterConnection>>,
|
||||
max_concurrent_queries: usize,
|
||||
|
|
@ -124,7 +122,6 @@ impl QuerierDatabase {
|
|||
|
||||
let chunk_adapter = Arc::new(ChunkAdapter::new(
|
||||
Arc::clone(&catalog_cache),
|
||||
store,
|
||||
Arc::clone(&metric_registry),
|
||||
Default::default(),
|
||||
));
|
||||
|
|
@ -263,7 +260,6 @@ mod tests {
|
|||
QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
catalog.metric_registry(),
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1),
|
||||
|
|
@ -289,7 +285,6 @@ mod tests {
|
|||
QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
catalog.metric_registry(),
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
|
|
@ -316,7 +311,6 @@ mod tests {
|
|||
let db = QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
catalog.metric_registry(),
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
|
|
@ -347,7 +341,6 @@ mod tests {
|
|||
let db = QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
catalog.metric_registry(),
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
|
|
|
|||
|
|
@ -135,7 +135,6 @@ mod tests {
|
|||
use iox_query::exec::Executor;
|
||||
use iox_time::{MockProvider, Time};
|
||||
use object_store::memory::InMemory;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use std::time::Duration;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
|
|
@ -191,7 +190,6 @@ mod tests {
|
|||
QuerierDatabase::new(
|
||||
catalog_cache,
|
||||
metric_registry,
|
||||
ParquetStorage::new(object_store),
|
||||
exec,
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ use crate::{
|
|||
};
|
||||
use data_types::{NamespaceId, ParquetFileId, ShardIndex};
|
||||
use iox_query::exec::Executor;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use sharder::JumpHash;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
|
|
@ -101,7 +100,6 @@ impl QuerierNamespace {
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_testing(
|
||||
catalog_cache: Arc<CatalogCache>,
|
||||
store: ParquetStorage,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
name: Arc<str>,
|
||||
ns: Arc<CachedNamespace>,
|
||||
|
|
@ -114,7 +112,6 @@ impl QuerierNamespace {
|
|||
let time_provider = catalog_cache.time_provider();
|
||||
let chunk_adapter = Arc::new(ChunkAdapter::new(
|
||||
catalog_cache,
|
||||
store,
|
||||
metric_registry,
|
||||
load_settings,
|
||||
));
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ use crate::{
|
|||
use data_types::{ShardIndex, TableId};
|
||||
use iox_catalog::interface::get_schema_by_name;
|
||||
use iox_tests::util::TestNamespace;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use sharder::JumpHash;
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::Handle;
|
||||
|
|
@ -38,7 +37,6 @@ pub async fn querier_namespace_with_limit(
|
|||
|
||||
QuerierNamespace::new_testing(
|
||||
catalog_cache,
|
||||
ParquetStorage::new(ns.catalog.object_store()),
|
||||
ns.catalog.metric_registry(),
|
||||
ns.namespace.name.clone().into(),
|
||||
cached_ns,
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ use data_types::{ChunkId, ParquetFileId, SequenceNumber, ShardIndex};
|
|||
use iox_catalog::interface::get_schema_by_name;
|
||||
use iox_tests::util::{TestCatalog, TestPartition, TestShard, TestTable};
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use schema::{selection::Selection, sort::SortKey, Schema};
|
||||
use sharder::JumpHash;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
|
@ -29,7 +28,6 @@ pub async fn querier_table(
|
|||
));
|
||||
let chunk_adapter = Arc::new(ChunkAdapter::new(
|
||||
catalog_cache,
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.metric_registry(),
|
||||
load_settings,
|
||||
));
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ use iox_tests::util::{TestCatalog, TestNamespace, TestShard};
|
|||
use itertools::Itertools;
|
||||
use mutable_batch_lp::LinesConverter;
|
||||
use once_cell::sync::Lazy;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use querier::{
|
||||
IngesterConnectionImpl, IngesterFlightClient, IngesterFlightClientError,
|
||||
IngesterFlightClientQueryData, QuerierCatalogCache, QuerierChunkLoadSetting, QuerierNamespace,
|
||||
|
|
@ -977,7 +976,6 @@ impl MockIngester {
|
|||
|
||||
Arc::new(QuerierNamespace::new_testing(
|
||||
catalog_cache,
|
||||
ParquetStorage::new(catalog.object_store()),
|
||||
catalog.metric_registry(),
|
||||
ns.namespace.name.clone().into(),
|
||||
Arc::new(schema.as_ref().into()),
|
||||
|
|
|
|||
Loading…
Reference in New Issue