feat: template static router config (#3781)

* feat: template static router config

* chore: lint and improved failure output

* chore: clarify docs

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2022-02-18 10:53:10 +00:00 committed by GitHub
parent f54ef92b77
commit 83cba3d2fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 129 additions and 10 deletions

View File

@ -122,6 +122,19 @@ message QuerySinks {
// A config file for a router server
message RouterConfigFile {
message Instance {
// The name of this router instance
string name = 1;
// The `Router.name` of the template to use
string template = 2;
}
// A list of router configurations
repeated Router routers = 1;
// A list of router templates identified by `Router.name`
repeated Router templates = 2;
// A list of router configurations that instantiate a specific template by name
repeated Instance instances = 3;
}

View File

@ -1,5 +1,6 @@
//! Implementation of command line option for running server
use hashbrown::HashMap;
use std::sync::Arc;
use crate::{
@ -12,6 +13,7 @@ use crate::{
},
},
};
use data_types::router::Router as RouterConfig;
use generated_types::{google::FieldViolation, influxdata::iox::router::v1::RouterConfigFile};
use observability_deps::tracing::warn;
use router::{resolver::RemoteTemplate, server::RouterServer};
@ -35,8 +37,14 @@ pub enum Error {
#[error("error decoding config file: {0}")]
DecodeConfig(#[from] serde_json::Error),
#[error("invalid config for router {0} in config file: {1}")]
#[error("invalid config for router \"{0}\" in config file: {1}")]
InvalidRouterConfig(String, FieldViolation),
#[error("invalid router template \"{0}\" in config file: {1}")]
InvalidRouterTemplate(String, FieldViolation),
#[error("router template \"{template}\" not found for router \"{name}\"")]
TemplateNotFound { name: String, template: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -105,6 +113,35 @@ pub async fn command(config: Config) -> Result<()> {
router_server.update_router(config);
}
let templates = config
.templates
.into_iter()
.map(|router| {
let name = router.name.clone();
match router.try_into() {
Ok(router) => Ok((name, router)),
Err(e) => Err(Error::InvalidRouterTemplate(name, e)),
}
})
.collect::<Result<HashMap<String, RouterConfig>>>()?;
for instance in config.instances {
match templates.get(&instance.template) {
Some(template) => {
router_server.update_router(RouterConfig {
name: instance.name,
..template.clone()
});
}
None => {
return Err(Error::TemplateNotFound {
name: instance.name,
template: instance.template,
})
}
}
}
true
}
None => false,

View File

@ -4,6 +4,7 @@ use crate::{
end_to_end_cases::scenario::rand_name,
};
use assert_cmd::Command;
use generated_types::influxdata::iox::router::v1::{write_sink::Sink, Router};
use predicates::prelude::*;
use std::io::Write;
@ -98,24 +99,66 @@ async fn test_router_static_config() {
let config = r#"
{
"routers": [
"routers":[
{
"name": "foo",
"writeSharder": {
"hashRing": {
"shards": [1]
"name":"foo",
"writeSharder":{
"hashRing":{
"shards":[
1
]
}
},
"writeSinks": {
"1": {
"sinks": [
"writeSinks":{
"1":{
"sinks":[
{
"grpcRemote": 12
"grpcRemote":12
}
]
}
}
}
],
"templates":[
{
"name":"default",
"writeSharder":{
"specificTargets":[
{
"matcher":{
"tableNameRegex":".*"
},
"shard":1
}
]
},
"writeSinks":{
"1":{
"sinks":[
{
"writeBuffer":{
"type":"kafka",
"connection":"kafka-bootstrap:9092",
"creationConfig":{
"nSequencers":1
}
}
}
]
}
}
}
],
"instances": [
{
"name": "instance1",
"template": "default"
},
{
"name": "instance2",
"template": "default"
}
]
}"#;
@ -151,6 +194,32 @@ async fn test_router_static_config() {
.success()
.stdout(predicate::str::contains("\"grpcRemote\": 12"));
// Template instantiated
for instance in ["instance1", "instance2"] {
let output = Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("router")
.arg("get")
.arg(instance)
.arg("--host")
.arg(addr)
.assert()
.success();
let decoded: Router = serde_json::from_slice(&output.get_output().stdout).unwrap();
assert_eq!(decoded.name, instance);
match &decoded.write_sinks.get(&1).unwrap().sinks[0].sink {
Some(Sink::WriteBuffer(x)) => {
assert_eq!(x.creation_config.as_ref().unwrap().n_sequencers, 1);
assert_eq!(x.connection, "kafka-bootstrap:9092");
}
_ => panic!(
"unexpected output: {}",
String::from_utf8_lossy(&output.get_output().stdout)
),
}
}
// Cannot create router
Command::cargo_bin("influxdb_iox")
.unwrap()