From 9a345e226cf533a61c10f005f2a3e4f92046075c Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 14 Oct 2020 08:36:49 -0400 Subject: [PATCH] chore: refactor cluster to use in memory write buffer This refactors cluster to use the in memory write buffer. It removes the injected DatabaseStore as it is no longer needed. --- Cargo.lock | 1 + delorean_cluster/Cargo.toml | 1 + delorean_cluster/src/lib.rs | 126 +++++++++++++++++++------------- delorean_data_types/src/data.rs | 4 +- 4 files changed, 80 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 07fe7ae7c0..2d150ad8f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,6 +729,7 @@ name = "delorean_cluster" version = "0.1.0" dependencies = [ "async-trait", + "delorean_arrow", "delorean_data_types", "delorean_generated_types", "delorean_line_parser", diff --git a/delorean_cluster/Cargo.toml b/delorean_cluster/Cargo.toml index 8f5e08283a..3ab1c9cd2f 100644 --- a/delorean_cluster/Cargo.toml +++ b/delorean_cluster/Cargo.toml @@ -16,3 +16,4 @@ delorean_line_parser = { path = "../delorean_line_parser" } delorean_storage = { path = "../delorean_storage" } delorean_write_buffer = { path = "../delorean_write_buffer" } tokio = { version = "0.2", features = ["full"] } +delorean_arrow = { path = "../delorean_arrow" } diff --git a/delorean_cluster/src/lib.rs b/delorean_cluster/src/lib.rs index f7b19e1efb..1fa5ffbc03 100644 --- a/delorean_cluster/src/lib.rs +++ b/delorean_cluster/src/lib.rs @@ -65,16 +65,15 @@ use std::{collections::BTreeMap, sync::Arc}; -use delorean_data_types::{ - data::{lines_to_replicated_write, ReplicatedWrite}, - database_rules::DatabaseRules, -}; +use delorean_data_types::{data::lines_to_replicated_write, database_rules::DatabaseRules}; use delorean_generated_types::wal as wb; use delorean_line_parser::ParsedLine; -use delorean_storage::{Database, DatabaseStore}; +use delorean_storage::Database; +use delorean_write_buffer::Db as WriteBufferDb; use async_trait::async_trait; -use snafu::{ResultExt, Snafu}; +use delorean_arrow::arrow::record_batch::RecordBatch; +use snafu::{OptionExt, ResultExt, Snafu}; use tokio::sync::RwLock; type DatabaseError = Box; @@ -87,6 +86,8 @@ pub enum Error { DatabaseNotFound { db: String }, #[snafu(display("database error: {}", source))] UnknownDatabaseError { source: DatabaseError }, + #[snafu(display("no local buffer for database: {}", db))] + NoLocalBuffer { db: String }, } pub type Result = std::result::Result; @@ -95,26 +96,32 @@ pub type Result = std::result::Result; /// as well as how they communicate with other Delorean servers. Each server /// will have one of these structs, which keeps track of all replication and query rules. #[derive(Debug)] -pub struct Server { - #[allow(dead_code)] - database_rules: RwLock>>, - pub local_store: S, +pub struct Server { + databases: RwLock>>, #[allow(dead_code)] connection_manager: M, } -impl Server { - pub fn new(connection_manager: M, local_store: S) -> Self { +impl Server { + pub fn new(connection_manager: M) -> Self { Self { - database_rules: RwLock::new(BTreeMap::new()), - local_store, + databases: RwLock::new(BTreeMap::new()), connection_manager, } } - pub async fn create_database(&self, db: &str, rules: DatabaseRules) -> Result<()> { - let mut rules_map = self.database_rules.write().await; - rules_map.insert(db.into(), Arc::new(rules)); + pub async fn create_database(&self, db_name: &str, rules: DatabaseRules) -> Result<()> { + let mut database_map = self.databases.write().await; + let buffer = if rules.store_locally { + Some(WriteBufferDb::new(db_name)) + } else { + None + }; + + let db = Db { rules, buffer }; + + database_map.insert(db_name.into(), Arc::new(db)); + Ok(()) } @@ -122,20 +129,42 @@ impl Server { /// is then replicated to other delorean servers based on the configuration of the db. /// This is step #1 from the above diagram. pub async fn write_lines(&self, db: &str, lines: &[ParsedLine<'_>]) -> Result<()> { - let rules = match self.database_rules.read().await.get(db) { - Some(d) => d.clone(), - None => return DatabaseNotFound { db }.fail(), - }; + let db = self + .databases + .read() + .await + .get(db) + .context(DatabaseNotFound { db })? + .clone(); - let data = lines_to_replicated_write(0, 0, lines, &rules); + let data = lines_to_replicated_write(0, 0, lines, &db.rules); - if rules.store_locally { - self.store_local(db, &data).await?; + if let Some(buf) = &db.buffer { + buf.store_replicated_write(&data) + .await + .map_err(|e| Box::new(e) as DatabaseError) + .context(UnknownDatabaseError {})?; } Ok(()) } + pub async fn query_local(&self, db_name: &str, query: &str) -> Result> { + let db = self + .databases + .read() + .await + .get(db_name) + .context(DatabaseNotFound { db: db_name })? + .clone(); + + let buff = db.buffer.as_ref().context(NoLocalBuffer { db: db_name })?; + buff.query(query) + .await + .map_err(|e| Box::new(e) as DatabaseError) + .context(UnknownDatabaseError {}) + } + pub async fn handle_replicated_write( &self, _db: &str, @@ -143,21 +172,6 @@ impl Server { ) -> Result<()> { unimplemented!() } - - async fn store_local(&self, db: &str, write: &ReplicatedWrite) -> Result<()> { - let local = self - .local_store - .db_or_create(db) - .await - .map_err(|e| Box::new(e) as DatabaseError) - .context(UnknownDatabaseError {})?; - - local - .store_replicated_write(write) - .await - .map_err(|e| Box::new(e) as DatabaseError) - .context(UnknownDatabaseError {}) - } } /// The Server will ask the ConnectionManager for connections to a specific remote server. @@ -186,12 +200,18 @@ pub trait RemoteServer { ) -> Result<(), Self::Error>; } +#[derive(Debug)] +struct Db { + pub rules: DatabaseRules, + pub buffer: Option, +} + #[cfg(test)] mod tests { use super::*; use async_trait::async_trait; + use delorean_arrow::arrow::{csv, util::string_writer::StringWriter}; use delorean_line_parser::parse_lines; - use delorean_storage::test::TestDatabaseStore; use snafu::Snafu; // type TestError = Box; @@ -200,27 +220,33 @@ mod tests { #[tokio::test(threaded_scheduler)] async fn writes_local() -> Result { // TODO: update this to use an actual database store and database backed entirely by memory - let store = TestDatabaseStore::new(); let manager = TestConnectionManager::new(); - let server = Server::new(manager, store); + let server = Server::new(manager); let rules = DatabaseRules { store_locally: true, ..Default::default() }; server.create_database("foo", rules).await.unwrap(); - let line = "cpu foo=1 10"; + let line = "cpu bar=1 10"; let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); server.write_lines("foo", &lines).await.unwrap(); - let db = server.local_store.db_or_create("foo").await.unwrap(); - let write = &db.get_writes().await[0]; - let batch = write.write_buffer_batch().unwrap(); - let entry = batch.entries().unwrap().get(0); - let table_batch = entry.table_batches().unwrap().get(0); + // panic!("blah {:?}", server.db("foo").await.buffer.as_ref().unwrap().tag_column_names(Some("cpu".to_string()), None, None).await.unwrap()); - assert_eq!(table_batch.name().unwrap(), "cpu"); - assert_eq!(table_batch.rows().unwrap().len(), 1); + let results = server + .query_local("foo", "select * from cpu") + .await + .unwrap(); + + let mut sw = StringWriter::new(); + { + let mut writer = csv::Writer::new(&mut sw); + for r in results { + writer.write(&r).unwrap(); + } + } + assert_eq!(&sw.to_string(), "bar,time\n1,10\n"); Ok(()) } diff --git a/delorean_data_types/src/data.rs b/delorean_data_types/src/data.rs index c1d9454af8..3a5a10a6e1 100644 --- a/delorean_data_types/src/data.rs +++ b/delorean_data_types/src/data.rs @@ -6,7 +6,7 @@ use crate::TIME_COLUMN_NAME; use delorean_generated_types::wal as wb; use delorean_line_parser::{FieldValue, ParsedLine}; -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeMap; use chrono::Utc; use crc32fast::Hasher; @@ -48,7 +48,7 @@ pub fn lines_to_replicated_write( writer: u8, sequence: u64, lines: &[ParsedLine<'_>], - rules: &Arc, + rules: &DatabaseRules, ) -> ReplicatedWrite { let default_time = Utc::now(); let entry_bytes = split_lines_into_write_entry_partitions(