Merge pull request #3332 from influxdata/crepererum/issue3331
feat: add router CLI, and GetRouter gRPC requestpull/24376/head
commit
435aeac91d
|
@ -5,6 +5,9 @@ option go_package = "github.com/influxdata/iox/router/v1";
|
||||||
import "influxdata/iox/router/v1/router.proto";
|
import "influxdata/iox/router/v1/router.proto";
|
||||||
|
|
||||||
service RouterService {
|
service RouterService {
|
||||||
|
// Get router.
|
||||||
|
rpc GetRouter(GetRouterRequest) returns (GetRouterResponse);
|
||||||
|
|
||||||
// List configured routers.
|
// List configured routers.
|
||||||
rpc ListRouters(ListRoutersRequest) returns (ListRoutersResponse);
|
rpc ListRouters(ListRoutersRequest) returns (ListRoutersResponse);
|
||||||
|
|
||||||
|
@ -15,6 +18,14 @@ service RouterService {
|
||||||
rpc DeleteRouter(DeleteRouterRequest) returns (DeleteRouterResponse);
|
rpc DeleteRouter(DeleteRouterRequest) returns (DeleteRouterResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GetRouterRequest {
|
||||||
|
string router_name = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetRouterResponse {
|
||||||
|
Router router = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message ListRoutersRequest {}
|
message ListRoutersRequest {}
|
||||||
|
|
||||||
message ListRoutersResponse {
|
message ListRoutersResponse {
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
//! This module implements the `router` CLI command
|
||||||
|
|
||||||
|
use influxdb_iox_client::{
|
||||||
|
connection::Connection,
|
||||||
|
router::{self, generated_types::Router as RouterConfig},
|
||||||
|
};
|
||||||
|
use structopt::StructOpt;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
#[allow(clippy::enum_variant_names)]
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("JSON Serialization error: {0}")]
|
||||||
|
Serde(#[from] serde_json::Error),
|
||||||
|
|
||||||
|
#[error("Client error: {0}")]
|
||||||
|
ClientError(#[from] influxdb_iox_client::error::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
/// Manage IOx databases
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
pub struct Config {
|
||||||
|
#[structopt(subcommand)]
|
||||||
|
command: Command,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new router or update an existing one.
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
struct CreateOrUpdate {
|
||||||
|
/// The name of the router
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return configuration of specific router
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
struct Get {
|
||||||
|
/// The name of the router
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete specific router
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
struct Delete {
|
||||||
|
/// The name of the router
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// All possible subcommands for router
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
enum Command {
|
||||||
|
CreateOrUpdate(CreateOrUpdate),
|
||||||
|
|
||||||
|
/// List routers
|
||||||
|
List,
|
||||||
|
|
||||||
|
Get(Get),
|
||||||
|
|
||||||
|
Delete(Delete),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||||
|
match config.command {
|
||||||
|
Command::CreateOrUpdate(command) => {
|
||||||
|
let mut client = router::Client::new(connection);
|
||||||
|
let config = RouterConfig {
|
||||||
|
name: command.name.clone(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
client.update_router(config).await?;
|
||||||
|
|
||||||
|
println!("Created/Updated router {}", command.name);
|
||||||
|
}
|
||||||
|
Command::List => {
|
||||||
|
let mut client = router::Client::new(connection);
|
||||||
|
let routers = client.list_routers().await?;
|
||||||
|
for router in routers {
|
||||||
|
println!("{}", router.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Command::Get(get) => {
|
||||||
|
let Get { name } = get;
|
||||||
|
let mut client = router::Client::new(connection);
|
||||||
|
let router = client.get_router(&name).await?;
|
||||||
|
println!("{}", serde_json::to_string_pretty(&router)?);
|
||||||
|
}
|
||||||
|
Command::Delete(delete) => {
|
||||||
|
let Delete { name } = delete;
|
||||||
|
let mut client = router::Client::new(connection);
|
||||||
|
client.delete_router(&name).await?;
|
||||||
|
|
||||||
|
println!("Deleted router {}", name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,6 +1,9 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use generated_types::{google::FromOptionalField, influxdata::iox::router::v1::*};
|
use generated_types::{
|
||||||
|
google::{FromOptionalField, NotFound, ResourceType},
|
||||||
|
influxdata::iox::router::v1::*,
|
||||||
|
};
|
||||||
use router::server::RouterServer;
|
use router::server::RouterServer;
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
|
@ -10,6 +13,20 @@ struct RouterService {
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl router_service_server::RouterService for RouterService {
|
impl router_service_server::RouterService for RouterService {
|
||||||
|
async fn get_router(
|
||||||
|
&self,
|
||||||
|
request: Request<GetRouterRequest>,
|
||||||
|
) -> Result<Response<GetRouterResponse>, Status> {
|
||||||
|
let GetRouterRequest { router_name } = request.into_inner();
|
||||||
|
let router = self
|
||||||
|
.server
|
||||||
|
.router(&router_name)
|
||||||
|
.ok_or_else(|| NotFound::new(ResourceType::Router, router_name))?;
|
||||||
|
Ok(Response::new(GetRouterResponse {
|
||||||
|
router: Some(router.config().clone().into()),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
async fn list_routers(
|
async fn list_routers(
|
||||||
&self,
|
&self,
|
||||||
_: Request<ListRoutersRequest>,
|
_: Request<ListRoutersRequest>,
|
||||||
|
|
|
@ -22,6 +22,7 @@ mod commands {
|
||||||
pub mod database;
|
pub mod database;
|
||||||
pub mod debug;
|
pub mod debug;
|
||||||
pub mod operations;
|
pub mod operations;
|
||||||
|
pub mod router;
|
||||||
pub mod run;
|
pub mod run;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
pub mod server_remote;
|
pub mod server_remote;
|
||||||
|
@ -147,6 +148,7 @@ enum Command {
|
||||||
Database(commands::database::Config),
|
Database(commands::database::Config),
|
||||||
// Clippy recommended boxing this variant because it's much larger than the others
|
// Clippy recommended boxing this variant because it's much larger than the others
|
||||||
Run(Box<commands::run::Config>),
|
Run(Box<commands::run::Config>),
|
||||||
|
Router(commands::router::Config),
|
||||||
Server(commands::server::Config),
|
Server(commands::server::Config),
|
||||||
Operation(commands::operations::Config),
|
Operation(commands::operations::Config),
|
||||||
Sql(commands::sql::Config),
|
Sql(commands::sql::Config),
|
||||||
|
@ -216,6 +218,14 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Command::Router(config) => {
|
||||||
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
|
let connection = connection().await;
|
||||||
|
if let Err(e) = commands::router::command(connection, config).await {
|
||||||
|
eprintln!("{}", e);
|
||||||
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
}
|
||||||
|
}
|
||||||
Command::Run(config) => {
|
Command::Run(config) => {
|
||||||
let _tracing_guard =
|
let _tracing_guard =
|
||||||
handle_init_logs(init_logs_and_tracing(log_verbose_count, &config));
|
handle_init_logs(init_logs_and_tracing(log_verbose_count, &config));
|
||||||
|
|
|
@ -19,6 +19,7 @@ mod read_cli;
|
||||||
mod remote_api;
|
mod remote_api;
|
||||||
mod remote_cli;
|
mod remote_cli;
|
||||||
mod router_api;
|
mod router_api;
|
||||||
|
mod router_cli;
|
||||||
mod run_cli;
|
mod run_cli;
|
||||||
pub mod scenario;
|
pub mod scenario;
|
||||||
mod sql_cli;
|
mod sql_cli;
|
||||||
|
|
|
@ -45,6 +45,7 @@ async fn test_router_crud() {
|
||||||
|
|
||||||
// no routers
|
// no routers
|
||||||
assert_eq!(client.list_routers().await.unwrap().len(), 0);
|
assert_eq!(client.list_routers().await.unwrap().len(), 0);
|
||||||
|
assert_error!(client.get_router(&cfg_foo_1.name).await, Error::NotFound(_));
|
||||||
client.delete_router(&router_name_b).await.unwrap();
|
client.delete_router(&router_name_b).await.unwrap();
|
||||||
|
|
||||||
// add routers
|
// add routers
|
||||||
|
@ -54,6 +55,8 @@ async fn test_router_crud() {
|
||||||
assert_eq!(routers.len(), 2);
|
assert_eq!(routers.len(), 2);
|
||||||
assert_eq!(&routers[0], &cfg_bar);
|
assert_eq!(&routers[0], &cfg_bar);
|
||||||
assert_eq!(&routers[1], &cfg_foo_1);
|
assert_eq!(&routers[1], &cfg_foo_1);
|
||||||
|
assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar);
|
||||||
|
assert_eq!(client.get_router(&cfg_foo_1.name).await.unwrap(), cfg_foo_1);
|
||||||
|
|
||||||
// update router
|
// update router
|
||||||
client.update_router(cfg_foo_2.clone()).await.unwrap();
|
client.update_router(cfg_foo_2.clone()).await.unwrap();
|
||||||
|
@ -61,12 +64,18 @@ async fn test_router_crud() {
|
||||||
assert_eq!(routers.len(), 2);
|
assert_eq!(routers.len(), 2);
|
||||||
assert_eq!(&routers[0], &cfg_bar);
|
assert_eq!(&routers[0], &cfg_bar);
|
||||||
assert_eq!(&routers[1], &cfg_foo_2);
|
assert_eq!(&routers[1], &cfg_foo_2);
|
||||||
|
assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar);
|
||||||
|
assert_eq!(client.get_router(&cfg_foo_2.name).await.unwrap(), cfg_foo_2);
|
||||||
|
|
||||||
// delete routers
|
// delete routers
|
||||||
client.delete_router(&router_name_b).await.unwrap();
|
client.delete_router(&router_name_b).await.unwrap();
|
||||||
let routers = client.list_routers().await.unwrap();
|
let routers = client.list_routers().await.unwrap();
|
||||||
assert_eq!(routers.len(), 1);
|
assert_eq!(routers.len(), 1);
|
||||||
assert_eq!(&routers[0], &cfg_bar);
|
assert_eq!(&routers[0], &cfg_bar);
|
||||||
|
assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar);
|
||||||
|
assert_error!(client.get_router(&cfg_foo_2.name).await, Error::NotFound(_));
|
||||||
|
|
||||||
|
// deleting router a second time is a no-op
|
||||||
client.delete_router(&router_name_b).await.unwrap();
|
client.delete_router(&router_name_b).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
use crate::{
|
||||||
|
common::server_fixture::{ServerFixture, ServerType},
|
||||||
|
end_to_end_cases::scenario::rand_name,
|
||||||
|
};
|
||||||
|
use assert_cmd::Command;
|
||||||
|
use predicates::prelude::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_router_crud() {
|
||||||
|
let server_fixture = ServerFixture::create_shared(ServerType::Router).await;
|
||||||
|
let addr = server_fixture.grpc_base();
|
||||||
|
let router_name = rand_name();
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("router")
|
||||||
|
.arg("get")
|
||||||
|
.arg(&router_name)
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.failure()
|
||||||
|
.stderr(predicate::str::contains(format!(
|
||||||
|
"Resource router/{} not found",
|
||||||
|
router_name,
|
||||||
|
)));
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("router")
|
||||||
|
.arg("create-or-update")
|
||||||
|
.arg(&router_name)
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(predicate::str::contains(format!(
|
||||||
|
"Created/Updated router {}",
|
||||||
|
router_name
|
||||||
|
)));
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("router")
|
||||||
|
.arg("list")
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(predicate::str::contains(&router_name));
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("router")
|
||||||
|
.arg("get")
|
||||||
|
.arg(&router_name)
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
predicate::str::contains(&router_name).and(predicate::str::contains(format!(
|
||||||
|
r#""name": "{}"#,
|
||||||
|
&router_name
|
||||||
|
))), // validate the defaults have been set reasonably
|
||||||
|
);
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("router")
|
||||||
|
.arg("delete")
|
||||||
|
.arg(&router_name)
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(predicate::str::contains(format!(
|
||||||
|
"Deleted router {}",
|
||||||
|
router_name
|
||||||
|
)));
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("router")
|
||||||
|
.arg("list")
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(predicate::str::contains(&router_name).not());
|
||||||
|
}
|
|
@ -1,3 +1,5 @@
|
||||||
|
use ::generated_types::google::OptionalField;
|
||||||
|
|
||||||
use self::generated_types::{router_service_client::RouterServiceClient, *};
|
use self::generated_types::{router_service_client::RouterServiceClient, *};
|
||||||
|
|
||||||
use crate::connection::Connection;
|
use crate::connection::Connection;
|
||||||
|
@ -49,6 +51,21 @@ impl Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get router
|
||||||
|
pub async fn get_router(
|
||||||
|
&mut self,
|
||||||
|
router_name: &str,
|
||||||
|
) -> Result<generated_types::Router, Error> {
|
||||||
|
let response = self
|
||||||
|
.inner
|
||||||
|
.get_router(GetRouterRequest {
|
||||||
|
router_name: router_name.to_string(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(response.into_inner().router.unwrap_field("router")?)
|
||||||
|
}
|
||||||
|
|
||||||
/// List routers.
|
/// List routers.
|
||||||
pub async fn list_routers(&mut self) -> Result<Vec<generated_types::Router>, Error> {
|
pub async fn list_routers(&mut self) -> Result<Vec<generated_types::Router>, Error> {
|
||||||
let response = self.inner.list_routers(ListRoutersRequest {}).await?;
|
let response = self.inner.list_routers(ListRoutersRequest {}).await?;
|
||||||
|
|
Loading…
Reference in New Issue