refactor: Remove org/bucket from WriteBufferDatabase interface

pull/24376/head
alamb 2020-09-12 07:05:54 -04:00
parent 82d5f485c3
commit f7ad7afc4f
3 changed files with 42 additions and 72 deletions

View File

@ -9,7 +9,7 @@
use http::header::CONTENT_ENCODING;
use tracing::{debug, error, info};
use delorean::storage::{Database, DatabaseStore};
use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore};
use delorean_line_parser::parse_lines;
use bytes::{Bytes, BytesMut};
@ -208,8 +208,10 @@ async fn write<T: DatabaseStore>(
query_string: String::from(query),
})?;
let db_name = org_and_bucket_to_database(&write_info.org, &write_info.bucket);
let db = storage
.db_or_create(&write_info.org, &write_info.bucket)
.db_or_create(&db_name)
.await
.map_err(|e| Box::new(e) as _)
.context(BucketByName {
@ -260,13 +262,12 @@ async fn read<T: DatabaseStore>(
query_string: query,
})?;
let db = storage
.db(&read_info.org, &read_info.bucket)
.await
.context(BucketNotFound {
org: read_info.org.clone(),
bucket: read_info.bucket.clone(),
})?;
let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket);
let db = storage.db(&db_name).await.context(BucketNotFound {
org: read_info.org.clone(),
bucket: read_info.bucket.clone(),
})?;
let results = db
.query(&read_info.query)
@ -335,7 +336,7 @@ mod tests {
use std::{collections::BTreeMap, net::SocketAddr};
use arrow::record_batch::RecordBatch;
use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore};
use delorean::storage::{Database, DatabaseStore};
use delorean_line_parser::ParsedLine;
use reqwest::{Client, Response};
use tonic::async_trait;
@ -384,7 +385,7 @@ mod tests {
// Check that the data got into the right bucket
let test_db = test_storage
.db("MyOrg", "MyBucket")
.db("MyOrg_MyBucket")
.await
.expect("Database exists");
@ -506,36 +507,23 @@ mod tests {
impl DatabaseStore for TestDatabaseStore {
type Database = TestDatabase;
type Error = TestError;
/// Retrieve the database specified by the org and bucket name,
/// returning None if no such database exists
///
/// TODO: change this to take a single database name, and move the
/// computation of org/bucket to the callers
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>> {
let db_name = org_and_bucket_to_database(org, bucket);
/// Retrieve the database specified name
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
let databases = self.databases.lock().await;
databases.get(&db_name).cloned()
databases.get(name).cloned()
}
/// Retrieve the database specified by the org and bucket name,
/// creating it if it doesn't exist.
///
/// TODO: change this to take a single database name, and move the computation of org/bucket
/// to the callers
async fn db_or_create(
&self,
org: &str,
bucket: &str,
) -> Result<Arc<Self::Database>, Self::Error> {
let db_name = org_and_bucket_to_database(org, bucket);
/// Retrieve the database specified by name, creating it if it
/// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error> {
let mut databases = self.databases.lock().await;
if let Some(db) = databases.get(&db_name) {
if let Some(db) = databases.get(name) {
Ok(db.clone())
} else {
let new_db = Arc::new(TestDatabase::new());
databases.insert(db_name, new_db.clone());
databases.insert(name.to_string(), new_db.clone());
Ok(new_db)
}
}

View File

@ -99,26 +99,17 @@ pub trait DatabaseStore: Debug + Send + Sync {
/// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static;
/// Retrieve the database specified by the org and bucket name,
/// returning None if no such database exists
///
/// TODO: change this to take a single database name, and move the
/// computation of org/bucket to the callers
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>>;
/// Retrieve the database specified by `name` returning None if no
/// such database exists
async fn db(&self, name: &str) -> Option<Arc<Self::Database>>;
/// Retrieve the database specified by the org and bucket name,
/// creating it if it doesn't exist.
///
/// TODO: change this to take a single database name, and move the computation of org/bucket
/// to the callers
async fn db_or_create(
&self,
org: &str,
bucket: &str,
) -> Result<Arc<Self::Database>, Self::Error>;
/// Retrieve the database specified by `name`, creating it if it
/// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>;
}
/// return the database name to use for the specified org and bucket name.
/// Compatibility: return the database name to use for the specified
/// org and bucket name.
///
/// TODO move to somewhere else / change the traits to take the database name directly
pub fn org_and_bucket_to_database(org: &str, bucket: &str) -> String {

View File

@ -1,4 +1,3 @@
use super::org_and_bucket_to_database;
use tonic::async_trait;
use tracing::info;
@ -202,26 +201,18 @@ impl super::DatabaseStore for WriteBufferDatabases {
type Database = Db;
type Error = Error;
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>> {
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
let databases = self.databases.read().await;
databases
.get(&org_and_bucket_to_database(org, bucket))
.cloned()
databases.get(name).cloned()
}
async fn db_or_create(
&self,
org: &str,
bucket: &str,
) -> Result<Arc<Self::Database>, Self::Error> {
let db_name = org_and_bucket_to_database(org, bucket);
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error> {
// get it through a read lock first if we can
{
let databases = self.databases.read().await;
if let Some(db) = databases.get(&db_name) {
if let Some(db) = databases.get(name) {
return Ok(db.clone());
}
}
@ -230,13 +221,13 @@ impl super::DatabaseStore for WriteBufferDatabases {
let mut databases = self.databases.write().await;
// make sure it didn't get inserted by someone else while we were waiting for the write lock
if let Some(db) = databases.get(&db_name) {
if let Some(db) = databases.get(name) {
return Ok(db.clone());
}
let db = Db::try_with_wal(db_name.to_string(), &mut self.base_dir.clone()).await?;
let db = Db::try_with_wal(name, &mut self.base_dir.clone()).await?;
let db = Arc::new(db);
databases.insert(db_name, db.clone());
databases.insert(name.to_string(), db.clone());
Ok(db)
}
@ -253,7 +244,7 @@ pub struct Db {
}
impl Db {
pub async fn try_with_wal(name: String, wal_dir: &mut PathBuf) -> Result<Self> {
pub async fn try_with_wal(name: &str, wal_dir: &mut PathBuf) -> Result<Self> {
wal_dir.push(&name);
if let Err(e) = std::fs::create_dir(wal_dir.clone()) {
match e.kind() {
@ -269,16 +260,16 @@ impl Db {
}
let dir = wal_dir.clone();
let wal_builder = WalBuilder::new(wal_dir.clone());
let wal_details = start_wal_sync_task(wal_builder).await.context(OpeningWal {
database: name.clone(),
})?;
let wal_details = start_wal_sync_task(wal_builder)
.await
.context(OpeningWal { database: name })?;
wal_details
.write_metadata()
.await
.context(OpeningWal { database: &name })?;
.context(OpeningWal { database: name })?;
Ok(Self {
name,
name: name.to_string(),
dir,
partitions: RwLock::new(vec![]),
next_partition_id: AtomicU32::new(1),
@ -1305,7 +1296,7 @@ mod tests {
"#;
{
let db = Db::try_with_wal("mydb".to_string(), &mut dir).await?;
let db = Db::try_with_wal("mydb", &mut dir).await?;
let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect();
db.write_lines(&lines).await?;
let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15")