refactor: Remove org/bucket from WriteBufferDatabase interface (#285)
parent
c4a738dfbe
commit
90eba702ca
|
@ -9,7 +9,7 @@
|
||||||
use http::header::CONTENT_ENCODING;
|
use http::header::CONTENT_ENCODING;
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, error, info};
|
||||||
|
|
||||||
use delorean::storage::{Database, DatabaseStore};
|
use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore};
|
||||||
use delorean_line_parser::parse_lines;
|
use delorean_line_parser::parse_lines;
|
||||||
|
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
|
@ -208,8 +208,10 @@ async fn write<T: DatabaseStore>(
|
||||||
query_string: String::from(query),
|
query_string: String::from(query),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let db_name = org_and_bucket_to_database(&write_info.org, &write_info.bucket);
|
||||||
|
|
||||||
let db = storage
|
let db = storage
|
||||||
.db_or_create(&write_info.org, &write_info.bucket)
|
.db_or_create(&db_name)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Box::new(e) as _)
|
.map_err(|e| Box::new(e) as _)
|
||||||
.context(BucketByName {
|
.context(BucketByName {
|
||||||
|
@ -260,13 +262,12 @@ async fn read<T: DatabaseStore>(
|
||||||
query_string: query,
|
query_string: query,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let db = storage
|
let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket);
|
||||||
.db(&read_info.org, &read_info.bucket)
|
|
||||||
.await
|
let db = storage.db(&db_name).await.context(BucketNotFound {
|
||||||
.context(BucketNotFound {
|
org: read_info.org.clone(),
|
||||||
org: read_info.org.clone(),
|
bucket: read_info.bucket.clone(),
|
||||||
bucket: read_info.bucket.clone(),
|
})?;
|
||||||
})?;
|
|
||||||
|
|
||||||
let results = db
|
let results = db
|
||||||
.query(&read_info.query)
|
.query(&read_info.query)
|
||||||
|
@ -335,7 +336,7 @@ mod tests {
|
||||||
use std::{collections::BTreeMap, net::SocketAddr};
|
use std::{collections::BTreeMap, net::SocketAddr};
|
||||||
|
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore};
|
use delorean::storage::{Database, DatabaseStore};
|
||||||
use delorean_line_parser::ParsedLine;
|
use delorean_line_parser::ParsedLine;
|
||||||
use reqwest::{Client, Response};
|
use reqwest::{Client, Response};
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
|
@ -384,7 +385,7 @@ mod tests {
|
||||||
|
|
||||||
// Check that the data got into the right bucket
|
// Check that the data got into the right bucket
|
||||||
let test_db = test_storage
|
let test_db = test_storage
|
||||||
.db("MyOrg", "MyBucket")
|
.db("MyOrg_MyBucket")
|
||||||
.await
|
.await
|
||||||
.expect("Database exists");
|
.expect("Database exists");
|
||||||
|
|
||||||
|
@ -506,36 +507,23 @@ mod tests {
|
||||||
impl DatabaseStore for TestDatabaseStore {
|
impl DatabaseStore for TestDatabaseStore {
|
||||||
type Database = TestDatabase;
|
type Database = TestDatabase;
|
||||||
type Error = TestError;
|
type Error = TestError;
|
||||||
/// Retrieve the database specified by the org and bucket name,
|
/// Retrieve the database specified name
|
||||||
/// returning None if no such database exists
|
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
|
||||||
///
|
|
||||||
/// TODO: change this to take a single database name, and move the
|
|
||||||
/// computation of org/bucket to the callers
|
|
||||||
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>> {
|
|
||||||
let db_name = org_and_bucket_to_database(org, bucket);
|
|
||||||
let databases = self.databases.lock().await;
|
let databases = self.databases.lock().await;
|
||||||
|
|
||||||
databases.get(&db_name).cloned()
|
databases.get(name).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Retrieve the database specified by the org and bucket name,
|
/// Retrieve the database specified by name, creating it if it
|
||||||
/// creating it if it doesn't exist.
|
/// doesn't exist.
|
||||||
///
|
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error> {
|
||||||
/// TODO: change this to take a single database name, and move the computation of org/bucket
|
|
||||||
/// to the callers
|
|
||||||
async fn db_or_create(
|
|
||||||
&self,
|
|
||||||
org: &str,
|
|
||||||
bucket: &str,
|
|
||||||
) -> Result<Arc<Self::Database>, Self::Error> {
|
|
||||||
let db_name = org_and_bucket_to_database(org, bucket);
|
|
||||||
let mut databases = self.databases.lock().await;
|
let mut databases = self.databases.lock().await;
|
||||||
|
|
||||||
if let Some(db) = databases.get(&db_name) {
|
if let Some(db) = databases.get(name) {
|
||||||
Ok(db.clone())
|
Ok(db.clone())
|
||||||
} else {
|
} else {
|
||||||
let new_db = Arc::new(TestDatabase::new());
|
let new_db = Arc::new(TestDatabase::new());
|
||||||
databases.insert(db_name, new_db.clone());
|
databases.insert(name.to_string(), new_db.clone());
|
||||||
Ok(new_db)
|
Ok(new_db)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,26 +99,17 @@ pub trait DatabaseStore: Debug + Send + Sync {
|
||||||
/// The type of error this DataBase store generates
|
/// The type of error this DataBase store generates
|
||||||
type Error: std::error::Error + Send + Sync + 'static;
|
type Error: std::error::Error + Send + Sync + 'static;
|
||||||
|
|
||||||
/// Retrieve the database specified by the org and bucket name,
|
/// Retrieve the database specified by `name` returning None if no
|
||||||
/// returning None if no such database exists
|
/// such database exists
|
||||||
///
|
async fn db(&self, name: &str) -> Option<Arc<Self::Database>>;
|
||||||
/// TODO: change this to take a single database name, and move the
|
|
||||||
/// computation of org/bucket to the callers
|
|
||||||
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>>;
|
|
||||||
|
|
||||||
/// Retrieve the database specified by the org and bucket name,
|
/// Retrieve the database specified by `name`, creating it if it
|
||||||
/// creating it if it doesn't exist.
|
/// doesn't exist.
|
||||||
///
|
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>;
|
||||||
/// TODO: change this to take a single database name, and move the computation of org/bucket
|
|
||||||
/// to the callers
|
|
||||||
async fn db_or_create(
|
|
||||||
&self,
|
|
||||||
org: &str,
|
|
||||||
bucket: &str,
|
|
||||||
) -> Result<Arc<Self::Database>, Self::Error>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return the database name to use for the specified org and bucket name.
|
/// Compatibility: return the database name to use for the specified
|
||||||
|
/// org and bucket name.
|
||||||
///
|
///
|
||||||
/// TODO move to somewhere else / change the traits to take the database name directly
|
/// TODO move to somewhere else / change the traits to take the database name directly
|
||||||
pub fn org_and_bucket_to_database(org: &str, bucket: &str) -> String {
|
pub fn org_and_bucket_to_database(org: &str, bucket: &str) -> String {
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use super::org_and_bucket_to_database;
|
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
|
@ -202,26 +201,18 @@ impl super::DatabaseStore for WriteBufferDatabases {
|
||||||
type Database = Db;
|
type Database = Db;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>> {
|
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
|
||||||
let databases = self.databases.read().await;
|
let databases = self.databases.read().await;
|
||||||
|
|
||||||
databases
|
databases.get(name).cloned()
|
||||||
.get(&org_and_bucket_to_database(org, bucket))
|
|
||||||
.cloned()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn db_or_create(
|
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error> {
|
||||||
&self,
|
|
||||||
org: &str,
|
|
||||||
bucket: &str,
|
|
||||||
) -> Result<Arc<Self::Database>, Self::Error> {
|
|
||||||
let db_name = org_and_bucket_to_database(org, bucket);
|
|
||||||
|
|
||||||
// get it through a read lock first if we can
|
// get it through a read lock first if we can
|
||||||
{
|
{
|
||||||
let databases = self.databases.read().await;
|
let databases = self.databases.read().await;
|
||||||
|
|
||||||
if let Some(db) = databases.get(&db_name) {
|
if let Some(db) = databases.get(name) {
|
||||||
return Ok(db.clone());
|
return Ok(db.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,13 +221,13 @@ impl super::DatabaseStore for WriteBufferDatabases {
|
||||||
let mut databases = self.databases.write().await;
|
let mut databases = self.databases.write().await;
|
||||||
|
|
||||||
// make sure it didn't get inserted by someone else while we were waiting for the write lock
|
// make sure it didn't get inserted by someone else while we were waiting for the write lock
|
||||||
if let Some(db) = databases.get(&db_name) {
|
if let Some(db) = databases.get(name) {
|
||||||
return Ok(db.clone());
|
return Ok(db.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let db = Db::try_with_wal(db_name.to_string(), &mut self.base_dir.clone()).await?;
|
let db = Db::try_with_wal(name, &mut self.base_dir.clone()).await?;
|
||||||
let db = Arc::new(db);
|
let db = Arc::new(db);
|
||||||
databases.insert(db_name, db.clone());
|
databases.insert(name.to_string(), db.clone());
|
||||||
|
|
||||||
Ok(db)
|
Ok(db)
|
||||||
}
|
}
|
||||||
|
@ -253,7 +244,7 @@ pub struct Db {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Db {
|
impl Db {
|
||||||
pub async fn try_with_wal(name: String, wal_dir: &mut PathBuf) -> Result<Self> {
|
pub async fn try_with_wal(name: &str, wal_dir: &mut PathBuf) -> Result<Self> {
|
||||||
wal_dir.push(&name);
|
wal_dir.push(&name);
|
||||||
if let Err(e) = std::fs::create_dir(wal_dir.clone()) {
|
if let Err(e) = std::fs::create_dir(wal_dir.clone()) {
|
||||||
match e.kind() {
|
match e.kind() {
|
||||||
|
@ -269,16 +260,16 @@ impl Db {
|
||||||
}
|
}
|
||||||
let dir = wal_dir.clone();
|
let dir = wal_dir.clone();
|
||||||
let wal_builder = WalBuilder::new(wal_dir.clone());
|
let wal_builder = WalBuilder::new(wal_dir.clone());
|
||||||
let wal_details = start_wal_sync_task(wal_builder).await.context(OpeningWal {
|
let wal_details = start_wal_sync_task(wal_builder)
|
||||||
database: name.clone(),
|
.await
|
||||||
})?;
|
.context(OpeningWal { database: name })?;
|
||||||
wal_details
|
wal_details
|
||||||
.write_metadata()
|
.write_metadata()
|
||||||
.await
|
.await
|
||||||
.context(OpeningWal { database: &name })?;
|
.context(OpeningWal { database: name })?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
name,
|
name: name.to_string(),
|
||||||
dir,
|
dir,
|
||||||
partitions: RwLock::new(vec![]),
|
partitions: RwLock::new(vec![]),
|
||||||
next_partition_id: AtomicU32::new(1),
|
next_partition_id: AtomicU32::new(1),
|
||||||
|
@ -1305,7 +1296,7 @@ mod tests {
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
{
|
{
|
||||||
let db = Db::try_with_wal("mydb".to_string(), &mut dir).await?;
|
let db = Db::try_with_wal("mydb", &mut dir).await?;
|
||||||
let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect();
|
let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect();
|
||||||
db.write_lines(&lines).await?;
|
db.write_lines(&lines).await?;
|
||||||
let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15")
|
let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15")
|
||||||
|
|
Loading…
Reference in New Issue