fix: Accept JSON ingester/shard config as CLI param value or env var value
parent
44bce8e3ec
commit
7965bda42f
|
@ -73,6 +73,25 @@ pub struct QuerierConfig {
|
|||
)]
|
||||
pub sequencer_to_ingester_file: Option<PathBuf>,
|
||||
|
||||
/// JSON containing a Sequencer ID to ingester gRPC mapping. For example:
|
||||
///
|
||||
/// ```json
|
||||
/// {
|
||||
/// "sequencers": {
|
||||
/// "0": { "addr": "http://ingester-1:8082" },
|
||||
/// "1": { "addr": "http://ingester-1:8082" },
|
||||
/// ...
|
||||
/// "25": { "addr": "http://ingester-2:8082" }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
#[clap(
|
||||
long = "--sequencer-to-ingester",
|
||||
env = "INFLUXDB_IOX_SEQUENCER_TO_INGESTER",
|
||||
action
|
||||
)]
|
||||
pub sequencer_to_ingester: Option<String>,
|
||||
|
||||
/// Size of the RAM cache pool in bytes.
|
||||
#[clap(
|
||||
long = "--ram-pool-bytes",
|
||||
|
@ -110,6 +129,9 @@ impl QuerierConfig {
|
|||
fs::read_to_string(file).context(SequencerToIngesterFileReadingSnafu { file })?;
|
||||
let map = deserialize_sequencer_ingester_map(&contents)?;
|
||||
Ok(IngesterAddresses::BySequencer(map))
|
||||
} else if let Some(contents) = &self.sequencer_to_ingester {
|
||||
let map = deserialize_sequencer_ingester_map(contents)?;
|
||||
Ok(IngesterAddresses::BySequencer(map))
|
||||
} else {
|
||||
let mut current_addresses = BTreeSet::new();
|
||||
Ok(IngesterAddresses::List(
|
||||
|
@ -151,7 +173,7 @@ fn deserialize_sequencer_ingester_map(contents: &str) -> Result<HashMap<i32, Arc
|
|||
}
|
||||
|
||||
/// Either specify a list of ingester addresses or a mapping from sequencer ID to ingester
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum IngesterAddresses {
|
||||
List(Vec<String>),
|
||||
BySequencer(HashMap<i32, Arc<str>>),
|
||||
|
@ -247,6 +269,35 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn supply_json_value() {
|
||||
let actual = QuerierConfig::try_parse_from([
|
||||
"my_binary",
|
||||
"--sequencer-to-ingester",
|
||||
r#"{
|
||||
"sequencers": {
|
||||
"0": { "addr": "http://ingester-1:8082" },
|
||||
"1": { "addr": "http://ingester-1:8082" },
|
||||
"25": { "addr": "http://ingester-2:8082" }
|
||||
}
|
||||
}"#,
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
let expected = IngesterAddresses::BySequencer(
|
||||
[
|
||||
(0, "http://ingester-1:8082"),
|
||||
(1, "http://ingester-1:8082"),
|
||||
(25, "http://ingester-2:8082"),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(seq_id, addr)| (seq_id, Arc::from(addr)))
|
||||
.collect(),
|
||||
);
|
||||
|
||||
assert_eq!(actual.ingester_addresses().unwrap(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn successful_deserialization() {
|
||||
let contents = r#"{
|
||||
|
|
|
@ -401,6 +401,7 @@ impl Config {
|
|||
num_query_threads: None, // will be ignored
|
||||
ingester_addresses: vec![], // will be ignored
|
||||
sequencer_to_ingester_file: None, // will be ignored
|
||||
sequencer_to_ingester: None, // will be ignored
|
||||
ram_pool_bytes: querier_ram_pool_bytes,
|
||||
max_concurrent_queries: querier_max_concurrent_queries,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue