feat: Extract SQL planning out of databases (#618)

pull/24376/head
Andrew Lamb 2021-01-07 13:13:30 -05:00 committed by GitHub
parent d17ef800c5
commit c672bb341d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 379 additions and 423 deletions

2
Cargo.lock generated
View File

@ -1822,7 +1822,6 @@ dependencies = [
"influxdb_line_protocol",
"query",
"snafu",
"sqlparser",
"string-interner",
"test_helpers",
"tokio",
@ -2424,6 +2423,7 @@ dependencies = [
"serde",
"serde_urlencoded 0.6.1",
"snafu",
"sqlparser",
"test_helpers",
"tokio",
"tracing",

View File

@ -76,6 +76,12 @@ impl<'a> DatabaseName<'a> {
}
}
impl<'a> std::convert::From<DatabaseName<'a>> for String {
fn from(name: DatabaseName<'a>) -> Self {
name.0.to_string()
}
}
impl<'a> std::convert::TryFrom<&'a str> for DatabaseName<'a> {
type Error = DatabaseNameError;

View File

@ -17,7 +17,6 @@ async-trait = "0.1"
chrono = "0.4"
flatbuffers = "0.6.1"
snafu = "0.6.2"
sqlparser = "0.6.1"
string-interner = "0.12.0"
tokio = { version = "0.2", features = ["full"] }
tracing = "0.1"

View File

@ -4,7 +4,7 @@ use query::group_by::WindowDuration;
use query::{
exec::{stringset::StringSet, FieldListPlan, SeriesSetPlan, SeriesSetPlans, StringSetPlan},
predicate::Predicate,
Database, SQLDatabase,
Database,
};
use query::{group_by::Aggregate, predicate::PredicateBuilder};
@ -19,11 +19,8 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use arrow_deps::{
arrow::{datatypes::Schema as ArrowSchema, record_batch::RecordBatch},
datafusion::{
datasource::MemTable, error::DataFusionError, execution::context::ExecutionContext,
logical_plan::LogicalPlan, physical_plan::collect, prelude::ExecutionConfig,
},
arrow::record_batch::RecordBatch,
datafusion::{error::DataFusionError, logical_plan::LogicalPlan},
};
use data_types::data::ReplicatedWrite;
@ -31,11 +28,6 @@ use crate::dictionary::Error as DictionaryError;
use async_trait::async_trait;
use snafu::{ResultExt, Snafu};
use sqlparser::{
ast::{SetExpr, Statement, TableFactor},
dialect::GenericDialect,
parser::Parser,
};
use tokio::sync::RwLock;
#[derive(Debug, Snafu)]
@ -87,24 +79,12 @@ pub enum Error {
#[snafu(display("id conversion error"))]
IdConversionError { source: std::num::TryFromIntError },
#[snafu(display("Invalid sql query: {} : {}", query, source))]
InvalidSqlQuery {
query: String,
source: sqlparser::parser::ParserError,
},
#[snafu(display("error executing query {}: {}", query, source))]
QueryError {
query: String,
source: DataFusionError,
},
#[snafu(display("Unsupported SQL statement in query {}: {}", query, statement))]
UnsupportedStatement {
query: String,
statement: Box<Statement>,
},
#[snafu(display("replicated write from writer {} missing payload", writer))]
MissingPayload { writer: u32 },
}
@ -293,64 +273,6 @@ impl Database for MutableBufferDb {
}
}
}
}
#[async_trait]
impl SQLDatabase for MutableBufferDb {
type Error = Error;
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
let mut tables = vec![];
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, query).context(InvalidSqlQuery { query })?;
for statement in ast {
match statement {
Statement::Query(q) => {
if let SetExpr::Select(q) = q.body {
for item in q.from {
if let TableFactor::Table { name, .. } = item.relation {
let name = name.to_string();
let data = self.table_to_arrow(&name, &[]).await?;
tables.push(ArrowTable {
name,
schema: data[0].schema().clone(),
data,
});
}
}
}
}
_ => {
return UnsupportedStatement {
query: query.to_string(),
statement,
}
.fail()
}
}
}
let config = ExecutionConfig::new().with_batch_size(1024 * 1024);
let mut ctx = ExecutionContext::with_config(config);
for table in tables {
let provider =
MemTable::try_new(table.schema, vec![table.data]).context(QueryError { query })?;
ctx.register_table(&table.name, Box::new(provider));
}
let plan = ctx
.create_logical_plan(&query)
.context(QueryError { query })?;
let plan = ctx.optimize(&plan).context(QueryError { query })?;
let plan = ctx
.create_physical_plan(&plan)
.context(QueryError { query })?;
collect(plan).await.context(QueryError { query })
}
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
@ -1037,12 +959,6 @@ impl Visitor for WindowGroupsVisitor {
}
}
struct ArrowTable {
name: String,
schema: Arc<ArrowSchema>,
data: Vec<RecordBatch>,
}
#[cfg(test)]
mod tests {
use super::*;
@ -1053,6 +969,7 @@ mod tests {
seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetItem},
Executor,
},
frontend::sql::SQLQueryPlanner,
predicate::PredicateBuilder,
Database,
};
@ -1063,7 +980,7 @@ mod tests {
datatypes::DataType,
},
assert_table_eq,
datafusion::prelude::*,
datafusion::{physical_plan::collect, prelude::*},
};
use influxdb_line_protocol::{parse_lines, ParsedLine};
use test_helpers::str_pair_vec_to_vec;
@ -1208,7 +1125,7 @@ mod tests {
.collect();
write_lines(&db, &lines).await;
let results = db.query("select * from cpu").await?;
let results = run_sql_query(&db, "select * from cpu").await;
let expected_cpu_table = &[
"+------+-------+--------+------+------+",
@ -1944,4 +1861,13 @@ mod tests {
let mut writer = query::test::TestLPWriter::default();
writer.write_lines(database, lines).await.unwrap()
}
async fn run_sql_query(database: &MutableBufferDb, query: &str) -> Vec<RecordBatch> {
let planner = SQLQueryPlanner::default();
let executor = Executor::new();
let physical_plan = planner.query(database, query, &executor).await.unwrap();
collect(physical_plan).await.unwrap()
}
}

View File

@ -61,10 +61,8 @@ mod column;
pub mod database;
mod dictionary;
mod partition;
mod store;
mod table;
// Allow restore chunks to be used outside of this crate (for
// benchmarking)
pub use crate::database::MutableBufferDb;
pub use crate::store::MutableBufferDatabases;

View File

@ -1,71 +0,0 @@
use async_trait::async_trait;
use query::DatabaseStore;
use snafu::Snafu;
use tokio::sync::RwLock;
use std::sync::Arc;
use std::{collections::BTreeMap, path::PathBuf};
use crate::database::MutableBufferDb;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error reading from dir {:?}: {}", dir, source))]
ReadError {
dir: PathBuf,
source: std::io::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Default)]
pub struct MutableBufferDatabases {
databases: RwLock<BTreeMap<String, Arc<MutableBufferDb>>>,
}
impl MutableBufferDatabases {
pub async fn add_db(&self, db: MutableBufferDb) {
let mut databases = self.databases.write().await;
databases.insert(db.name.clone(), Arc::new(db));
}
}
#[async_trait]
impl DatabaseStore for MutableBufferDatabases {
type Database = MutableBufferDb;
type Error = Error;
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
let databases = self.databases.read().await;
databases.get(name).cloned()
}
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error> {
// get it through a read lock first if we can
{
let databases = self.databases.read().await;
if let Some(db) = databases.get(name) {
return Ok(db.clone());
}
}
// database doesn't exist yet so acquire the write lock and get or insert
let mut databases = self.databases.write().await;
// make sure it didn't get inserted by someone else while we were waiting for
// the write lock
if let Some(db) = databases.get(name) {
return Ok(db.clone());
}
let db = MutableBufferDb::new(name);
let db = Arc::new(db);
databases.insert(name.to_string(), db.clone());
Ok(db)
}
}

