fix: Only switch into querier RPC write path if ingester addresses specified
This enables testing of the querier using the old path with the rpc_write feature turned on.pull/24376/head
parent
b85130cb7c
commit
5141cba1db
|
@ -1,14 +1,8 @@
|
|||
//! Querier-related configs.
|
||||
use data_types::{IngesterMapping, ShardIndex};
|
||||
use snafu::Snafu;
|
||||
use std::{collections::HashMap, io, path::PathBuf, sync::Arc};
|
||||
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
use serde::Deserialize;
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
use snafu::ResultExt;
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
use std::fs;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{collections::HashMap, fs, io, path::PathBuf, sync::Arc};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_docs)]
|
||||
|
@ -25,7 +19,6 @@ pub enum Error {
|
|||
ingesters,
|
||||
shards,
|
||||
))]
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
IgnoreAllRequiresEmptyConfig {
|
||||
ingesters: HashMap<Arc<str>, Arc<IngesterConfig>>,
|
||||
shards: HashMap<ShardIndex, ShardConfig>,
|
||||
|
@ -137,7 +130,6 @@ pub struct QuerierConfig {
|
|||
env = "INFLUXDB_IOX_SHARD_TO_INGESTERS_FILE",
|
||||
action
|
||||
)]
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
pub shard_to_ingesters_file: Option<PathBuf>,
|
||||
|
||||
/// JSON containing a Shard index to ingesters gRPC mapping. For example:
|
||||
|
@ -207,7 +199,6 @@ pub struct QuerierConfig {
|
|||
env = "INFLUXDB_IOX_SHARD_TO_INGESTERS",
|
||||
action
|
||||
)]
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
pub shard_to_ingesters: Option<String>,
|
||||
|
||||
/// gRPC address for the router to talk with the ingesters. For
|
||||
|
@ -307,21 +298,45 @@ impl QuerierConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return the querier config's ingester addresses.
|
||||
/// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to
|
||||
/// specify a JSON file containing shard to ingester address mappings, this returns `Err` if
|
||||
/// there are any problems reading, deserializing, or interpreting the file.
|
||||
|
||||
// When we have switched to using the RPC write path and remove the rpc_write feature, this
|
||||
// method can be changed to be infallible as clap will handle failure to parse the list of
|
||||
// strings.
|
||||
//
|
||||
// For now, to enable turning on the `rpc_write` feature in tests but not necessarily switching
|
||||
// into the RPC write path mode, require *both* the feature flag to be enabled *and*
|
||||
// `--ingester-addresses` to be set in order to switch. If the `rpc_write` feature is enabled
|
||||
// and `--shard-to-ingesters*` are set, use the write buffer path instead.
|
||||
#[cfg(feature = "rpc_write")]
|
||||
pub fn ingester_addresses(&self) -> Result<IngesterAddresses, Error> {
|
||||
if self.ingester_addresses.is_empty() {
|
||||
Ok(IngesterAddresses::None)
|
||||
} else {
|
||||
if let Some(file) = &self.shard_to_ingesters_file {
|
||||
let contents =
|
||||
fs::read_to_string(file).context(ShardToIngesterFileReadingSnafu { file })?;
|
||||
let map = deserialize_shard_ingester_map(&contents)?;
|
||||
if map.is_empty() {
|
||||
Ok(IngesterAddresses::None)
|
||||
} else {
|
||||
Ok(IngesterAddresses::ByShardIndex(map))
|
||||
}
|
||||
} else if let Some(contents) = &self.shard_to_ingesters {
|
||||
let map = deserialize_shard_ingester_map(contents)?;
|
||||
if map.is_empty() {
|
||||
Ok(IngesterAddresses::None)
|
||||
} else {
|
||||
Ok(IngesterAddresses::ByShardIndex(map))
|
||||
}
|
||||
} else if !self.ingester_addresses.is_empty() {
|
||||
Ok(IngesterAddresses::List(
|
||||
self.ingester_addresses
|
||||
.iter()
|
||||
.map(|s| s.as_str().into())
|
||||
.collect(),
|
||||
))
|
||||
} else {
|
||||
Ok(IngesterAddresses::None)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -353,7 +368,6 @@ impl QuerierConfig {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
fn deserialize_shard_ingester_map(
|
||||
contents: &str,
|
||||
) -> Result<HashMap<ShardIndex, IngesterMapping>, Error> {
|
||||
|
@ -439,7 +453,6 @@ pub enum IngesterAddresses {
|
|||
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
struct IngestersConfig {
|
||||
#[serde(default)]
|
||||
ignore_all: bool,
|
||||
|
@ -451,7 +464,6 @@ struct IngestersConfig {
|
|||
|
||||
/// Ingester config.
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
pub struct IngesterConfig {
|
||||
addr: Option<Arc<str>>,
|
||||
#[serde(default)]
|
||||
|
@ -460,7 +472,6 @@ pub struct IngesterConfig {
|
|||
|
||||
/// Shard config.
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
pub struct ShardConfig {
|
||||
ingester: Option<Arc<str>>,
|
||||
#[serde(default)]
|
||||
|
@ -468,7 +479,6 @@ pub struct ShardConfig {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(not(feature = "rpc_write"))] // These tests won't be relevant after the switch to rpc_write.
|
||||
mod tests {
|
||||
use super::*;
|
||||
use clap::Parser;
|
||||
|
|
|
@ -440,11 +440,9 @@ impl Config {
|
|||
};
|
||||
|
||||
let querier_config = QuerierConfig {
|
||||
num_query_threads: None, // will be ignored
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
num_query_threads: None, // will be ignored
|
||||
shard_to_ingesters_file: None, // will be ignored
|
||||
#[cfg(not(feature = "rpc_write"))]
|
||||
shard_to_ingesters: None, // will be ignored
|
||||
shard_to_ingesters: None, // will be ignored
|
||||
#[cfg(feature = "rpc_write")]
|
||||
ingester_addresses: vec![], // will be ignored
|
||||
ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes,
|
||||
|
|
Loading…
Reference in New Issue