Merge pull request #3028 from influxdata/crepererum/router_mode_get_set_cfg

feat: add basic CRUD operations for router configs
pull/24376/head
kodiakhq[bot] 2021-11-04 14:15:20 +00:00 committed by GitHub
commit 102ca32bec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1040 additions and 61 deletions

View File

@ -11,7 +11,7 @@ use std::hash::{Hash, Hasher};
///
/// e.g. you can use it find the ShardID in vector of ShardIds
/// that is closest to a given hash value.
#[derive(Debug, Eq, PartialEq, Clone, Default)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ConsistentHasher<T>
where
T: Copy + Hash,
@ -53,6 +53,17 @@ where
}
}
impl<T> Default for ConsistentHasher<T>
where
T: Copy + Hash,
{
fn default() -> Self {
Self {
ring: Default::default(),
}
}
}
impl<T> From<ConsistentHasher<T>> for Vec<T>
where
T: Copy + Hash,

View File

@ -19,6 +19,7 @@ pub mod error;
pub mod job;
pub mod names;
pub mod partition_metadata;
pub mod router;
pub mod sequence;
pub mod server_id;
pub mod timestamp;

135
data_types/src/router.rs Normal file
View File

@ -0,0 +1,135 @@
use std::collections::BTreeMap;
use regex::Regex;
use crate::{
consistent_hasher::ConsistentHasher, server_id::ServerId, write_buffer::WriteBufferConnection,
};
#[derive(Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Clone, Copy)]
pub struct ShardId(u32);
impl ShardId {
pub fn new(id: u32) -> Self {
Self(id)
}
pub fn get(&self) -> u32 {
self.0
}
}
/// ShardConfig defines rules for assigning a line/row to an individual
/// host or a group of hosts. A shard
/// is a logical concept, but the usage is meant to split data into
/// mutually exclusive areas. The rough order of organization is:
/// database -> shard -> partition -> chunk. For example, you could shard
/// based on table name and assign to 1 of 10 shards. Within each
/// shard you would have partitions, which would likely be based off time.
/// This makes it possible to horizontally scale out writes.
#[derive(Debug, Eq, PartialEq, Clone, Default)]
pub struct ShardConfig {
/// Each matcher, if any, is evaluated in order.
/// If there is a match, the route will be evaluated to
/// the given targets, otherwise the hash ring will be evaluated.
/// This is useful for overriding the hashring function on some hot spot. For
/// example, if you use the table name as the input to the hash function
/// and your ring has 4 slots. If two tables that are very hot get
/// assigned to the same slot you can override that by putting in a
/// specific matcher to pull that table over to a different node.
pub specific_targets: Vec<MatcherToShard>,
/// An optional default hasher which will route to one in a collection of
/// nodes.
pub hash_ring: Option<HashRing>,
}
/// Maps a matcher with specific shard. If the line/row matches
/// it should be sent to the group.
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct MatcherToShard {
pub matcher: Matcher,
pub shard: ShardId,
}
/// HashRing is a rule for creating a hash key for a row and mapping that to
/// an individual node on a ring.
#[derive(Debug, Eq, PartialEq, Clone, Default)]
pub struct HashRing {
/// ring of shard ids
pub shards: ConsistentHasher<ShardId>,
}
/// A matcher is used to match routing rules or subscriptions on a row-by-row
/// (or line) basis.
#[derive(Debug, Clone, Default)]
pub struct Matcher {
/// if provided, match if the table name matches against the regex
pub table_name_regex: Option<Regex>,
}
impl PartialEq for Matcher {
fn eq(&self, other: &Self) -> bool {
// this is kind of janky, but it's only used during tests and should get the job
// done
format!("{:?}", self.table_name_regex) == format!("{:?}", other.table_name_regex)
}
}
impl Eq for Matcher {}
/// Sinks for query requests.
///
/// Queries are sent to one of these sinks and the resulting data is received from it.
///
/// Note that the query results are flowing into the opposite direction (aka a query sink is a result source).
#[derive(Debug, Eq, PartialEq, Clone, Default)]
pub struct QuerySinks {
pub grpc_remotes: Vec<ServerId>,
}
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum WriteSinkVariant {
/// gRPC-based remote, addressed by its server ID.
GrpcRemote(ServerId),
/// Write buffer connection.
WriteBuffer(WriteBufferConnection),
}
/// Sink of write requests aka new data.
///
/// Data is sent to this sink and a status is received from it.
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct WriteSink {
pub sink: WriteSinkVariant,
/// If set, errors during writing to this sink are ignored and do NOT lead to an overall failure.
pub ignore_errors: bool,
}
/// Set of write sinks.
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct WriteSinkSet {
/// Sinks within the set.
pub sinks: Vec<WriteSink>,
}
/// Router for writes and queries.
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct Router {
/// Router name.
///
/// The name corresponds to the database name on the database node.
///
/// The router name is unique for this router node.
pub name: String,
/// Write sharder.
pub write_sharder: ShardConfig,
/// Sinks for write requests.
pub write_sinks: BTreeMap<ShardId, WriteSinkSet>,
/// Sinks for query requests.
pub query_sinks: QuerySinks,
}

