diff --git a/query/src/lib.rs b/query/src/lib.rs index 21ecdbb542..586287a617 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -53,9 +53,6 @@ pub trait TSDatabase: Debug + Send + Sync { /// Stores the replicated write in the write buffer and, if enabled, the write ahead log. async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error>; - /// Execute the specified query and return arrow record batches with the result - async fn query(&self, query: &str) -> Result, Self::Error>; - /// Returns a plan that lists the names of tables in this /// database that have at least one row that matches the /// conditions listed on `predicate` @@ -104,21 +101,21 @@ pub trait TSDatabase: Debug + Send + Sync { predicate: Predicate, gby_agg: GroupByAndAggregate, ) -> Result; +} - /// Fetch the specified table names and columns as Arrow - /// RecordBatches. Columns are returned in the order specified. - async fn table_to_arrow( - &self, - table_name: &str, - columns: &[&str], - ) -> Result, Self::Error>; +#[async_trait] +pub trait SQLDatabase: Debug + Send + Sync { + type Error: std::error::Error + Send + Sync + 'static; + + /// Execute the specified query and return arrow record batches with the result + async fn query(&self, query: &str) -> Result, Self::Error>; } #[async_trait] /// Storage for `Databases` which can be retrieved by name pub trait DatabaseStore: Debug + Send + Sync { /// The type of database that is stored by this DatabaseStore - type Database: TSDatabase; + type Database: TSDatabase + SQLDatabase; /// The type of error this DataBase store generates type Error: std::error::Error + Send + Sync + 'static; diff --git a/query/src/test.rs b/query/src/test.rs index b677f7a56f..2ede429615 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -10,7 +10,7 @@ use crate::{ stringset::{StringSet, StringSetRef}, SeriesSetPlans, StringSetPlan, }, - DatabaseStore, Predicate, TSDatabase, TimestampRange, + DatabaseStore, Predicate, SQLDatabase, TSDatabase, TimestampRange, }; use data_types::data::ReplicatedWrite; @@ -263,11 +263,6 @@ impl TSDatabase for TestDatabase { Ok(()) } - /// Execute the specified query and return arrow record batches with the result - async fn query(&self, _query: &str) -> Result, Self::Error> { - unimplemented!("query Not yet implemented"); - } - /// Return all table names that are saved in this database async fn table_names(&self, predicate: Predicate) -> Result { let saved_lines = self.saved_lines.lock().await; @@ -400,14 +395,15 @@ impl TSDatabase for TestDatabase { message: "No saved query_groups in TestDatabase", }) } +} - /// Fetch the specified table names and columns as Arrow RecordBatches - async fn table_to_arrow( - &self, - _table_name: &str, - _columns: &[&str], - ) -> Result, Self::Error> { - unimplemented!("table_to_arrow Not yet implemented for test database"); +#[async_trait] +impl SQLDatabase for TestDatabase { + type Error = TestError; + + /// Execute the specified query and return arrow record batches with the result + async fn query(&self, _query: &str) -> Result, Self::Error> { + unimplemented!("query Not yet implemented"); } } diff --git a/server/src/server.rs b/server/src/server.rs index 46099de5aa..84c5e74176 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -16,7 +16,7 @@ use data_types::{ }; use influxdb_line_protocol::ParsedLine; use object_store::ObjectStore; -use query::TSDatabase; +use query::{SQLDatabase, TSDatabase}; use write_buffer::Db as WriteBufferDb; use async_trait::async_trait; diff --git a/src/server/http_routes.rs b/src/server/http_routes.rs index 1af551c875..77afa5498f 100644 --- a/src/server/http_routes.rs +++ b/src/server/http_routes.rs @@ -17,7 +17,7 @@ use tracing::{debug, error, info}; use arrow_deps::arrow; use influxdb_line_protocol::parse_lines; -use query::{org_and_bucket_to_database, DatabaseStore, TSDatabase}; +use query::{org_and_bucket_to_database, DatabaseStore, SQLDatabase, TSDatabase}; use bytes::{Bytes, BytesMut}; use futures::{self, StreamExt}; diff --git a/write_buffer/src/database.rs b/write_buffer/src/database.rs index 42ccbd980b..e15e68892d 100644 --- a/write_buffer/src/database.rs +++ b/write_buffer/src/database.rs @@ -6,7 +6,7 @@ use query::group_by::WindowDuration; use query::{ exec::{stringset::StringSet, FieldListPlan, SeriesSetPlan, SeriesSetPlans, StringSetPlan}, predicate::Predicate, - TSDatabase, + SQLDatabase, TSDatabase, }; use wal::{ writer::{start_wal_sync_task, Error as WalWriterError, WalDetails}, @@ -339,6 +339,17 @@ impl Db { Ok(()) } + + async fn table_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result> { + let partitions = self.partitions.read().await; + + let batches = partitions + .iter() + .map(|p| p.table_to_arrow(table_name, columns)) + .collect::, crate::partition::Error>>()?; + + Ok(batches) + } } #[async_trait] @@ -498,21 +509,11 @@ impl TSDatabase for Db { } } } +} - async fn table_to_arrow( - &self, - table_name: &str, - columns: &[&str], - ) -> Result, Self::Error> { - let partitions = self.partitions.read().await; - - let batches = partitions - .iter() - .map(|p| p.table_to_arrow(table_name, columns)) - .collect::, crate::partition::Error>>()?; - - Ok(batches) - } +#[async_trait] +impl SQLDatabase for Db { + type Error = Error; async fn query(&self, query: &str) -> Result, Self::Error> { let mut tables = vec![];