Merge pull request #2100 from influxdata/sinks

refactor: Generailize routing sinks
pull/24376/head
kodiakhq[bot] 2021-07-26 09:28:19 +00:00 committed by GitHub
commit f361e344c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 190 additions and 75 deletions

View File

@ -287,9 +287,9 @@ pub struct StrftimeColumn {
/// A routing config defines the destination where to route all data plane operations /// A routing config defines the destination where to route all data plane operations
/// for a given database. /// for a given database.
#[derive(Debug, Eq, PartialEq, Clone, Default)] #[derive(Debug, Eq, PartialEq, Clone)]
pub struct RoutingConfig { pub struct RoutingConfig {
pub target: NodeGroup, pub sink: Sink,
} }
/// ShardId maps to a nodegroup that holds the the shard. /// ShardId maps to a nodegroup that holds the the shard.
@ -329,13 +329,14 @@ pub struct ShardConfig {
pub ignore_errors: bool, pub ignore_errors: bool,
/// Mapping between shard IDs and node groups. Other sharding rules use /// Mapping between shard IDs and node groups. Other sharding rules use
/// ShardId as targets. /// ShardId as targets.
pub shards: Arc<HashMap<ShardId, Shard>>, pub shards: Arc<HashMap<ShardId, Sink>>,
} }
/// Configuration for a specific IOx shard /// Configuration for a specific IOx sink
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone)]
pub enum Shard { pub enum Sink {
Iox(NodeGroup), Iox(NodeGroup),
Kafka(KafkaProducer),
} }
struct LineHasher<'a, 'b, 'c> { struct LineHasher<'a, 'b, 'c> {
@ -391,6 +392,9 @@ pub struct MatcherToShard {
/// A collection of IOx nodes /// A collection of IOx nodes
pub type NodeGroup = Vec<ServerId>; pub type NodeGroup = Vec<ServerId>;
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct KafkaProducer {}
/// HashRing is a rule for creating a hash key for a row and mapping that to /// HashRing is a rule for creating a hash key for a row and mapping that to
/// an individual node on a ring. /// an individual node on a ring.
#[derive(Debug, Eq, PartialEq, Clone, Default)] #[derive(Debug, Eq, PartialEq, Clone, Default)]

View File

@ -118,5 +118,7 @@ message DatabaseRules {
} }
message RoutingConfig { message RoutingConfig {
NodeGroup target = 1; NodeGroup target = 1 [deprecated = true];
Sink sink = 2;
} }

View File

@ -33,7 +33,7 @@ message ShardConfig {
/// Mapping between shard IDs and node groups. Other sharding rules use /// Mapping between shard IDs and node groups. Other sharding rules use
/// ShardId as targets. /// ShardId as targets.
map<uint32, Shard> shards = 4; map<uint32, Sink> shards = 4;
} }
// Maps a matcher with specific shard. If the line/row matches // Maps a matcher with specific shard. If the line/row matches
@ -52,10 +52,11 @@ message Matcher {
string predicate = 2; string predicate = 2;
} }
// Configuration for a specific shard // Configuration for a specific sink
message Shard { message Sink {
oneof sink { oneof sink {
NodeGroup iox = 1; NodeGroup iox = 1;
KafkaProducer kafka = 2;
} }
} }
@ -67,6 +68,11 @@ message NodeGroup {
repeated Node nodes = 1; repeated Node nodes = 1;
} }
// Kafka producer configuration
message KafkaProducer {
}
// HashRing is a rule for creating a hash key for a row and mapping that to // HashRing is a rule for creating a hash key for a row and mapping that to
// an individual node on a ring. // an individual node on a ring.
message HashRing { message HashRing {

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use data_types::database_rules::{ use data_types::database_rules::{
DatabaseRules, RoutingConfig, RoutingRules, WriteBufferConnection, DatabaseRules, RoutingConfig, RoutingRules, Sink, WriteBufferConnection,
}; };
use data_types::DatabaseName; use data_types::DatabaseName;
@ -14,6 +14,7 @@ use crate::influxdata::iox::management::v1 as management;
mod lifecycle; mod lifecycle;
mod partition; mod partition;
mod shard; mod shard;
mod sink;
impl From<DatabaseRules> for management::DatabaseRules { impl From<DatabaseRules> for management::DatabaseRules {
fn from(rules: DatabaseRules) -> Self { fn from(rules: DatabaseRules) -> Self {
@ -100,8 +101,10 @@ impl TryFrom<management::database_rules::RoutingRules> for RoutingRules {
impl From<RoutingConfig> for management::RoutingConfig { impl From<RoutingConfig> for management::RoutingConfig {
fn from(routing_config: RoutingConfig) -> Self { fn from(routing_config: RoutingConfig) -> Self {
#[allow(deprecated)]
Self { Self {
target: Some(routing_config.target.into()), target: None,
sink: Some(routing_config.sink.into()),
} }
} }
} }
@ -110,8 +113,13 @@ impl TryFrom<management::RoutingConfig> for RoutingConfig {
type Error = FieldViolation; type Error = FieldViolation;
fn try_from(proto: management::RoutingConfig) -> Result<Self, Self::Error> { fn try_from(proto: management::RoutingConfig) -> Result<Self, Self::Error> {
#[allow(deprecated)]
Ok(Self { Ok(Self {
target: proto.target.required("target")?, sink: if proto.target.is_some() {
Sink::Iox(proto.target.required("target")?)
} else {
proto.sink.required("sink")?
},
}) })
} }
} }
@ -196,4 +204,72 @@ mod tests {
// These should be none as preserved on non-protobuf DatabaseRules // These should be none as preserved on non-protobuf DatabaseRules
assert!(back.routing_rules.is_none()); assert!(back.routing_rules.is_none());
} }
#[test]
fn test_routing_rules_conversion() {
let protobuf = management::DatabaseRules {
name: "database".to_string(),
routing_rules: None,
..Default::default()
};
let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into();
assert!(back.routing_rules.is_none());
#[allow(deprecated)]
let routing_config_sink = management::RoutingConfig {
target: None,
sink: Some(management::Sink {
sink: Some(management::sink::Sink::Iox(management::NodeGroup {
nodes: vec![management::node_group::Node { id: 1234 }],
})),
}),
};
let protobuf = management::DatabaseRules {
name: "database".to_string(),
routing_rules: Some(management::database_rules::RoutingRules::RoutingConfig(
routing_config_sink.clone(),
)),
..Default::default()
};
let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into();
assert_eq!(
back.routing_rules,
Some(management::database_rules::RoutingRules::RoutingConfig(
routing_config_sink.clone()
))
);
#[allow(deprecated)]
let routing_config_target = management::RoutingConfig {
target: Some(management::NodeGroup {
nodes: vec![management::node_group::Node { id: 1234 }],
}),
sink: None,
};
let protobuf = management::DatabaseRules {
name: "database".to_string(),
routing_rules: Some(management::database_rules::RoutingRules::RoutingConfig(
routing_config_target,
)),
..Default::default()
};
let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into();
assert_eq!(
back.routing_rules,
Some(management::database_rules::RoutingRules::RoutingConfig(
routing_config_sink
))
);
}
} }

View File

@ -3,9 +3,7 @@ use std::sync::Arc;
use regex::Regex; use regex::Regex;
use data_types::database_rules::{ use data_types::database_rules::{HashRing, Matcher, MatcherToShard, NodeGroup, ShardConfig};
HashRing, Matcher, MatcherToShard, NodeGroup, Shard, ShardConfig,
};
use data_types::server_id::ServerId; use data_types::server_id::ServerId;
use crate::google::{FieldViolation, FieldViolationExt, FromField}; use crate::google::{FieldViolation, FieldViolationExt, FromField};
@ -110,26 +108,6 @@ impl TryFrom<management::HashRing> for HashRing {
} }
} }
impl From<Shard> for management::Shard {
fn from(shard: Shard) -> Self {
let sink = match shard {
Shard::Iox(node_group) => management::shard::Sink::Iox(node_group.into()),
};
management::Shard { sink: Some(sink) }
}
}
impl TryFrom<management::Shard> for Shard {
type Error = FieldViolation;
fn try_from(proto: management::Shard) -> Result<Self, Self::Error> {
let sink = proto.sink.ok_or_else(|| FieldViolation::required(""))?;
Ok(match sink {
management::shard::Sink::Iox(node_group) => Shard::Iox(node_group.scope("node_group")?),
})
}
}
impl From<NodeGroup> for management::NodeGroup { impl From<NodeGroup> for management::NodeGroup {
fn from(node_group: NodeGroup) -> Self { fn from(node_group: NodeGroup) -> Self {
Self { Self {
@ -195,9 +173,10 @@ impl TryFrom<management::Matcher> for Matcher {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use data_types::consistent_hasher::ConsistentHasher; use data_types::consistent_hasher::ConsistentHasher;
use data_types::database_rules::DatabaseRules; use data_types::database_rules::{DatabaseRules, Sink};
use super::*;
#[test] #[test]
fn test_matcher_default() { fn test_matcher_default() {
@ -343,8 +322,8 @@ mod tests {
shards: vec![ shards: vec![
( (
1, 1,
management::Shard { management::Sink {
sink: Some(management::shard::Sink::Iox(management::NodeGroup { sink: Some(management::sink::Sink::Iox(management::NodeGroup {
nodes: vec![ nodes: vec![
management::node_group::Node { id: 10 }, management::node_group::Node { id: 10 },
management::node_group::Node { id: 11 }, management::node_group::Node { id: 11 },
@ -355,8 +334,8 @@ mod tests {
), ),
( (
2, 2,
management::Shard { management::Sink {
sink: Some(management::shard::Sink::Iox(management::NodeGroup { sink: Some(management::sink::Sink::Iox(management::NodeGroup {
nodes: vec![management::node_group::Node { id: 20 }], nodes: vec![management::node_group::Node { id: 20 }],
})), })),
}, },
@ -370,12 +349,8 @@ mod tests {
let shard_config: ShardConfig = protobuf.try_into().unwrap(); let shard_config: ShardConfig = protobuf.try_into().unwrap();
assert_eq!(shard_config.shards.len(), 2); assert_eq!(shard_config.shards.len(), 2);
assert!( assert!(matches!(&shard_config.shards[&1], Sink::Iox(node_group) if node_group.len() == 3));
matches!(&shard_config.shards[&1], Shard::Iox(node_group) if node_group.len() == 3) assert!(matches!(&shard_config.shards[&2], Sink::Iox(node_group) if node_group.len() == 1));
);
assert!(
matches!(&shard_config.shards[&2], Shard::Iox(node_group) if node_group.len() == 1)
);
} }
#[test] #[test]

View File

@ -0,0 +1,42 @@
use std::convert::TryFrom;
use data_types::database_rules::{KafkaProducer, Sink};
use crate::google::{FieldViolation, FromField};
use crate::influxdata::iox::management::v1 as management;
impl From<Sink> for management::Sink {
fn from(shard: Sink) -> Self {
let sink = match shard {
Sink::Iox(node_group) => management::sink::Sink::Iox(node_group.into()),
Sink::Kafka(kafka) => management::sink::Sink::Kafka(kafka.into()),
};
management::Sink { sink: Some(sink) }
}
}
impl TryFrom<management::Sink> for Sink {
type Error = FieldViolation;
fn try_from(proto: management::Sink) -> Result<Self, Self::Error> {
let sink = proto.sink.ok_or_else(|| FieldViolation::required(""))?;
Ok(match sink {
management::sink::Sink::Iox(node_group) => Sink::Iox(node_group.scope("node_group")?),
management::sink::Sink::Kafka(kafka) => Sink::Kafka(kafka.scope("kafka")?),
})
}
}
impl From<KafkaProducer> for management::KafkaProducer {
fn from(_: KafkaProducer) -> Self {
Self {}
}
}
impl TryFrom<management::KafkaProducer> for KafkaProducer {
type Error = FieldViolation;
fn try_from(_: management::KafkaProducer) -> Result<Self, Self::Error> {
Ok(Self {})
}
}

View File

@ -80,7 +80,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use data_types::{ use data_types::{
database_rules::{ database_rules::{
DatabaseRules, NodeGroup, RoutingRules, Shard, ShardConfig, ShardId, WriteBufferConnection, DatabaseRules, NodeGroup, RoutingRules, ShardConfig, ShardId, Sink, WriteBufferConnection,
}, },
database_state::DatabaseStateCode, database_state::DatabaseStateCode,
job::Job, job::Job,
@ -790,16 +790,15 @@ where
&*rules, &*rules,
) )
.context(LineConversion)?; .context(LineConversion)?;
Some((routing_config.target.clone(), sharded_entries)) Some((routing_config.sink.clone(), sharded_entries))
} else { } else {
None None
} }
}; };
if let Some((target, sharded_entries)) = routing_config_target { if let Some((sink, sharded_entries)) = routing_config_target {
for i in sharded_entries { for i in sharded_entries {
self.write_entry_downstream(&db_name, &target, i.entry) self.write_entry_sink(&db_name, &sink, i.entry).await?;
.await?;
} }
return Ok(()); return Ok(());
} }
@ -814,7 +813,7 @@ where
let rules = db.rules(); let rules = db.rules();
let shard_config = rules.routing_rules.as_ref().map(|cfg| match cfg { let shard_config = rules.routing_rules.as_ref().map(|cfg| match cfg {
RoutingRules::RoutingConfig(_) => todo!("routing config"), RoutingRules::RoutingConfig(_) => unreachable!("routing config handled above"),
RoutingRules::ShardConfig(shard_config) => shard_config, RoutingRules::ShardConfig(shard_config) => shard_config,
}); });
@ -848,19 +847,15 @@ where
&self, &self,
db_name: &str, db_name: &str,
db: &Db, db: &Db,
shards: Arc<HashMap<u32, Shard>>, shards: Arc<HashMap<u32, Sink>>,
sharded_entry: ShardedEntry, sharded_entry: ShardedEntry,
) -> Result<()> { ) -> Result<()> {
match sharded_entry.shard_id { match sharded_entry.shard_id {
Some(shard_id) => { Some(shard_id) => {
let shard = shards.get(&shard_id).context(ShardNotFound { shard_id })?; let sink = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
match shard { self.write_entry_sink(db_name, sink, sharded_entry.entry)
Shard::Iox(node_group) => {
self.write_entry_downstream(db_name, node_group, sharded_entry.entry)
.await? .await?
} }
}
}
None => { None => {
self.write_entry_local(&db_name, db, sharded_entry.entry) self.write_entry_local(&db_name, db, sharded_entry.entry)
.await? .await?
@ -869,6 +864,18 @@ where
Ok(()) Ok(())
} }
async fn write_entry_sink(&self, db_name: &str, sink: &Sink, entry: Entry) -> Result<()> {
match sink {
Sink::Iox(node_group) => {
self.write_entry_downstream(db_name, node_group, entry)
.await
}
Sink::Kafka(_) => {
todo!("write to write buffer")
}
}
}
async fn write_entry_downstream( async fn write_entry_downstream(
&self, &self,
db_name: &str, db_name: &str,
@ -1741,7 +1748,7 @@ mod tests {
..Default::default() ..Default::default()
}), }),
shards: Arc::new( shards: Arc::new(
vec![(TEST_SHARD_ID, Shard::Iox(remote_ids.clone()))] vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))]
.into_iter() .into_iter()
.collect(), .collect(),
), ),

View File

@ -148,7 +148,6 @@ where
.update_db_rules(&db_name, |_orig| Ok(rules)) .update_db_rules(&db_name, |_orig| Ok(rules))
.await .await
.map_err(UpdateError::from)?; .map_err(UpdateError::from)?;
Ok(Response::new(UpdateDatabaseResponse { Ok(Response::new(UpdateDatabaseResponse {
rules: Some(updated_rules.as_ref().clone().into()), rules: Some(updated_rules.as_ref().clone().into()),
})) }))

View File

@ -11,8 +11,8 @@ use entry::{
}; };
use generated_types::influxdata::iox::management::v1::database_rules::RoutingRules; use generated_types::influxdata::iox::management::v1::database_rules::RoutingRules;
use generated_types::influxdata::iox::management::v1::{ use generated_types::influxdata::iox::management::v1::{
node_group::Node, shard, HashRing, Matcher, MatcherToShard, NodeGroup, RoutingConfig, Shard, node_group::Node, sink, HashRing, Matcher, MatcherToShard, NodeGroup, RoutingConfig,
ShardConfig, ShardConfig, Sink,
}; };
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
use std::collections::HashMap; use std::collections::HashMap;
@ -225,8 +225,8 @@ async fn test_write_routed() {
shards: vec![ shards: vec![
( (
TEST_SHARD_ID_1, TEST_SHARD_ID_1,
Shard { Sink {
sink: Some(shard::Sink::Iox(NodeGroup { sink: Some(sink::Sink::Iox(NodeGroup {
nodes: vec![Node { nodes: vec![Node {
id: TEST_REMOTE_ID_1, id: TEST_REMOTE_ID_1,
}], }],
@ -235,8 +235,8 @@ async fn test_write_routed() {
), ),
( (
TEST_SHARD_ID_2, TEST_SHARD_ID_2,
Shard { Sink {
sink: Some(shard::Sink::Iox(NodeGroup { sink: Some(sink::Sink::Iox(NodeGroup {
nodes: vec![Node { nodes: vec![Node {
id: TEST_REMOTE_ID_2, id: TEST_REMOTE_ID_2,
}], }],
@ -245,8 +245,8 @@ async fn test_write_routed() {
), ),
( (
TEST_SHARD_ID_3, TEST_SHARD_ID_3,
Shard { Sink {
sink: Some(shard::Sink::Iox(NodeGroup { sink: Some(sink::Sink::Iox(NodeGroup {
nodes: vec![Node { nodes: vec![Node {
id: TEST_REMOTE_ID_3, id: TEST_REMOTE_ID_3,
}], }],
@ -398,8 +398,8 @@ async fn test_write_routed_errors() {
}], }],
shards: vec![( shards: vec![(
TEST_SHARD_ID, TEST_SHARD_ID,
Shard { Sink {
sink: Some(shard::Sink::Iox(NodeGroup { sink: Some(sink::Sink::Iox(NodeGroup {
nodes: vec![Node { id: TEST_REMOTE_ID }], nodes: vec![Node { id: TEST_REMOTE_ID }],
})), })),
}, },
@ -513,9 +513,13 @@ async fn test_write_routed_no_shard() {
.get_database(db_name) .get_database(db_name)
.await .await
.expect("cannot get database on router"); .expect("cannot get database on router");
#[allow(deprecated)]
let routing_config = RoutingConfig { let routing_config = RoutingConfig {
target: Some(NodeGroup { target: None,
sink: Some(Sink {
sink: Some(sink::Sink::Iox(NodeGroup {
nodes: vec![Node { id: *remote_id }], nodes: vec![Node { id: *remote_id }],
})),
}), }),
}; };
router_db_rules.routing_rules = Some(RoutingRules::RoutingConfig(routing_config)); router_db_rules.routing_rules = Some(RoutingRules::RoutingConfig(routing_config));