refactor: Add sink to routing config

This deprecates the "target" field in the RoutingConfig and replaces it with the "sink"
field, which has a variant that accepts a node group.

This commit is backward compatible in that it will accept existing configs.
The configs will roundtrip to the new format though (i.e. `database get` will render
the sink field).
pull/24376/head
Marko Mikulicic 2021-07-22 11:53:39 +02:00
parent 16a82ba350
commit d58a3ccbc7
No known key found for this signature in database
GPG Key ID: D02A41F91A687DB3
6 changed files with 95 additions and 21 deletions

View File

@ -287,9 +287,9 @@ pub struct StrftimeColumn {
/// A routing config defines the destination where to route all data plane operations
/// for a given database.
#[derive(Debug, Eq, PartialEq, Clone, Default)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct RoutingConfig {
pub target: NodeGroup,
pub sink: Sink,
}
/// ShardId maps to a nodegroup that holds the the shard.

View File

@ -118,5 +118,7 @@ message DatabaseRules {
}
message RoutingConfig {
NodeGroup target = 1;
NodeGroup target = 1 [deprecated = true];
Sink sink = 2;
}

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use thiserror::Error;
use data_types::database_rules::{
DatabaseRules, RoutingConfig, RoutingRules, WriteBufferConnection,
DatabaseRules, RoutingConfig, RoutingRules, Sink, WriteBufferConnection,
};
use data_types::DatabaseName;
@ -101,8 +101,10 @@ impl TryFrom<management::database_rules::RoutingRules> for RoutingRules {
impl From<RoutingConfig> for management::RoutingConfig {
fn from(routing_config: RoutingConfig) -> Self {
#[allow(deprecated)]
Self {
target: Some(routing_config.target.into()),
target: None,
sink: Some(routing_config.sink.into()),
}
}
}
@ -111,8 +113,13 @@ impl TryFrom<management::RoutingConfig> for RoutingConfig {
type Error = FieldViolation;
fn try_from(proto: management::RoutingConfig) -> Result<Self, Self::Error> {
#[allow(deprecated)]
Ok(Self {
target: proto.target.required("target")?,
sink: if proto.target.is_some() {
Sink::Iox(proto.target.required("target")?)
} else {
proto.sink.required("sink")?
},
})
}
}
@ -197,4 +204,62 @@ mod tests {
// These should be none as preserved on non-protobuf DatabaseRules
assert!(back.routing_rules.is_none());
}
#[test]
fn test_routing_rules_conversion() {
let protobuf = management::DatabaseRules {
name: "database".to_string(),
routing_rules: None,
..Default::default()
};
let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into();
assert!(back.routing_rules.is_none());
#[allow(deprecated)]
let routing_config = management::RoutingConfig {
target: Some(management::NodeGroup {
nodes: vec![management::node_group::Node { id: 1234 }],
}),
sink: None,
};
let protobuf = management::DatabaseRules {
name: "database".to_string(),
routing_rules: Some(management::database_rules::RoutingRules::RoutingConfig(
routing_config,
)),
..Default::default()
};
let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into();
assert!(back.routing_rules.is_some());
#[allow(deprecated)]
let routing_config = management::RoutingConfig {
target: None,
sink: Some(management::Sink {
sink: Some(management::sink::Sink::Iox(management::NodeGroup {
nodes: vec![management::node_group::Node { id: 1234 }],
})),
}),
};
let protobuf = management::DatabaseRules {
name: "database".to_string(),
routing_rules: Some(management::database_rules::RoutingRules::RoutingConfig(
routing_config,
)),
..Default::default()
};
let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into();
assert!(back.routing_rules.is_some());
}
}

View File

@ -790,16 +790,15 @@ where
&*rules,
)
.context(LineConversion)?;
Some((routing_config.target.clone(), sharded_entries))
Some((routing_config.sink.clone(), sharded_entries))
} else {
None
}
};
if let Some((target, sharded_entries)) = routing_config_target {
if let Some((sink, sharded_entries)) = routing_config_target {
for i in sharded_entries {
self.write_entry_downstream(&db_name, &target, i.entry)
.await?;
self.write_entry_sink(&db_name, &sink, i.entry).await?;
}
return Ok(());
}
@ -814,7 +813,7 @@ where
let rules = db.rules();
let shard_config = rules.routing_rules.as_ref().map(|cfg| match cfg {
RoutingRules::RoutingConfig(_) => todo!("routing config"),
RoutingRules::RoutingConfig(_) => unreachable!("routing config handled above"),
RoutingRules::ShardConfig(shard_config) => shard_config,
});
@ -853,13 +852,9 @@ where
) -> Result<()> {
match sharded_entry.shard_id {
Some(shard_id) => {
let shard = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
match shard {
Sink::Iox(node_group) => {
self.write_entry_downstream(db_name, node_group, sharded_entry.entry)
.await?
}
}
let sink = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
self.write_entry_sink(db_name, sink, sharded_entry.entry)
.await?
}
None => {
self.write_entry_local(&db_name, db, sharded_entry.entry)
@ -869,6 +864,15 @@ where
Ok(())
}
async fn write_entry_sink(&self, db_name: &str, sink: &Sink, entry: Entry) -> Result<()> {
match sink {
Sink::Iox(node_group) => {
self.write_entry_downstream(db_name, node_group, entry)
.await
}
}
}
async fn write_entry_downstream(
&self,
db_name: &str,

View File

@ -148,7 +148,6 @@ where
.update_db_rules(&db_name, |_orig| Ok(rules))
.await
.map_err(UpdateError::from)?;
Ok(Response::new(UpdateDatabaseResponse {
rules: Some(updated_rules.as_ref().clone().into()),
}))

View File

@ -513,9 +513,13 @@ async fn test_write_routed_no_shard() {
.get_database(db_name)
.await
.expect("cannot get database on router");
#[allow(deprecated)]
let routing_config = RoutingConfig {
target: Some(NodeGroup {
nodes: vec![Node { id: *remote_id }],
target: None,
sink: Some(Sink {
sink: Some(sink::Sink::Iox(NodeGroup {
nodes: vec![Node { id: *remote_id }],
})),
}),
};
router_db_rules.routing_rules = Some(RoutingRules::RoutingConfig(routing_config));