feat: Add simpler RoutingConfig

pull/24376/head
Marko Mikulicic 2021-05-24 15:35:35 +02:00
parent ba83ebe35c
commit bae5e5aee3
No known key found for this signature in database
GPG Key ID: D02A41F91A687DB3
7 changed files with 333 additions and 40 deletions

View File

@ -46,14 +46,25 @@ pub struct DatabaseRules {
/// Configure how data flows through the system /// Configure how data flows through the system
pub lifecycle_rules: LifecycleRules, pub lifecycle_rules: LifecycleRules,
/// An optional config to split writes into different "shards". A shard /// An optional config to delegate data plane operations to one or more
/// remote servers.
pub routing_rules: Option<RoutingRules>,
}
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum RoutingRules {
// A routing config defines the target where all data plane operations for
// a given database are delegated to.
RoutingConfig(RoutingConfig),
/// A sharding config split writes into different "shards". A shard
/// is a logical concept, but the usage is meant to split data into /// is a logical concept, but the usage is meant to split data into
/// mutually exclusive areas. The rough order of organization is: /// mutually exclusive areas. The rough order of organization is:
/// database -> shard -> partition -> chunk. For example, you could shard /// database -> shard -> partition -> chunk. For example, you could shard
/// based on table name and assign to 1 of 10 shards. Within each /// based on table name and assign to 1 of 10 shards. Within each
/// shard you would have partitions, which would likely be based off time. /// shard you would have partitions, which would likely be based off time.
/// This makes it possible to horizontally scale out writes. /// This makes it possible to horizontally scale out writes.
pub shard_config: Option<ShardConfig>, ShardConfig(ShardConfig),
} }
impl DatabaseRules { impl DatabaseRules {
@ -67,7 +78,7 @@ impl DatabaseRules {
partition_template: Default::default(), partition_template: Default::default(),
write_buffer_config: None, write_buffer_config: None,
lifecycle_rules: Default::default(), lifecycle_rules: Default::default(),
shard_config: None, routing_rules: None,
} }
} }
@ -351,6 +362,13 @@ pub struct StrftimeColumn {
pub format: String, pub format: String,
} }
/// A routing config defines the destination where to route all data plane operations
/// for a given database.
#[derive(Debug, Eq, PartialEq, Clone, Default)]
pub struct RoutingConfig {
pub target: NodeGroup,
}
/// ShardId maps to a nodegroup that holds the the shard. /// ShardId maps to a nodegroup that holds the the shard.
pub type ShardId = u32; pub type ShardId = u32;
pub const NO_SHARD_CONFIG: Option<&ShardConfig> = None; pub const NO_SHARD_CONFIG: Option<&ShardConfig> = None;

View File

@ -177,6 +177,15 @@ message DatabaseRules {
// Write Buffer configuration for this database // Write Buffer configuration for this database
WriteBufferConfig write_buffer_config = 6; WriteBufferConfig write_buffer_config = 6;
// Shard config oneof routing_rules {
ShardConfig shard_config = 8; // Shard config
ShardConfig shard_config = 8;
// Routing config
RoutingConfig routing_config = 9;
}
}
message RoutingConfig {
NodeGroup target = 1;
} }

View File

