diff --git a/delorean_cluster/src/lib.rs b/delorean_cluster/src/lib.rs index 69f14e74b8..34d771039f 100644 --- a/delorean_cluster/src/lib.rs +++ b/delorean_cluster/src/lib.rs @@ -106,9 +106,9 @@ pub enum Error { pub type Result = std::result::Result; -/// Server is the container struct for how Delorean servers store data internally -/// as well as how they communicate with other Delorean servers. Each server -/// will have one of these structs, which keeps track of all replication and query rules. +/// `Server` is the container struct for how servers store data internally, as well as how they +/// communicate with other servers. Each server will have one of these structs, which keeps track +/// of all replication and query rules. #[derive(Debug)] pub struct Server { databases: RwLock>>, @@ -127,17 +127,23 @@ impl Server { /// Tells the server the set of rules for a database. Currently, this is not persisted and /// is for in-memory processing rules only. - pub async fn create_database(&self, db_name: &str, rules: DatabaseRules) -> Result<()> { + pub async fn create_database( + &self, + db_name: impl Into, + rules: DatabaseRules, + ) -> Result<()> { + let db_name = db_name.into(); + let mut database_map = self.databases.write().await; let buffer = if rules.store_locally { - Some(WriteBufferDb::new(db_name)) + Some(WriteBufferDb::new(&db_name)) } else { None }; let db = Db { rules, buffer }; - database_map.insert(db_name.into(), Arc::new(db)); + database_map.insert(db_name, Arc::new(db)); Ok(()) } @@ -153,8 +159,8 @@ impl Server { Ok(()) } - /// write_lines takes in raw line protocol and converts it to a ReplicatedWrite, which - /// is then replicated to other delorean servers based on the configuration of the db. + /// `write_lines` takes in raw line protocol and converts it to a `ReplicatedWrite`, which + /// is then replicated to other servers based on the configuration of the `db`. /// This is step #1 from the above diagram. pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> { let db = self @@ -231,8 +237,8 @@ impl Server { } } -/// The Server will ask the ConnectionManager for connections to a specific remote server. -/// These connections can be used to communicate with other Delorean servers. +/// The `Server` will ask the `ConnectionManager` for connections to a specific remote server. +/// These connections can be used to communicate with other servers. /// This is implemented as a trait for dependency injection in testing. #[async_trait] pub trait ConnectionManager { @@ -243,13 +249,12 @@ pub trait ConnectionManager { async fn remote_server(&self, connect: &str) -> Result, Self::Error>; } -/// The RemoteServer represents the API for replicating, subscribing, and querying other -/// delorean servers. +/// The `RemoteServer` represents the API for replicating, subscribing, and querying other servers. #[async_trait] pub trait RemoteServer { type Error: std::error::Error + Send + Sync + 'static; - /// replicate will send a replicated write to a remote server. This is step #2 from the diagram. + /// Sends a replicated write to a remote server. This is step #2 from the diagram. async fn replicate( &self, db: &str, diff --git a/delorean_data_types/src/data.rs b/delorean_data_types/src/data.rs index aca6a61415..65224489d3 100644 --- a/delorean_data_types/src/data.rs +++ b/delorean_data_types/src/data.rs @@ -1,5 +1,5 @@ //! This module contains helper methods for constructing replicated writes -//! based on DatabaseRules. +//! based on `DatabaseRules`. use crate::database_rules::DatabaseRules; use crate::TIME_COLUMN_NAME; diff --git a/delorean_data_types/src/database_rules.rs b/delorean_data_types/src/database_rules.rs index 4355c5d376..1ae65cb2eb 100644 --- a/delorean_data_types/src/database_rules.rs +++ b/delorean_data_types/src/database_rules.rs @@ -19,55 +19,54 @@ pub type Result = std::result::Result; /// querying data for a single database. #[derive(Debug, Serialize, Deserialize, Default)] pub struct DatabaseRules { - // partition_template is used to generate a partition key for each row inserted into the db + /// Template that generates a partition key for each row inserted into the db pub partition_template: PartitionTemplate, - // store_locally if set to true will cause this delorean server to store writes and replicated - // writes in a local write buffer database. This is step #4 from the diagram. + /// If `store_locally` is set to `true`, this server will store writes and replicated + /// writes in a local write buffer database. This is step #4 from the diagram. pub store_locally: bool, - // replication is the set of host groups that data should be replicated to. Which host a - // write goes to within a host group is determined by consistent hashing the partition key. - // We'd use this to create a host group per availability zone. So you might have 5 availability - // zones with 2 hosts in each. Replication will ensure that N of those zones get a write. For - // each zone, only a single host needs to get the write. Replication is for ensuring a write - // exists across multiple hosts before returning success. Its purpose is to ensure write - // durability, rather than write availability for query (this is covered by subscriptions). + /// The set of host groups that data should be replicated to. Which host a + /// write goes to within a host group is determined by consistent hashing of the partition key. + /// We'd use this to create a host group per availability zone, so you might have 5 availability + /// zones with 2 hosts in each. Replication will ensure that N of those zones get a write. For + /// each zone, only a single host needs to get the write. Replication is for ensuring a write + /// exists across multiple hosts before returning success. Its purpose is to ensure write + /// durability, rather than write availability for query (this is covered by subscriptions). pub replication: Vec, - // replication_count is the minimum number of host groups to replicate a write to before - // success is returned. This can be overridden on a per request basis. Replication will - // continue to write to the other host groups in the background. + /// The minimum number of host groups to replicate a write to before success is returned. This + /// can be overridden on a per request basis. Replication will continue to write to the other + /// host groups in the background. pub replication_count: u8, - // replication_queue_max_size is used to determine how far back replication can back up before - // either rejecting writes or dropping missed writes. The queue is kept in memory on a per - // database basis. A queue size of zero means it will only try to replicate synchronously and - // drop any failures. + /// How long the replication queue can get before either rejecting writes or dropping missed + /// writes. The queue is kept in memory on a per-database basis. A queue size of zero means it + /// will only try to replicate synchronously and drop any failures. pub replication_queue_max_size: usize, - // subscriptions are used for query servers to get data via either push or pull as it arrives. - // They are separate from replication as they have a different purpose. They're for query - // servers or other clients that want to subscribe to some subset of data being written in. - // This could either be specific partitions, ranges of partitions, tables, or rows matching - // some predicate. This is step #3 from the diagram. + /// `subscriptions` are used for query servers to get data via either push or pull as it + /// arrives. They are separate from replication as they have a different purpose. They're for + /// query servers or other clients that want to subscribe to some subset of data being written + /// in. This could either be specific partitions, ranges of partitions, tables, or rows matching + /// some predicate. This is step #3 from the diagram. pub subscriptions: Vec, - // query local is set to true if this server should answer queries from either its local - // write buffer and/or read-only partitions that it knows about. If set to true, results - // will be merged with any others from the remote goups or read only partitions. + /// If set to `true`, this server should answer queries from one or more of of its local write + /// buffer and any read-only partitions that it knows about. In this case, results + /// will be merged with any others from the remote goups or read only partitions. pub query_local: bool, - // primary_query_group should be set to a host group if remote servers should be issued - // queries for this database. All hosts in the group should be queried with this server - // acting as the coordinator that merges results together. If a specific host in the group - // is unavailable, another host in the same position from a secondary group should be - // queried. For example, if we've partitioned the data in this DB into 4 partitions and - // we are replicating the data across 3 availability zones. Say we have 4 hosts in each - // of those AZs, thus they each have 1 partition. We'd set the primary group to be the 4 - // hosts in the same AZ as this one. And the secondary groups as the hosts in the other - // 2 AZs. + /// Set `primary_query_group` to a host group if remote servers should be issued + /// queries for this database. All hosts in the group should be queried with this server + /// acting as the coordinator that merges results together. If a specific host in the group + /// is unavailable, another host in the same position from a secondary group should be + /// queried. For example, imagine we've partitioned the data in this DB into 4 partitions and + /// we are replicating the data across 3 availability zones. We have 4 hosts in each + /// of those AZs, thus they each have 1 partition. We'd set the primary group to be the 4 + /// hosts in the same AZ as this one, and the secondary groups as the hosts in the other + /// 2 AZs. pub primary_query_group: Option, pub secondary_query_groups: Vec, - // read_only_partitions are used when a server should answer queries for partitions that - // come from object storage. This can be used to start up a new query server to handle - // queries by pointing it at a collection of partitions and then telling it to also pull - // data from the replication servers (writes that haven't been snapshotted into a partition). + /// Use `read_only_partitions` when a server should answer queries for partitions that + /// come from object storage. This can be used to start up a new query server to handle + /// queries by pointing it at a collection of partitions and then telling it to also pull + /// data from the replication servers (writes that haven't been snapshotted into a partition). pub read_only_partitions: Vec, } @@ -81,12 +80,12 @@ impl DatabaseRules { } } -/// PartitionTemplate is used to compute the partition key of each row that gets written. It +/// `PartitionTemplate` is used to compute the partition key of each row that gets written. It /// can consist of the table name, a column name and its value, a formatted time, or a string -/// column and regex captures of its value. For columns that do not appear in the input row +/// column and regex captures of its value. For columns that do not appear in the input row, /// a blank value is output. /// -/// The key is constructed in order of the template parts, thus ordering changes what partition +/// The key is constructed in order of the template parts; thus ordering changes what partition /// key is generated. #[derive(Debug, Serialize, Deserialize, Default)] pub struct PartitionTemplate { @@ -105,7 +104,7 @@ impl PartitionTemplate { .map(|p| match p { TemplatePart::Table => line.series.measurement.to_string(), TemplatePart::Column(column) => match line.tag_value(&column) { - Some(v) => format!("{}_{}", column, v.to_string()), + Some(v) => format!("{}_{}", column, v), None => match line.field_value(&column) { Some(v) => format!("{}_{}", column, v), None => "".to_string(), @@ -123,7 +122,7 @@ impl PartitionTemplate { } } -/// TemplatePart specifies what part of a row should be used to compute this part of a partition key. +/// `TemplatePart` specifies what part of a row should be used to compute this part of a partition key. #[derive(Debug, Serialize, Deserialize)] pub enum TemplatePart { Table, @@ -133,24 +132,24 @@ pub enum TemplatePart { StrftimeColumn(StrftimeColumn), } -/// RegexCapture is for pulling parts of a string column into the partition key. +/// `RegexCapture` is for pulling parts of a string column into the partition key. #[derive(Debug, Serialize, Deserialize)] pub struct RegexCapture { column: String, regex: String, } -/// StrftimeColumn can be used to create a time based partition key off some column other than -/// the builtin "time" column. +/// `StrftimeColumn` can be used to create a time based partition key off some column other than +/// the builtin `time` column. #[derive(Debug, Serialize, Deserialize)] pub struct StrftimeColumn { column: String, format: String, } -/// PartitionId is the object storage identifier for a specific partition. It should be a +/// `PartitionId` is the object storage identifier for a specific partition. It should be a /// path that can be used against an object store to locate all the files and subdirectories -/// for a partition. It takes the form of //// +/// for a partition. It takes the form of `////`. pub type PartitionId = String; pub type WriterId = String; @@ -160,7 +159,7 @@ enum SubscriptionType { Pull, } -/// Subscription represent a group of hosts that want to either receive data pushed +/// `Subscription` represents a group of hosts that want to either receive data pushed /// as it arrives or want to pull it periodically. The subscription has a matcher /// that is used to determine what data will match it, and an optional queue for /// storing matched writes. @@ -170,12 +169,12 @@ pub struct Subscription { host_group: HostGroupId, subscription_type: SubscriptionType, matcher: Matcher, - // max_queue_size is used for subscriptions that can potentially get queued up either for - // pulling later, or in the case of a temporary outage for push subscriptions. + /// `max_queue_size` is used for subscriptions that can potentially get queued up either for + /// pulling later, or in the case of a temporary outage for push subscriptions. max_queue_size: usize, } -/// Matcher specifies the rule against the table name and/or a predicate +/// `Matcher` specifies the rule against the table name and/or a predicate /// against the row to determine if it matches the write rule. #[derive(Debug, Serialize, Deserialize)] struct Matcher { @@ -186,7 +185,7 @@ struct Matcher { predicate: Option, } -/// MatchTables looks at the table name of a row to determine if it should +/// `MatchTables` looks at the table name of a row to determine if it should /// match the rule. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -202,10 +201,11 @@ pub type HostGroupId = String; #[derive(Debug, Serialize, Deserialize)] pub struct HostGroup { pub id: HostGroupId, - // hosts is a vec of connection strings for remote hosts + /// `hosts` is a vector of connection strings for remote hosts. pub hosts: Vec, } +#[cfg(test)] mod tests { use super::*; use delorean_line_parser::parse_lines; @@ -365,12 +365,10 @@ mod tests { Ok(()) } - #[allow(dead_code)] fn parsed_lines(lp: &str) -> Vec> { parse_lines(lp).map(|l| l.unwrap()).collect() } - #[allow(dead_code)] fn parse_line(line: &str) -> ParsedLine<'_> { parsed_lines(line).pop().unwrap() } diff --git a/delorean_storage/src/test.rs b/delorean_storage/src/test.rs index fbaea58581..2c88811c0a 100644 --- a/delorean_storage/src/test.rs +++ b/delorean_storage/src/test.rs @@ -1,7 +1,6 @@ -//! This module provides a reference implementaton -//! of storage::DatabaseSource and storage::Database for use in testing -//! -//! Note: this module is only compiled in the 'test' cfg, +//! This module provides a reference implementaton of `storage::DatabaseSource` and +//! `storage::Database` for use in testing. + use delorean_arrow::arrow::record_batch::RecordBatch; use crate::{ @@ -18,30 +17,30 @@ use std::{collections::BTreeMap, sync::Arc}; use tokio::sync::Mutex; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct TestDatabase { - /// lines which have been written to this database, in order + /// Lines which have been written to this database, in order saved_lines: Mutex>, - /// replicated writes which have been written to this database, in order + /// Replicated writes which have been written to this database, in order replicated_writes: Mutex>, - /// column_names to return upon next request + /// `column_names` to return upon next request column_names: Arc>>, - /// the last request for column_names. + /// The last request for `column_names` column_names_request: Arc>>, - /// column_values to return upon next request + /// `column_values` to return upon next request column_values: Arc>>, - /// the last request for column_values. + /// The last request for `column_values` column_values_request: Arc>>, - /// responses to return on the next request to query_series + /// Responses to return on the next request to `query_series` query_series_values: Arc>>, - /// the last request for query_series + /// The last request for `query_series` query_series_request: Arc>>, } @@ -64,7 +63,7 @@ pub struct ColumnValuesRequest { pub predicate: Option, } -/// Records the parameters passed to a query_series request +/// Records the parameters passed to a `query_series` request #[derive(Debug, PartialEq, Clone)] pub struct QuerySeriesRequest { pub range: Option, @@ -81,21 +80,6 @@ pub enum TestError { Execution { source: crate::exec::Error }, } -impl Default for TestDatabase { - fn default() -> Self { - Self { - saved_lines: Mutex::new(Vec::new()), - replicated_writes: Mutex::new(Vec::new()), - column_names: Arc::new(Mutex::new(None)), - column_names_request: Arc::new(Mutex::new(None)), - column_values: Arc::new(Mutex::new(None)), - column_values_request: Arc::new(Mutex::new(None)), - query_series_values: Arc::new(Mutex::new(None)), - query_series_request: Arc::new(Mutex::new(None)), - } - } -} - impl TestDatabase { pub fn new() -> Self { Self::default() @@ -176,7 +160,7 @@ fn line_in_range(line: &ParsedLine<'_>, range: &Option) -> bool impl Database for TestDatabase { type Error = TestError; - /// writes parsed lines into this database + /// Writes parsed lines into this database async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> { let mut saved_lines = self.saved_lines.lock().await; for line in lines { @@ -185,7 +169,7 @@ impl Database for TestDatabase { Ok(()) } - /// adds the replicated write to this database + /// Adds the replicated write to this database async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> { self.replicated_writes.lock().await.push(write.clone()); Ok(()) @@ -217,7 +201,7 @@ impl Database for TestDatabase { Ok(names.into()) } - /// return the mocked out column names, recording the request + /// Return the mocked out column names, recording the request async fn tag_column_names( &self, table: Option, @@ -250,7 +234,7 @@ impl Database for TestDatabase { Ok(column_names.into()) } - /// return the mocked out column values, recording the request + /// Return the mocked out column values, recording the request async fn column_values( &self, column_name: &str, diff --git a/delorean_write_buffer/src/database.rs b/delorean_write_buffer/src/database.rs index 6d6086f8e5..e3a4ed7312 100644 --- a/delorean_write_buffer/src/database.rs +++ b/delorean_write_buffer/src/database.rs @@ -215,7 +215,7 @@ impl From for Error { pub type Result = std::result::Result; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Db { pub name: String, // TODO: partitions need to be wrapped in an Arc if they're going to be used without this lock @@ -225,17 +225,17 @@ pub struct Db { impl Db { /// New creates a new in-memory only write buffer database - pub fn new(name: &str) -> Self { + pub fn new(name: impl Into) -> Self { Self { - name: name.to_string(), - partitions: RwLock::new(vec![]), - wal_details: None, + name: name.into(), + ..Default::default() } } /// Create a new DB that will create and use the Write Ahead Log /// (WAL) directory `wal_dir` - pub async fn try_with_wal(name: &str, wal_dir: &mut PathBuf) -> Result { + pub async fn try_with_wal(name: impl Into, wal_dir: &mut PathBuf) -> Result { + let name = name.into(); wal_dir.push(&name); if let Err(e) = std::fs::create_dir(wal_dir.clone()) { match e.kind() { @@ -252,16 +252,16 @@ impl Db { let wal_builder = WalBuilder::new(wal_dir.clone()); let wal_details = start_wal_sync_task(wal_builder) .await - .context(OpeningWal { database: name })?; + .context(OpeningWal { database: &name })?; wal_details .write_metadata() .await - .context(OpeningWal { database: name })?; + .context(OpeningWal { database: &name })?; Ok(Self { - name: name.to_string(), - partitions: RwLock::new(vec![]), + name, wal_details: Some(wal_details), + ..Default::default() }) } @@ -346,7 +346,7 @@ impl Database for Db { if let Some(wal) = &self.wal_details { wal.write_and_sync(data).await.context(WritingWal { - database: self.name.clone(), + database: &self.name, })?; } @@ -370,7 +370,7 @@ impl Database for Db { wal.write_and_sync(write.data.clone()) .await .context(WritingWal { - database: self.name.clone(), + database: &self.name, })?; }