From bae5e5aee351537988f8d29f54a3106af9e0cbd1 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Mon, 24 May 2021 15:35:35 +0200 Subject: [PATCH] feat: Add simpler RoutingConfig --- data_types/src/database_rules.rs | 24 ++- .../iox/management/v1/database_rules.proto | 13 +- generated_types/src/database_rules.rs | 62 +++++- generated_types/src/database_rules/shard.rs | 14 +- server/src/lib.rs | 44 +++- tests/end_to_end_cases/management_api.rs | 21 +- tests/end_to_end_cases/write_api.rs | 195 +++++++++++++++++- 7 files changed, 333 insertions(+), 40 deletions(-) diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 1ad7780acc..32a0611618 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -46,14 +46,25 @@ pub struct DatabaseRules { /// Configure how data flows through the system pub lifecycle_rules: LifecycleRules, - /// An optional config to split writes into different "shards". A shard + /// An optional config to delegate data plane operations to one or more + /// remote servers. + pub routing_rules: Option, +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub enum RoutingRules { + // A routing config defines the target where all data plane operations for + // a given database are delegated to. + RoutingConfig(RoutingConfig), + + /// A sharding config split writes into different "shards". A shard /// is a logical concept, but the usage is meant to split data into /// mutually exclusive areas. The rough order of organization is: /// database -> shard -> partition -> chunk. For example, you could shard /// based on table name and assign to 1 of 10 shards. Within each /// shard you would have partitions, which would likely be based off time. /// This makes it possible to horizontally scale out writes. - pub shard_config: Option, + ShardConfig(ShardConfig), } impl DatabaseRules { @@ -67,7 +78,7 @@ impl DatabaseRules { partition_template: Default::default(), write_buffer_config: None, lifecycle_rules: Default::default(), - shard_config: None, + routing_rules: None, } } @@ -351,6 +362,13 @@ pub struct StrftimeColumn { pub format: String, } +/// A routing config defines the destination where to route all data plane operations +/// for a given database. +#[derive(Debug, Eq, PartialEq, Clone, Default)] +pub struct RoutingConfig { + pub target: NodeGroup, +} + /// ShardId maps to a nodegroup that holds the the shard. pub type ShardId = u32; pub const NO_SHARD_CONFIG: Option<&ShardConfig> = None; diff --git a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto index 48c124b5d1..61795efc54 100644 --- a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto +++ b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto @@ -177,6 +177,15 @@ message DatabaseRules { // Write Buffer configuration for this database WriteBufferConfig write_buffer_config = 6; - // Shard config - ShardConfig shard_config = 8; + oneof routing_rules { + // Shard config + ShardConfig shard_config = 8; + + // Routing config + RoutingConfig routing_config = 9; + } +} + +message RoutingConfig { + NodeGroup target = 1; } diff --git a/generated_types/src/database_rules.rs b/generated_types/src/database_rules.rs index 69a0542bd3..9425ee79ee 100644 --- a/generated_types/src/database_rules.rs +++ b/generated_types/src/database_rules.rs @@ -2,7 +2,9 @@ use std::convert::{TryFrom, TryInto}; use thiserror::Error; -use data_types::database_rules::{ColumnType, ColumnValue, DatabaseRules, Order}; +use data_types::database_rules::{ + ColumnType, ColumnValue, DatabaseRules, Order, RoutingConfig, RoutingRules, +}; use data_types::DatabaseName; use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt}; @@ -20,7 +22,7 @@ impl From for management::DatabaseRules { partition_template: Some(rules.partition_template.into()), write_buffer_config: rules.write_buffer_config.map(Into::into), lifecycle_rules: Some(rules.lifecycle_rules.into()), - shard_config: rules.shard_config.map(Into::into), + routing_rules: rules.routing_rules.map(Into::into), } } } @@ -43,9 +45,9 @@ impl TryFrom for DatabaseRules { .optional("partition_template")? .unwrap_or_default(); - let shard_config = proto - .shard_config - .optional("shard_config") + let routing_rules = proto + .routing_rules + .optional("routing_rules") .unwrap_or_default(); Ok(Self { @@ -53,7 +55,53 @@ impl TryFrom for DatabaseRules { partition_template, write_buffer_config, lifecycle_rules, - shard_config, + routing_rules, + }) + } +} + +impl From for management::database_rules::RoutingRules { + fn from(routing_rules: RoutingRules) -> Self { + match routing_rules { + RoutingRules::RoutingConfig(cfg) => { + management::database_rules::RoutingRules::RoutingConfig(cfg.into()) + } + RoutingRules::ShardConfig(cfg) => { + management::database_rules::RoutingRules::ShardConfig(cfg.into()) + } + } + } +} + +impl TryFrom for RoutingRules { + type Error = FieldViolation; + + fn try_from(proto: management::database_rules::RoutingRules) -> Result { + Ok(match proto { + management::database_rules::RoutingRules::ShardConfig(cfg) => { + RoutingRules::ShardConfig(cfg.try_into()?) + } + management::database_rules::RoutingRules::RoutingConfig(cfg) => { + RoutingRules::RoutingConfig(cfg.try_into()?) + } + }) + } +} + +impl From for management::RoutingConfig { + fn from(routing_config: RoutingConfig) -> Self { + Self { + target: Some(routing_config.target.into()), + } + } +} + +impl TryFrom for RoutingConfig { + type Error = FieldViolation; + + fn try_from(proto: management::RoutingConfig) -> Result { + Ok(Self { + target: proto.target.required("target")?, }) } } @@ -184,6 +232,6 @@ mod tests { // These should be none as preserved on non-protobuf DatabaseRules assert!(back.write_buffer_config.is_none()); - assert!(back.shard_config.is_none()); + assert!(back.routing_rules.is_none()); } } diff --git a/generated_types/src/database_rules/shard.rs b/generated_types/src/database_rules/shard.rs index 28a7de0d03..0a9b8d4f51 100644 --- a/generated_types/src/database_rules/shard.rs +++ b/generated_types/src/database_rules/shard.rs @@ -319,16 +319,22 @@ mod tests { fn test_database_rules_shard_config() { let protobuf = management::DatabaseRules { name: "database".to_string(), - shard_config: Some(management::ShardConfig { - ..Default::default() - }), + routing_rules: Some(management::database_rules::RoutingRules::ShardConfig( + management::ShardConfig { + ..Default::default() + }, + )), ..Default::default() }; let rules: DatabaseRules = protobuf.try_into().unwrap(); let back: management::DatabaseRules = rules.into(); - assert!(back.shard_config.is_some()); + assert!(back.routing_rules.is_some()); + assert!(matches!( + back.routing_rules, + Some(management::database_rules::RoutingRules::ShardConfig(_)) + )); } #[test] diff --git a/server/src/lib.rs b/server/src/lib.rs index 1b18cb2677..1ad36ee919 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -101,7 +101,7 @@ use crate::{ db::Db, }; use cached::Return; -use data_types::database_rules::{NodeGroup, Shard, ShardId}; +use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId}; use generated_types::database_rules::{decode_database_rules, encode_database_rules}; use influxdb_iox_client::{connection::Builder, write}; use rand::seq::SliceRandom; @@ -587,6 +587,31 @@ impl Server { .db(&db_name) .context(DatabaseNotFound { db_name: &*db_name })?; + // need to split this in two blocks because we cannot hold a lock across an async call. + let routing_config_target = { + let rules = db.rules.read(); + if let Some(RoutingRules::RoutingConfig(routing_config)) = &rules.routing_rules { + let sharded_entries = lines_to_sharded_entries( + lines, + default_time, + None as Option<&ShardConfig>, + &*rules, + ) + .context(LineConversion)?; + Some((routing_config.target.clone(), sharded_entries)) + } else { + None + } + }; + + if let Some((target, sharded_entries)) = routing_config_target { + for i in sharded_entries { + self.write_entry_downstream(&db_name, &target, i.entry) + .await?; + } + return Ok(()); + } + // Split lines into shards while holding a read lock on the sharding config. // Once the lock is released we have a vector of entries, each associated with a // shard id, and an Arc to the mapping between shard ids and node @@ -595,10 +620,14 @@ impl Server { // lock. let (sharded_entries, shards) = { let rules = db.rules.read(); - let shard_config = &rules.shard_config; + + let shard_config = rules.routing_rules.as_ref().map(|cfg| match cfg { + RoutingRules::RoutingConfig(_) => todo!("routing config"), + RoutingRules::ShardConfig(shard_config) => shard_config, + }); let sharded_entries = - lines_to_sharded_entries(lines, default_time, shard_config.as_ref(), &*rules) + lines_to_sharded_entries(lines, default_time, shard_config, &*rules) .context(LineConversion)?; let shards = shard_config @@ -1143,7 +1172,7 @@ mod tests { }, write_buffer_config: None, lifecycle_rules: Default::default(), - shard_config: None, + routing_rules: None, }; // Create a database @@ -1239,7 +1268,7 @@ mod tests { }, write_buffer_config: None, lifecycle_rules: Default::default(), - shard_config: None, + routing_rules: None, }; // Create a database @@ -1462,7 +1491,7 @@ mod tests { let db = server.db(&db_name).unwrap(); { let mut rules = db.rules.write(); - rules.shard_config = Some(ShardConfig { + let shard_config = ShardConfig { hash_ring: Some(HashRing { shards: vec![TEST_SHARD_ID].into(), ..Default::default() @@ -1473,7 +1502,8 @@ mod tests { .collect(), ), ..Default::default() - }); + }; + rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config)); } let line = "cpu bar=1 10"; diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 7e6b4787f5..9f10ad1bf8 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1,6 +1,6 @@ use generated_types::{ google::protobuf::{Duration, Empty}, - influxdata::iox::management::v1::*, + influxdata::iox::management::v1::{database_rules::RoutingRules, *}, }; use influxdb_iox_client::{management::CreateDatabaseError, operations, write::WriteError}; @@ -222,7 +222,7 @@ async fn test_create_get_update_database() { }), ..Default::default() }), - shard_config: None, + routing_rules: None, }; client @@ -235,12 +235,12 @@ async fn test_create_get_update_database() { .await .expect("get database failed"); - assert_eq!(response.shard_config, None); + assert_eq!(response.routing_rules, None); - rules.shard_config = Some(ShardConfig { + rules.routing_rules = Some(RoutingRules::ShardConfig(ShardConfig { ignore_errors: true, ..Default::default() - }); + })); let updated_rules = client .update_database(rules.clone()) @@ -254,13 +254,10 @@ async fn test_create_get_update_database() { .await .expect("get database failed"); - assert_eq!( - response - .shard_config - .expect("shard config missing") - .ignore_errors, - true - ); + assert!(matches!( + response.routing_rules, + Some(RoutingRules::ShardConfig(cfg)) if cfg.ignore_errors, + )); } #[tokio::test] diff --git a/tests/end_to_end_cases/write_api.rs b/tests/end_to_end_cases/write_api.rs index 403efb82be..2c00d1ee95 100644 --- a/tests/end_to_end_cases/write_api.rs +++ b/tests/end_to_end_cases/write_api.rs @@ -9,8 +9,10 @@ use entry::{ lines_to_sharded_entries, test_helpers::{partitioner, sharder}, }; +use generated_types::influxdata::iox::management::v1::database_rules::RoutingRules; use generated_types::influxdata::iox::management::v1::{ - node_group::Node, shard, HashRing, Matcher, MatcherToShard, NodeGroup, Shard, ShardConfig, + node_group::Node, shard, HashRing, Matcher, MatcherToShard, NodeGroup, RoutingConfig, Shard, + ShardConfig, }; use influxdb_line_protocol::parse_lines; use std::collections::HashMap; @@ -194,7 +196,7 @@ async fn test_write_routed() { .get_database(&db_name) .await .expect("cannot get database on router"); - router_db_rules.shard_config = Some(ShardConfig { + let shard_config = ShardConfig { specific_targets: vec![ MatcherToShard { matcher: Some(Matcher { @@ -251,7 +253,9 @@ async fn test_write_routed() { .into_iter() .collect::>(), ..Default::default() - }); + }; + router_db_rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config)); + router_mgmt .update_database(router_db_rules) .await @@ -379,7 +383,7 @@ async fn test_write_routed_errors() { .get_database(&db_name) .await .expect("cannot get database on router"); - router_db_rules.shard_config = Some(ShardConfig { + let shard_config = ShardConfig { specific_targets: vec![MatcherToShard { matcher: Some(Matcher { table_name_regex: "^cpu$".to_string(), @@ -398,7 +402,8 @@ async fn test_write_routed_errors() { .into_iter() .collect::>(), ..Default::default() - }); + }; + router_db_rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config)); router_mgmt .update_database(router_db_rules) .await @@ -425,3 +430,183 @@ async fn test_write_routed_errors() { // TODO(mkm): check connection error and successful communication with a // target that replies with an error... } + +#[tokio::test] +async fn test_write_routed_no_shard() { + const TEST_ROUTER_ID: u32 = 1; + + const TEST_TARGET_ID_1: u32 = 2; + const TEST_TARGET_ID_2: u32 = 3; + const TEST_TARGET_ID_3: u32 = 4; + + const TEST_REMOTE_ID_1: u32 = 2; + const TEST_REMOTE_ID_2: u32 = 3; + const TEST_REMOTE_ID_3: u32 = 4; + + let router = ServerFixture::create_single_use().await; + let mut router_mgmt = router.management_client(); + router_mgmt + .update_server_id(TEST_ROUTER_ID) + .await + .expect("set ID failed"); + + let target_1 = ServerFixture::create_single_use().await; + let mut target_1_mgmt = target_1.management_client(); + target_1_mgmt + .update_server_id(TEST_TARGET_ID_1) + .await + .expect("set ID failed"); + + router_mgmt + .update_remote(TEST_REMOTE_ID_1, target_1.grpc_base()) + .await + .expect("set remote failed"); + + let target_2 = ServerFixture::create_single_use().await; + let mut target_2_mgmt = target_2.management_client(); + target_2_mgmt + .update_server_id(TEST_TARGET_ID_2) + .await + .expect("set ID failed"); + + router_mgmt + .update_remote(TEST_REMOTE_ID_2, target_2.grpc_base()) + .await + .expect("set remote failed"); + + let target_3 = ServerFixture::create_single_use().await; + let mut target_3_mgmt = target_3.management_client(); + target_3_mgmt + .update_server_id(TEST_TARGET_ID_3) + .await + .expect("set ID failed"); + + router_mgmt + .update_remote(TEST_REMOTE_ID_3, target_3.grpc_base()) + .await + .expect("set remote failed"); + + let db_name_1 = rand_name(); + let db_name_2 = rand_name(); + for &db_name in &[&db_name_1, &db_name_2] { + create_readable_database(db_name, router.grpc_channel()).await; + create_readable_database(db_name, target_1.grpc_channel()).await; + create_readable_database(db_name, target_2.grpc_channel()).await; + create_readable_database(db_name, target_3.grpc_channel()).await; + } + + // Set routing rules on the router: + for (db_name, remote_id) in &[ + (db_name_1.clone(), TEST_REMOTE_ID_1), + (db_name_2.clone(), TEST_REMOTE_ID_2), + ] { + let mut router_db_rules = router_mgmt + .get_database(db_name) + .await + .expect("cannot get database on router"); + let routing_config = RoutingConfig { + target: Some(NodeGroup { + nodes: vec![Node { id: *remote_id }], + }), + }; + router_db_rules.routing_rules = Some(RoutingRules::RoutingConfig(routing_config)); + + router_mgmt + .update_database(router_db_rules) + .await + .expect("cannot update router db rules"); + } + + // Write some data + let line_1 = "cpu bar=1 100"; + let line_2 = "disk bar=2 100"; + + let mut write_client = router.write_client(); + + for (&ref db_name, &ref line) in &[(&db_name_1, line_1), (&db_name_2, line_2)] { + let num_lines_written = write_client + .write(db_name, line) + .await + .expect("cannot write"); + assert_eq!(num_lines_written, 1); + } + + // The router will have split the write request by database name. + // Target 1 will have received only the "cpu" table. + // Target 2 will have received only the "disk" table. + // Target 3 won't get any writes. + + let mut query_results = target_1 + .flight_client() + .perform_query(&db_name_1, "select * from cpu") + .await + .expect("failed to query target 1"); + + let mut batches = Vec::new(); + while let Some(data) = query_results.next().await.unwrap() { + batches.push(data); + } + + let expected = vec![ + "+-----+-------------------------------+", + "| bar | time |", + "+-----+-------------------------------+", + "| 1 | 1970-01-01 00:00:00.000000100 |", + "+-----+-------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &batches); + + assert!(target_1 + .flight_client() + .perform_query(&db_name_1, "select * from disk") + .await + .unwrap_err() + .to_string() + .contains("Table or CTE with name \\\'disk\\\' not found\"")); + + let mut query_results = target_2 + .flight_client() + .perform_query(&db_name_2, "select * from disk") + .await + .expect("failed to query target 2"); + + let mut batches = Vec::new(); + while let Some(data) = query_results.next().await.unwrap() { + batches.push(data); + } + + let expected = vec![ + "+-----+-------------------------------+", + "| bar | time |", + "+-----+-------------------------------+", + "| 2 | 1970-01-01 00:00:00.000000100 |", + "+-----+-------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &batches); + + assert!(target_2 + .flight_client() + .perform_query(&db_name_1, "select * from cpu") + .await + .unwrap_err() + .to_string() + .contains("Table or CTE with name \\\'cpu\\\' not found\"")); + + // Ensure that target_3 didn't get any writes. + + assert!(target_3 + .flight_client() + .perform_query(&db_name_1, "select * from cpu") + .await + .unwrap_err() + .to_string() + .contains("Table or CTE with name \\\'cpu\\\' not found\"")); + + assert!(target_3 + .flight_client() + .perform_query(&db_name_2, "select * from disk") + .await + .unwrap_err() + .to_string() + .contains("Table or CTE with name \\\'disk\\\' not found\"")); +}