@ -2,7 +2,9 @@ use std::convert::{TryFrom, TryInto};
use thiserror::Error; use thiserror::Error;
use data_types::database_rules::{ColumnType, ColumnValue, DatabaseRules, Order}; use data_types::database_rules::{
ColumnType, ColumnValue, DatabaseRules, Order, RoutingConfig, RoutingRules,
};
use data_types::DatabaseName; use data_types::DatabaseName;
use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt}; use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt};
@ -20,7 +22,7 @@ impl From<DatabaseRules> for management::DatabaseRules {
partition_template: Some(rules.partition_template.into()), partition_template: Some(rules.partition_template.into()),
write_buffer_config: rules.write_buffer_config.map(Into::into), write_buffer_config: rules.write_buffer_config.map(Into::into),
lifecycle_rules: Some(rules.lifecycle_rules.into()), lifecycle_rules: Some(rules.lifecycle_rules.into()),
shard_config: rules.shard_config.map(Into::into), routing_rules: rules.routing_rules.map(Into::into),
} }
} }
} }
@ -43,9 +45,9 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
.optional("partition_template")? .optional("partition_template")?
.unwrap_or_default(); .unwrap_or_default();
let shard_config = proto let routing_rules = proto
.shard_config .routing_rules
.optional("shard_config") .optional("routing_rules")
.unwrap_or_default(); .unwrap_or_default();
Ok(Self { Ok(Self {
@ -53,7 +55,53 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
partition_template, partition_template,
write_buffer_config, write_buffer_config,
lifecycle_rules, lifecycle_rules,
shard_config, routing_rules,
})
}
}
impl From<RoutingRules> for management::database_rules::RoutingRules {
fn from(routing_rules: RoutingRules) -> Self {
match routing_rules {
RoutingRules::RoutingConfig(cfg) => {
management::database_rules::RoutingRules::RoutingConfig(cfg.into())
}
RoutingRules::ShardConfig(cfg) => {
management::database_rules::RoutingRules::ShardConfig(cfg.into())
}
}
}
}
impl TryFrom<management::database_rules::RoutingRules> for RoutingRules {
type Error = FieldViolation;
fn try_from(proto: management::database_rules::RoutingRules) -> Result<Self, Self::Error> {
Ok(match proto {
management::database_rules::RoutingRules::ShardConfig(cfg) => {
RoutingRules::ShardConfig(cfg.try_into()?)
}
management::database_rules::RoutingRules::RoutingConfig(cfg) => {
RoutingRules::RoutingConfig(cfg.try_into()?)
}
})
}
}
impl From<RoutingConfig> for management::RoutingConfig {
fn from(routing_config: RoutingConfig) -> Self {
Self {
target: Some(routing_config.target.into()),
}
}
}
impl TryFrom<management::RoutingConfig> for RoutingConfig {
type Error = FieldViolation;
fn try_from(proto: management::RoutingConfig) -> Result<Self, Self::Error> {
Ok(Self {
target: proto.target.required("target")?,
}) })
} }
} }
@ -184,6 +232,6 @@ mod tests {
// These should be none as preserved on non-protobuf DatabaseRules // These should be none as preserved on non-protobuf DatabaseRules
assert!(back.write_buffer_config.is_none()); assert!(back.write_buffer_config.is_none());
assert!(back.shard_config.is_none()); assert!(back.routing_rules.is_none());
} }
} }

View File

