feat: add shard sink indirection (#1447)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
2e5a9bf18e
commit
9320f59de0
|
@ -396,7 +396,13 @@ pub struct ShardConfig {
|
|||
pub ignore_errors: bool,
|
||||
/// Mapping between shard IDs and node groups. Other sharding rules use
|
||||
/// ShardId as targets.
|
||||
pub shards: Arc<HashMap<ShardId, NodeGroup>>,
|
||||
pub shards: Arc<HashMap<ShardId, Shard>>,
|
||||
}
|
||||
|
||||
/// Configuration for a specific IOx shard
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub enum Shard {
|
||||
Iox(NodeGroup),
|
||||
}
|
||||
|
||||
struct LineHasher<'a, 'b, 'c> {
|
||||
|
|
|
@ -33,7 +33,7 @@ message ShardConfig {
|
|||
|
||||
/// Mapping between shard IDs and node groups. Other sharding rules use
|
||||
/// ShardId as targets.
|
||||
map<uint32, NodeGroup> shards = 4;
|
||||
map<uint32, Shard> shards = 4;
|
||||
}
|
||||
|
||||
// Maps a matcher with specific shard. If the line/row matches
|
||||
|
@ -52,6 +52,13 @@ message Matcher {
|
|||
string predicate = 2;
|
||||
}
|
||||
|
||||
// Configuration for a specific shard
|
||||
message Shard {
|
||||
oneof sink {
|
||||
NodeGroup iox = 1;
|
||||
}
|
||||
}
|
||||
|
||||
// A collection of IOx nodes
|
||||
message NodeGroup {
|
||||
message Node {
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
use std::collections::HashMap;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::sync::Arc;
|
||||
|
||||
use regex::Regex;
|
||||
|
||||
use data_types::database_rules::{HashRing, Matcher, MatcherToShard, NodeGroup, ShardConfig};
|
||||
use data_types::database_rules::{
|
||||
HashRing, Matcher, MatcherToShard, NodeGroup, Shard, ShardConfig,
|
||||
};
|
||||
use data_types::server_id::ServerId;
|
||||
|
||||
use crate::google::FieldViolation;
|
||||
use crate::google::{FieldViolation, FieldViolationExt, FromField};
|
||||
use crate::influxdata::iox::management::v1 as management;
|
||||
|
||||
impl From<ShardConfig> for management::ShardConfig {
|
||||
|
@ -23,7 +24,7 @@ impl From<ShardConfig> for management::ShardConfig {
|
|||
shards: shard_config
|
||||
.shards
|
||||
.iter()
|
||||
.map(|(k, v)| (*k, from_node_group_for_management_node_group(v.clone())))
|
||||
.map(|(k, v)| (*k, v.clone().into()))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
@ -38,20 +39,21 @@ impl TryFrom<management::ShardConfig> for ShardConfig {
|
|||
.specific_targets
|
||||
.into_iter()
|
||||
.map(|i| i.try_into())
|
||||
.collect::<Result<Vec<MatcherToShard>, _>>()?,
|
||||
.collect::<Result<_, FieldViolation>>()
|
||||
.field("specific_targets")?,
|
||||
hash_ring: proto
|
||||
.hash_ring
|
||||
.map(|i| i.try_into())
|
||||
.map_or(Ok(None), |r| r.map(Some))?,
|
||||
.map_or(Ok(None), |r| r.map(Some))
|
||||
.field("hash_ring")?,
|
||||
ignore_errors: proto.ignore_errors,
|
||||
shards: Arc::new(
|
||||
proto
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
try_from_management_node_group_for_node_group(v).map(|ng| (k, ng))
|
||||
})
|
||||
.collect::<Result<HashMap<u32, NodeGroup>, FieldViolation>>()?,
|
||||
.map(|(k, v)| Ok((k, v.try_into()?)))
|
||||
.collect::<Result<_, FieldViolation>>()
|
||||
.field("shards")?,
|
||||
),
|
||||
})
|
||||
}
|
||||
|
@ -80,7 +82,7 @@ impl TryFrom<management::MatcherToShard> for MatcherToShard {
|
|||
|
||||
fn try_from(proto: management::MatcherToShard) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
matcher: proto.matcher.unwrap_or_default().try_into()?,
|
||||
matcher: proto.matcher.unwrap_or_default().scope("matcher")?,
|
||||
shard: proto.shard,
|
||||
})
|
||||
}
|
||||
|
@ -108,29 +110,52 @@ impl TryFrom<management::HashRing> for HashRing {
|
|||
}
|
||||
}
|
||||
|
||||
// cannot (and/or don't know how to) add impl From inside prost generated code
|
||||
fn from_node_group_for_management_node_group(node_group: NodeGroup) -> management::NodeGroup {
|
||||
management::NodeGroup {
|
||||
nodes: node_group
|
||||
.into_iter()
|
||||
.map(|id| management::node_group::Node { id: id.get_u32() })
|
||||
.collect(),
|
||||
impl From<Shard> for management::Shard {
|
||||
fn from(shard: Shard) -> Self {
|
||||
let sink = match shard {
|
||||
Shard::Iox(node_group) => management::shard::Sink::Iox(node_group.into()),
|
||||
};
|
||||
management::Shard { sink: Some(sink) }
|
||||
}
|
||||
}
|
||||
|
||||
fn try_from_management_node_group_for_node_group(
|
||||
proto: management::NodeGroup,
|
||||
) -> Result<NodeGroup, FieldViolation> {
|
||||
proto
|
||||
.nodes
|
||||
.into_iter()
|
||||
.map(|i| {
|
||||
ServerId::try_from(i.id).map_err(|_| FieldViolation {
|
||||
field: "id".to_string(),
|
||||
description: "Node ID must be nonzero".to_string(),
|
||||
})
|
||||
impl TryFrom<management::Shard> for Shard {
|
||||
type Error = FieldViolation;
|
||||
|
||||
fn try_from(proto: management::Shard) -> Result<Self, Self::Error> {
|
||||
let sink = proto.sink.ok_or_else(|| FieldViolation::required(""))?;
|
||||
Ok(match sink {
|
||||
management::shard::Sink::Iox(node_group) => Shard::Iox(node_group.scope("node_group")?),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NodeGroup> for management::NodeGroup {
|
||||
fn from(node_group: NodeGroup) -> Self {
|
||||
Self {
|
||||
nodes: node_group
|
||||
.into_iter()
|
||||
.map(|id| management::node_group::Node { id: id.get_u32() })
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<management::NodeGroup> for NodeGroup {
|
||||
type Error = FieldViolation;
|
||||
|
||||
fn try_from(proto: management::NodeGroup) -> Result<Self, Self::Error> {
|
||||
proto
|
||||
.nodes
|
||||
.into_iter()
|
||||
.map(|i| {
|
||||
ServerId::try_from(i.id).map_err(|_| FieldViolation {
|
||||
field: "id".to_string(),
|
||||
description: "Node ID must be nonzero".to_string(),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Matcher> for management::Matcher {
|
||||
|
@ -312,18 +337,22 @@ mod tests {
|
|||
shards: vec![
|
||||
(
|
||||
1,
|
||||
management::NodeGroup {
|
||||
nodes: vec![
|
||||
management::node_group::Node { id: 10 },
|
||||
management::node_group::Node { id: 11 },
|
||||
management::node_group::Node { id: 12 },
|
||||
],
|
||||
management::Shard {
|
||||
sink: Some(management::shard::Sink::Iox(management::NodeGroup {
|
||||
nodes: vec![
|
||||
management::node_group::Node { id: 10 },
|
||||
management::node_group::Node { id: 11 },
|
||||
management::node_group::Node { id: 12 },
|
||||
],
|
||||
})),
|
||||
},
|
||||
),
|
||||
(
|
||||
2,
|
||||
management::NodeGroup {
|
||||
nodes: vec![management::node_group::Node { id: 20 }],
|
||||
management::Shard {
|
||||
sink: Some(management::shard::Sink::Iox(management::NodeGroup {
|
||||
nodes: vec![management::node_group::Node { id: 20 }],
|
||||
})),
|
||||
},
|
||||
),
|
||||
]
|
||||
|
@ -335,8 +364,12 @@ mod tests {
|
|||
let shard_config: ShardConfig = protobuf.try_into().unwrap();
|
||||
|
||||
assert_eq!(shard_config.shards.len(), 2);
|
||||
assert_eq!(shard_config.shards[&1].len(), 3);
|
||||
assert_eq!(shard_config.shards[&2].len(), 1);
|
||||
assert!(
|
||||
matches!(&shard_config.shards[&1], Shard::Iox(node_group) if node_group.len() == 3)
|
||||
);
|
||||
assert!(
|
||||
matches!(&shard_config.shards[&2], Shard::Iox(node_group) if node_group.len() == 1)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -99,7 +99,7 @@ use crate::{
|
|||
},
|
||||
db::Db,
|
||||
};
|
||||
use data_types::database_rules::{NodeGroup, ShardId};
|
||||
use data_types::database_rules::{NodeGroup, Shard, ShardId};
|
||||
use generated_types::database_rules::{decode_database_rules, encode_database_rules};
|
||||
use influxdb_iox_client::{connection::Builder, write};
|
||||
use rand::seq::SliceRandom;
|
||||
|
@ -565,14 +565,18 @@ impl<M: ConnectionManager> Server<M> {
|
|||
&self,
|
||||
db_name: &str,
|
||||
db: &Db,
|
||||
shards: Arc<HashMap<u32, NodeGroup>>,
|
||||
shards: Arc<HashMap<u32, Shard>>,
|
||||
sharded_entry: ShardedEntry,
|
||||
) -> Result<()> {
|
||||
match sharded_entry.shard_id {
|
||||
Some(shard_id) => {
|
||||
let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
|
||||
self.write_entry_downstream(db_name, node_group, sharded_entry.entry)
|
||||
.await?
|
||||
let shard = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
|
||||
match shard {
|
||||
Shard::Iox(node_group) => {
|
||||
self.write_entry_downstream(db_name, node_group, sharded_entry.entry)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
self.write_entry_local(&db_name, db, sharded_entry.entry)
|
||||
|
@ -1281,7 +1285,7 @@ mod tests {
|
|||
..Default::default()
|
||||
}),
|
||||
shards: Arc::new(
|
||||
vec![(TEST_SHARD_ID, remote_ids.clone())]
|
||||
vec![(TEST_SHARD_ID, Shard::Iox(remote_ids.clone()))]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
),
|
||||
|
|
|
@ -10,7 +10,7 @@ use entry::{
|
|||
test_helpers::{partitioner, sharder},
|
||||
};
|
||||
use generated_types::influxdata::iox::management::v1::{
|
||||
node_group::Node, HashRing, Matcher, MatcherToShard, NodeGroup, ShardConfig,
|
||||
node_group::Node, shard, HashRing, Matcher, MatcherToShard, NodeGroup, Shard, ShardConfig,
|
||||
};
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use std::collections::HashMap;
|
||||
|
@ -217,26 +217,32 @@ async fn test_write_routed() {
|
|||
shards: vec![
|
||||
(
|
||||
TEST_SHARD_ID_1,
|
||||
NodeGroup {
|
||||
nodes: vec![Node {
|
||||
id: TEST_REMOTE_ID_1,
|
||||
}],
|
||||
Shard {
|
||||
sink: Some(shard::Sink::Iox(NodeGroup {
|
||||
nodes: vec![Node {
|
||||
id: TEST_REMOTE_ID_1,
|
||||
}],
|
||||
})),
|
||||
},
|
||||
),
|
||||
(
|
||||
TEST_SHARD_ID_2,
|
||||
NodeGroup {
|
||||
nodes: vec![Node {
|
||||
id: TEST_REMOTE_ID_2,
|
||||
}],
|
||||
Shard {
|
||||
sink: Some(shard::Sink::Iox(NodeGroup {
|
||||
nodes: vec![Node {
|
||||
id: TEST_REMOTE_ID_2,
|
||||
}],
|
||||
})),
|
||||
},
|
||||
),
|
||||
(
|
||||
TEST_SHARD_ID_3,
|
||||
NodeGroup {
|
||||
nodes: vec![Node {
|
||||
id: TEST_REMOTE_ID_3,
|
||||
}],
|
||||
Shard {
|
||||
sink: Some(shard::Sink::Iox(NodeGroup {
|
||||
nodes: vec![Node {
|
||||
id: TEST_REMOTE_ID_3,
|
||||
}],
|
||||
})),
|
||||
},
|
||||
),
|
||||
]
|
||||
|
@ -381,8 +387,10 @@ async fn test_write_routed_errors() {
|
|||
}],
|
||||
shards: vec![(
|
||||
TEST_SHARD_ID,
|
||||
NodeGroup {
|
||||
nodes: vec![Node { id: TEST_REMOTE_ID }],
|
||||
Shard {
|
||||
sink: Some(shard::Sink::Iox(NodeGroup {
|
||||
nodes: vec![Node { id: TEST_REMOTE_ID }],
|
||||
})),
|
||||
},
|
||||
)]
|
||||
.into_iter()
|
||||
|
|
Loading…
Reference in New Issue