feat: Implement sharding logic

pull/24376/head
Marko Mikulicic 2021-04-15 16:44:41 +02:00 committed by kodiakhq[bot]
parent 664d7103ca
commit 8bfcc1782d
4 changed files with 342 additions and 8 deletions

View File

@ -0,0 +1,167 @@
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
/// A ConsistentHasher implements a simple consistent hashing mechanism
/// that maps a point to the nearest "node" N.
///
/// It has the property that the addition or removal of one node in the ring
/// in the worst case only changes the mapping of points that were assigned
/// to the node adjacent to the node that gets inserted/removed (on average half
/// of them).
///
/// e.g. you can use it find the ShardID in vector of ShardIds
/// that is closest to a given hash value.
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ConsistentHasher<T>
where
T: Copy + Hash,
{
ring: Vec<(u64, T)>,
}
impl<T> ConsistentHasher<T>
where
T: Copy + Hash,
{
pub fn new(nodes: &[T]) -> Self {
let mut ring: Vec<_> = nodes.iter().map(|node| (Self::hash(node), *node)).collect();
ring.sort_by_key(|(hash, _)| *hash);
Self { ring }
}
pub fn find<H: Hash>(&self, point: H) -> Option<T> {
let point_hash = Self::hash(point);
self.ring
.iter()
.find(|(node_hash, _)| node_hash > &point_hash)
.or_else(|| self.ring.first())
.map(|(_, node)| *node)
}
pub fn is_empty(&self) -> bool {
self.ring.is_empty()
}
pub fn len(&self) -> usize {
self.ring.len()
}
fn hash<H: Hash>(h: H) -> u64 {
let mut hasher = DefaultHasher::new();
h.hash(&mut hasher);
hasher.finish()
}
}
impl<T> From<ConsistentHasher<T>> for Vec<T>
where
T: Copy + Hash,
{
fn from(hasher: ConsistentHasher<T>) -> Self {
hasher.ring.into_iter().map(|(_, node)| node).collect()
}
}
impl<T> From<Vec<T>> for ConsistentHasher<T>
where
T: Copy + Hash,
{
fn from(vec: Vec<T>) -> Self {
Self::new(&vec)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_roundtrip() {}
#[test]
fn test_consistent_hasher() {
let ch = ConsistentHasher::new(&[10, 20, 30, 40]);
// test points found with:
/*
for needle in (10..=40).step_by(10) {
let mut found = 0;
for point in 0..100 {
if ch.find(point) == Some(needle) {
found += 1;
println!(r#"assert_eq!(ch.find({}), Some({}));"#, point, needle);
}
if found >= 16 {
break;
}
}
println!();
}
*/
assert_eq!(ch.find(1), Some(10));
assert_eq!(ch.find(6), Some(10));
assert_eq!(ch.find(16), Some(10));
assert_eq!(ch.find(25), Some(10));
assert_eq!(ch.find(8), Some(20));
assert_eq!(ch.find(9), Some(20));
assert_eq!(ch.find(11), Some(20));
assert_eq!(ch.find(13), Some(20));
assert_eq!(ch.find(3), Some(30));
assert_eq!(ch.find(12), Some(30));
assert_eq!(ch.find(15), Some(30));
assert_eq!(ch.find(20), Some(30));
assert_eq!(ch.find(7), Some(40));
assert_eq!(ch.find(10), Some(40));
assert_eq!(ch.find(14), Some(40));
assert_eq!(ch.find(18), Some(40));
let ch = ConsistentHasher::new(&[10, 20, 30, 40, 50]);
assert_eq!(ch.find(1), Some(10));
assert_eq!(ch.find(6), Some(10));
assert_eq!(ch.find(16), Some(10));
assert_eq!(ch.find(25), Some(10));
assert_eq!(ch.find(8), Some(20));
assert_eq!(ch.find(9), Some(20));
assert_eq!(ch.find(11), Some(50)); // <-- moved to node 50
assert_eq!(ch.find(13), Some(50)); // <-- moved to node 50
assert_eq!(ch.find(3), Some(30));
assert_eq!(ch.find(12), Some(30));
assert_eq!(ch.find(15), Some(30));
assert_eq!(ch.find(20), Some(30));
assert_eq!(ch.find(7), Some(40));
assert_eq!(ch.find(10), Some(40));
assert_eq!(ch.find(14), Some(40));
assert_eq!(ch.find(18), Some(40));
let ch = ConsistentHasher::new(&[10, 20, 30]);
assert_eq!(ch.find(1), Some(10));
assert_eq!(ch.find(6), Some(10));
assert_eq!(ch.find(16), Some(10));
assert_eq!(ch.find(25), Some(10));
assert_eq!(ch.find(8), Some(20));
assert_eq!(ch.find(9), Some(20));
assert_eq!(ch.find(11), Some(20));
assert_eq!(ch.find(13), Some(20));
assert_eq!(ch.find(3), Some(30));
assert_eq!(ch.find(12), Some(30));
assert_eq!(ch.find(15), Some(30));
assert_eq!(ch.find(20), Some(30));
// all points that used to map to shard 40 go to shard 20
assert_eq!(ch.find(7), Some(20));
assert_eq!(ch.find(10), Some(20));
assert_eq!(ch.find(14), Some(20));
assert_eq!(ch.find(18), Some(20));
}
}

View File

@ -2,7 +2,7 @@ use std::convert::{TryFrom, TryInto};
use chrono::{DateTime, TimeZone, Utc};
use regex::Regex;
use snafu::Snafu;
use snafu::{OptionExt, Snafu};
use generated_types::google::protobuf::Empty;
use generated_types::{
@ -11,9 +11,11 @@ use generated_types::{
};
use influxdb_line_protocol::ParsedLine;
use crate::consistent_hasher::ConsistentHasher;
use crate::field_validation::{FromField, FromFieldOpt, FromFieldString, FromFieldVec};
use crate::DatabaseName;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::Arc;
@ -33,6 +35,12 @@ pub enum Error {
#[snafu(context(false))]
FieldViolation { source: FieldViolation },
#[snafu(display("No sharding rule matches line: {}", line))]
NoShardingRuleMatches { line: String },
#[snafu(display("No shards defined"))]
NoShardsDefined,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -787,9 +795,45 @@ pub struct ShardConfig {
pub shards: Arc<HashMap<ShardId, NodeGroup>>,
}
struct LineHasher<'a, 'b, 'c> {
line: &'a ParsedLine<'c>,
hash_ring: &'b HashRing,
}
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
fn hash<H: Hasher>(&self, state: &mut H) {
if self.hash_ring.table_name {
self.line.series.measurement.hash(state);
}
for column in &self.hash_ring.columns {
if let Some(tag_value) = self.line.tag_value(column) {
tag_value.hash(state);
} else if let Some(field_value) = self.line.field_value(column) {
field_value.to_string().hash(state);
}
state.write_u8(0); // column separator
}
}
}
impl Sharder for ShardConfig {
fn shard(&self, _line: &ParsedLine<'_>) -> Result<ShardId, Error> {
todo!("mkm to implement as part of #916")
fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
if let Some(specific_targets) = &self.specific_targets {
if specific_targets.matcher.match_line(line) {
return Ok(specific_targets.shard);
}
}
if let Some(hash_ring) = &self.hash_ring {
return hash_ring
.shards
.find(LineHasher { line, hash_ring })
.context(NoShardsDefined);
}
NoShardingRuleMatches {
line: line.to_string(),
}
.fail()
}
}
@ -813,7 +857,7 @@ pub struct HashRing {
/// include the values of these columns in the hash key
pub columns: Vec<String>,
/// ring of shard ids
pub shards: Vec<ShardId>,
pub shards: ConsistentHasher<ShardId>,
}
/// A matcher is used to match routing rules or subscriptions on a row-by-row
@ -836,6 +880,24 @@ impl PartialEq for Matcher {
}
impl Eq for Matcher {}
impl Matcher {
fn match_line(&self, line: &ParsedLine<'_>) -> bool {
let table_name_matches = if let Some(table_name_regex) = &self.table_name_regex {
table_name_regex.is_match(line.series.measurement.as_str())
} else {
false
};
let predicate_matches = if self.predicate.is_some() {
unimplemented!("predicates not implemented yet")
} else {
false
};
table_name_matches || predicate_matches
}
}
impl From<ShardConfig> for management::ShardConfig {
fn from(shard_config: ShardConfig) -> Self {
Self {
@ -910,7 +972,7 @@ impl From<HashRing> for management::HashRing {
Self {
table_name: hash_ring.table_name,
columns: hash_ring.columns,
shards: hash_ring.shards,
shards: hash_ring.shards.into(),
}
}
}
@ -922,7 +984,7 @@ impl TryFrom<management::HashRing> for HashRing {
Ok(Self {
table_name: proto.table_name,
columns: proto.columns,
shards: proto.shards,
shards: proto.shards.into(),
})
}
}
@ -1493,8 +1555,8 @@ mod tests {
let hash_ring: HashRing = protobuf.try_into().unwrap();
assert_eq!(hash_ring.shards.len(), 2);
assert_eq!(hash_ring.shards[0], 1);
assert_eq!(hash_ring.shards[1], 2);
assert_eq!(hash_ring.shards.find(1), Some(2));
assert_eq!(hash_ring.shards.find(2), Some(1));
}
#[test]
@ -1588,4 +1650,106 @@ mod tests {
assert_eq!(shard_config.shards[&1].len(), 3);
assert_eq!(shard_config.shards[&2].len(), 1);
}
#[test]
fn test_sharder() {
let protobuf = management::ShardConfig {
specific_targets: Some(management::MatcherToShard {
matcher: Some(management::Matcher {
table_name_regex: "pu$".to_string(),
..Default::default()
}),
shard: 1,
}),
hash_ring: Some(management::HashRing {
table_name: true,
columns: vec!["t1", "t2", "f1", "f2"]
.into_iter()
.map(|i| i.to_string())
.collect(),
// in practice we won't have that many shards
// but for tests it's better to have more distinct values
// so we don't hide bugs due to sheer luck.
shards: (1000..1000000).collect(),
}),
..Default::default()
};
let shard_config: ShardConfig = protobuf.try_into().unwrap();
// hit the specific targets
let line = parse_line("cpu,t1=1,t2=2,t3=3 f1=1,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 1);
let line = parse_line("cpu,t1=10,t2=20,t3=30 f1=10,f2=20,f3=30 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 1);
// hit the hash ring
let line = parse_line("mem,t1=1,t2=2,t3=3 f1=1,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 710570);
// change a column that is not part of the hashring columns
let line = parse_line("mem,t1=1,t2=2,t3=30 f1=1,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 710570);
// change a column that is part of the hashring
let line = parse_line("mem,t1=10,t2=2,t3=3 f1=1,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 342220);
// ensure columns can be optional and yet cannot be mixed up
let line = parse_line("mem,t1=10,t3=3 f1=1,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 494892);
let line = parse_line("mem,t2=10,t3=3 f1=1,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 32813);
// same thing for "fields" columns:
// change a column that is not part of the hashring columns
let line = parse_line("mem,t1=1,t2=2,t3=3 f1=1,f2=2,f3=30 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 710570);
// change a column that is part of the hashring
let line = parse_line("mem,t1=10,t2=2,t3=3 f1=1,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 342220);
// ensure columns can be optional and yet cannot be mixed up
let line = parse_line("mem,t1=1,t3=3 f1=10,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 49366);
let line = parse_line("mem,t2=1,t3=3 f1=10,f2=2,f3=3 10");
let sharded_line = shard_config.shard(&line).expect("cannot shard a line");
assert_eq!(sharded_line, 637504);
}
#[test]
fn test_sharder_no_shards() {
let protobuf = management::ShardConfig {
hash_ring: Some(management::HashRing {
table_name: true,
columns: vec!["t1", "t2", "f1", "f2"]
.into_iter()
.map(|i| i.to_string())
.collect(),
shards: vec![],
}),
..Default::default()
};
let shard_config: ShardConfig = protobuf.try_into().unwrap();
let line = parse_line("cpu,t1=1,t2=2,t3=3 f1=1,f2=2,f3=3 10");
let err = shard_config.shard(&line).unwrap_err();
assert!(matches!(err, Error::NoShardsDefined));
}
}

View File

@ -22,5 +22,6 @@ pub mod partition_metadata;
pub mod timestamp;
pub mod wal;
pub mod consistent_hasher;
mod database_name;
pub mod field_validation;

View File

@ -19,6 +19,8 @@ message ShardConfig {
/// and your ring has 4 slots. If two tables that are very hot get
/// assigned to the same slot you can override that by putting in a
/// specific matcher to pull that table over to a different node.
///
/// TODO(#1224): consider turning this into a repeated field.
MatcherToShard specific_targets = 1;
/// An optional default hasher which will route to one in a collection of