refactor: DF-driven on-demand mem limit instead of ahead-of-time heuristics (#6313)
* refactor: DF-driven on-demand mem limit instead of ahead-of-time heuristics Closes #6310. * refactor: rename and tune default exec mem limits * fix: ingester2 bits after rebasepull/24376/head
parent
c3a2ac99aa
commit
cd6a8a1a82
|
@ -51,6 +51,18 @@ pub struct QuerierConfig {
|
|||
)]
|
||||
pub num_query_threads: Option<usize>,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
///
|
||||
/// If queries attempt to allocate more than this many bytes
|
||||
/// during execution, they will error with "ResourcesExhausted".
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = "8589934592", // 8GB
|
||||
action
|
||||
)]
|
||||
pub exec_mem_pool_bytes: usize,
|
||||
|
||||
/// Path to a JSON file containing a Shard index to ingesters gRPC mapping. For example:
|
||||
///
|
||||
/// ```json
|
||||
|
@ -216,19 +228,6 @@ pub struct QuerierConfig {
|
|||
)]
|
||||
pub max_concurrent_queries: usize,
|
||||
|
||||
/// Maximum bytes to scan for a table in a query (estimated).
|
||||
///
|
||||
/// If IOx estimates that it will scan more than this many bytes
|
||||
/// in a query, the query will error. This protects against potentially unbounded
|
||||
/// memory growth leading to OOMs in certain pathological queries.
|
||||
#[clap(
|
||||
long = "max-table-query-bytes",
|
||||
env = "INFLUXDB_IOX_MAX_TABLE_QUERY_BYTES",
|
||||
default_value = "1073741824", // 1 GB
|
||||
action
|
||||
)]
|
||||
pub max_table_query_bytes: usize,
|
||||
|
||||
/// After how many ingester query errors should the querier enter circuit breaker mode?
|
||||
///
|
||||
/// The querier normally contacts the ingester for any unpersisted data during query planning.
|
||||
|
@ -298,12 +297,6 @@ impl QuerierConfig {
|
|||
pub fn max_concurrent_queries(&self) -> usize {
|
||||
self.max_concurrent_queries
|
||||
}
|
||||
|
||||
/// Query will error if it estimated that a single table will provide more
|
||||
/// than this many bytes.
|
||||
pub fn max_table_query_bytes(&self) -> usize {
|
||||
self.max_table_query_bytes
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_shard_ingester_map(
|
||||
|
|
|
@ -12,7 +12,7 @@ use parquet_file::storage::{ParquetStorage, StorageId};
|
|||
use snafu::prelude::*;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use crate::process_info::setup_metric_registry;
|
||||
use crate::process_info::{setup_metric_registry, USIZE_MAX};
|
||||
|
||||
mod generate;
|
||||
|
||||
|
@ -43,6 +43,15 @@ pub enum Command {
|
|||
action
|
||||
)]
|
||||
query_exec_thread_count: usize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = &USIZE_MAX[..],
|
||||
action
|
||||
)]
|
||||
exec_mem_pool_bytes: usize,
|
||||
},
|
||||
|
||||
/// Generate Parquet files and catalog entries with different characteristics for the purposes
|
||||
|
@ -68,6 +77,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
catalog_dsn,
|
||||
compactor_config,
|
||||
query_exec_thread_count,
|
||||
exec_mem_pool_bytes,
|
||||
} => {
|
||||
let compactor_config = compactor_config.into_compactor_config();
|
||||
|
||||
|
@ -94,6 +104,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
parquet_store.id(),
|
||||
Arc::clone(parquet_store.object_store()),
|
||||
)]),
|
||||
mem_pool_size: exec_mem_pool_bytes,
|
||||
}));
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
|
||||
|
|
|
@ -314,18 +314,14 @@ pub struct Config {
|
|||
)]
|
||||
pub querier_max_concurrent_queries: usize,
|
||||
|
||||
/// Maximum bytes to scan for a table in a query (estimated).
|
||||
///
|
||||
/// If IOx estimates that it will scan more than this many bytes
|
||||
/// in a query, the query will error. This protects against potentially unbounded
|
||||
/// memory growth leading to OOMs in certain pathological queries.
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
long = "querier-max-table-query-bytes",
|
||||
env = "INFLUXDB_IOX_QUERIER_MAX_TABLE_QUERY_BYTES",
|
||||
default_value = "1073741824", // 1 GB
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = "8589934592", // 8GB
|
||||
action
|
||||
)]
|
||||
pub querier_max_table_query_bytes: usize,
|
||||
pub exec_mem_pool_bytes: usize,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
@ -350,7 +346,7 @@ impl Config {
|
|||
querier_ram_pool_metadata_bytes,
|
||||
querier_ram_pool_data_bytes,
|
||||
querier_max_concurrent_queries,
|
||||
querier_max_table_query_bytes,
|
||||
exec_mem_pool_bytes,
|
||||
} = self;
|
||||
|
||||
let database_directory = object_store_config.database_directory.clone();
|
||||
|
@ -449,7 +445,7 @@ impl Config {
|
|||
ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes,
|
||||
ram_pool_data_bytes: querier_ram_pool_data_bytes,
|
||||
max_concurrent_queries: querier_max_concurrent_queries,
|
||||
max_table_query_bytes: querier_max_table_query_bytes,
|
||||
exec_mem_pool_bytes,
|
||||
ingester_circuit_breaker_threshold: u64::MAX, // never for all-in-one-mode
|
||||
};
|
||||
|
||||
|
@ -541,6 +537,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
parquet_store.id(),
|
||||
Arc::clone(parquet_store.object_store()),
|
||||
)]),
|
||||
mem_pool_size: querier_config.exec_mem_pool_bytes,
|
||||
}));
|
||||
|
||||
info!("starting router");
|
||||
|
|
|
@ -18,7 +18,7 @@ use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
|
|||
use ioxd_common::Service;
|
||||
use ioxd_compactor::create_compactor_server_type;
|
||||
|
||||
use crate::process_info::setup_metric_registry;
|
||||
use crate::process_info::{setup_metric_registry, USIZE_MAX};
|
||||
|
||||
use super::main;
|
||||
|
||||
|
@ -76,6 +76,15 @@ pub struct Config {
|
|||
action
|
||||
)]
|
||||
pub query_exec_thread_count: usize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = &USIZE_MAX[..],
|
||||
action
|
||||
)]
|
||||
pub exec_mem_pool_bytes: usize,
|
||||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<(), Error> {
|
||||
|
@ -107,6 +116,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
parquet_store.id(),
|
||||
Arc::clone(parquet_store.object_store()),
|
||||
)]),
|
||||
mem_pool_size: config.exec_mem_pool_bytes,
|
||||
}));
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ use observability_deps::tracing::*;
|
|||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::process_info::setup_metric_registry;
|
||||
use crate::process_info::{setup_metric_registry, USIZE_MAX};
|
||||
|
||||
use super::main;
|
||||
|
||||
|
@ -75,6 +75,15 @@ pub struct Config {
|
|||
action
|
||||
)]
|
||||
pub query_exec_thread_count: usize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = &USIZE_MAX[..],
|
||||
action
|
||||
)]
|
||||
pub exec_mem_pool_bytes: usize,
|
||||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<()> {
|
||||
|
@ -98,7 +107,10 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
&metric_registry,
|
||||
));
|
||||
|
||||
let exec = Arc::new(Executor::new(config.query_exec_thread_count));
|
||||
let exec = Arc::new(Executor::new(
|
||||
config.query_exec_thread_count,
|
||||
config.exec_mem_pool_bytes,
|
||||
));
|
||||
let server_type = create_ingester_server_type(
|
||||
&common_state,
|
||||
Arc::clone(&metric_registry),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! Command line options for running an ingester for a router using the RPC write path to talk to.
|
||||
|
||||
use super::main;
|
||||
use crate::process_info::setup_metric_registry;
|
||||
use crate::process_info::{setup_metric_registry, USIZE_MAX};
|
||||
use clap_blocks::{
|
||||
catalog_dsn::CatalogDsnConfig, ingester2::Ingester2Config, object_store::make_object_store,
|
||||
run_config::RunConfig,
|
||||
|
@ -70,6 +70,15 @@ pub struct Config {
|
|||
action
|
||||
)]
|
||||
pub exec_thread_count: usize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = &USIZE_MAX[..],
|
||||
action
|
||||
)]
|
||||
exec_mem_pool_bytes: usize,
|
||||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<()> {
|
||||
|
@ -81,7 +90,10 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
.get_catalog("ingester", Arc::clone(&metric_registry))
|
||||
.await?;
|
||||
|
||||
let exec = Arc::new(Executor::new(config.exec_thread_count));
|
||||
let exec = Arc::new(Executor::new(
|
||||
config.exec_thread_count,
|
||||
config.exec_mem_pool_bytes,
|
||||
));
|
||||
let object_store = make_object_store(config.run_config.object_store_config())
|
||||
.map_err(Error::ObjectStoreParsing)?;
|
||||
|
||||
|
|
|
@ -99,7 +99,10 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
let ingester_addresses = config.querier_config.ingester_addresses()?;
|
||||
info!(?ingester_addresses, "using ingester addresses");
|
||||
|
||||
let exec = Arc::new(Executor::new(num_threads));
|
||||
let exec = Arc::new(Executor::new(
|
||||
num_threads,
|
||||
config.querier_config.exec_mem_pool_bytes,
|
||||
));
|
||||
|
||||
let server_type = create_querier_server_type(QuerierServerTypeArgs {
|
||||
common_state: &common_state,
|
||||
|
|
|
@ -49,3 +49,10 @@ pub fn setup_metric_registry() -> Arc<metric::Registry> {
|
|||
|
||||
registry
|
||||
}
|
||||
|
||||
/// String version of [`usize::MAX`].
|
||||
pub static USIZE_MAX: Lazy<&'static str> = Lazy::new(|| {
|
||||
let s = usize::MAX.to_string();
|
||||
let s: Box<str> = Box::from(s);
|
||||
Box::leak(s)
|
||||
});
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::time::Duration;
|
|||
use arrow_util::assert_batches_sorted_eq;
|
||||
use assert_cmd::{assert::Assert, Command};
|
||||
use futures::FutureExt;
|
||||
use generated_types::{aggregate::AggregateType, read_group_request::Group};
|
||||
use predicates::prelude::*;
|
||||
use test_helpers::assert_contains;
|
||||
use test_helpers_end_to_end::{
|
||||
|
@ -596,8 +597,7 @@ async fn oom_protection() {
|
|||
// Set up the cluster ====================================
|
||||
let router_config = TestConfig::new_router(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&router_config);
|
||||
let querier_config =
|
||||
TestConfig::new_querier(&ingester_config).with_querier_max_table_query_bytes(1);
|
||||
let querier_config = TestConfig::new_querier(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_router(router_config)
|
||||
.await
|
||||
|
@ -615,9 +615,12 @@ async fn oom_protection() {
|
|||
// SQL query
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let sql = format!("select * from {}", table_name);
|
||||
let sql = format!(
|
||||
"select tag1, sum(val) as val from {} group by tag1",
|
||||
table_name
|
||||
);
|
||||
let err = try_run_sql(
|
||||
sql,
|
||||
&sql,
|
||||
state.cluster().namespace(),
|
||||
state.cluster().querier().querier_grpc_connection(),
|
||||
)
|
||||
|
@ -635,6 +638,14 @@ async fn oom_protection() {
|
|||
} else {
|
||||
panic!("Not a gRPC error: {err}");
|
||||
}
|
||||
|
||||
// EXPLAIN should work though
|
||||
run_sql(
|
||||
format!("EXPLAIN {sql}"),
|
||||
state.cluster().namespace(),
|
||||
state.cluster().querier().querier_grpc_connection(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
|
@ -643,12 +654,15 @@ async fn oom_protection() {
|
|||
async move {
|
||||
let mut storage_client = state.cluster().querier_storage_client();
|
||||
|
||||
let read_filter_request = GrpcRequestBuilder::new()
|
||||
let read_group_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.build_read_filter();
|
||||
.aggregate_type(AggregateType::Sum)
|
||||
.group(Group::By)
|
||||
.group_keys(["tag1"])
|
||||
.build_read_group();
|
||||
|
||||
let status = storage_client
|
||||
.read_filter(read_filter_request)
|
||||
.read_group(read_group_request)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_eq!(
|
||||
|
|
|
@ -136,7 +136,7 @@ mod tests {
|
|||
assert_eq!(expected_pk, pk);
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
let CompactedStream { stream, .. } =
|
||||
compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch)
|
||||
.await
|
||||
|
@ -173,7 +173,7 @@ mod tests {
|
|||
assert_eq!(expected_pk, pk);
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
let CompactedStream {
|
||||
stream,
|
||||
data_sort_key,
|
||||
|
@ -221,7 +221,7 @@ mod tests {
|
|||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
|
||||
// NO SORT KEY from the catalog here, first persisting batch
|
||||
let CompactedStream {
|
||||
|
@ -275,7 +275,7 @@ mod tests {
|
|||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
|
||||
// SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog
|
||||
// this is NOT what the computed sort key would be based on this data's cardinality
|
||||
|
@ -334,7 +334,7 @@ mod tests {
|
|||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
|
||||
// SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog
|
||||
// this is NOT what the computed sort key would be based on this data's cardinality
|
||||
|
@ -397,7 +397,7 @@ mod tests {
|
|||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
|
||||
// SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog
|
||||
// this is NOT what the computed sort key would be based on this data's cardinality
|
||||
|
@ -464,7 +464,7 @@ mod tests {
|
|||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -505,7 +505,7 @@ mod tests {
|
|||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -551,7 +551,7 @@ mod tests {
|
|||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -598,7 +598,7 @@ mod tests {
|
|||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -649,7 +649,7 @@ mod tests {
|
|||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let exc = Executor::new_testing();
|
||||
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
|
||||
.await
|
||||
.unwrap()
|
||||
|
|
|
@ -21,6 +21,7 @@ use datafusion::{
|
|||
self,
|
||||
execution::{
|
||||
context::SessionState,
|
||||
disk_manager::DiskManagerConfig,
|
||||
runtime_env::{RuntimeConfig, RuntimeEnv},
|
||||
},
|
||||
logical_expr::{expr_rewriter::normalize_col, Extension},
|
||||
|
@ -44,6 +45,9 @@ pub struct ExecutorConfig {
|
|||
|
||||
/// Object stores
|
||||
pub object_stores: HashMap<StorageId, Arc<DynObjectStore>>,
|
||||
|
||||
/// Memory pool size in bytes.
|
||||
pub mem_pool_size: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -86,9 +90,6 @@ impl DedicatedExecutors {
|
|||
|
||||
/// Handles executing DataFusion plans, and marshalling the results into rust
|
||||
/// native structures.
|
||||
///
|
||||
/// TODO: Have a resource manager that would limit how many plans are
|
||||
/// running, based on a policy
|
||||
#[derive(Debug)]
|
||||
pub struct Executor {
|
||||
/// Executors
|
||||
|
@ -114,11 +115,12 @@ pub enum ExecutorType {
|
|||
impl Executor {
|
||||
/// Creates a new executor with a two dedicated thread pools, each
|
||||
/// with num_threads
|
||||
pub fn new(num_threads: usize) -> Self {
|
||||
pub fn new(num_threads: usize, mem_pool_size: usize) -> Self {
|
||||
Self::new_with_config(ExecutorConfig {
|
||||
num_threads,
|
||||
target_query_partitions: num_threads,
|
||||
object_stores: HashMap::default(),
|
||||
mem_pool_size,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -134,6 +136,7 @@ impl Executor {
|
|||
num_threads: 1,
|
||||
target_query_partitions: 1,
|
||||
object_stores: HashMap::default(),
|
||||
mem_pool_size: 1024 * 1024 * 1024, // 1GB
|
||||
};
|
||||
let executors = Arc::new(DedicatedExecutors::new_testing());
|
||||
Self::new_with_config_and_executors(config, executors)
|
||||
|
@ -151,7 +154,9 @@ impl Executor {
|
|||
) -> Self {
|
||||
assert_eq!(config.num_threads, executors.num_threads);
|
||||
|
||||
let runtime_config = RuntimeConfig::new();
|
||||
let runtime_config = RuntimeConfig::new()
|
||||
.with_disk_manager(DiskManagerConfig::Disabled)
|
||||
.with_memory_limit(config.mem_pool_size, 1.0);
|
||||
|
||||
for (id, store) in &config.object_stores {
|
||||
runtime_config
|
||||
|
|
|
@ -77,16 +77,6 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Internal error: Can not group chunks '{}'", source,))]
|
||||
InternalChunkGrouping { source: self::overlap::Error },
|
||||
|
||||
#[snafu(display(
|
||||
"Query would scan at least {} bytes, more than configured maximum {} bytes. Try adjusting your compactor settings or increasing the per query memory limit.",
|
||||
actual_bytes,
|
||||
limit_bytes,
|
||||
))]
|
||||
TooMuchData {
|
||||
actual_bytes: usize,
|
||||
limit_bytes: usize,
|
||||
},
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
|
|
|
@ -89,6 +89,7 @@ impl TestCatalog {
|
|||
parquet_store.id(),
|
||||
Arc::clone(parquet_store.object_store()),
|
||||
)]),
|
||||
mem_pool_size: 1024 * 1024 * 1024,
|
||||
},
|
||||
exec,
|
||||
));
|
||||
|
|
|
@ -202,7 +202,6 @@ pub async fn create_querier_server_type(
|
|||
args.exec,
|
||||
ingester_connection,
|
||||
args.querier_config.max_concurrent_queries(),
|
||||
args.querier_config.max_table_query_bytes(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
|
|
@ -107,7 +107,6 @@ mod tests {
|
|||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
usize::MAX,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
|
@ -143,7 +142,6 @@ mod tests {
|
|||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
usize::MAX,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
|
|
|
@ -71,9 +71,6 @@ pub struct QuerierDatabase {
|
|||
/// Sharder to determine which ingesters to query for a particular table and namespace.
|
||||
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
|
||||
|
||||
/// Max combined chunk size for all chunks returned to the query subsystem by a single table.
|
||||
max_table_query_bytes: usize,
|
||||
|
||||
/// Chunk prune metrics.
|
||||
prune_metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
@ -109,7 +106,6 @@ impl QuerierDatabase {
|
|||
exec: Arc<Executor>,
|
||||
ingester_connection: Option<Arc<dyn IngesterConnection>>,
|
||||
max_concurrent_queries: usize,
|
||||
max_table_query_bytes: usize,
|
||||
) -> Result<Self, Error> {
|
||||
assert!(
|
||||
max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX,
|
||||
|
@ -148,7 +144,6 @@ impl QuerierDatabase {
|
|||
query_log,
|
||||
query_execution_semaphore,
|
||||
sharder,
|
||||
max_table_query_bytes,
|
||||
prune_metrics,
|
||||
})
|
||||
}
|
||||
|
@ -178,7 +173,6 @@ impl QuerierDatabase {
|
|||
self.ingester_connection.clone(),
|
||||
Arc::clone(&self.query_log),
|
||||
Arc::clone(&self.sharder),
|
||||
self.max_table_query_bytes,
|
||||
Arc::clone(&self.prune_metrics),
|
||||
)))
|
||||
}
|
||||
|
@ -262,7 +256,6 @@ mod tests {
|
|||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1),
|
||||
usize::MAX,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -287,7 +280,6 @@ mod tests {
|
|||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
usize::MAX,
|
||||
)
|
||||
.await,
|
||||
Error::NoShards
|
||||
|
@ -313,7 +305,6 @@ mod tests {
|
|||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
usize::MAX,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -343,7 +334,6 @@ mod tests {
|
|||
catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
usize::MAX,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -224,7 +224,6 @@ mod tests {
|
|||
exec,
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
|
||||
usize::MAX,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
|
|
|
@ -59,7 +59,6 @@ impl QuerierNamespace {
|
|||
ingester_connection: Option<Arc<dyn IngesterConnection>>,
|
||||
query_log: Arc<QueryLog>,
|
||||
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
|
||||
max_table_query_bytes: usize,
|
||||
prune_metrics: Arc<PruneMetrics>,
|
||||
) -> Self {
|
||||
let tables: HashMap<_, _> = ns
|
||||
|
@ -77,7 +76,6 @@ impl QuerierNamespace {
|
|||
ingester_connection: ingester_connection.clone(),
|
||||
chunk_adapter: Arc::clone(&chunk_adapter),
|
||||
exec: Arc::clone(&exec),
|
||||
max_query_bytes: max_table_query_bytes,
|
||||
prune_metrics: Arc::clone(&prune_metrics),
|
||||
}));
|
||||
|
||||
|
@ -107,7 +105,6 @@ impl QuerierNamespace {
|
|||
exec: Arc<Executor>,
|
||||
ingester_connection: Option<Arc<dyn IngesterConnection>>,
|
||||
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
|
||||
max_table_query_bytes: usize,
|
||||
) -> Self {
|
||||
let time_provider = catalog_cache.time_provider();
|
||||
let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, metric_registry));
|
||||
|
@ -122,7 +119,6 @@ impl QuerierNamespace {
|
|||
ingester_connection,
|
||||
query_log,
|
||||
sharder,
|
||||
max_table_query_bytes,
|
||||
prune_metrics,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -196,9 +196,7 @@ impl ExecutionContextProvider for QuerierNamespace {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::namespace::test_util::{
|
||||
clear_parquet_cache, querier_namespace, querier_namespace_with_limit,
|
||||
};
|
||||
use crate::namespace::test_util::{clear_parquet_cache, querier_namespace};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use data_types::ColumnType;
|
||||
|
@ -589,53 +587,6 @@ mod tests {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_chunk_size_limit() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let table = ns.create_table("table").await;
|
||||
let shard = ns.create_shard(1).await;
|
||||
let partition = table.with_shard(&shard).create_partition("k").await;
|
||||
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
table.create_column("foo", ColumnType::F64).await;
|
||||
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol("table foo=1 11")
|
||||
.with_max_seq(2)
|
||||
.with_min_time(11)
|
||||
.with_max_time(11);
|
||||
let file1 = partition.create_parquet_file(builder).await;
|
||||
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol("table foo=2 22")
|
||||
.with_max_seq(4)
|
||||
.with_min_time(22)
|
||||
.with_max_time(22);
|
||||
let file2 = partition.create_parquet_file(builder).await;
|
||||
|
||||
let total_size =
|
||||
(file1.parquet_file.file_size_bytes + file2.parquet_file.file_size_bytes) as usize;
|
||||
|
||||
// querying right at the total size works (i.e. the limit is INCLUSIVE)
|
||||
let querier_namespace = Arc::new(querier_namespace_with_limit(&ns, total_size).await);
|
||||
run_res(&querier_namespace, "SELECT * FROM \"table\"", None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// check that limit is enforced
|
||||
let limit = total_size - 1;
|
||||
let querier_namespace = Arc::new(querier_namespace_with_limit(&ns, limit).await);
|
||||
let err = run_res(&querier_namespace, "SELECT * FROM \"table\"", None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!("Cannot build plan: Resources exhausted: Query would scan at least {total_size} bytes, more than configured maximum {limit} bytes. Try adjusting your compactor settings or increasing the per query memory limit."),
|
||||
);
|
||||
}
|
||||
|
||||
async fn assert_query(
|
||||
querier_namespace: &Arc<QuerierNamespace>,
|
||||
sql: &str,
|
||||
|
|
|
@ -12,14 +12,6 @@ use tokio::runtime::Handle;
|
|||
|
||||
/// Create [`QuerierNamespace`] for testing.
|
||||
pub async fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
|
||||
querier_namespace_with_limit(ns, usize::MAX).await
|
||||
}
|
||||
|
||||
/// Create [`QuerierNamespace`] for testing with chunk limits.
|
||||
pub async fn querier_namespace_with_limit(
|
||||
ns: &Arc<TestNamespace>,
|
||||
max_table_query_bytes: usize,
|
||||
) -> QuerierNamespace {
|
||||
let mut repos = ns.catalog.catalog.repositories().await;
|
||||
let schema = get_schema_by_name(&ns.namespace.name, repos.as_mut())
|
||||
.await
|
||||
|
@ -57,7 +49,6 @@ pub async fn querier_namespace_with_limit(
|
|||
ns.catalog.exec(),
|
||||
Some(create_ingester_connection_for_testing()),
|
||||
sharder,
|
||||
max_table_query_bytes,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -72,12 +72,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
|
||||
impl From<Error> for DataFusionError {
|
||||
fn from(err: Error) -> Self {
|
||||
match err {
|
||||
Error::ChunkPruning {
|
||||
source: err @ provider::Error::TooMuchData { .. },
|
||||
} => Self::ResourcesExhausted(err.to_string()),
|
||||
_ => Self::External(Box::new(err) as _),
|
||||
}
|
||||
Self::External(Box::new(err) as _)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,7 +88,6 @@ pub struct QuerierTableArgs {
|
|||
pub ingester_connection: Option<Arc<dyn IngesterConnection>>,
|
||||
pub chunk_adapter: Arc<ChunkAdapter>,
|
||||
pub exec: Arc<Executor>,
|
||||
pub max_query_bytes: usize,
|
||||
pub prune_metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
||||
|
@ -133,9 +127,6 @@ pub struct QuerierTable {
|
|||
/// Executor for queries.
|
||||
exec: Arc<Executor>,
|
||||
|
||||
/// Max combined chunk size for all chunks returned to the query subsystem.
|
||||
max_query_bytes: usize,
|
||||
|
||||
/// Metrics for chunk pruning.
|
||||
prune_metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
@ -154,7 +145,6 @@ impl QuerierTable {
|
|||
ingester_connection,
|
||||
chunk_adapter,
|
||||
exec,
|
||||
max_query_bytes,
|
||||
prune_metrics,
|
||||
} = args;
|
||||
|
||||
|
@ -176,7 +166,6 @@ impl QuerierTable {
|
|||
chunk_adapter,
|
||||
reconciler,
|
||||
exec,
|
||||
max_query_bytes,
|
||||
prune_metrics,
|
||||
}
|
||||
}
|
||||
|
@ -430,10 +419,9 @@ impl QuerierTable {
|
|||
|
||||
/// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks)
|
||||
pub fn chunk_pruner(&self) -> Arc<dyn ChunkPruner> {
|
||||
Arc::new(QuerierTableChunkPruner::new(
|
||||
self.max_query_bytes,
|
||||
Arc::clone(&self.prune_metrics),
|
||||
))
|
||||
Arc::new(QuerierTableChunkPruner::new(Arc::clone(
|
||||
&self.prune_metrics,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Get partitions from ingesters.
|
||||
|
|
|
@ -95,13 +95,12 @@ impl TableProvider for QuerierTable {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct QuerierTableChunkPruner {
|
||||
max_bytes: usize,
|
||||
metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
||||
impl QuerierTableChunkPruner {
|
||||
pub fn new(max_bytes: usize, metrics: Arc<PruneMetrics>) -> Self {
|
||||
Self { max_bytes, metrics }
|
||||
pub fn new(metrics: Arc<PruneMetrics>) -> Self {
|
||||
Self { metrics }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,17 +139,6 @@ impl ChunkPruner for QuerierTableChunkPruner {
|
|||
}
|
||||
};
|
||||
|
||||
let estimated_bytes = chunks
|
||||
.iter()
|
||||
.map(|chunk| chunk_estimate_size(chunk.as_ref()))
|
||||
.sum::<usize>();
|
||||
if estimated_bytes > self.max_bytes {
|
||||
return Err(ProviderError::TooMuchData {
|
||||
actual_bytes: estimated_bytes,
|
||||
limit_bytes: self.max_bytes,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(chunks)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,6 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
|
|||
ingester_connection: Some(create_ingester_connection_for_testing()),
|
||||
chunk_adapter,
|
||||
exec: catalog.exec(),
|
||||
max_query_bytes: usize::MAX,
|
||||
prune_metrics: Arc::new(PruneMetrics::new(&catalog.metric_registry())),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -963,7 +963,6 @@ impl MockIngester {
|
|||
catalog.exec(),
|
||||
Some(ingester_connection),
|
||||
sharder,
|
||||
usize::MAX,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -299,8 +299,8 @@ impl TestConfig {
|
|||
}
|
||||
|
||||
/// Configure maximum per-table query bytes for the querier.
|
||||
pub fn with_querier_max_table_query_bytes(self, bytes: usize) -> Self {
|
||||
self.with_env("INFLUXDB_IOX_MAX_TABLE_QUERY_BYTES", bytes.to_string())
|
||||
pub fn with_querier_mem_pool_bytes(self, bytes: usize) -> Self {
|
||||
self.with_env("INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", bytes.to_string())
|
||||
}
|
||||
|
||||
/// Changes the log to JSON for easier parsing.
|
||||
|
|
Loading…
Reference in New Issue