@ -319,16 +319,22 @@ mod tests {
fn test_database_rules_shard_config() { fn test_database_rules_shard_config() {
let protobuf = management::DatabaseRules { let protobuf = management::DatabaseRules {
name: "database".to_string(), name: "database".to_string(),
shard_config: Some(management::ShardConfig { routing_rules: Some(management::database_rules::RoutingRules::ShardConfig(
..Default::default() management::ShardConfig {
}), ..Default::default()
},
)),
..Default::default() ..Default::default()
}; };
let rules: DatabaseRules = protobuf.try_into().unwrap(); let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into(); let back: management::DatabaseRules = rules.into();
assert!(back.shard_config.is_some()); assert!(back.routing_rules.is_some());
assert!(matches!(
back.routing_rules,
Some(management::database_rules::RoutingRules::ShardConfig(_))
));
} }
#[test] #[test]

View File

@ -101,7 +101,7 @@ use crate::{
db::Db, db::Db,
}; };
use cached::Return; use cached::Return;
use data_types::database_rules::{NodeGroup, Shard, ShardId}; use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId};
use generated_types::database_rules::{decode_database_rules, encode_database_rules}; use generated_types::database_rules::{decode_database_rules, encode_database_rules};
use influxdb_iox_client::{connection::Builder, write}; use influxdb_iox_client::{connection::Builder, write};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
@ -587,6 +587,31 @@ impl<M: ConnectionManager> Server<M> {
.db(&db_name) .db(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?; .context(DatabaseNotFound { db_name: &*db_name })?;
// need to split this in two blocks because we cannot hold a lock across an async call.
let routing_config_target = {
let rules = db.rules.read();
if let Some(RoutingRules::RoutingConfig(routing_config)) = &rules.routing_rules {
let sharded_entries = lines_to_sharded_entries(
lines,
default_time,
None as Option<&ShardConfig>,
&*rules,
)
.context(LineConversion)?;
Some((routing_config.target.clone(), sharded_entries))
} else {
None
}
};
if let Some((target, sharded_entries)) = routing_config_target {
for i in sharded_entries {
self.write_entry_downstream(&db_name, &target, i.entry)
.await?;
}
return Ok(());
}
// Split lines into shards while holding a read lock on the sharding config. // Split lines into shards while holding a read lock on the sharding config.
// Once the lock is released we have a vector of entries, each associated with a // Once the lock is released we have a vector of entries, each associated with a
// shard id, and an Arc to the mapping between shard ids and node // shard id, and an Arc to the mapping between shard ids and node
@ -595,10 +620,14 @@ impl<M: ConnectionManager> Server<M> {
// lock. // lock.
let (sharded_entries, shards) = { let (sharded_entries, shards) = {
let rules = db.rules.read(); let rules = db.rules.read();
let shard_config = &rules.shard_config;
let shard_config = rules.routing_rules.as_ref().map(|cfg| match cfg {
RoutingRules::RoutingConfig(_) => todo!("routing config"),
RoutingRules::ShardConfig(shard_config) => shard_config,
});
let sharded_entries = let sharded_entries =
lines_to_sharded_entries(lines, default_time, shard_config.as_ref(), &*rules) lines_to_sharded_entries(lines, default_time, shard_config, &*rules)
.context(LineConversion)?; .context(LineConversion)?;
let shards = shard_config let shards = shard_config
@ -1143,7 +1172,7 @@ mod tests {
}, },
write_buffer_config: None, write_buffer_config: None,
lifecycle_rules: Default::default(), lifecycle_rules: Default::default(),
shard_config: None, routing_rules: None,
}; };
// Create a database // Create a database
@ -1239,7 +1268,7 @@ mod tests {
}, },
write_buffer_config: None, write_buffer_config: None,
lifecycle_rules: Default::default(), lifecycle_rules: Default::default(),
shard_config: None, routing_rules: None,
}; };
// Create a database // Create a database
@ -1462,7 +1491,7 @@ mod tests {
let db = server.db(&db_name).unwrap(); let db = server.db(&db_name).unwrap();
{ {
let mut rules = db.rules.write(); let mut rules = db.rules.write();
rules.shard_config = Some(ShardConfig { let shard_config = ShardConfig {
hash_ring: Some(HashRing { hash_ring: Some(HashRing {
shards: vec![TEST_SHARD_ID].into(), shards: vec![TEST_SHARD_ID].into(),
..Default::default() ..Default::default()
@ -1473,7 +1502,8 @@ mod tests {
.collect(), .collect(),
), ),
..Default::default() ..Default::default()
}); };
rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
} }
let line = "cpu bar=1 10"; let line = "cpu bar=1 10";

View File

@ -1,6 +1,6 @@
use generated_types::{ use generated_types::{
google::protobuf::{Duration, Empty}, google::protobuf::{Duration, Empty},
influxdata::iox::management::v1::*, influxdata::iox::management::v1::{database_rules::RoutingRules, *},
}; };
use influxdb_iox_client::{management::CreateDatabaseError, operations, write::WriteError}; use influxdb_iox_client::{management::CreateDatabaseError, operations, write::WriteError};
@ -222,7 +222,7 @@ async fn test_create_get_update_database() {
}), }),
..Default::default() ..Default::default()
}), }),
shard_config: None, routing_rules: None,
}; };
client client
@ -235,12 +235,12 @@ async fn test_create_get_update_database() {
.await .await
.expect("get database failed"); .expect("get database failed");
assert_eq!(response.shard_config, None); assert_eq!(response.routing_rules, None);
rules.shard_config = Some(ShardConfig { rules.routing_rules = Some(RoutingRules::ShardConfig(ShardConfig {
ignore_errors: true, ignore_errors: true,
..Default::default() ..Default::default()
}); }));
let updated_rules = client let updated_rules = client
.update_database(rules.clone()) .update_database(rules.clone())
@ -254,13 +254,10 @@ async fn test_create_get_update_database() {
.await .await
.expect("get database failed"); .expect("get database failed");
assert_eq!( assert!(matches!(
response response.routing_rules,
.shard_config Some(RoutingRules::ShardConfig(cfg)) if cfg.ignore_errors,
.expect("shard config missing") ));
.ignore_errors,
true
);
} }
#[tokio::test] #[tokio::test]