View File

@ -17,6 +17,7 @@ tonic = "0.5"
time = { path = "../time" }
[dev-dependencies]
data_types = { path = "../data_types" }
num_cpus = "1.13.0"
[build-dependencies] # In alphabetical order

View File

@ -13,17 +13,14 @@ import "influxdata/iox/write_buffer/v1/write_buffer.proto";
// # Write Routing
//
// ## Overall Picture
// Data is accepted from all sources, is sharded, and is (according to the sharding) written into the sink sets. There
// may be a prioritization for sources that is "HTTP and gRPC first, and write buffers in declared order".
// Data is accepted from all sources, is sharded, and is (according to the sharding) written into the sink sets.
//
// ```text
// ( HTTP )--+ +------->( sink set 1 )
// | |
// ( gRPC )--+-->( sharder )--> ...
// | |
// ( Write Buffer 1 )--+ +------->( sink set n )
// ... |
// ( Write Buffer n )--+
// |
// +------->( sink set n )
// ```
//
// ## Sharder
@ -71,11 +68,14 @@ import "influxdata/iox/write_buffer/v1/write_buffer.proto";
message Router {
// Router name.
//
// The name is unique for this node.
// The name corresponds to the database name on the database node.
//
// The router name is unique for this router node.
string name = 1;
// Sources of write requests.
WriteSources write_sources = 2;
// write source, current always HTTP and gRPC
reserved 2;
reserved "write_sources";
// Write sharder.
ShardConfig write_sharder = 3;
@ -87,19 +87,6 @@ message Router {
QuerySinks query_sinks = 5;
}
// Sources of write request aka new data.
//
// Data is accepted from these sources and a status is provided back to it.
message WriteSources {
// If set writes via gRPC and HTTP are accepted.
//
// You may want to disable this when incoming data should solely be received via write buffer(s).
bool allow_unsequenced_inputs = 2;
// Write buffer connections.
repeated influxdata.iox.write_buffer.v1.WriteBufferConnection write_buffers = 3;
}
// Sink of write requests aka new data.
//
// Data is sent to this sink and a status is received from it.

View File

@ -6,7 +6,7 @@ import "influxdata/iox/router/v1/router.proto";
service RouterService {
// List configured routers.
rpc ListRouter(ListRouterRequest) returns (ListRouterResponse);
rpc ListRouters(ListRoutersRequest) returns (ListRoutersResponse);
// Update router config (upsert).
rpc UpdateRouter(UpdateRouterRequest) returns (UpdateRouterResponse);
@ -15,9 +15,9 @@ service RouterService {
rpc DeleteRouter(DeleteRouterRequest) returns (DeleteRouterResponse);
}
message ListRouterRequest {}
message ListRoutersRequest {}
message ListRouterResponse {
message ListRoutersResponse {
repeated Router routers = 1;
}

View File

@ -85,6 +85,16 @@ pub mod influxdata {
}
}
pub mod router {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.router.v1.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.router.v1.serde.rs"
));
}
}
pub mod write {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.write.v1.rs"));
@ -194,19 +204,21 @@ pub use influxdata::platform::storage::*;
pub mod google;
#[cfg(feature = "data_types_conversions")]
#[cfg(any(feature = "data_types_conversions", test))]
pub mod chunk;
#[cfg(feature = "data_types_conversions")]
#[cfg(any(feature = "data_types_conversions", test))]
pub mod database_rules;
#[cfg(feature = "data_types_conversions")]
#[cfg(any(feature = "data_types_conversions", test))]
pub mod database_state;
#[cfg(feature = "data_types_conversions")]
#[cfg(any(feature = "data_types_conversions", test))]
pub mod detailed_database;
#[cfg(feature = "data_types_conversions")]
#[cfg(any(feature = "data_types_conversions", test))]
pub mod job;
#[cfg(feature = "data_types_conversions")]
#[cfg(any(feature = "data_types_conversions", test))]
pub mod router;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod server_config;
#[cfg(feature = "data_types_conversions")]
#[cfg(any(feature = "data_types_conversions", test))]
pub mod write_buffer;
pub use prost::{DecodeError, EncodeError};

View File

@ -0,0 +1,453 @@
use data_types::router::{
HashRing, Matcher, MatcherToShard, QuerySinks, Router, ShardConfig, ShardId, WriteSink,
WriteSinkSet, WriteSinkVariant,
};
use regex::Regex;
use crate::google::{FieldViolation, FieldViolationExt, FromField, FromFieldOpt};
use crate::influxdata::iox::router::v1 as router;
impl From<ShardConfig> for router::ShardConfig {
fn from(shard_config: ShardConfig) -> Self {
Self {
specific_targets: shard_config
.specific_targets
.into_iter()
.map(|i| i.into())
.collect(),
hash_ring: shard_config.hash_ring.map(|i| i.into()),
}
}
}
impl TryFrom<router::ShardConfig> for ShardConfig {
type Error = FieldViolation;
fn try_from(proto: router::ShardConfig) -> Result<Self, Self::Error> {
Ok(Self {
specific_targets: proto
.specific_targets
.into_iter()
.map(|i| i.try_into())
.collect::<Result<_, FieldViolation>>()
.field("specific_targets")?,
hash_ring: proto
.hash_ring
.map(|i| i.try_into())
.map_or(Ok(None), |r| r.map(Some))
.field("hash_ring")?,
})
}
}
impl From<MatcherToShard> for router::MatcherToShard {
fn from(matcher_to_shard: MatcherToShard) -> Self {
Self {
matcher: none_if_default(matcher_to_shard.matcher.into()),
shard: matcher_to_shard.shard.get(),
}
}
}
impl TryFrom<router::MatcherToShard> for MatcherToShard {
type Error = FieldViolation;
fn try_from(proto: router::MatcherToShard) -> Result<Self, Self::Error> {
Ok(Self {
matcher: proto.matcher.unwrap_or_default().scope("matcher")?,
shard: ShardId::new(proto.shard),
})
}
}
impl From<HashRing> for router::HashRing {
fn from(hash_ring: HashRing) -> Self {
let shards: Vec<ShardId> = hash_ring.shards.into();
Self {
shards: shards.into_iter().map(|id| id.get()).collect(),
}
}
}
impl TryFrom<router::HashRing> for HashRing {
type Error = FieldViolation;
fn try_from(proto: router::HashRing) -> Result<Self, Self::Error> {
Ok(Self {
shards: proto
.shards
.into_iter()
.map(ShardId::new)
.collect::<Vec<ShardId>>()
.into(),
})
}
}
impl From<Matcher> for router::Matcher {
fn from(matcher: Matcher) -> Self {
Self {
table_name_regex: matcher
.table_name_regex
.map(|r| r.to_string())
.unwrap_or_default(),
}
}
}
impl TryFrom<router::Matcher> for Matcher {
type Error = FieldViolation;
fn try_from(proto: router::Matcher) -> Result<Self, Self::Error> {
let table_name_regex = match &proto.table_name_regex as &str {
"" => None,
re => Some(Regex::new(re).map_err(|e| FieldViolation {
field: "table_name_regex".to_string(),
description: e.to_string(),
})?),
};
Ok(Self { table_name_regex })
}
}
impl From<QuerySinks> for router::QuerySinks {
fn from(query_sinks: QuerySinks) -> Self {
Self {
grpc_remotes: query_sinks
.grpc_remotes
.into_iter()
.map(|id| id.get_u32())
.collect(),
}
}
}
impl TryFrom<router::QuerySinks> for QuerySinks {
type Error = FieldViolation;
fn try_from(proto: router::QuerySinks) -> Result<Self, Self::Error> {
Ok(Self {
grpc_remotes: proto
.grpc_remotes
.into_iter()
.map(|i| i.try_into())
.collect::<Result<_, data_types::server_id::Error>>()
.field("grpc_remotes")?,
})
}
}
impl From<WriteSinkVariant> for router::write_sink::Sink {
fn from(write_sink_variant: WriteSinkVariant) -> Self {
match write_sink_variant {
WriteSinkVariant::GrpcRemote(server_id) => {
router::write_sink::Sink::GrpcRemote(server_id.get_u32())
}
WriteSinkVariant::WriteBuffer(write_buffer_conn) => {
router::write_sink::Sink::WriteBuffer(write_buffer_conn.into())
}
}
}
}
impl TryFrom<router::write_sink::Sink> for WriteSinkVariant {
type Error = FieldViolation;
fn try_from(proto: router::write_sink::Sink) -> Result<Self, Self::Error> {
match proto {
router::write_sink::Sink::GrpcRemote(server_id) => Ok(WriteSinkVariant::GrpcRemote(
server_id.try_into().field("server_id")?,
)),
router::write_sink::Sink::WriteBuffer(write_buffer_conn) => {
Ok(WriteSinkVariant::WriteBuffer(
write_buffer_conn
.try_into()
.field("write_buffer_connection")?,
))
}
}
}
}
impl From<WriteSink> for router::WriteSink {
fn from(write_sink: WriteSink) -> Self {
Self {
sink: Some(write_sink.sink.into()),
ignore_errors: write_sink.ignore_errors,
}
}
}
impl TryFrom<router::WriteSink> for WriteSink {
type Error = FieldViolation;
fn try_from(proto: router::WriteSink) -> Result<Self, Self::Error> {
Ok(Self {
sink: proto.sink.required("sink")?,
ignore_errors: proto.ignore_errors,
})
}
}
impl From<WriteSinkSet> for router::WriteSinkSet {
fn from(write_sink_set: WriteSinkSet) -> Self {
Self {
sinks: write_sink_set
.sinks
.into_iter()
.map(|sink| sink.into())
.collect(),
}
}
}
impl TryFrom<router::WriteSinkSet> for WriteSinkSet {
type Error = FieldViolation;
fn try_from(proto: router::WriteSinkSet) -> Result<Self, Self::Error> {
Ok(Self {
sinks: proto
.sinks
.into_iter()
.map(|sink| sink.try_into())
.collect::<Result<_, FieldViolation>>()?,
})
}
}
impl From<Router> for router::Router {
fn from(router: Router) -> Self {
Self {
name: router.name,
write_sharder: none_if_default(router.write_sharder.into()),
write_sinks: router
.write_sinks
.into_iter()
.map(|(id, sink_set)| (id.get(), sink_set.into()))
.collect(),
query_sinks: none_if_default(router.query_sinks.into()),
}
}
}
impl TryFrom<router::Router> for Router {
type Error = FieldViolation;
fn try_from(proto: router::Router) -> Result<Self, Self::Error> {
Ok(Self {
name: proto.name,
write_sharder: proto
.write_sharder
.optional("write_sharder")?
.unwrap_or_default(),
write_sinks: proto
.write_sinks
.into_iter()
.map(|(id, sink_set)| Ok((ShardId::new(id), sink_set.try_into()?)))
.collect::<Result<_, FieldViolation>>()?,
query_sinks: proto
.query_sinks
.optional("query_sinks")?
.unwrap_or_default(),
})
}
}
/// Returns none if v matches its default value.
fn none_if_default<T: Default + PartialEq>(v: T) -> Option<T> {
if v == Default::default() {
None
} else {
Some(v)
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashMap};
use data_types::{consistent_hasher::ConsistentHasher, server_id::ServerId};
use super::*;
#[test]
fn test_matcher_default() {
let protobuf = router::Matcher {
..Default::default()
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: router::Matcher = matcher.clone().into();
assert!(matcher.table_name_regex.is_none());
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
}
#[test]
fn test_matcher_regexp() {
let protobuf = router::Matcher {
table_name_regex: "^foo$".into(),
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: router::Matcher = matcher.clone().into();
assert_eq!(matcher.table_name_regex.unwrap().to_string(), "^foo$");
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
}
#[test]
fn test_matcher_bad_regexp() {
let protobuf = router::Matcher {
table_name_regex: "*".into(),
};
let matcher: Result<Matcher, FieldViolation> = protobuf.try_into();
assert!(matcher.is_err());
assert_eq!(matcher.err().unwrap().field, "table_name_regex");
}
#[test]
fn test_hash_ring_default() {
let protobuf = router::HashRing {
..Default::default()
};
let hash_ring: HashRing = protobuf.clone().try_into().unwrap();
let back: router::HashRing = hash_ring.clone().into();
assert!(hash_ring.shards.is_empty());
assert_eq!(protobuf.shards, back.shards);
}
#[test]
fn test_hash_ring_nodes() {
let protobuf = router::HashRing { shards: vec![1, 2] };
let hash_ring: HashRing = protobuf.try_into().unwrap();
assert_eq!(hash_ring.shards.len(), 2);
assert_eq!(hash_ring.shards.find(1), Some(ShardId::new(2)));
assert_eq!(hash_ring.shards.find(2), Some(ShardId::new(1)));
}
#[test]
fn test_matcher_to_shard_default() {
let protobuf = router::MatcherToShard {
..Default::default()
};
let matcher_to_shard: MatcherToShard = protobuf.clone().try_into().unwrap();
let back: router::MatcherToShard = matcher_to_shard.clone().into();
assert_eq!(
matcher_to_shard.matcher,
Matcher {
..Default::default()
}
);
assert_eq!(protobuf.matcher, back.matcher);
assert_eq!(matcher_to_shard.shard, ShardId::new(0));
assert_eq!(protobuf.shard, back.shard);
}
#[test]
fn test_shard_config_default() {
let protobuf = router::ShardConfig {
..Default::default()
};
let shard_config: ShardConfig = protobuf.clone().try_into().unwrap();
let back: router::ShardConfig = shard_config.clone().into();
assert!(shard_config.specific_targets.is_empty());
assert_eq!(protobuf.specific_targets, back.specific_targets);
assert!(shard_config.hash_ring.is_none());
assert_eq!(protobuf.hash_ring, back.hash_ring);
}
#[test]
fn test_sharder() {
let protobuf = router::ShardConfig {
specific_targets: vec![router::MatcherToShard {
matcher: Some(router::Matcher {
table_name_regex: "pu\\d.$".to_string(),
}),
shard: 1,
}],
hash_ring: Some(router::HashRing {
shards: vec![1, 2, 3, 4],
}),
};
let shard_config: ShardConfig = protobuf.try_into().unwrap();
assert_eq!(
shard_config,
ShardConfig {
specific_targets: vec![MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("pu\\d.$").unwrap()),
},
shard: ShardId::new(1),
}],
hash_ring: Some(HashRing {
shards: ConsistentHasher::new(&[
ShardId::new(1),
ShardId::new(2),
ShardId::new(3),
ShardId::new(4)
])
}),
}
);
}
#[test]
fn test_router() {
let protobuf = router::Router {
name: String::from("my_router"),
write_sharder: None,
write_sinks: HashMap::from([(
13,
router::WriteSinkSet {
sinks: vec![router::WriteSink {
sink: Some(router::write_sink::Sink::GrpcRemote(1)),
ignore_errors: false,
}],
},
)]),
query_sinks: Some(router::QuerySinks {
grpc_remotes: vec![1, 3],
}),
};
let router: Router = protobuf.try_into().unwrap();
assert_eq!(
router,
Router {
name: String::from("my_router"),
write_sharder: ShardConfig::default(),
write_sinks: BTreeMap::from([(
ShardId::new(13),
WriteSinkSet {
sinks: vec![WriteSink {
sink: WriteSinkVariant::GrpcRemote(ServerId::try_from(1).unwrap()),
ignore_errors: false,
},],
},
),]),
query_sinks: QuerySinks {
grpc_remotes: vec![
ServerId::try_from(1).unwrap(),
ServerId::try_from(3).unwrap()
]
}
},
);
}
}