View File

@ -19,6 +19,7 @@ croaring = "0.4.5"
chrono = "0.4"
arrow_deps = { path = "../arrow_deps" }
sqlparser = "0.6.1"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
data_types = { path = "../data_types" }
test_helpers = { path = "../test_helpers" }

View File

@ -1,10 +1,10 @@
//! This module handles the manipulation / execution of storage
//! plans. This is currently implemented using DataFusion, and this
//! interface abstracts away many of the details
pub(crate) mod context;
mod counters;
pub mod field;
pub mod fieldlist;
mod planning;
mod schema_pivot;
pub mod seriesset;
pub mod stringset;
@ -17,8 +17,8 @@ use arrow_deps::{
};
use counters::ExecutionCounters;
use context::IOxExecutionContext;
use field::FieldColumns;
use planning::IOxExecutionContext;
use schema_pivot::SchemaPivotNode;
use fieldlist::{FieldList, IntoFieldList};
@ -244,7 +244,8 @@ impl Executor {
pub async fn to_string_set(&self, plan: StringSetPlan) -> Result<StringSetRef> {
match plan {
StringSetPlan::Known(res) => res,
StringSetPlan::Plan(plans) => run_logical_plans(self.counters.clone(), plans)
StringSetPlan::Plan(plans) => self
.run_logical_plans(plans)
.await?
.into_stringset()
.context(StringSetConversion),
@ -281,8 +282,8 @@ impl Executor {
let handles = plans
.into_iter()
.map(|plan| {
// Clone Arc's for transmission to threads
let counters = self.counters.clone();
// TODO run these on some executor other than the main tokio pool (maybe?)
let ctx = self.new_context();
let (plan_tx, plan_rx) = mpsc::channel(1);
rx_channels.push(plan_rx);
@ -297,10 +298,8 @@ impl Executor {
let tag_columns = Arc::new(tag_columns);
// TODO run these on some executor other than the main tokio pool (maybe?)
let ctx = IOxExecutionContext::new(counters);
let physical_plan = ctx
.make_plan(&plan)
.prepare_plan(&plan)
.await
.context(DataFusionPhysicalPlanning)?;
@ -358,7 +357,7 @@ impl Executor {
tokio::task::spawn(async move {
let ctx = IOxExecutionContext::new(counters);
let physical_plan = ctx
.make_plan(&plan)
.prepare_plan(&plan)
.await
.context(DataFusionPhysicalPlanning)?;
@ -390,8 +389,40 @@ impl Executor {
/// Run the plan and return a record batch reader for reading the results
pub async fn run_logical_plan(&self, plan: LogicalPlan) -> Result<Vec<RecordBatch>> {
let counters = self.counters.clone();
run_logical_plans(counters, vec![plan]).await
self.run_logical_plans(vec![plan]).await
}
/// Create a new execution context, suitable for executing a new query
pub fn new_context(&self) -> IOxExecutionContext {
IOxExecutionContext::new(self.counters.clone())
}
/// plans and runs the plans in parallel and collects the results
/// run each plan in parallel and collect the results
async fn run_logical_plans(&self, plans: Vec<LogicalPlan>) -> Result<Vec<RecordBatch>> {
let value_futures = plans
.into_iter()
.map(|plan| {
let ctx = self.new_context();
// TODO run these on some executor other than the main tokio pool
tokio::task::spawn(async move {
let physical_plan = ctx.prepare_plan(&plan).await.expect("making logical plan");
// TODO: avoid this buffering
ctx.collect(physical_plan)
.await
.context(DataFusionExecution)
})
})
.collect::<Vec<_>>();
// now, wait for all the values to resolve and collect them together
let mut results = Vec::new();
for join_handle in value_futures {
let mut plan_result = join_handle.await.context(JoinError)??;
results.append(&mut plan_result);
}
Ok(results)
}
}
/// Create a SchemaPivot node which an arbitrary input like
@ -414,38 +445,6 @@ pub fn make_schema_pivot(input: LogicalPlan) -> LogicalPlan {
LogicalPlan::Extension { node }
}
/// plans and runs the plans in parallel and collects the results
/// run each plan in parallel and collect the results
async fn run_logical_plans(
counters: Arc<ExecutionCounters>,
plans: Vec<LogicalPlan>,
) -> Result<Vec<RecordBatch>> {
let value_futures = plans
.into_iter()
.map(|plan| {
let counters = counters.clone();
// TODO run these on some executor other than the main tokio pool
tokio::task::spawn(async move {
let ctx = IOxExecutionContext::new(counters);
let physical_plan = ctx.make_plan(&plan).await.expect("making logical plan");
// TODO: avoid this buffering
ctx.collect(physical_plan)
.await
.context(DataFusionExecution)
})
})
.collect::<Vec<_>>();
// now, wait for all the values to resolve and collect them together
let mut results = Vec::new();
for join_handle in value_futures {
let mut plan_result = join_handle.await.context(JoinError)??;
results.append(&mut plan_result);
}
Ok(results)
}
#[cfg(test)]
mod tests {
use arrow_deps::{

View File

@ -1,7 +1,7 @@
//! This module contains plumbing to connect InfluxDB IOx extensions to
//! DataFusion
use std::sync::Arc;
use std::{fmt, sync::Arc};
use arrow_deps::{
arrow::record_batch::RecordBatch,
@ -27,6 +27,8 @@ pub use arrow_deps::datafusion::error::{DataFusionError as Error, Result};
use super::counters::ExecutionCounters;
/// This structure implements the DataFusion notion of "query planner"
/// and is needed to create plans with the IOx extension nodes.
struct IOxQueryPlanner {}
impl QueryPlanner for IOxQueryPlanner {
@ -77,11 +79,26 @@ impl ExtensionPlanner for IOxExtensionPlanner {
}
}
/// This is an execution context for planning in IOx.
/// It wraps a DataFusion execution context and incudes
/// statistical counters.
///
/// Eventually we envision this as also managing resources
/// and providing visibility into what plans are running
pub struct IOxExecutionContext {
counters: Arc<ExecutionCounters>,
inner: ExecutionContext,
}
impl fmt::Debug for IOxExecutionContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IOxExecutionContext")
.field("counters", &self.counters)
.field("inner", &"<DataFusion ExecutionContext>")
.finish()
}
}
impl IOxExecutionContext {
/// Create an ExecutionContext suitable for executing DataFusion plans
pub fn new(counters: Arc<ExecutionCounters>) -> Self {
@ -96,7 +113,25 @@ impl IOxExecutionContext {
Self { counters, inner }
}
pub async fn make_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
/// returns a reference to the inner datafusion execution context
pub fn inner(&self) -> &ExecutionContext {
&self.inner
}
/// returns a mutable reference to the inner datafusion execution context
pub fn inner_mut(&mut self) -> &mut ExecutionContext {
&mut self.inner
}
/// Prepare a SQL statement for execution. This assumes that any
/// tables referenced in the SQL have been registered with this context
pub async fn prepare_sql(&mut self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
let logical_plan = self.inner.sql(sql)?.to_logical_plan();
self.prepare_plan(&logical_plan).await
}
/// Prepare (optimize + plan) a pre-created logical plan for execution
pub async fn prepare_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
debug!(
"Creating plan: Initial plan\n----\n{}\n{}\n----",
plan.display_indent_schema(),

2
query/src/frontend.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod influxdb;
pub mod sql;

View File

@ -1,5 +1,3 @@
//! The main query planners of InfluxDB IOx
/// Plans queries that originate from the InfluxDB Storage gRPC
/// interface, which are in terms of the InfluxDB Line Protocol data
/// model (the `ParsedLine` structures) and provides an interface to query
@ -29,10 +27,3 @@ pub struct InfluxRPCPlanner {
// Result<StringSetPlan>; async fn tag_column_names(&self, database: impl Database, predicate:
// Predicate) -> Result<StringSetPlan>; ...
}
/// Plans queries as SQL against databases
#[derive(Debug)]
pub struct SQLQueryPlanner {
// Example methods:
//async fn query(&self, database: &impl Database, query: &str) -> Result<Vec<RecordBatch>>;
}

124
query/src/frontend/sql.rs Normal file
View File

@ -0,0 +1,124 @@
use std::sync::Arc;
use snafu::{ResultExt, Snafu};
use crate::{exec::Executor, Database};
use arrow_deps::datafusion::{
datasource::MemTable, error::DataFusionError, physical_plan::ExecutionPlan,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error preparing query {}", source))]
Preparing { source: crate::exec::context::Error },
#[snafu(display("Invalid sql query: {} : {}", query, source))]
InvalidSqlQuery {
query: String,
source: sqlparser::parser::ParserError,
},
#[snafu(display("Unsupported SQL statement in query {}: {}", query, statement))]
UnsupportedStatement {
query: String,
statement: Box<Statement>,
},
#[snafu(display("Internal Error creating memtable for table {}: {}", table, source))]
InternalMemTableCreation {
table: String,
source: DataFusionError,
},
#[snafu(display("Internal error converting table to arrow {}: {}", table, source))]
InternalTableConversion {
table: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// This struct can create plans for running SQL queries against databases
#[derive(Debug, Default)]
pub struct SQLQueryPlanner {}
impl SQLQueryPlanner {
/// Plan a SQL query against the data in `database`, and return a
/// DataFusion physical execution plan. The plan can then be
/// executed using `executor` in a streaming fashion.
pub async fn query<D: Database>(
&self,
database: &D,
query: &str,
executor: &Executor,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut ctx = executor.new_context();
// figure out the table names that appear in the sql
let table_names = table_names(query)?;
// Register a table provider for each table so DataFusion
// knows what the schema of that table is and how to obtain
// its data when needed.
for table in table_names {
// TODO: make our own struct that implements
// TableProvider, type so we can take advantage of
// datafusion predicate and selection pushdown. For now,
// use a Memtable provider (which requires materializing
// the entire table here)
let data = database.table_to_arrow(&table, &[]).await.map_err(|e| {
Error::InternalTableConversion {
table: table.clone(),
source: Box::new(e),
}
})?;
let schema = data[0].schema().clone();
let provider = Box::new(
MemTable::try_new(schema, vec![data])
.context(InternalMemTableCreation { table: &table })?,
);
ctx.inner_mut().register_table(&table, provider);
}
ctx.prepare_sql(query).await.context(Preparing)
}
}
use sqlparser::{
ast::{SetExpr, Statement, TableFactor},
dialect::GenericDialect,
parser::Parser,
};
/// return a list of table names that appear in the query
/// TODO find some way to avoid using sql parser direcly here
fn table_names(query: &str) -> Result<Vec<String>> {
let mut tables = vec![];
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, query).context(InvalidSqlQuery { query })?;
for statement in ast {
match statement {
Statement::Query(q) => {
if let SetExpr::Select(q) = q.body {
for item in q.from {
if let TableFactor::Table { name, .. } = item.relation {
tables.push(name.to_string());
}
}
}
}
_ => {
return UnsupportedStatement {
query: query.to_string(),
statement,
}
.fail()
}
}
}
Ok(tables)
}

View File

@ -8,15 +8,15 @@
use arrow_deps::arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::{data::ReplicatedWrite, partition_metadata::Table as TableStats};
use exec::{FieldListPlan, SeriesSetPlans, StringSetPlan};
use exec::{Executor, FieldListPlan, SeriesSetPlans, StringSetPlan};
use std::{fmt::Debug, sync::Arc};
pub mod exec;
pub mod frontend;
pub mod func;
pub mod group_by;
pub mod id;
pub mod planner;
pub mod predicate;
pub mod util;
@ -39,6 +39,27 @@ pub trait Database: Debug + Send + Sync {
/// write ahead log.
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error>;
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
async fn table_to_arrow(
&self,
table_name: &str,
columns: &[&str],
) -> Result<Vec<RecordBatch>, Self::Error>;
/// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
/// Return the table names that are in a given partition key
async fn table_names_for_partition(
&self,
partition_key: &str,
) -> Result<Vec<String>, Self::Error>;
// ----------
// The functions below are slated for removal
// ---------
/// Returns a plan that lists the names of tables in this
/// database that have at least one row that matches the
/// conditions listed on `predicate`
@ -89,32 +110,6 @@ pub trait Database: Debug + Send + Sync {
) -> Result<SeriesSetPlans, Self::Error>;
}
#[async_trait]
pub trait SQLDatabase: Debug + Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
/// Execute the specified query and return arrow record batches with the
/// result
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error>;
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
async fn table_to_arrow(
&self,
table_name: &str,
columns: &[&str],
) -> Result<Vec<RecordBatch>, Self::Error>;
/// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
/// Return the table names that are in a given partition key
async fn table_names_for_partition(
&self,
partition_key: &str,
) -> Result<Vec<String>, Self::Error>;
}
/// Collection of data that shares the same partition key
pub trait PartitionChunk: Debug + Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
@ -142,7 +137,7 @@ pub trait PartitionChunk: Debug + Send + Sync {
/// Storage for `Databases` which can be retrieved by name
pub trait DatabaseStore: Debug + Send + Sync {
/// The type of database that is stored by this DatabaseStore
type Database: Database + SQLDatabase;
type Database: Database;
/// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static;
@ -154,6 +149,10 @@ pub trait DatabaseStore: Debug + Send + Sync {
/// Retrieve the database specified by `name`, creating it if it
/// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>;
/// Provide a query executor to use for running queries on
/// databases in this `DatabaseStore`
fn executor(&self) -> Arc<Executor>;
}
// Note: I would like to compile this module only in the 'test' cfg,

View File

@ -3,14 +3,14 @@
use arrow_deps::arrow::record_batch::RecordBatch;
use crate::group_by::GroupByAndAggregate;
use crate::{exec::Executor, group_by::GroupByAndAggregate};
use crate::{
exec::FieldListPlan,
exec::{
stringset::{StringSet, StringSetRef},
SeriesSetPlans, StringSetPlan,
},
Database, DatabaseStore, PartitionChunk, Predicate, SQLDatabase, TimestampRange,
Database, DatabaseStore, PartitionChunk, Predicate, TimestampRange,
};
use data_types::{
@ -408,30 +408,6 @@ impl Database for TestDatabase {
message: "No saved query_groups in TestDatabase",
})
}
}
#[async_trait]
impl SQLDatabase for TestDatabase {
type Error = TestError;
/// Execute the specified query and return arrow record batches with the
/// result
async fn query(&self, _query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!("query Not yet implemented");
}
/// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
unimplemented!("partition_keys not yet implemented for test database");
}
/// Return the table names that are in a given partition key
async fn table_names_for_partition(
&self,
_partition_key: &str,
) -> Result<Vec<String>, Self::Error> {
unimplemented!("table_names_for_partition not yet implemented for test database");
}
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
@ -442,6 +418,19 @@ impl SQLDatabase for TestDatabase {
) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!()
}
/// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
unimplemented!("partition_keys not yet for test database");
}
/// Return the table names that are in a given partition key
async fn table_names_for_partition(
&self,
_partition_key: &str,
) -> Result<Vec<String>, Self::Error> {
unimplemented!("table_names_for_partition not implemented for test database");
}
}
#[derive(Debug)]
@ -475,6 +464,7 @@ impl PartitionChunk for TestChunk {
#[derive(Debug)]
pub struct TestDatabaseStore {
databases: Mutex<BTreeMap<String, Arc<TestDatabase>>>,
executor: Arc<Executor>,
}
impl TestDatabaseStore {
@ -496,6 +486,7 @@ impl Default for TestDatabaseStore {
fn default() -> Self {
Self {
databases: Mutex::new(BTreeMap::new()),
executor: Arc::new(Executor::new()),
}
}
}
@ -524,6 +515,10 @@ impl DatabaseStore for TestDatabaseStore {
Ok(new_db)
}
}
fn executor(&self) -> Arc<Executor> {
self.executor.clone()
}
}
/// Helper for writing line protocol data directly into test databases

View File

@ -9,7 +9,7 @@ use std::sync::{
use async_trait::async_trait;
use data_types::{data::ReplicatedWrite, database_rules::DatabaseRules};
use mutable_buffer::MutableBufferDb;
use query::{Database, PartitionChunk, SQLDatabase};
use query::{Database, PartitionChunk};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
@ -241,23 +241,6 @@ impl Database for Db {
.await
.context(MutableBufferRead)
}
}
#[async_trait]
impl SQLDatabase for Db {
type Error = Error;
async fn query(
&self,
query: &str,
) -> Result<Vec<arrow_deps::arrow::record_batch::RecordBatch>, Self::Error> {
self.local_store
.as_ref()
.context(DatabaseNotReadable)?
.query(query)
.await
.context(MutableBufferRead)
}
async fn table_to_arrow(
&self,

View File

@ -9,7 +9,6 @@ use std::{
};
use crate::db::Db;
use arrow_deps::arrow::record_batch::RecordBatch;
use data_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, HostGroup, HostGroupId, MatchTables},
@ -18,7 +17,7 @@ use data_types::{
use influxdb_line_protocol::ParsedLine;
use mutable_buffer::MutableBufferDb;
use object_store::{path::ObjectStorePath, ObjectStore};
use query::{Database, DatabaseStore, SQLDatabase};
use query::{exec::Executor, Database, DatabaseStore};
use async_trait::async_trait;
use bytes::Bytes;
@ -78,6 +77,7 @@ pub struct Server<M: ConnectionManager> {
config: RwLock<Config>,
connection_manager: Arc<M>,
pub store: Arc<ObjectStore>,
executor: Arc<Executor>,
}
#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
@ -93,6 +93,7 @@ impl<M: ConnectionManager> Server<M> {
config: RwLock::new(Config::default()),
store,
connection_manager: Arc::new(connection_manager),
executor: Arc::new(Executor::new()),
}
}
@ -227,26 +228,6 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// Executes a query against the local write buffer database, if one exists.
pub async fn query_local(&self, db_name: &str, query: &str) -> Result<Vec<RecordBatch>> {
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let config = self.config.read().await;
let db = config
.databases
.get(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
let buff = db
.local_store
.as_ref()
.context(NoLocalBuffer { db: &*db_name })?;
buff.query(query)
.await
.map_err(|e| Box::new(e) as DatabaseError)
.context(UnknownDatabaseError {})
}
pub async fn handle_replicated_write(
&self,
db_name: &DatabaseName<'_>,
@ -329,7 +310,10 @@ impl<M: ConnectionManager> Server<M> {
}
#[async_trait]
impl DatabaseStore for Server<ConnectionManagerImpl> {
impl<M> DatabaseStore for Server<M>
where
M: ConnectionManager + std::fmt::Debug + Send + Sync,
{
type Database = Db;
type Error = Error;
@ -361,6 +345,10 @@ impl DatabaseStore for Server<ConnectionManagerImpl> {
Ok(db)
}
fn executor(&self) -> Arc<Executor> {
self.executor.clone()
}
}
/// The `Server` will ask the `ConnectionManager` for connections to a specific
@ -433,12 +421,13 @@ fn config_location(id: u32) -> ObjectStorePath {
#[cfg(test)]
mod tests {
use super::*;
use arrow_deps::arrow::{csv, util::string_writer::StringWriter};
use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect};
use async_trait::async_trait;
use data_types::database_rules::{MatchTables, Matcher, Subscription};
use futures::TryStreamExt;
use influxdb_line_protocol::parse_lines;
use object_store::{memory::InMemory, ObjectStoreIntegration};
use query::frontend::sql::SQLQueryPlanner;
use snafu::Snafu;
use std::sync::Mutex;
@ -513,19 +502,27 @@ mod tests {
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
server.write_lines("foo", &lines).await.unwrap();
let results = server
.query_local("foo", "select * from cpu")
let db_name = DatabaseName::new("foo").unwrap();
let db = server.db(&db_name).await.unwrap();
let buff = db.local_store.as_ref().unwrap();
let planner = SQLQueryPlanner::default();
let executor = server.executor();
let physical_plan = planner
.query(buff.as_ref(), "select * from cpu", executor.as_ref())
.await
.unwrap();
let mut sw = StringWriter::new();
{
let mut writer = csv::Writer::new(&mut sw);
for r in results {
writer.write(&r).unwrap();
}
}
assert_eq!(&sw.to_string(), "bar,time\n1.0,10\n");
let batches = collect(physical_plan).await.unwrap();
let expected = vec![
"+-----+------+",
"| bar | time |",
"+-----+------+",
"| 1 | 10 |",
"+-----+------+",
];
assert_table_eq!(expected, &batches);
Ok(())
}
@ -723,6 +720,7 @@ partition_key:
General { message: String },
}
#[derive(Debug)]
struct TestConnectionManager {
remotes: BTreeMap<String, Arc<TestRemoteServer>>,
}
@ -745,7 +743,7 @@ partition_key:
}
}
#[derive(Default)]
#[derive(Debug, Default)]
struct TestRemoteServer {
writes: Mutex<BTreeMap<String, Vec<ReplicatedWrite>>>,
}

View File

@ -11,7 +11,6 @@ use server::server::{ConnectionManagerImpl as ConnectionManager, Server as AppSe
use hyper::Server;
use object_store::{self, gcp::GoogleCloudStorage, ObjectStore};
use query::exec::Executor as QueryExecutor;
use snafu::{ResultExt, Snafu};
@ -112,9 +111,6 @@ pub async fn main(logging_level: LoggingLevel, ignore_config_file: bool) -> Resu
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}
// Fire up the query executor
let executor = Arc::new(QueryExecutor::default());
// Construct and start up gRPC server
let grpc_bind_addr = config.grpc_bind_address;
@ -122,7 +118,7 @@ pub async fn main(logging_level: LoggingLevel, ignore_config_file: bool) -> Resu
.await
.context(StartListeningGrpc { grpc_bind_addr })?;
let grpc_server = service::make_server(socket, app_server.clone(), executor);
let grpc_server = service::make_server(socket, app_server.clone());
info!(bind_address=?grpc_bind_addr, "gRPC server listening");

View File

@ -12,11 +12,11 @@
use super::{org_and_bucket_to_database, OrgBucketMappingError};
// Influx crates
use arrow_deps::arrow;
use arrow_deps::{arrow, datafusion::physical_plan::collect};
use data_types::{database_rules::DatabaseRules, DatabaseName};
use influxdb_line_protocol::parse_lines;
use object_store::path::ObjectStorePath;
use query::SQLDatabase;
use query::{frontend::sql::SQLQueryPlanner, Database, DatabaseStore};
use server::server::{ConnectionManager, Server as AppServer};
// External crates
@ -60,13 +60,15 @@ pub enum ApplicationError {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display(
"Internal error reading points from database {}: {}",
database,
source
))]
#[snafu(display("Error planning query {}: {}", query, source))]
PlanningSQLQuery {
query: String,
source: query::frontend::sql::Error,
},
#[snafu(display("Internal error reading points from database {}: {}", db_name, source))]
Query {
database: String,
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
@ -147,6 +149,7 @@ impl ApplicationError {
Self::BucketByName { .. } => self.internal_error(),
Self::BucketMappingError { .. } => self.internal_error(),
Self::WritingPoints { .. } => self.internal_error(),
Self::PlanningSQLQuery { .. } => self.bad_request(),
Self::Query { .. } => self.internal_error(),
Self::QueryError { .. } => self.bad_request(),
Self::BucketNotFound { .. } => self.not_found(),
@ -406,6 +409,9 @@ async fn read<M: ConnectionManager + Send + Sync + Debug + 'static>(
query_string: query,
})?;
let planner = SQLQueryPlanner::default();
let executor = server.executor();
let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket)
.context(BucketMappingError)?;
@ -414,12 +420,17 @@ async fn read<M: ConnectionManager + Send + Sync + Debug + 'static>(
bucket: read_info.bucket.clone(),
})?;
let results = db
.query(&read_info.sql_query)
let physical_plan = planner
.query(db.as_ref(), &read_info.sql_query, executor.as_ref())
.await
.context(PlanningSQLQuery { query })?;
let batches = collect(physical_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(QueryError {})?;
let results = arrow::util::pretty::pretty_format_batches(&results).unwrap();
.context(Query { db_name })?;
let results = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
Ok(Response::new(Body::from(results.into_bytes())))
}
@ -657,7 +668,9 @@ mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use arrow_deps::{arrow::record_batch::RecordBatch, assert_table_eq};
use http::header;
use query::exec::Executor;
use reqwest::{Client, Response};
use hyper::Server;
@ -665,7 +678,7 @@ mod tests {
use data_types::database_rules::DatabaseRules;
use data_types::DatabaseName;
use object_store::{memory::InMemory, ObjectStore};
use server::server::ConnectionManagerImpl;
use server::{db::Db, server::ConnectionManagerImpl};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -727,22 +740,15 @@ mod tests {
.await
.expect("Database exists");
let results = test_db
.query("select * from h2o_temperature")
.await
.unwrap();
let results_str = arrow::util::pretty::pretty_format_batches(&results).unwrap();
let results: Vec<_> = results_str.split('\n').collect();
let batches = run_query(test_db.as_ref(), "select * from h2o_temperature").await;
let expected = vec![
"+----------------+--------------+-------+-----------------+------------+",
"| bottom_degrees | location | state | surface_degrees | time |",
"+----------------+--------------+-------+-----------------+------------+",
"| 50.4 | santa_monica | CA | 65.2 | 1568756160 |",
"+----------------+--------------+-------+-----------------+------------+",
"",
];
assert_eq!(results, expected);
assert_table_eq!(expected, &batches);
Ok(())
}
@ -796,12 +802,7 @@ mod tests {
.await
.expect("Database exists");
let results = test_db
.query("select * from h2o_temperature")
.await
.unwrap();
let results_str = arrow::util::pretty::pretty_format_batches(&results).unwrap();
let results: Vec<_> = results_str.split('\n').collect();
let batches = run_query(test_db.as_ref(), "select * from h2o_temperature").await;
let expected = vec![
"+----------------+--------------+-------+-----------------+------------+",
@ -809,9 +810,8 @@ mod tests {
"+----------------+--------------+-------+-----------------+------------+",
"| 50.4 | santa_monica | CA | 65.2 | 1568756160 |",
"+----------------+--------------+-------+-----------------+------------+",
"",
];
assert_eq!(results, expected);
assert_table_eq!(expected, &batches);
Ok(())
}
@ -914,4 +914,13 @@ mod tests {
println!("Started server at {}", server_url);
server_url
}
/// Run the specified SQL query and return formatted results as a string
async fn run_query(db: &Db, query: &str) -> Vec<RecordBatch> {
let planner = SQLQueryPlanner::default();
let executor = Executor::new();
let physical_plan = planner.query(db, query, &executor).await.unwrap();
collect(physical_plan).await.unwrap()
}
}

View File

@ -29,10 +29,7 @@ use crate::server::rpc::input::GrpcInputs;
use data_types::DatabaseName;
use query::{
exec::{
seriesset::{Error as SeriesSetError, SeriesSetItem},
Executor as QueryExecutor,
},
exec::seriesset::{Error as SeriesSetError, SeriesSetItem},
predicate::PredicateBuilder,
Database, DatabaseStore,
};
@ -227,7 +224,6 @@ impl Error {
#[derive(Debug)]
pub struct GrpcService<T: DatabaseStore> {
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
}
impl<T> GrpcService<T>
@ -235,8 +231,8 @@ where
T: DatabaseStore + 'static,
{
/// Create a new GrpcService connected to `db_store`
pub fn new(db_store: Arc<T>, executor: Arc<QueryExecutor>) -> Self {
Self { db_store, executor }
pub fn new(db_store: Arc<T>) -> Self {
Self { db_store }
}
}
@ -286,14 +282,7 @@ where
predicate.loggable()
);
read_filter_impl(
tx.clone(),
self.db_store.clone(),
self.executor.clone(),
db_name,
range,
predicate,
)
read_filter_impl(tx.clone(), self.db_store.clone(), db_name, range, predicate)
.await
.map_err(|e| e.to_status())?;
@ -347,7 +336,6 @@ where
query_group_impl(
tx.clone(),
self.db_store.clone(),
self.executor.clone(),
db_name,
range,
predicate,
@ -398,7 +386,6 @@ where
query_group_impl(
tx.clone(),
self.db_store.clone(),
self.executor.clone(),
db_name,
range,
predicate,
@ -439,7 +426,6 @@ where
let response = tag_keys_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
measurement,
range,
@ -489,8 +475,7 @@ where
unimplemented!("tag_value for a measurement, with general predicate");
}
measurement_name_impl(self.db_store.clone(), self.executor.clone(), db_name, range)
.await
measurement_name_impl(self.db_store.clone(), db_name, range).await
} else if tag_key.is_field() {
info!(
"tag_values with tag_key=[xff] (field name) for database {}, range: {:?}, predicate: {} --> returning fields",
@ -498,15 +483,8 @@ where
predicate.loggable()
);
let fieldlist = field_names_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
None,
range,
predicate,
)
.await?;
let fieldlist =
field_names_impl(self.db_store.clone(), db_name, None, range, predicate).await?;
// Pick out the field names into a Vec<Vec<u8>>for return
let values = fieldlist
@ -529,7 +507,6 @@ where
tag_values_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
tag_key,
measurement,
@ -629,8 +606,7 @@ where
predicate.loggable()
);
let response =
measurement_name_impl(self.db_store.clone(), self.executor.clone(), db_name, range)
let response = measurement_name_impl(self.db_store.clone(), db_name, range)
.await
.map_err(|e| e.to_status());
@ -672,7 +648,6 @@ where
let response = tag_keys_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
measurement,
range,
@ -718,7 +693,6 @@ where
let response = tag_values_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
tag_key,
measurement,
@ -765,7 +739,6 @@ where
let response = field_names_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
measurement,
range,
@ -815,7 +788,6 @@ fn get_database_name(input: &impl GrpcInputs) -> Result<DatabaseName<'static>, S
/// (optional) range
async fn measurement_name_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
db_name: DatabaseName<'static>,
range: Option<TimestampRange>,
) -> Result<StringValuesResponse>
@ -835,6 +807,8 @@ where
source: Box::new(e),
})?;
let executor = db_store.executor();
let table_names = executor
.to_string_set(plan)
.await
@ -856,7 +830,6 @@ where
/// predicates
async fn tag_keys_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
db_name: DatabaseName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -881,6 +854,8 @@ where
.await
.context(DatabaseNotFound { db_name: &*db_name })?;
let executor = db_store.executor();
let tag_key_plan = db
.tag_column_names(predicate)
.await
@ -911,7 +886,6 @@ where
/// arbitratry predicates
async fn tag_values_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
db_name: DatabaseName<'static>,
tag_name: String,
measurement: Option<String>,
@ -937,6 +911,8 @@ where
.await
.context(DatabaseNotFound { db_name: &*db_name })?;
let executor = db_store.executor();
let tag_value_plan =
db.column_values(&tag_name, predicate)
.await
@ -973,7 +949,6 @@ where
async fn read_filter_impl<'a, T>(
tx: mpsc::Sender<Result<ReadResponse, Status>>,
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
db_name: DatabaseName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
@ -996,6 +971,8 @@ where
.await
.context(DatabaseNotFound { db_name: &*db_name })?;
let executor = db_store.executor();
let series_plan =
db.query_series(predicate)
.await
@ -1055,7 +1032,6 @@ async fn convert_series_set(
async fn query_group_impl<T>(
tx: mpsc::Sender<Result<ReadResponse, Status>>,
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
db_name: DatabaseName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
@ -1079,6 +1055,8 @@ where
.await
.context(DatabaseNotFound { db_name: &*db_name })?;
let executor = db_store.executor();
let grouped_series_set_plan =
db.query_groups(predicate, gby_agg)
.await
@ -1116,7 +1094,6 @@ where
/// predicate
async fn field_names_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
db_name: DatabaseName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -1141,6 +1118,8 @@ where
.await
.context(DatabaseNotFound { db_name: &*db_name })?;
let executor = db_store.executor();
let fieldlist_plan =
db.field_column_names(predicate)
.await
@ -1165,23 +1144,13 @@ where
/// implementing the IOx and Storage gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has
/// shutdown.
pub async fn make_server<T>(
socket: TcpListener,
storage: Arc<T>,
executor: Arc<QueryExecutor>,
) -> Result<()>
pub async fn make_server<T>(socket: TcpListener, storage: Arc<T>) -> Result<()>
where
T: DatabaseStore + 'static,
{
tonic::transport::Server::builder()
.add_service(IOxTestingServer::new(GrpcService::new(
storage.clone(),
executor.clone(),
)))
.add_service(StorageServer::new(GrpcService::new(
storage.clone(),
executor.clone(),
)))
.add_service(IOxTestingServer::new(GrpcService::new(storage.clone())))
.add_service(StorageServer::new(GrpcService::new(storage.clone())))
.serve_with_incoming(socket)
.await
.context(ServerError {})
@ -2590,7 +2559,6 @@ mod tests {
iox_client: IOxTestingClient,
storage_client: StorageClientWrapper,
test_storage: Arc<TestDatabaseStore>,
_test_executor: Arc<QueryExecutor>,
}
impl Fixture {
@ -2598,7 +2566,6 @@ mod tests {
/// a fixture with the test server and clients
async fn new() -> Result<Self, FixtureError> {
let test_storage = Arc::new(TestDatabaseStore::new());
let test_executor = Arc::new(QueryExecutor::default());
// Get a random port from the kernel by asking for port 0.
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
@ -2611,7 +2578,7 @@ mod tests {
println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr);
let server = make_server(socket, test_storage.clone(), test_executor.clone());
let server = make_server(socket, test_storage.clone());
tokio::task::spawn(server);
let iox_client = connect_to_server::<IOxTestingClient>(bind_addr)
@ -2627,7 +2594,6 @@ mod tests {
iox_client,
storage_client,
test_storage,
_test_executor: test_executor,
})
}
}