refactor: Move SQL functions into is own trait (#511)

* refactor: remove uneeded function table_to_arrow from Trait

* refactor: Move SQL functions into is own trait
pull/24376/head
Andrew Lamb 2020-12-02 08:23:37 -05:00 committed by GitHub
parent c99d389a70
commit ecc4eee8e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 35 additions and 41 deletions

View File

@ -53,9 +53,6 @@ pub trait TSDatabase: Debug + Send + Sync {
/// Stores the replicated write in the write buffer and, if enabled, the write ahead log.
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error>;
/// Execute the specified query and return arrow record batches with the result
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error>;
/// Returns a plan that lists the names of tables in this
/// database that have at least one row that matches the
/// conditions listed on `predicate`
@ -104,21 +101,21 @@ pub trait TSDatabase: Debug + Send + Sync {
predicate: Predicate,
gby_agg: GroupByAndAggregate,
) -> Result<SeriesSetPlans, Self::Error>;
}
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
async fn table_to_arrow(
&self,
table_name: &str,
columns: &[&str],
) -> Result<Vec<RecordBatch>, Self::Error>;
#[async_trait]
pub trait SQLDatabase: Debug + Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
/// Execute the specified query and return arrow record batches with the result
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error>;
}
#[async_trait]
/// Storage for `Databases` which can be retrieved by name
pub trait DatabaseStore: Debug + Send + Sync {
/// The type of database that is stored by this DatabaseStore
type Database: TSDatabase;
type Database: TSDatabase + SQLDatabase;
/// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static;

View File

@ -10,7 +10,7 @@ use crate::{
stringset::{StringSet, StringSetRef},
SeriesSetPlans, StringSetPlan,
},
DatabaseStore, Predicate, TSDatabase, TimestampRange,
DatabaseStore, Predicate, SQLDatabase, TSDatabase, TimestampRange,
};
use data_types::data::ReplicatedWrite;
@ -263,11 +263,6 @@ impl TSDatabase for TestDatabase {
Ok(())
}
/// Execute the specified query and return arrow record batches with the result
async fn query(&self, _query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!("query Not yet implemented");
}
/// Return all table names that are saved in this database
async fn table_names(&self, predicate: Predicate) -> Result<StringSetPlan, Self::Error> {
let saved_lines = self.saved_lines.lock().await;
@ -400,14 +395,15 @@ impl TSDatabase for TestDatabase {
message: "No saved query_groups in TestDatabase",
})
}
}
/// Fetch the specified table names and columns as Arrow RecordBatches
async fn table_to_arrow(
&self,
_table_name: &str,
_columns: &[&str],
) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!("table_to_arrow Not yet implemented for test database");
#[async_trait]
impl SQLDatabase for TestDatabase {
type Error = TestError;
/// Execute the specified query and return arrow record batches with the result
async fn query(&self, _query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!("query Not yet implemented");
}
}

View File

@ -16,7 +16,7 @@ use data_types::{
};
use influxdb_line_protocol::ParsedLine;
use object_store::ObjectStore;
use query::TSDatabase;
use query::{SQLDatabase, TSDatabase};
use write_buffer::Db as WriteBufferDb;
use async_trait::async_trait;

View File

@ -17,7 +17,7 @@ use tracing::{debug, error, info};
use arrow_deps::arrow;
use influxdb_line_protocol::parse_lines;
use query::{org_and_bucket_to_database, DatabaseStore, TSDatabase};
use query::{org_and_bucket_to_database, DatabaseStore, SQLDatabase, TSDatabase};
use bytes::{Bytes, BytesMut};
use futures::{self, StreamExt};

View File

@ -6,7 +6,7 @@ use query::group_by::WindowDuration;
use query::{
exec::{stringset::StringSet, FieldListPlan, SeriesSetPlan, SeriesSetPlans, StringSetPlan},
predicate::Predicate,
TSDatabase,
SQLDatabase, TSDatabase,
};
use wal::{
writer::{start_wal_sync_task, Error as WalWriterError, WalDetails},
@ -339,6 +339,17 @@ impl Db {
Ok(())
}
async fn table_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result<Vec<RecordBatch>> {
let partitions = self.partitions.read().await;
let batches = partitions
.iter()
.map(|p| p.table_to_arrow(table_name, columns))
.collect::<Result<Vec<_>, crate::partition::Error>>()?;
Ok(batches)
}
}
#[async_trait]
@ -498,21 +509,11 @@ impl TSDatabase for Db {
}
}
}
}
async fn table_to_arrow(
&self,
table_name: &str,
columns: &[&str],
) -> Result<Vec<RecordBatch>, Self::Error> {
let partitions = self.partitions.read().await;
let batches = partitions
.iter()
.map(|p| p.table_to_arrow(table_name, columns))
.collect::<Result<Vec<_>, crate::partition::Error>>()?;
Ok(batches)
}
#[async_trait]
impl SQLDatabase for Db {
type Error = Error;
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
let mut tables = vec![];