docs: Change some comments into doc comments; some copy editing
parent
083e6947df
commit
1a184e067d
|
@ -106,9 +106,9 @@ pub enum Error {
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
/// Server is the container struct for how Delorean servers store data internally
|
/// `Server` is the container struct for how servers store data internally, as well as how they
|
||||||
/// as well as how they communicate with other Delorean servers. Each server
|
/// communicate with other servers. Each server will have one of these structs, which keeps track
|
||||||
/// will have one of these structs, which keeps track of all replication and query rules.
|
/// of all replication and query rules.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Server<M: ConnectionManager> {
|
pub struct Server<M: ConnectionManager> {
|
||||||
databases: RwLock<BTreeMap<String, Arc<Db>>>,
|
databases: RwLock<BTreeMap<String, Arc<Db>>>,
|
||||||
|
@ -153,8 +153,8 @@ impl<M: ConnectionManager> Server<M> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// write_lines takes in raw line protocol and converts it to a ReplicatedWrite, which
|
/// `write_lines` takes in raw line protocol and converts it to a `ReplicatedWrite`, which
|
||||||
/// is then replicated to other delorean servers based on the configuration of the db.
|
/// is then replicated to other servers based on the configuration of the `db`.
|
||||||
/// This is step #1 from the above diagram.
|
/// This is step #1 from the above diagram.
|
||||||
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
|
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
|
||||||
let db = self
|
let db = self
|
||||||
|
@ -231,8 +231,8 @@ impl<M: ConnectionManager> Server<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The Server will ask the ConnectionManager for connections to a specific remote server.
|
/// The `Server` will ask the `ConnectionManager` for connections to a specific remote server.
|
||||||
/// These connections can be used to communicate with other Delorean servers.
|
/// These connections can be used to communicate with other servers.
|
||||||
/// This is implemented as a trait for dependency injection in testing.
|
/// This is implemented as a trait for dependency injection in testing.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ConnectionManager {
|
pub trait ConnectionManager {
|
||||||
|
@ -243,13 +243,12 @@ pub trait ConnectionManager {
|
||||||
async fn remote_server(&self, connect: &str) -> Result<Arc<Self::RemoteServer>, Self::Error>;
|
async fn remote_server(&self, connect: &str) -> Result<Arc<Self::RemoteServer>, Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The RemoteServer represents the API for replicating, subscribing, and querying other
|
/// The `RemoteServer` represents the API for replicating, subscribing, and querying other servers.
|
||||||
/// delorean servers.
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait RemoteServer {
|
pub trait RemoteServer {
|
||||||
type Error: std::error::Error + Send + Sync + 'static;
|
type Error: std::error::Error + Send + Sync + 'static;
|
||||||
|
|
||||||
/// replicate will send a replicated write to a remote server. This is step #2 from the diagram.
|
/// Sends a replicated write to a remote server. This is step #2 from the diagram.
|
||||||
async fn replicate(
|
async fn replicate(
|
||||||
&self,
|
&self,
|
||||||
db: &str,
|
db: &str,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
//! This module contains helper methods for constructing replicated writes
|
//! This module contains helper methods for constructing replicated writes
|
||||||
//! based on DatabaseRules.
|
//! based on `DatabaseRules`.
|
||||||
|
|
||||||
use crate::database_rules::DatabaseRules;
|
use crate::database_rules::DatabaseRules;
|
||||||
use crate::TIME_COLUMN_NAME;
|
use crate::TIME_COLUMN_NAME;
|
||||||
|
|
|
@ -19,55 +19,54 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
/// querying data for a single database.
|
/// querying data for a single database.
|
||||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||||
pub struct DatabaseRules {
|
pub struct DatabaseRules {
|
||||||
// partition_template is used to generate a partition key for each row inserted into the db
|
/// Template that generates a partition key for each row inserted into the db
|
||||||
pub partition_template: PartitionTemplate,
|
pub partition_template: PartitionTemplate,
|
||||||
// store_locally if set to true will cause this delorean server to store writes and replicated
|
/// If `store_locally` is set to `true`, this server will store writes and replicated
|
||||||
// writes in a local write buffer database. This is step #4 from the diagram.
|
/// writes in a local write buffer database. This is step #4 from the diagram.
|
||||||
pub store_locally: bool,
|
pub store_locally: bool,
|
||||||
// replication is the set of host groups that data should be replicated to. Which host a
|
/// The set of host groups that data should be replicated to. Which host a
|
||||||
// write goes to within a host group is determined by consistent hashing the partition key.
|
/// write goes to within a host group is determined by consistent hashing of the partition key.
|
||||||
// We'd use this to create a host group per availability zone. So you might have 5 availability
|
/// We'd use this to create a host group per availability zone, so you might have 5 availability
|
||||||
// zones with 2 hosts in each. Replication will ensure that N of those zones get a write. For
|
/// zones with 2 hosts in each. Replication will ensure that N of those zones get a write. For
|
||||||
// each zone, only a single host needs to get the write. Replication is for ensuring a write
|
/// each zone, only a single host needs to get the write. Replication is for ensuring a write
|
||||||
// exists across multiple hosts before returning success. Its purpose is to ensure write
|
/// exists across multiple hosts before returning success. Its purpose is to ensure write
|
||||||
// durability, rather than write availability for query (this is covered by subscriptions).
|
/// durability, rather than write availability for query (this is covered by subscriptions).
|
||||||
pub replication: Vec<HostGroupId>,
|
pub replication: Vec<HostGroupId>,
|
||||||
// replication_count is the minimum number of host groups to replicate a write to before
|
/// The minimum number of host groups to replicate a write to before success is returned. This
|
||||||
// success is returned. This can be overridden on a per request basis. Replication will
|
/// can be overridden on a per request basis. Replication will continue to write to the other
|
||||||
// continue to write to the other host groups in the background.
|
/// host groups in the background.
|
||||||
pub replication_count: u8,
|
pub replication_count: u8,
|
||||||
// replication_queue_max_size is used to determine how far back replication can back up before
|
/// How long the replication queue can get before either rejecting writes or dropping missed
|
||||||
// either rejecting writes or dropping missed writes. The queue is kept in memory on a per
|
/// writes. The queue is kept in memory on a per-database basis. A queue size of zero means it
|
||||||
// database basis. A queue size of zero means it will only try to replicate synchronously and
|
/// will only try to replicate synchronously and drop any failures.
|
||||||
// drop any failures.
|
|
||||||
pub replication_queue_max_size: usize,
|
pub replication_queue_max_size: usize,
|
||||||
// subscriptions are used for query servers to get data via either push or pull as it arrives.
|
/// `subscriptions` are used for query servers to get data via either push or pull as it
|
||||||
// They are separate from replication as they have a different purpose. They're for query
|
/// arrives. They are separate from replication as they have a different purpose. They're for
|
||||||
// servers or other clients that want to subscribe to some subset of data being written in.
|
/// query servers or other clients that want to subscribe to some subset of data being written
|
||||||
// This could either be specific partitions, ranges of partitions, tables, or rows matching
|
/// in. This could either be specific partitions, ranges of partitions, tables, or rows matching
|
||||||
// some predicate. This is step #3 from the diagram.
|
/// some predicate. This is step #3 from the diagram.
|
||||||
pub subscriptions: Vec<Subscription>,
|
pub subscriptions: Vec<Subscription>,
|
||||||
|
|
||||||
// query local is set to true if this server should answer queries from either its local
|
/// If set to `true`, this server should answer queries from one or more of of its local write
|
||||||
// write buffer and/or read-only partitions that it knows about. If set to true, results
|
/// buffer and any read-only partitions that it knows about. In this case, results
|
||||||
// will be merged with any others from the remote goups or read only partitions.
|
/// will be merged with any others from the remote goups or read only partitions.
|
||||||
pub query_local: bool,
|
pub query_local: bool,
|
||||||
// primary_query_group should be set to a host group if remote servers should be issued
|
/// Set `primary_query_group` to a host group if remote servers should be issued
|
||||||
// queries for this database. All hosts in the group should be queried with this server
|
/// queries for this database. All hosts in the group should be queried with this server
|
||||||
// acting as the coordinator that merges results together. If a specific host in the group
|
/// acting as the coordinator that merges results together. If a specific host in the group
|
||||||
// is unavailable, another host in the same position from a secondary group should be
|
/// is unavailable, another host in the same position from a secondary group should be
|
||||||
// queried. For example, if we've partitioned the data in this DB into 4 partitions and
|
/// queried. For example, imagine we've partitioned the data in this DB into 4 partitions and
|
||||||
// we are replicating the data across 3 availability zones. Say we have 4 hosts in each
|
/// we are replicating the data across 3 availability zones. We have 4 hosts in each
|
||||||
// of those AZs, thus they each have 1 partition. We'd set the primary group to be the 4
|
/// of those AZs, thus they each have 1 partition. We'd set the primary group to be the 4
|
||||||
// hosts in the same AZ as this one. And the secondary groups as the hosts in the other
|
/// hosts in the same AZ as this one, and the secondary groups as the hosts in the other
|
||||||
// 2 AZs.
|
/// 2 AZs.
|
||||||
pub primary_query_group: Option<HostGroupId>,
|
pub primary_query_group: Option<HostGroupId>,
|
||||||
pub secondary_query_groups: Vec<HostGroupId>,
|
pub secondary_query_groups: Vec<HostGroupId>,
|
||||||
|
|
||||||
// read_only_partitions are used when a server should answer queries for partitions that
|
/// Use `read_only_partitions` when a server should answer queries for partitions that
|
||||||
// come from object storage. This can be used to start up a new query server to handle
|
/// come from object storage. This can be used to start up a new query server to handle
|
||||||
// queries by pointing it at a collection of partitions and then telling it to also pull
|
/// queries by pointing it at a collection of partitions and then telling it to also pull
|
||||||
// data from the replication servers (writes that haven't been snapshotted into a partition).
|
/// data from the replication servers (writes that haven't been snapshotted into a partition).
|
||||||
pub read_only_partitions: Vec<PartitionId>,
|
pub read_only_partitions: Vec<PartitionId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,12 +80,12 @@ impl DatabaseRules {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// PartitionTemplate is used to compute the partition key of each row that gets written. It
|
/// `PartitionTemplate` is used to compute the partition key of each row that gets written. It
|
||||||
/// can consist of the table name, a column name and its value, a formatted time, or a string
|
/// can consist of the table name, a column name and its value, a formatted time, or a string
|
||||||
/// column and regex captures of its value. For columns that do not appear in the input row
|
/// column and regex captures of its value. For columns that do not appear in the input row,
|
||||||
/// a blank value is output.
|
/// a blank value is output.
|
||||||
///
|
///
|
||||||
/// The key is constructed in order of the template parts, thus ordering changes what partition
|
/// The key is constructed in order of the template parts; thus ordering changes what partition
|
||||||
/// key is generated.
|
/// key is generated.
|
||||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||||
pub struct PartitionTemplate {
|
pub struct PartitionTemplate {
|
||||||
|
@ -123,7 +122,7 @@ impl PartitionTemplate {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TemplatePart specifies what part of a row should be used to compute this part of a partition key.
|
/// `TemplatePart` specifies what part of a row should be used to compute this part of a partition key.
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum TemplatePart {
|
pub enum TemplatePart {
|
||||||
Table,
|
Table,
|
||||||
|
@ -133,24 +132,24 @@ pub enum TemplatePart {
|
||||||
StrftimeColumn(StrftimeColumn),
|
StrftimeColumn(StrftimeColumn),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// RegexCapture is for pulling parts of a string column into the partition key.
|
/// `RegexCapture` is for pulling parts of a string column into the partition key.
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct RegexCapture {
|
pub struct RegexCapture {
|
||||||
column: String,
|
column: String,
|
||||||
regex: String,
|
regex: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// StrftimeColumn can be used to create a time based partition key off some column other than
|
/// `StrftimeColumn` can be used to create a time based partition key off some column other than
|
||||||
/// the builtin "time" column.
|
/// the builtin `time` column.
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct StrftimeColumn {
|
pub struct StrftimeColumn {
|
||||||
column: String,
|
column: String,
|
||||||
format: String,
|
format: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// PartitionId is the object storage identifier for a specific partition. It should be a
|
/// `PartitionId` is the object storage identifier for a specific partition. It should be a
|
||||||
/// path that can be used against an object store to locate all the files and subdirectories
|
/// path that can be used against an object store to locate all the files and subdirectories
|
||||||
/// for a partition. It takes the form of /<writer ID>/<database>/<partition key>/
|
/// for a partition. It takes the form of `/<writer ID>/<database>/<partition key>/`.
|
||||||
pub type PartitionId = String;
|
pub type PartitionId = String;
|
||||||
pub type WriterId = String;
|
pub type WriterId = String;
|
||||||
|
|
||||||
|
@ -160,7 +159,7 @@ enum SubscriptionType {
|
||||||
Pull,
|
Pull,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Subscription represent a group of hosts that want to either receive data pushed
|
/// `Subscription` represents a group of hosts that want to either receive data pushed
|
||||||
/// as it arrives or want to pull it periodically. The subscription has a matcher
|
/// as it arrives or want to pull it periodically. The subscription has a matcher
|
||||||
/// that is used to determine what data will match it, and an optional queue for
|
/// that is used to determine what data will match it, and an optional queue for
|
||||||
/// storing matched writes.
|
/// storing matched writes.
|
||||||
|
@ -170,12 +169,12 @@ pub struct Subscription {
|
||||||
host_group: HostGroupId,
|
host_group: HostGroupId,
|
||||||
subscription_type: SubscriptionType,
|
subscription_type: SubscriptionType,
|
||||||
matcher: Matcher,
|
matcher: Matcher,
|
||||||
// max_queue_size is used for subscriptions that can potentially get queued up either for
|
/// `max_queue_size` is used for subscriptions that can potentially get queued up either for
|
||||||
// pulling later, or in the case of a temporary outage for push subscriptions.
|
/// pulling later, or in the case of a temporary outage for push subscriptions.
|
||||||
max_queue_size: usize,
|
max_queue_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Matcher specifies the rule against the table name and/or a predicate
|
/// `Matcher` specifies the rule against the table name and/or a predicate
|
||||||
/// against the row to determine if it matches the write rule.
|
/// against the row to determine if it matches the write rule.
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
struct Matcher {
|
struct Matcher {
|
||||||
|
@ -186,7 +185,7 @@ struct Matcher {
|
||||||
predicate: Option<String>,
|
predicate: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// MatchTables looks at the table name of a row to determine if it should
|
/// `MatchTables` looks at the table name of a row to determine if it should
|
||||||
/// match the rule.
|
/// match the rule.
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
|
@ -202,7 +201,7 @@ pub type HostGroupId = String;
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct HostGroup {
|
pub struct HostGroup {
|
||||||
pub id: HostGroupId,
|
pub id: HostGroupId,
|
||||||
// hosts is a vec of connection strings for remote hosts
|
/// `hosts` is a vector of connection strings for remote hosts.
|
||||||
pub hosts: Vec<String>,
|
pub hosts: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
//! This module provides a reference implementaton
|
//! This module provides a reference implementaton of `storage::DatabaseSource` and
|
||||||
//! of storage::DatabaseSource and storage::Database for use in testing
|
//! `storage::Database` for use in testing.
|
||||||
//!
|
|
||||||
//! Note: this module is only compiled in the 'test' cfg,
|
|
||||||
use delorean_arrow::arrow::record_batch::RecordBatch;
|
use delorean_arrow::arrow::record_batch::RecordBatch;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -20,28 +19,28 @@ use tokio::sync::Mutex;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TestDatabase {
|
pub struct TestDatabase {
|
||||||
/// lines which have been written to this database, in order
|
/// Lines which have been written to this database, in order
|
||||||
saved_lines: Mutex<Vec<String>>,
|
saved_lines: Mutex<Vec<String>>,
|
||||||
|
|
||||||
/// replicated writes which have been written to this database, in order
|
/// Replicated writes which have been written to this database, in order
|
||||||
replicated_writes: Mutex<Vec<ReplicatedWrite>>,
|
replicated_writes: Mutex<Vec<ReplicatedWrite>>,
|
||||||
|
|
||||||
/// column_names to return upon next request
|
/// `column_names` to return upon next request
|
||||||
column_names: Arc<Mutex<Option<StringSetRef>>>,
|
column_names: Arc<Mutex<Option<StringSetRef>>>,
|
||||||
|
|
||||||
/// the last request for column_names.
|
/// The last request for `column_names`
|
||||||
column_names_request: Arc<Mutex<Option<ColumnNamesRequest>>>,
|
column_names_request: Arc<Mutex<Option<ColumnNamesRequest>>>,
|
||||||
|
|
||||||
/// column_values to return upon next request
|
/// `column_values` to return upon next request
|
||||||
column_values: Arc<Mutex<Option<StringSetRef>>>,
|
column_values: Arc<Mutex<Option<StringSetRef>>>,
|
||||||
|
|
||||||
/// the last request for column_values.
|
/// The last request for `column_values`
|
||||||
column_values_request: Arc<Mutex<Option<ColumnValuesRequest>>>,
|
column_values_request: Arc<Mutex<Option<ColumnValuesRequest>>>,
|
||||||
|
|
||||||
/// responses to return on the next request to query_series
|
/// Responses to return on the next request to `query_series`
|
||||||
query_series_values: Arc<Mutex<Option<SeriesSetPlans>>>,
|
query_series_values: Arc<Mutex<Option<SeriesSetPlans>>>,
|
||||||
|
|
||||||
/// the last request for query_series
|
/// The last request for `query_series`
|
||||||
query_series_request: Arc<Mutex<Option<QuerySeriesRequest>>>,
|
query_series_request: Arc<Mutex<Option<QuerySeriesRequest>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +63,7 @@ pub struct ColumnValuesRequest {
|
||||||
pub predicate: Option<String>,
|
pub predicate: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Records the parameters passed to a query_series request
|
/// Records the parameters passed to a `query_series` request
|
||||||
#[derive(Debug, PartialEq, Clone)]
|
#[derive(Debug, PartialEq, Clone)]
|
||||||
pub struct QuerySeriesRequest {
|
pub struct QuerySeriesRequest {
|
||||||
pub range: Option<TimestampRange>,
|
pub range: Option<TimestampRange>,
|
||||||
|
@ -176,7 +175,7 @@ fn line_in_range(line: &ParsedLine<'_>, range: &Option<TimestampRange>) -> bool
|
||||||
impl Database for TestDatabase {
|
impl Database for TestDatabase {
|
||||||
type Error = TestError;
|
type Error = TestError;
|
||||||
|
|
||||||
/// writes parsed lines into this database
|
/// Writes parsed lines into this database
|
||||||
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> {
|
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> {
|
||||||
let mut saved_lines = self.saved_lines.lock().await;
|
let mut saved_lines = self.saved_lines.lock().await;
|
||||||
for line in lines {
|
for line in lines {
|
||||||
|
@ -185,7 +184,7 @@ impl Database for TestDatabase {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// adds the replicated write to this database
|
/// Adds the replicated write to this database
|
||||||
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> {
|
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> {
|
||||||
self.replicated_writes.lock().await.push(write.clone());
|
self.replicated_writes.lock().await.push(write.clone());
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -217,7 +216,7 @@ impl Database for TestDatabase {
|
||||||
Ok(names.into())
|
Ok(names.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return the mocked out column names, recording the request
|
/// Return the mocked out column names, recording the request
|
||||||
async fn tag_column_names(
|
async fn tag_column_names(
|
||||||
&self,
|
&self,
|
||||||
table: Option<String>,
|
table: Option<String>,
|
||||||
|
@ -250,7 +249,7 @@ impl Database for TestDatabase {
|
||||||
Ok(column_names.into())
|
Ok(column_names.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return the mocked out column values, recording the request
|
/// Return the mocked out column values, recording the request
|
||||||
async fn column_values(
|
async fn column_values(
|
||||||
&self,
|
&self,
|
||||||
column_name: &str,
|
column_name: &str,
|
||||||
|
|
Loading…
Reference in New Issue