Merge pull request #358 from influxdata/cn/improvements

pull/24376/head
Carol (Nichols || Goulding) 2020-10-16 14:22:37 -04:00 committed by GitHub
commit ce76513048
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 115 deletions

View File

@ -106,9 +106,9 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Server is the container struct for how Delorean servers store data internally
/// as well as how they communicate with other Delorean servers. Each server
/// will have one of these structs, which keeps track of all replication and query rules.
/// `Server` is the container struct for how servers store data internally, as well as how they
/// communicate with other servers. Each server will have one of these structs, which keeps track
/// of all replication and query rules.
#[derive(Debug)]
pub struct Server<M: ConnectionManager> {
databases: RwLock<BTreeMap<String, Arc<Db>>>,
@ -127,17 +127,23 @@ impl<M: ConnectionManager> Server<M> {
/// Tells the server the set of rules for a database. Currently, this is not persisted and
/// is for in-memory processing rules only.
pub async fn create_database(&self, db_name: &str, rules: DatabaseRules) -> Result<()> {
pub async fn create_database(
&self,
db_name: impl Into<String>,
rules: DatabaseRules,
) -> Result<()> {
let db_name = db_name.into();
let mut database_map = self.databases.write().await;
let buffer = if rules.store_locally {
Some(WriteBufferDb::new(db_name))
Some(WriteBufferDb::new(&db_name))
} else {
None
};
let db = Db { rules, buffer };
database_map.insert(db_name.into(), Arc::new(db));
database_map.insert(db_name, Arc::new(db));
Ok(())
}
@ -153,8 +159,8 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// write_lines takes in raw line protocol and converts it to a ReplicatedWrite, which
/// is then replicated to other delorean servers based on the configuration of the db.
/// `write_lines` takes in raw line protocol and converts it to a `ReplicatedWrite`, which
/// is then replicated to other servers based on the configuration of the `db`.
/// This is step #1 from the above diagram.
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
let db = self
@ -231,8 +237,8 @@ impl<M: ConnectionManager> Server<M> {
}
}
/// The Server will ask the ConnectionManager for connections to a specific remote server.
/// These connections can be used to communicate with other Delorean servers.
/// The `Server` will ask the `ConnectionManager` for connections to a specific remote server.
/// These connections can be used to communicate with other servers.
/// This is implemented as a trait for dependency injection in testing.
#[async_trait]
pub trait ConnectionManager {
@ -243,13 +249,12 @@ pub trait ConnectionManager {
async fn remote_server(&self, connect: &str) -> Result<Arc<Self::RemoteServer>, Self::Error>;
}
/// The RemoteServer represents the API for replicating, subscribing, and querying other
/// delorean servers.
/// The `RemoteServer` represents the API for replicating, subscribing, and querying other servers.
#[async_trait]
pub trait RemoteServer {
type Error: std::error::Error + Send + Sync + 'static;
/// replicate will send a replicated write to a remote server. This is step #2 from the diagram.
/// Sends a replicated write to a remote server. This is step #2 from the diagram.
async fn replicate(
&self,
db: &str,

View File

@ -1,5 +1,5 @@
//! This module contains helper methods for constructing replicated writes
//! based on DatabaseRules.
//! based on `DatabaseRules`.
use crate::database_rules::DatabaseRules;
use crate::TIME_COLUMN_NAME;

View File

@ -19,55 +19,54 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// querying data for a single database.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct DatabaseRules {
// partition_template is used to generate a partition key for each row inserted into the db
/// Template that generates a partition key for each row inserted into the db
pub partition_template: PartitionTemplate,
// store_locally if set to true will cause this delorean server to store writes and replicated
// writes in a local write buffer database. This is step #4 from the diagram.
/// If `store_locally` is set to `true`, this server will store writes and replicated
/// writes in a local write buffer database. This is step #4 from the diagram.
pub store_locally: bool,
// replication is the set of host groups that data should be replicated to. Which host a
// write goes to within a host group is determined by consistent hashing the partition key.
// We'd use this to create a host group per availability zone. So you might have 5 availability
// zones with 2 hosts in each. Replication will ensure that N of those zones get a write. For
// each zone, only a single host needs to get the write. Replication is for ensuring a write
// exists across multiple hosts before returning success. Its purpose is to ensure write
// durability, rather than write availability for query (this is covered by subscriptions).
/// The set of host groups that data should be replicated to. Which host a
/// write goes to within a host group is determined by consistent hashing of the partition key.
/// We'd use this to create a host group per availability zone, so you might have 5 availability
/// zones with 2 hosts in each. Replication will ensure that N of those zones get a write. For
/// each zone, only a single host needs to get the write. Replication is for ensuring a write
/// exists across multiple hosts before returning success. Its purpose is to ensure write
/// durability, rather than write availability for query (this is covered by subscriptions).
pub replication: Vec<HostGroupId>,
// replication_count is the minimum number of host groups to replicate a write to before
// success is returned. This can be overridden on a per request basis. Replication will
// continue to write to the other host groups in the background.
/// The minimum number of host groups to replicate a write to before success is returned. This
/// can be overridden on a per request basis. Replication will continue to write to the other
/// host groups in the background.
pub replication_count: u8,
// replication_queue_max_size is used to determine how far back replication can back up before
// either rejecting writes or dropping missed writes. The queue is kept in memory on a per
// database basis. A queue size of zero means it will only try to replicate synchronously and
// drop any failures.
/// How long the replication queue can get before either rejecting writes or dropping missed
/// writes. The queue is kept in memory on a per-database basis. A queue size of zero means it
/// will only try to replicate synchronously and drop any failures.
pub replication_queue_max_size: usize,
// subscriptions are used for query servers to get data via either push or pull as it arrives.
// They are separate from replication as they have a different purpose. They're for query
// servers or other clients that want to subscribe to some subset of data being written in.
// This could either be specific partitions, ranges of partitions, tables, or rows matching
// some predicate. This is step #3 from the diagram.
/// `subscriptions` are used for query servers to get data via either push or pull as it
/// arrives. They are separate from replication as they have a different purpose. They're for
/// query servers or other clients that want to subscribe to some subset of data being written
/// in. This could either be specific partitions, ranges of partitions, tables, or rows matching
/// some predicate. This is step #3 from the diagram.
pub subscriptions: Vec<Subscription>,
// query local is set to true if this server should answer queries from either its local
// write buffer and/or read-only partitions that it knows about. If set to true, results
// will be merged with any others from the remote goups or read only partitions.
/// If set to `true`, this server should answer queries from one or more of of its local write
/// buffer and any read-only partitions that it knows about. In this case, results
/// will be merged with any others from the remote goups or read only partitions.
pub query_local: bool,
// primary_query_group should be set to a host group if remote servers should be issued
// queries for this database. All hosts in the group should be queried with this server
// acting as the coordinator that merges results together. If a specific host in the group
// is unavailable, another host in the same position from a secondary group should be
// queried. For example, if we've partitioned the data in this DB into 4 partitions and
// we are replicating the data across 3 availability zones. Say we have 4 hosts in each
// of those AZs, thus they each have 1 partition. We'd set the primary group to be the 4
// hosts in the same AZ as this one. And the secondary groups as the hosts in the other
// 2 AZs.
/// Set `primary_query_group` to a host group if remote servers should be issued
/// queries for this database. All hosts in the group should be queried with this server
/// acting as the coordinator that merges results together. If a specific host in the group
/// is unavailable, another host in the same position from a secondary group should be
/// queried. For example, imagine we've partitioned the data in this DB into 4 partitions and
/// we are replicating the data across 3 availability zones. We have 4 hosts in each
/// of those AZs, thus they each have 1 partition. We'd set the primary group to be the 4
/// hosts in the same AZ as this one, and the secondary groups as the hosts in the other
/// 2 AZs.
pub primary_query_group: Option<HostGroupId>,
pub secondary_query_groups: Vec<HostGroupId>,
// read_only_partitions are used when a server should answer queries for partitions that
// come from object storage. This can be used to start up a new query server to handle
// queries by pointing it at a collection of partitions and then telling it to also pull
// data from the replication servers (writes that haven't been snapshotted into a partition).
/// Use `read_only_partitions` when a server should answer queries for partitions that
/// come from object storage. This can be used to start up a new query server to handle
/// queries by pointing it at a collection of partitions and then telling it to also pull
/// data from the replication servers (writes that haven't been snapshotted into a partition).
pub read_only_partitions: Vec<PartitionId>,
}
@ -81,12 +80,12 @@ impl DatabaseRules {
}
}
/// PartitionTemplate is used to compute the partition key of each row that gets written. It
/// `PartitionTemplate` is used to compute the partition key of each row that gets written. It
/// can consist of the table name, a column name and its value, a formatted time, or a string
/// column and regex captures of its value. For columns that do not appear in the input row
/// column and regex captures of its value. For columns that do not appear in the input row,
/// a blank value is output.
///
/// The key is constructed in order of the template parts, thus ordering changes what partition
/// The key is constructed in order of the template parts; thus ordering changes what partition
/// key is generated.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct PartitionTemplate {
@ -105,7 +104,7 @@ impl PartitionTemplate {
.map(|p| match p {
TemplatePart::Table => line.series.measurement.to_string(),
TemplatePart::Column(column) => match line.tag_value(&column) {
Some(v) => format!("{}_{}", column, v.to_string()),
Some(v) => format!("{}_{}", column, v),
None => match line.field_value(&column) {
Some(v) => format!("{}_{}", column, v),
None => "".to_string(),
@ -123,7 +122,7 @@ impl PartitionTemplate {
}
}
/// TemplatePart specifies what part of a row should be used to compute this part of a partition key.
/// `TemplatePart` specifies what part of a row should be used to compute this part of a partition key.
#[derive(Debug, Serialize, Deserialize)]
pub enum TemplatePart {
Table,
@ -133,24 +132,24 @@ pub enum TemplatePart {
StrftimeColumn(StrftimeColumn),
}
/// RegexCapture is for pulling parts of a string column into the partition key.
/// `RegexCapture` is for pulling parts of a string column into the partition key.
#[derive(Debug, Serialize, Deserialize)]
pub struct RegexCapture {
column: String,
regex: String,
}
/// StrftimeColumn can be used to create a time based partition key off some column other than
/// the builtin "time" column.
/// `StrftimeColumn` can be used to create a time based partition key off some column other than
/// the builtin `time` column.
#[derive(Debug, Serialize, Deserialize)]
pub struct StrftimeColumn {
column: String,
format: String,
}
/// PartitionId is the object storage identifier for a specific partition. It should be a
/// `PartitionId` is the object storage identifier for a specific partition. It should be a
/// path that can be used against an object store to locate all the files and subdirectories
/// for a partition. It takes the form of /<writer ID>/<database>/<partition key>/
/// for a partition. It takes the form of `/<writer ID>/<database>/<partition key>/`.
pub type PartitionId = String;
pub type WriterId = String;
@ -160,7 +159,7 @@ enum SubscriptionType {
Pull,
}
/// Subscription represent a group of hosts that want to either receive data pushed
/// `Subscription` represents a group of hosts that want to either receive data pushed
/// as it arrives or want to pull it periodically. The subscription has a matcher
/// that is used to determine what data will match it, and an optional queue for
/// storing matched writes.
@ -170,12 +169,12 @@ pub struct Subscription {
host_group: HostGroupId,
subscription_type: SubscriptionType,
matcher: Matcher,
// max_queue_size is used for subscriptions that can potentially get queued up either for
// pulling later, or in the case of a temporary outage for push subscriptions.
/// `max_queue_size` is used for subscriptions that can potentially get queued up either for
/// pulling later, or in the case of a temporary outage for push subscriptions.
max_queue_size: usize,
}
/// Matcher specifies the rule against the table name and/or a predicate
/// `Matcher` specifies the rule against the table name and/or a predicate
/// against the row to determine if it matches the write rule.
#[derive(Debug, Serialize, Deserialize)]
struct Matcher {
@ -186,7 +185,7 @@ struct Matcher {
predicate: Option<String>,
}
/// MatchTables looks at the table name of a row to determine if it should
/// `MatchTables` looks at the table name of a row to determine if it should
/// match the rule.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@ -202,10 +201,11 @@ pub type HostGroupId = String;
#[derive(Debug, Serialize, Deserialize)]
pub struct HostGroup {
pub id: HostGroupId,
// hosts is a vec of connection strings for remote hosts
/// `hosts` is a vector of connection strings for remote hosts.
pub hosts: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use delorean_line_parser::parse_lines;
@ -365,12 +365,10 @@ mod tests {
Ok(())
}
#[allow(dead_code)]
fn parsed_lines(lp: &str) -> Vec<ParsedLine<'_>> {
parse_lines(lp).map(|l| l.unwrap()).collect()
}
#[allow(dead_code)]
fn parse_line(line: &str) -> ParsedLine<'_> {
parsed_lines(line).pop().unwrap()
}

View File

@ -1,7 +1,6 @@
//! This module provides a reference implementaton
//! of storage::DatabaseSource and storage::Database for use in testing
//!
//! Note: this module is only compiled in the 'test' cfg,
//! This module provides a reference implementaton of `storage::DatabaseSource` and
//! `storage::Database` for use in testing.
use delorean_arrow::arrow::record_batch::RecordBatch;
use crate::{
@ -18,30 +17,30 @@ use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::Mutex;
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct TestDatabase {
/// lines which have been written to this database, in order
/// Lines which have been written to this database, in order
saved_lines: Mutex<Vec<String>>,
/// replicated writes which have been written to this database, in order
/// Replicated writes which have been written to this database, in order
replicated_writes: Mutex<Vec<ReplicatedWrite>>,
/// column_names to return upon next request
/// `column_names` to return upon next request
column_names: Arc<Mutex<Option<StringSetRef>>>,
/// the last request for column_names.
/// The last request for `column_names`
column_names_request: Arc<Mutex<Option<ColumnNamesRequest>>>,
/// column_values to return upon next request
/// `column_values` to return upon next request
column_values: Arc<Mutex<Option<StringSetRef>>>,
/// the last request for column_values.
/// The last request for `column_values`
column_values_request: Arc<Mutex<Option<ColumnValuesRequest>>>,
/// responses to return on the next request to query_series
/// Responses to return on the next request to `query_series`
query_series_values: Arc<Mutex<Option<SeriesSetPlans>>>,
/// the last request for query_series
/// The last request for `query_series`
query_series_request: Arc<Mutex<Option<QuerySeriesRequest>>>,
}
@ -64,7 +63,7 @@ pub struct ColumnValuesRequest {
pub predicate: Option<String>,
}
/// Records the parameters passed to a query_series request
/// Records the parameters passed to a `query_series` request
#[derive(Debug, PartialEq, Clone)]
pub struct QuerySeriesRequest {
pub range: Option<TimestampRange>,
@ -81,21 +80,6 @@ pub enum TestError {
Execution { source: crate::exec::Error },
}
impl Default for TestDatabase {
fn default() -> Self {
Self {
saved_lines: Mutex::new(Vec::new()),
replicated_writes: Mutex::new(Vec::new()),
column_names: Arc::new(Mutex::new(None)),
column_names_request: Arc::new(Mutex::new(None)),
column_values: Arc::new(Mutex::new(None)),
column_values_request: Arc::new(Mutex::new(None)),
query_series_values: Arc::new(Mutex::new(None)),
query_series_request: Arc::new(Mutex::new(None)),
}
}
}
impl TestDatabase {
pub fn new() -> Self {
Self::default()
@ -176,7 +160,7 @@ fn line_in_range(line: &ParsedLine<'_>, range: &Option<TimestampRange>) -> bool
impl Database for TestDatabase {
type Error = TestError;
/// writes parsed lines into this database
/// Writes parsed lines into this database
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> {
let mut saved_lines = self.saved_lines.lock().await;
for line in lines {
@ -185,7 +169,7 @@ impl Database for TestDatabase {
Ok(())
}
/// adds the replicated write to this database
/// Adds the replicated write to this database
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> {
self.replicated_writes.lock().await.push(write.clone());
Ok(())
@ -217,7 +201,7 @@ impl Database for TestDatabase {
Ok(names.into())
}
/// return the mocked out column names, recording the request
/// Return the mocked out column names, recording the request
async fn tag_column_names(
&self,
table: Option<String>,
@ -250,7 +234,7 @@ impl Database for TestDatabase {
Ok(column_names.into())
}
/// return the mocked out column values, recording the request
/// Return the mocked out column values, recording the request
async fn column_values(
&self,
column_name: &str,

View File

@ -215,7 +215,7 @@ impl From<crate::partition::Error> for Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct Db {
pub name: String,
// TODO: partitions need to be wrapped in an Arc if they're going to be used without this lock
@ -225,17 +225,17 @@ pub struct Db {
impl Db {
/// New creates a new in-memory only write buffer database
pub fn new(name: &str) -> Self {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.to_string(),
partitions: RwLock::new(vec![]),
wal_details: None,
name: name.into(),
..Default::default()
}
}
/// Create a new DB that will create and use the Write Ahead Log
/// (WAL) directory `wal_dir`
pub async fn try_with_wal(name: &str, wal_dir: &mut PathBuf) -> Result<Self> {
pub async fn try_with_wal(name: impl Into<String>, wal_dir: &mut PathBuf) -> Result<Self> {
let name = name.into();
wal_dir.push(&name);
if let Err(e) = std::fs::create_dir(wal_dir.clone()) {
match e.kind() {
@ -252,16 +252,16 @@ impl Db {
let wal_builder = WalBuilder::new(wal_dir.clone());
let wal_details = start_wal_sync_task(wal_builder)
.await
.context(OpeningWal { database: name })?;
.context(OpeningWal { database: &name })?;
wal_details
.write_metadata()
.await
.context(OpeningWal { database: name })?;
.context(OpeningWal { database: &name })?;
Ok(Self {
name: name.to_string(),
partitions: RwLock::new(vec![]),
name,
wal_details: Some(wal_details),
..Default::default()
})
}
@ -346,7 +346,7 @@ impl Database for Db {
if let Some(wal) = &self.wal_details {
wal.write_and_sync(data).await.context(WritingWal {
database: self.name.clone(),
database: &self.name,
})?;
}
@ -370,7 +370,7 @@ impl Database for Db {
wal.write_and_sync(write.data.clone())
.await
.context(WritingWal {
database: self.name.clone(),
database: &self.name,
})?;
}