chore: refactor cluster to use in memory write buffer
This refactors cluster to use the in memory write buffer. It removes the injected DatabaseStore as it is no longer needed.pull/24376/head
parent
0d6bfd2f29
commit
9a345e226c
|
@ -729,6 +729,7 @@ name = "delorean_cluster"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"delorean_arrow",
|
||||
"delorean_data_types",
|
||||
"delorean_generated_types",
|
||||
"delorean_line_parser",
|
||||
|
|
|
@ -16,3 +16,4 @@ delorean_line_parser = { path = "../delorean_line_parser" }
|
|||
delorean_storage = { path = "../delorean_storage" }
|
||||
delorean_write_buffer = { path = "../delorean_write_buffer" }
|
||||
tokio = { version = "0.2", features = ["full"] }
|
||||
delorean_arrow = { path = "../delorean_arrow" }
|
||||
|
|
|
@ -65,16 +65,15 @@
|
|||
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use delorean_data_types::{
|
||||
data::{lines_to_replicated_write, ReplicatedWrite},
|
||||
database_rules::DatabaseRules,
|
||||
};
|
||||
use delorean_data_types::{data::lines_to_replicated_write, database_rules::DatabaseRules};
|
||||
use delorean_generated_types::wal as wb;
|
||||
use delorean_line_parser::ParsedLine;
|
||||
use delorean_storage::{Database, DatabaseStore};
|
||||
use delorean_storage::Database;
|
||||
use delorean_write_buffer::Db as WriteBufferDb;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use delorean_arrow::arrow::record_batch::RecordBatch;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
@ -87,6 +86,8 @@ pub enum Error {
|
|||
DatabaseNotFound { db: String },
|
||||
#[snafu(display("database error: {}", source))]
|
||||
UnknownDatabaseError { source: DatabaseError },
|
||||
#[snafu(display("no local buffer for database: {}", db))]
|
||||
NoLocalBuffer { db: String },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -95,26 +96,32 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// as well as how they communicate with other Delorean servers. Each server
|
||||
/// will have one of these structs, which keeps track of all replication and query rules.
|
||||
#[derive(Debug)]
|
||||
pub struct Server<M: ConnectionManager, S: DatabaseStore> {
|
||||
#[allow(dead_code)]
|
||||
database_rules: RwLock<BTreeMap<String, Arc<DatabaseRules>>>,
|
||||
pub local_store: S,
|
||||
pub struct Server<M: ConnectionManager> {
|
||||
databases: RwLock<BTreeMap<String, Arc<Db>>>,
|
||||
#[allow(dead_code)]
|
||||
connection_manager: M,
|
||||
}
|
||||
|
||||
impl<M: ConnectionManager, S: DatabaseStore> Server<M, S> {
|
||||
pub fn new(connection_manager: M, local_store: S) -> Self {
|
||||
impl<M: ConnectionManager> Server<M> {
|
||||
pub fn new(connection_manager: M) -> Self {
|
||||
Self {
|
||||
database_rules: RwLock::new(BTreeMap::new()),
|
||||
local_store,
|
||||
databases: RwLock::new(BTreeMap::new()),
|
||||
connection_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_database(&self, db: &str, rules: DatabaseRules) -> Result<()> {
|
||||
let mut rules_map = self.database_rules.write().await;
|
||||
rules_map.insert(db.into(), Arc::new(rules));
|
||||
pub async fn create_database(&self, db_name: &str, rules: DatabaseRules) -> Result<()> {
|
||||
let mut database_map = self.databases.write().await;
|
||||
let buffer = if rules.store_locally {
|
||||
Some(WriteBufferDb::new(db_name))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let db = Db { rules, buffer };
|
||||
|
||||
database_map.insert(db_name.into(), Arc::new(db));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -122,20 +129,42 @@ impl<M: ConnectionManager, S: DatabaseStore> Server<M, S> {
|
|||
/// is then replicated to other delorean servers based on the configuration of the db.
|
||||
/// This is step #1 from the above diagram.
|
||||
pub async fn write_lines(&self, db: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
|
||||
let rules = match self.database_rules.read().await.get(db) {
|
||||
Some(d) => d.clone(),
|
||||
None => return DatabaseNotFound { db }.fail(),
|
||||
};
|
||||
let db = self
|
||||
.databases
|
||||
.read()
|
||||
.await
|
||||
.get(db)
|
||||
.context(DatabaseNotFound { db })?
|
||||
.clone();
|
||||
|
||||
let data = lines_to_replicated_write(0, 0, lines, &rules);
|
||||
let data = lines_to_replicated_write(0, 0, lines, &db.rules);
|
||||
|
||||
if rules.store_locally {
|
||||
self.store_local(db, &data).await?;
|
||||
if let Some(buf) = &db.buffer {
|
||||
buf.store_replicated_write(&data)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as DatabaseError)
|
||||
.context(UnknownDatabaseError {})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn query_local(&self, db_name: &str, query: &str) -> Result<Vec<RecordBatch>> {
|
||||
let db = self
|
||||
.databases
|
||||
.read()
|
||||
.await
|
||||
.get(db_name)
|
||||
.context(DatabaseNotFound { db: db_name })?
|
||||
.clone();
|
||||
|
||||
let buff = db.buffer.as_ref().context(NoLocalBuffer { db: db_name })?;
|
||||
buff.query(query)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as DatabaseError)
|
||||
.context(UnknownDatabaseError {})
|
||||
}
|
||||
|
||||
pub async fn handle_replicated_write(
|
||||
&self,
|
||||
_db: &str,
|
||||
|
@ -143,21 +172,6 @@ impl<M: ConnectionManager, S: DatabaseStore> Server<M, S> {
|
|||
) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn store_local(&self, db: &str, write: &ReplicatedWrite) -> Result<()> {
|
||||
let local = self
|
||||
.local_store
|
||||
.db_or_create(db)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as DatabaseError)
|
||||
.context(UnknownDatabaseError {})?;
|
||||
|
||||
local
|
||||
.store_replicated_write(write)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as DatabaseError)
|
||||
.context(UnknownDatabaseError {})
|
||||
}
|
||||
}
|
||||
|
||||
/// The Server will ask the ConnectionManager for connections to a specific remote server.
|
||||
|
@ -186,12 +200,18 @@ pub trait RemoteServer {
|
|||
) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Db {
|
||||
pub rules: DatabaseRules,
|
||||
pub buffer: Option<WriteBufferDb>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use delorean_arrow::arrow::{csv, util::string_writer::StringWriter};
|
||||
use delorean_line_parser::parse_lines;
|
||||
use delorean_storage::test::TestDatabaseStore;
|
||||
use snafu::Snafu;
|
||||
|
||||
// type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
@ -200,27 +220,33 @@ mod tests {
|
|||
#[tokio::test(threaded_scheduler)]
|
||||
async fn writes_local() -> Result {
|
||||
// TODO: update this to use an actual database store and database backed entirely by memory
|
||||
let store = TestDatabaseStore::new();
|
||||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, store);
|
||||
let server = Server::new(manager);
|
||||
let rules = DatabaseRules {
|
||||
store_locally: true,
|
||||
..Default::default()
|
||||
};
|
||||
server.create_database("foo", rules).await.unwrap();
|
||||
|
||||
let line = "cpu foo=1 10";
|
||||
let line = "cpu bar=1 10";
|
||||
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
|
||||
server.write_lines("foo", &lines).await.unwrap();
|
||||
|
||||
let db = server.local_store.db_or_create("foo").await.unwrap();
|
||||
let write = &db.get_writes().await[0];
|
||||
let batch = write.write_buffer_batch().unwrap();
|
||||
let entry = batch.entries().unwrap().get(0);
|
||||
let table_batch = entry.table_batches().unwrap().get(0);
|
||||
// panic!("blah {:?}", server.db("foo").await.buffer.as_ref().unwrap().tag_column_names(Some("cpu".to_string()), None, None).await.unwrap());
|
||||
|
||||
assert_eq!(table_batch.name().unwrap(), "cpu");
|
||||
assert_eq!(table_batch.rows().unwrap().len(), 1);
|
||||
let results = server
|
||||
.query_local("foo", "select * from cpu")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut sw = StringWriter::new();
|
||||
{
|
||||
let mut writer = csv::Writer::new(&mut sw);
|
||||
for r in results {
|
||||
writer.write(&r).unwrap();
|
||||
}
|
||||
}
|
||||
assert_eq!(&sw.to_string(), "bar,time\n1,10\n");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use crate::TIME_COLUMN_NAME;
|
|||
use delorean_generated_types::wal as wb;
|
||||
use delorean_line_parser::{FieldValue, ParsedLine};
|
||||
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use chrono::Utc;
|
||||
use crc32fast::Hasher;
|
||||
|
@ -48,7 +48,7 @@ pub fn lines_to_replicated_write(
|
|||
writer: u8,
|
||||
sequence: u64,
|
||||
lines: &[ParsedLine<'_>],
|
||||
rules: &Arc<DatabaseRules>,
|
||||
rules: &DatabaseRules,
|
||||
) -> ReplicatedWrite {
|
||||
let default_time = Utc::now();
|
||||
let entry_bytes = split_lines_into_write_entry_partitions(
|
||||
|
|
Loading…
Reference in New Issue