View File

@ -9,8 +9,10 @@ use entry::{
lines_to_sharded_entries, lines_to_sharded_entries,
test_helpers::{partitioner, sharder}, test_helpers::{partitioner, sharder},
}; };
use generated_types::influxdata::iox::management::v1::database_rules::RoutingRules;
use generated_types::influxdata::iox::management::v1::{ use generated_types::influxdata::iox::management::v1::{
node_group::Node, shard, HashRing, Matcher, MatcherToShard, NodeGroup, Shard, ShardConfig, node_group::Node, shard, HashRing, Matcher, MatcherToShard, NodeGroup, RoutingConfig, Shard,
ShardConfig,
}; };
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
use std::collections::HashMap; use std::collections::HashMap;
@ -194,7 +196,7 @@ async fn test_write_routed() {
.get_database(&db_name) .get_database(&db_name)
.await .await
.expect("cannot get database on router"); .expect("cannot get database on router");
router_db_rules.shard_config = Some(ShardConfig { let shard_config = ShardConfig {
specific_targets: vec![ specific_targets: vec![
MatcherToShard { MatcherToShard {
matcher: Some(Matcher { matcher: Some(Matcher {
@ -251,7 +253,9 @@ async fn test_write_routed() {
.into_iter() .into_iter()
.collect::<HashMap<_, _>>(), .collect::<HashMap<_, _>>(),
..Default::default() ..Default::default()
}); };
router_db_rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
router_mgmt router_mgmt
.update_database(router_db_rules) .update_database(router_db_rules)
.await .await
@ -379,7 +383,7 @@ async fn test_write_routed_errors() {
.get_database(&db_name) .get_database(&db_name)
.await .await
.expect("cannot get database on router"); .expect("cannot get database on router");
router_db_rules.shard_config = Some(ShardConfig { let shard_config = ShardConfig {
specific_targets: vec![MatcherToShard { specific_targets: vec![MatcherToShard {
matcher: Some(Matcher { matcher: Some(Matcher {
table_name_regex: "^cpu$".to_string(), table_name_regex: "^cpu$".to_string(),
@ -398,7 +402,8 @@ async fn test_write_routed_errors() {
.into_iter() .into_iter()
.collect::<HashMap<_, _>>(), .collect::<HashMap<_, _>>(),
..Default::default() ..Default::default()
}); };
router_db_rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
router_mgmt router_mgmt
.update_database(router_db_rules) .update_database(router_db_rules)
.await .await
@ -425,3 +430,183 @@ async fn test_write_routed_errors() {
// TODO(mkm): check connection error and successful communication with a // TODO(mkm): check connection error and successful communication with a
// target that replies with an error... // target that replies with an error...
} }
#[tokio::test]
async fn test_write_routed_no_shard() {
const TEST_ROUTER_ID: u32 = 1;
const TEST_TARGET_ID_1: u32 = 2;
const TEST_TARGET_ID_2: u32 = 3;
const TEST_TARGET_ID_3: u32 = 4;
const TEST_REMOTE_ID_1: u32 = 2;
const TEST_REMOTE_ID_2: u32 = 3;
const TEST_REMOTE_ID_3: u32 = 4;
let router = ServerFixture::create_single_use().await;
let mut router_mgmt = router.management_client();
router_mgmt
.update_server_id(TEST_ROUTER_ID)
.await
.expect("set ID failed");
let target_1 = ServerFixture::create_single_use().await;
let mut target_1_mgmt = target_1.management_client();
target_1_mgmt
.update_server_id(TEST_TARGET_ID_1)
.await
.expect("set ID failed");
router_mgmt
.update_remote(TEST_REMOTE_ID_1, target_1.grpc_base())
.await
.expect("set remote failed");
let target_2 = ServerFixture::create_single_use().await;
let mut target_2_mgmt = target_2.management_client();
target_2_mgmt
.update_server_id(TEST_TARGET_ID_2)
.await
.expect("set ID failed");
router_mgmt
.update_remote(TEST_REMOTE_ID_2, target_2.grpc_base())
.await
.expect("set remote failed");
let target_3 = ServerFixture::create_single_use().await;
let mut target_3_mgmt = target_3.management_client();
target_3_mgmt
.update_server_id(TEST_TARGET_ID_3)
.await
.expect("set ID failed");
router_mgmt
.update_remote(TEST_REMOTE_ID_3, target_3.grpc_base())
.await
.expect("set remote failed");
let db_name_1 = rand_name();
let db_name_2 = rand_name();
for &db_name in &[&db_name_1, &db_name_2] {
create_readable_database(db_name, router.grpc_channel()).await;
create_readable_database(db_name, target_1.grpc_channel()).await;
create_readable_database(db_name, target_2.grpc_channel()).await;
create_readable_database(db_name, target_3.grpc_channel()).await;
}
// Set routing rules on the router:
for (db_name, remote_id) in &[
(db_name_1.clone(), TEST_REMOTE_ID_1),
(db_name_2.clone(), TEST_REMOTE_ID_2),
] {
let mut router_db_rules = router_mgmt
.get_database(db_name)
.await
.expect("cannot get database on router");
let routing_config = RoutingConfig {
target: Some(NodeGroup {
nodes: vec![Node { id: *remote_id }],
}),
};
router_db_rules.routing_rules = Some(RoutingRules::RoutingConfig(routing_config));
router_mgmt
.update_database(router_db_rules)
.await
.expect("cannot update router db rules");
}
// Write some data
let line_1 = "cpu bar=1 100";
let line_2 = "disk bar=2 100";
let mut write_client = router.write_client();
for (&ref db_name, &ref line) in &[(&db_name_1, line_1), (&db_name_2, line_2)] {
let num_lines_written = write_client
.write(db_name, line)
.await
.expect("cannot write");
assert_eq!(num_lines_written, 1);
}
// The router will have split the write request by database name.
// Target 1 will have received only the "cpu" table.
// Target 2 will have received only the "disk" table.
// Target 3 won't get any writes.
let mut query_results = target_1
.flight_client()
.perform_query(&db_name_1, "select * from cpu")
.await
.expect("failed to query target 1");
let mut batches = Vec::new();
while let Some(data) = query_results.next().await.unwrap() {
batches.push(data);
}
let expected = vec![
"+-----+-------------------------------+",
"| bar | time |",
"+-----+-------------------------------+",
"| 1 | 1970-01-01 00:00:00.000000100 |",
"+-----+-------------------------------+",
];
assert_batches_sorted_eq!(&expected, &batches);
assert!(target_1
.flight_client()
.perform_query(&db_name_1, "select * from disk")
.await
.unwrap_err()
.to_string()
.contains("Table or CTE with name \\\'disk\\\' not found\""));
let mut query_results = target_2
.flight_client()
.perform_query(&db_name_2, "select * from disk")
.await
.expect("failed to query target 2");
let mut batches = Vec::new();
while let Some(data) = query_results.next().await.unwrap() {
batches.push(data);
}
let expected = vec![
"+-----+-------------------------------+",
"| bar | time |",
"+-----+-------------------------------+",
"| 2 | 1970-01-01 00:00:00.000000100 |",
"+-----+-------------------------------+",
];
assert_batches_sorted_eq!(&expected, &batches);
assert!(target_2
.flight_client()
.perform_query(&db_name_1, "select * from cpu")
.await
.unwrap_err()
.to_string()
.contains("Table or CTE with name \\\'cpu\\\' not found\""));
// Ensure that target_3 didn't get any writes.
assert!(target_3
.flight_client()
.perform_query(&db_name_1, "select * from cpu")
.await
.unwrap_err()
.to_string()
.contains("Table or CTE with name \\\'cpu\\\' not found\""));
assert!(target_3
.flight_client()
.perform_query(&db_name_2, "select * from disk")
.await
.unwrap_err()
.to_string()
.contains("Table or CTE with name \\\'disk\\\' not found\""));
}