Merge pull request #383 from influxdata/pd-handle_replicated_write

feat: implement subscriptions that for subscribers to all data
pull/24376/head
Paul Dix 2020-10-25 13:04:05 -04:00 committed by GitHub
commit bee92fb33b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 142 additions and 57 deletions

View File

@ -72,13 +72,13 @@ use delorean_data_types::{
data::{lines_to_replicated_write, ReplicatedWrite}, data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, HostGroup, HostGroupId}, database_rules::{DatabaseRules, HostGroup, HostGroupId},
}; };
use delorean_generated_types::wal as wb;
use delorean_line_parser::ParsedLine; use delorean_line_parser::ParsedLine;
use delorean_storage::Database; use delorean_storage::Database;
use delorean_write_buffer::Db as WriteBufferDb; use delorean_write_buffer::Db as WriteBufferDb;
use async_trait::async_trait; use async_trait::async_trait;
use delorean_arrow::arrow::record_batch::RecordBatch; use delorean_arrow::arrow::record_batch::RecordBatch;
use delorean_data_types::database_rules::MatchTables;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -196,37 +196,7 @@ impl<M: ConnectionManager> Server<M> {
let sequence = db.next_sequence(); let sequence = db.next_sequence();
let write = lines_to_replicated_write(id, sequence, lines, &db.rules); let write = lines_to_replicated_write(id, sequence, lines, &db.rules);
if let Some(buf) = &db.buffer { self.handle_replicated_write(db_name, db, write).await?;
buf.store_replicated_write(&write)
.await
.map_err(|e| Box::new(e) as DatabaseError)
.context(UnknownDatabaseError {})?;
}
for host_group_id in &db.rules.replication {
let group = self
.host_groups
.get(host_group_id)
.context(HostGroupNotFound { id: host_group_id })?;
let host = group
.hosts
.get(0)
.context(NoHostInGroup { id: host_group_id })?;
let connection = self
.connection_manager
.remote_server(host)
.await
.map_err(|e| Box::new(e) as DatabaseError)
.context(UnableToGetConnection { server: host })?;
connection
.replicate(db_name, &write)
.await
.map_err(|e| Box::new(e) as DatabaseError)
.context(ErrorReplicating {})?;
}
Ok(()) Ok(())
} }
@ -247,10 +217,66 @@ impl<M: ConnectionManager> Server<M> {
pub async fn handle_replicated_write( pub async fn handle_replicated_write(
&self, &self,
_db: &str, db_name: &str,
_write: &wb::ReplicatedWrite<'_>, db: &Db,
write: ReplicatedWrite,
) -> Result<()> { ) -> Result<()> {
unimplemented!() if let Some(buf) = &db.buffer {
buf.store_replicated_write(&write)
.await
.map_err(|e| Box::new(e) as DatabaseError)
.context(UnknownDatabaseError {})?;
}
for host_group_id in &db.rules.replication {
self.replicate_to_host_group(host_group_id, db_name, &write)
.await?;
}
for subscription in &db.rules.subscriptions {
match subscription.matcher.tables {
MatchTables::All => {
self.replicate_to_host_group(&subscription.host_group_id, db_name, &write)
.await?
}
MatchTables::Table(_) => unimplemented!(),
MatchTables::Regex(_) => unimplemented!(),
}
}
Ok(())
}
async fn replicate_to_host_group(
&self,
host_group_id: &str,
db_name: &str,
write: &ReplicatedWrite,
) -> Result<()> {
let group = self
.host_groups
.get(host_group_id)
.context(HostGroupNotFound { id: host_group_id })?;
let host = group
.hosts
.get(0)
.context(NoHostInGroup { id: host_group_id })?;
let connection = self
.connection_manager
.remote_server(host)
.await
.map_err(|e| Box::new(e) as DatabaseError)
.context(UnableToGetConnection { server: host })?;
connection
.replicate(db_name, write)
.await
.map_err(|e| Box::new(e) as DatabaseError)
.context(ErrorReplicating {})?;
Ok(())
} }
} }
@ -280,7 +306,7 @@ pub trait RemoteServer {
} }
#[derive(Debug)] #[derive(Debug)]
struct Db { pub struct Db {
pub rules: DatabaseRules, pub rules: DatabaseRules,
pub buffer: Option<WriteBufferDb>, pub buffer: Option<WriteBufferDb>,
sequence: AtomicU64, sequence: AtomicU64,
@ -299,6 +325,7 @@ mod tests {
use super::*; use super::*;
use async_trait::async_trait; use async_trait::async_trait;
use delorean_arrow::arrow::{csv, util::string_writer::StringWriter}; use delorean_arrow::arrow::{csv, util::string_writer::StringWriter};
use delorean_data_types::database_rules::{MatchTables, Matcher, Subscription};
use delorean_line_parser::parse_lines; use delorean_line_parser::parse_lines;
use snafu::Snafu; use snafu::Snafu;
use std::sync::Mutex; use std::sync::Mutex;
@ -417,6 +444,69 @@ partition_key:
Ok(()) Ok(())
} }
#[tokio::test]
async fn sends_all_to_subscriber() -> Result {
let mut manager = TestConnectionManager::new();
let remote = Arc::new(TestRemoteServer::default());
let remote_id = "serverA";
manager
.remotes
.insert(remote_id.to_string(), remote.clone());
let mut server = Server::new(manager);
server.set_id(1);
let host_group_id = "az1".to_string();
let rules = DatabaseRules {
subscriptions: vec![Subscription {
name: "query_server_1".to_string(),
host_group_id: host_group_id.clone(),
matcher: Matcher {
tables: MatchTables::All,
predicate: None,
},
}],
..Default::default()
};
server
.create_host_group(host_group_id.clone(), vec![remote_id.to_string()])
.await
.unwrap();
let db_name = "foo";
server.create_database(db_name, rules).await.unwrap();
let lines = parsed_lines("cpu bar=1 10");
server.write_lines("foo", &lines).await.unwrap();
let writes = remote.writes.lock().unwrap().get(db_name).unwrap().clone();
let write_text = r#"
writer:1, sequence:1, checksum:226387645
partition_key:
table:cpu
bar:1 time:10
"#;
assert_eq!(write_text, writes[0].to_string());
// ensure sequence number goes up
let lines = parsed_lines("mem,server=A,region=west user=232 12");
server.write_lines("foo", &lines).await.unwrap();
let writes = remote.writes.lock().unwrap().get(db_name).unwrap().clone();
assert_eq!(2, writes.len());
let write_text = r#"
writer:1, sequence:2, checksum:3759030699
partition_key:
table:mem
server:A region:west user:232 time:12
"#;
assert_eq!(write_text, writes[1].to_string());
Ok(())
}
#[derive(Snafu, Debug, Clone)] #[derive(Snafu, Debug, Clone)]
enum TestClusterError { enum TestClusterError {
#[snafu(display("Test delorean_cluster error: {}", message))] #[snafu(display("Test delorean_cluster error: {}", message))]

View File

@ -153,43 +153,38 @@ pub struct StrftimeColumn {
pub type PartitionId = String; pub type PartitionId = String;
pub type WriterId = String; pub type WriterId = String;
#[derive(Debug, Serialize, Deserialize)] /// `Subscription` represents a group of hosts that want to receive data as it arrives.
enum SubscriptionType { /// The subscription has a matcher that is used to determine what data will match it, and
Push, /// an optional queue for storing matched writes. Subscribers that recieve some subeset
Pull, /// of an individual replicated write will get a new replicated write, but with the same
} /// originating writer ID and sequence number for the consuming subscriber's tracking
/// purposes.
/// `Subscription` represents a group of hosts that want to either receive data pushed ///
/// as it arrives or want to pull it periodically. The subscription has a matcher /// For pull based subscriptions, the requester will send a matcher, which the receiver
/// that is used to determine what data will match it, and an optional queue for /// will execute against its in-memory WAL.
/// storing matched writes.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Subscription { pub struct Subscription {
name: String, pub name: String,
host_group: HostGroupId, pub host_group_id: HostGroupId,
subscription_type: SubscriptionType, pub matcher: Matcher,
matcher: Matcher,
/// `max_queue_size` is used for subscriptions that can potentially get queued up either for
/// pulling later, or in the case of a temporary outage for push subscriptions.
max_queue_size: usize,
} }
/// `Matcher` specifies the rule against the table name and/or a predicate /// `Matcher` specifies the rule against the table name and/or a predicate
/// against the row to determine if it matches the write rule. /// against the row to determine if it matches the write rule.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct Matcher { pub struct Matcher {
#[serde(flatten)] #[serde(flatten)]
tables: MatchTables, pub tables: MatchTables,
// TODO: make this work with delorean_storage::Predicate // TODO: make this work with delorean_storage::Predicate
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
predicate: Option<String>, pub predicate: Option<String>,
} }
/// `MatchTables` looks at the table name of a row to determine if it should /// `MatchTables` looks at the table name of a row to determine if it should
/// match the rule. /// match the rule.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
enum MatchTables { pub enum MatchTables {
#[serde(rename = "*")] #[serde(rename = "*")]
All, All,
Table(String), Table(String),