View File

@ -8,6 +8,7 @@ use crate::influxdb_ioxd::{
use super::RouterServerType;
mod deployment;
mod router;
pub async fn server_grpc(
server_type: Arc<RouterServerType>,
@ -22,6 +23,10 @@ pub async fn server_grpc(
server_type.serving_readiness.clone(),
)
);
add_service!(
builder,
router::make_server(Arc::clone(&server_type.server),)
);
serve_builder!(builder);

View File

@ -0,0 +1,53 @@
use std::sync::Arc;
use generated_types::{google::FromFieldOpt, influxdata::iox::router::v1::*};
use router::server::RouterServer;
use tonic::{Request, Response, Status};
struct RouterService {
server: Arc<RouterServer>,
}
#[tonic::async_trait]
impl router_service_server::RouterService for RouterService {
async fn list_routers(
&self,
_: Request<ListRoutersRequest>,
) -> Result<Response<ListRoutersResponse>, Status> {
Ok(Response::new(ListRoutersResponse {
routers: self
.server
.routers()
.into_iter()
.map(|router| router.config().clone().into())
.collect(),
}))
}
async fn update_router(
&self,
request: Request<UpdateRouterRequest>,
) -> Result<Response<UpdateRouterResponse>, Status> {
use data_types::router::Router as RouterConfig;
let UpdateRouterRequest { router } = request.into_inner();
let cfg: RouterConfig = router.required("router")?;
self.server.update_router(cfg);
Ok(Response::new(UpdateRouterResponse {}))
}
async fn delete_router(
&self,
request: Request<DeleteRouterRequest>,
) -> Result<Response<DeleteRouterResponse>, Status> {
let DeleteRouterRequest { router_name } = request.into_inner();
self.server.delete_router(&router_name);
Ok(Response::new(DeleteRouterResponse {}))
}
}
pub fn make_server(
server: Arc<RouterServer>,
) -> router_service_server::RouterServiceServer<impl router_service_server::RouterService> {
router_service_server::RouterServiceServer::new(RouterService { server })
}

View File

@ -232,6 +232,12 @@ impl ServerFixture {
influxdb_iox_client::remote::Client::new(self.grpc_channel())
}
/// Return a router client suitable for communicating with this
/// server
pub fn router_client(&self) -> influxdb_iox_client::router::Client {
influxdb_iox_client::router::Client::new(self.grpc_channel())
}
/// Return a write client suitable for communicating with this
/// server
pub fn write_client(&self) -> influxdb_iox_client::write::Client {
@ -518,52 +524,57 @@ impl TestServer {
}
let channel = self.grpc_channel().await.expect("gRPC should be running");
let mut management_client = influxdb_iox_client::management::Client::new(channel);
let mut deployment_client = influxdb_iox_client::deployment::Client::new(channel.clone());
if let Ok(id) = management_client.get_server_id().await {
if let Ok(id) = deployment_client.get_server_id().await {
// tell others that this server had some problem
*ready = ServerState::Error;
std::mem::drop(ready);
panic!("Server already has an ID ({}); possibly a stray/orphan server from another test run.", id);
}
// Set the writer id, if requested
// Set the server id, if requested
match initial_config {
InitialConfig::SetWriterId => {
let id = DEFAULT_SERVER_ID;
management_client
deployment_client
.update_server_id(id)
.await
.expect("set ID failed");
println!("Set writer_id to {:?}", id);
// if server ID was set, we can also wait until DBs are loaded
let check_dbs_loaded = async {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
if self.test_config.server_type == ServerType::Database {
// if server ID was set, we can also wait until DBs are loaded
let mut management_client =
influxdb_iox_client::management::Client::new(channel);
let check_dbs_loaded = async {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
while !management_client
.get_server_status()
.await
.unwrap()
.initialized
{
interval.tick().await;
}
};
while !management_client
.get_server_status()
.await
.unwrap()
.initialized
{
interval.tick().await;
}
};
let capped_check = tokio::time::timeout(Duration::from_secs(30), check_dbs_loaded);
let capped_check =
tokio::time::timeout(Duration::from_secs(30), check_dbs_loaded);
match capped_check.await {
Ok(_) => {
println!("Databases loaded");
}
Err(e) => {
// tell others that this server had some problem
*ready = ServerState::Error;
std::mem::drop(ready);
panic!("Server did not load databases in required time: {}", e);
match capped_check.await {
Ok(_) => {
println!("Databases loaded");
}
Err(e) => {
// tell others that this server had some problem
*ready = ServerState::Error;
std::mem::drop(ready);
panic!("Server did not load databases in required time: {}", e);
}
}
}
}

View File

@ -15,6 +15,7 @@ mod persistence;
mod read_api;
mod read_cli;
mod remote_api;
mod router_api;
mod run_cli;
pub mod scenario;
mod sql_cli;

View File

@ -0,0 +1,67 @@
use influxdb_iox_client::router::generated_types::{QuerySinks, Router};
use crate::{
common::server_fixture::{ServerFixture, ServerType},
end_to_end_cases::scenario::rand_name,
};
#[tokio::test]
async fn test_router_crud() {
let server_fixture = ServerFixture::create_shared(ServerType::Router).await;
let mut client = server_fixture.router_client();
let router_name_a = rand_name();
let router_name_b = rand_name();
let (router_name_a, router_name_b) = if router_name_a < router_name_b {
(router_name_a, router_name_b)
} else {
(router_name_b, router_name_a)
};
let cfg_foo_1 = Router {
name: router_name_b.clone(),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
let cfg_foo_2 = Router {
query_sinks: Some(QuerySinks {
grpc_remotes: vec![1],
}),
..cfg_foo_1.clone()
};
assert_ne!(cfg_foo_1, cfg_foo_2);
let cfg_bar = Router {
name: router_name_a,
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
// no routers
assert_eq!(client.list_routers().await.unwrap().len(), 0);
client.delete_router(&router_name_b).await.unwrap();
// add routers
client.update_router(cfg_foo_1.clone()).await.unwrap();
client.update_router(cfg_bar.clone()).await.unwrap();
let routers = client.list_routers().await.unwrap();
assert_eq!(routers.len(), 2);
assert_eq!(&routers[0], &cfg_bar);
assert_eq!(&routers[1], &cfg_foo_1);
// update router
client.update_router(cfg_foo_2.clone()).await.unwrap();
let routers = client.list_routers().await.unwrap();
assert_eq!(routers.len(), 2);
assert_eq!(&routers[0], &cfg_bar);
assert_eq!(&routers[1], &cfg_foo_2);
// delete routers
client.delete_router(&router_name_b).await.unwrap();
let routers = client.list_routers().await.unwrap();
assert_eq!(routers.len(), 1);
assert_eq!(&routers[0], &cfg_bar);
client.delete_router(&router_name_b).await.unwrap();
}

View File

@ -13,6 +13,9 @@ pub mod management;
/// Client for remote API
pub mod remote;
/// Client for router API
pub mod router;
/// Client for write API
pub mod write;

View File

@ -0,0 +1,110 @@
use thiserror::Error;
use self::generated_types::{router_service_client::RouterServiceClient, *};
use crate::connection::Connection;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::router::v1::*;
}
/// Errors returned by Client::list_routers
#[derive(Debug, Error)]
pub enum ListRoutersError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::update_router
#[derive(Debug, Error)]
pub enum UpdateRouterError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::delete_router
#[derive(Debug, Error)]
pub enum DeleteRouterError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// An IOx Router API client.
///
/// This client wraps the underlying `tonic` generated client with a
/// more ergonomic interface.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{
/// router::Client,
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
///
/// // List routers
/// client
/// .list_routers()
/// .await
/// .expect("listing routers failed");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: RouterServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: RouterServiceClient::new(channel),
}
}
/// List routers.
pub async fn list_routers(&mut self) -> Result<Vec<generated_types::Router>, ListRoutersError> {
let response = self
.inner
.list_routers(ListRoutersRequest {})
.await
.map_err(ListRoutersError::ServerError)?;
Ok(response.into_inner().routers)
}
/// Update router
pub async fn update_router(
&mut self,
config: generated_types::Router,
) -> Result<(), UpdateRouterError> {
self.inner
.update_router(UpdateRouterRequest {
router: Some(config),
})
.await
.map_err(UpdateRouterError::ServerError)?;
Ok(())
}
/// Delete router
pub async fn delete_router(&mut self, router_name: &str) -> Result<(), UpdateRouterError> {
self.inner
.delete_router(DeleteRouterRequest {
router_name: router_name.to_string(),
})
.await
.map_err(UpdateRouterError::ServerError)?;
Ok(())
}
}

View File

@ -8,4 +8,5 @@
clippy::clone_on_ref_ptr
)]
pub mod router;
pub mod server;

45
router/src/router.rs Normal file
View File

@ -0,0 +1,45 @@
use data_types::router::Router as RouterConfig;
/// Router for a single database.
#[derive(Debug)]
pub struct Router {
/// Router config.
config: RouterConfig,
}
impl Router {
/// Create new router from config.
pub fn new(config: RouterConfig) -> Self {
Self { config }
}
/// Router config.
pub fn config(&self) -> &RouterConfig {
&self.config
}
/// Router name.
///
/// This is the same as the database that this router acts for.
pub fn name(&self) -> &str {
&self.config.name
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_getters() {
let cfg = RouterConfig {
name: String::from("my_router"),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
let router = Router::new(cfg.clone());
assert_eq!(router.config(), &cfg);
assert_eq!(router.name(), "my_router");
}
}

View File

@ -1,11 +1,13 @@
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
use data_types::server_id::ServerId;
use data_types::{router::Router as RouterConfig, server_id::ServerId};
use metric::Registry as MetricRegistry;
use parking_lot::RwLock;
use snafu::Snafu;
use trace::TraceCollector;
use crate::router::Router;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations)]
pub enum SetServerIdError {
@ -19,6 +21,7 @@ pub struct RouterServer {
server_id: RwLock<Option<ServerId>>,
metric_registry: Arc<MetricRegistry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
routers: RwLock<BTreeMap<String, Arc<Router>>>,
}
impl RouterServer {
@ -29,6 +32,7 @@ impl RouterServer {
server_id: RwLock::new(None),
metric_registry,
trace_collector,
routers: Default::default(),
}
}
@ -55,13 +59,38 @@ impl RouterServer {
}
}
/// Metric registry associated with this server.
pub fn metric_registry(&self) -> &Arc<MetricRegistry> {
&self.metric_registry
}
/// Trace collector associated with this server.
pub fn trace_collector(&self) -> &Option<Arc<dyn TraceCollector>> {
&self.trace_collector
}
/// List all routers, sorted by name,
pub fn routers(&self) -> Vec<Arc<Router>> {
self.routers.read().values().cloned().collect()
}
/// Update or create router.
///
/// Returns `true` if the router already existed.
pub fn update_router(&self, config: RouterConfig) -> bool {
let router = Router::new(config);
self.routers
.write()
.insert(router.name().to_string(), Arc::new(router))
.is_some()
}
/// Delete router.
///
/// Returns `true` if the router existed.
pub fn delete_router(&self, name: &str) -> bool {
self.routers.write().remove(name).is_some()
}
}
pub mod test_utils {
@ -74,6 +103,8 @@ pub mod test_utils {
#[cfg(test)]
mod tests {
use data_types::router::QuerySinks;
use crate::server::test_utils::make_router_server;
use super::*;
@ -99,4 +130,56 @@ mod tests {
let err = server.set_server_id(id42).unwrap_err();
assert!(matches!(err, SetServerIdError::AlreadySet { .. }));
}
#[test]
fn test_router_crud() {
let server = make_router_server();
let cfg_foo_1 = RouterConfig {
name: String::from("foo"),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
let cfg_foo_2 = RouterConfig {
query_sinks: QuerySinks {
grpc_remotes: vec![ServerId::try_from(1).unwrap()],
},
..cfg_foo_1.clone()
};
assert_ne!(cfg_foo_1, cfg_foo_2);
let cfg_bar = RouterConfig {
name: String::from("bar"),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
// no routers
assert_eq!(server.routers().len(), 0);
assert!(!server.delete_router("foo"));
// add routers
assert!(!server.update_router(cfg_foo_1.clone()));
assert!(!server.update_router(cfg_bar.clone()));
let routers = server.routers();
assert_eq!(routers.len(), 2);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(routers[1].config(), &cfg_foo_1);
// update router
assert!(server.update_router(cfg_foo_2.clone()));
let routers = server.routers();
assert_eq!(routers.len(), 2);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(routers[1].config(), &cfg_foo_2);
// delete routers
assert!(server.delete_router("foo"));
let routers = server.routers();
assert_eq!(routers.len(), 1);
assert_eq!(routers[0].config(), &cfg_bar);
assert!(!server.delete_router("foo"));
}
}