From 90eba702ca277e1e57deec3553e9ba78f4c88c24 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 13 Sep 2020 06:09:19 -0400 Subject: [PATCH] refactor: Remove org/bucket from WriteBufferDatabase interface (#285) --- src/server/write_buffer_routes.rs | 52 +++++++++++----------------- src/storage.rs | 25 +++++-------- src/storage/write_buffer_database.rs | 37 ++++++++------------ 3 files changed, 42 insertions(+), 72 deletions(-) diff --git a/src/server/write_buffer_routes.rs b/src/server/write_buffer_routes.rs index 3390bbf6bf..47f436211f 100644 --- a/src/server/write_buffer_routes.rs +++ b/src/server/write_buffer_routes.rs @@ -9,7 +9,7 @@ use http::header::CONTENT_ENCODING; use tracing::{debug, error, info}; -use delorean::storage::{Database, DatabaseStore}; +use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore}; use delorean_line_parser::parse_lines; use bytes::{Bytes, BytesMut}; @@ -208,8 +208,10 @@ async fn write( query_string: String::from(query), })?; + let db_name = org_and_bucket_to_database(&write_info.org, &write_info.bucket); + let db = storage - .db_or_create(&write_info.org, &write_info.bucket) + .db_or_create(&db_name) .await .map_err(|e| Box::new(e) as _) .context(BucketByName { @@ -260,13 +262,12 @@ async fn read( query_string: query, })?; - let db = storage - .db(&read_info.org, &read_info.bucket) - .await - .context(BucketNotFound { - org: read_info.org.clone(), - bucket: read_info.bucket.clone(), - })?; + let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket); + + let db = storage.db(&db_name).await.context(BucketNotFound { + org: read_info.org.clone(), + bucket: read_info.bucket.clone(), + })?; let results = db .query(&read_info.query) @@ -335,7 +336,7 @@ mod tests { use std::{collections::BTreeMap, net::SocketAddr}; use arrow::record_batch::RecordBatch; - use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore}; + use delorean::storage::{Database, DatabaseStore}; use delorean_line_parser::ParsedLine; use reqwest::{Client, Response}; use tonic::async_trait; @@ -384,7 +385,7 @@ mod tests { // Check that the data got into the right bucket let test_db = test_storage - .db("MyOrg", "MyBucket") + .db("MyOrg_MyBucket") .await .expect("Database exists"); @@ -506,36 +507,23 @@ mod tests { impl DatabaseStore for TestDatabaseStore { type Database = TestDatabase; type Error = TestError; - /// Retrieve the database specified by the org and bucket name, - /// returning None if no such database exists - /// - /// TODO: change this to take a single database name, and move the - /// computation of org/bucket to the callers - async fn db(&self, org: &str, bucket: &str) -> Option> { - let db_name = org_and_bucket_to_database(org, bucket); + /// Retrieve the database specified name + async fn db(&self, name: &str) -> Option> { let databases = self.databases.lock().await; - databases.get(&db_name).cloned() + databases.get(name).cloned() } - /// Retrieve the database specified by the org and bucket name, - /// creating it if it doesn't exist. - /// - /// TODO: change this to take a single database name, and move the computation of org/bucket - /// to the callers - async fn db_or_create( - &self, - org: &str, - bucket: &str, - ) -> Result, Self::Error> { - let db_name = org_and_bucket_to_database(org, bucket); + /// Retrieve the database specified by name, creating it if it + /// doesn't exist. + async fn db_or_create(&self, name: &str) -> Result, Self::Error> { let mut databases = self.databases.lock().await; - if let Some(db) = databases.get(&db_name) { + if let Some(db) = databases.get(name) { Ok(db.clone()) } else { let new_db = Arc::new(TestDatabase::new()); - databases.insert(db_name, new_db.clone()); + databases.insert(name.to_string(), new_db.clone()); Ok(new_db) } } diff --git a/src/storage.rs b/src/storage.rs index a4ef5d6841..1ca4b6d238 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -99,26 +99,17 @@ pub trait DatabaseStore: Debug + Send + Sync { /// The type of error this DataBase store generates type Error: std::error::Error + Send + Sync + 'static; - /// Retrieve the database specified by the org and bucket name, - /// returning None if no such database exists - /// - /// TODO: change this to take a single database name, and move the - /// computation of org/bucket to the callers - async fn db(&self, org: &str, bucket: &str) -> Option>; + /// Retrieve the database specified by `name` returning None if no + /// such database exists + async fn db(&self, name: &str) -> Option>; - /// Retrieve the database specified by the org and bucket name, - /// creating it if it doesn't exist. - /// - /// TODO: change this to take a single database name, and move the computation of org/bucket - /// to the callers - async fn db_or_create( - &self, - org: &str, - bucket: &str, - ) -> Result, Self::Error>; + /// Retrieve the database specified by `name`, creating it if it + /// doesn't exist. + async fn db_or_create(&self, name: &str) -> Result, Self::Error>; } -/// return the database name to use for the specified org and bucket name. +/// Compatibility: return the database name to use for the specified +/// org and bucket name. /// /// TODO move to somewhere else / change the traits to take the database name directly pub fn org_and_bucket_to_database(org: &str, bucket: &str) -> String { diff --git a/src/storage/write_buffer_database.rs b/src/storage/write_buffer_database.rs index 74b264b32c..2ab8e66125 100644 --- a/src/storage/write_buffer_database.rs +++ b/src/storage/write_buffer_database.rs @@ -1,4 +1,3 @@ -use super::org_and_bucket_to_database; use tonic::async_trait; use tracing::info; @@ -202,26 +201,18 @@ impl super::DatabaseStore for WriteBufferDatabases { type Database = Db; type Error = Error; - async fn db(&self, org: &str, bucket: &str) -> Option> { + async fn db(&self, name: &str) -> Option> { let databases = self.databases.read().await; - databases - .get(&org_and_bucket_to_database(org, bucket)) - .cloned() + databases.get(name).cloned() } - async fn db_or_create( - &self, - org: &str, - bucket: &str, - ) -> Result, Self::Error> { - let db_name = org_and_bucket_to_database(org, bucket); - + async fn db_or_create(&self, name: &str) -> Result, Self::Error> { // get it through a read lock first if we can { let databases = self.databases.read().await; - if let Some(db) = databases.get(&db_name) { + if let Some(db) = databases.get(name) { return Ok(db.clone()); } } @@ -230,13 +221,13 @@ impl super::DatabaseStore for WriteBufferDatabases { let mut databases = self.databases.write().await; // make sure it didn't get inserted by someone else while we were waiting for the write lock - if let Some(db) = databases.get(&db_name) { + if let Some(db) = databases.get(name) { return Ok(db.clone()); } - let db = Db::try_with_wal(db_name.to_string(), &mut self.base_dir.clone()).await?; + let db = Db::try_with_wal(name, &mut self.base_dir.clone()).await?; let db = Arc::new(db); - databases.insert(db_name, db.clone()); + databases.insert(name.to_string(), db.clone()); Ok(db) } @@ -253,7 +244,7 @@ pub struct Db { } impl Db { - pub async fn try_with_wal(name: String, wal_dir: &mut PathBuf) -> Result { + pub async fn try_with_wal(name: &str, wal_dir: &mut PathBuf) -> Result { wal_dir.push(&name); if let Err(e) = std::fs::create_dir(wal_dir.clone()) { match e.kind() { @@ -269,16 +260,16 @@ impl Db { } let dir = wal_dir.clone(); let wal_builder = WalBuilder::new(wal_dir.clone()); - let wal_details = start_wal_sync_task(wal_builder).await.context(OpeningWal { - database: name.clone(), - })?; + let wal_details = start_wal_sync_task(wal_builder) + .await + .context(OpeningWal { database: name })?; wal_details .write_metadata() .await - .context(OpeningWal { database: &name })?; + .context(OpeningWal { database: name })?; Ok(Self { - name, + name: name.to_string(), dir, partitions: RwLock::new(vec![]), next_partition_id: AtomicU32::new(1), @@ -1305,7 +1296,7 @@ mod tests { "#; { - let db = Db::try_with_wal("mydb".to_string(), &mut dir).await?; + let db = Db::try_with_wal("mydb", &mut dir).await?; let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect(); db.write_lines(&lines).await?; let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15")