diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 413fb966e5..5f4353153a 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -33,7 +33,7 @@ use crate::{ predicate::{Predicate, PredicateMatch}, provider::ProviderBuilder, util::schema_has_all_expr_columns, - Database, PartitionChunk, + QueryChunk, QueryDatabase, }; #[derive(Debug, Snafu)] @@ -191,7 +191,7 @@ impl InfluxRpcPlanner { /// conditions listed on `predicate` pub fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { let mut builder = StringSetPlanBuilder::new(); @@ -233,7 +233,7 @@ impl InfluxRpcPlanner { /// conditions specified by `predicate`. pub fn tag_keys<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { debug!(predicate=?predicate, "planning tag_keys"); @@ -346,7 +346,7 @@ impl InfluxRpcPlanner { predicate: Predicate, ) -> Result<StringSetPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { debug!(predicate=?predicate, tag_name, "planning tag_values"); @@ -495,7 +495,7 @@ impl InfluxRpcPlanner { /// specified by `predicate`. pub fn field_columns<D>(&self, database: &D, predicate: Predicate) -> Result<FieldListPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { debug!(predicate=?predicate, "planning field_columns"); @@ -539,7 +539,7 @@ impl InfluxRpcPlanner { /// same) occur together in the plan pub fn read_filter<D>(&self, database: &D, predicate: Predicate) -> Result<SeriesSetPlans> where - D: Database + 'static, + D: QueryDatabase + 'static, { debug!(predicate=?predicate, "planning read_filter"); @@ -575,7 +575,7 @@ impl InfluxRpcPlanner { group_columns: &[impl AsRef<str>], ) -> Result<SeriesSetPlans> where - D: Database + 'static, + D: QueryDatabase + 'static, { debug!(predicate=?predicate, agg=?agg, "planning read_group"); @@ -615,7 +615,7 @@ impl InfluxRpcPlanner { offset: WindowDuration, ) -> Result<SeriesSetPlans> where - D: Database + 'static, + D: QueryDatabase + 'static, { debug!(predicate=?predicate, "planning read_window_aggregate"); @@ -644,7 +644,7 @@ impl InfluxRpcPlanner { chunks: Vec<Arc<C>>, ) -> Result<BTreeMap<String, Vec<Arc<C>>>> where - C: PartitionChunk + 'static, + C: QueryChunk + 'static, { let mut table_chunks = BTreeMap::new(); for chunk in chunks { @@ -691,7 +691,7 @@ impl InfluxRpcPlanner { chunks: Vec<Arc<C>>, ) -> Result<Option<StringSetPlan>> where - C: PartitionChunk + 'static, + C: QueryChunk + 'static, { let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; @@ -753,7 +753,7 @@ impl InfluxRpcPlanner { chunks: Vec<Arc<C>>, ) -> Result<Option<LogicalPlan>> where - C: PartitionChunk + 'static, + C: QueryChunk + 'static, { let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; let TableScanAndFilter { @@ -804,7 +804,7 @@ impl InfluxRpcPlanner { chunks: Vec<Arc<C>>, ) -> Result<Option<SeriesSetPlan>> where - C: PartitionChunk + 'static, + C: QueryChunk + 'static, { let table_name = table_name.as_ref(); let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; @@ -925,7 +925,7 @@ impl InfluxRpcPlanner { chunks: Vec<Arc<C>>, ) -> Result<Option<SeriesSetPlan>> where - C: PartitionChunk + 'static, + C: QueryChunk + 'static, { let table_name = table_name.into(); let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; @@ -1016,7 +1016,7 @@ impl InfluxRpcPlanner { chunks: Vec<Arc<C>>, ) -> Result<Option<SeriesSetPlan>> where - C: PartitionChunk + 'static, + C: QueryChunk + 'static, { let table_name = table_name.into(); let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; @@ -1100,7 +1100,7 @@ impl InfluxRpcPlanner { chunks: Vec<Arc<C>>, ) -> Result<Option<TableScanAndFilter>> where - C: PartitionChunk + 'static, + C: QueryChunk + 'static, { // Scan all columns to begin with (DataFusion projection // push-down optimization will prune out unneeded columns later) diff --git a/query/src/lib.rs b/query/src/lib.rs index 66b6226b3d..dc18ec819b 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -40,9 +40,9 @@ use self::predicate::Predicate; /// /// TODO: Move all Query and Line Protocol specific things out of this /// trait and into the various query planners. -pub trait Database: Debug + Send + Sync { +pub trait QueryDatabase: Debug + Send + Sync { type Error: std::error::Error + Send + Sync + 'static; - type Chunk: PartitionChunk; + type Chunk: QueryChunk; /// Return the partition keys for data in this DB fn partition_keys(&self) -> Result<Vec<String>, Self::Error>; @@ -57,7 +57,7 @@ pub trait Database: Debug + Send + Sync { } /// Collection of data that shares the same partition key -pub trait PartitionChunk: Prunable + Debug + Send + Sync { +pub trait QueryChunk: Prunable + Debug + Send + Sync { type Error: std::error::Error + Send + Sync + 'static; /// returns the Id of this chunk. Ids are unique within a @@ -105,7 +105,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { /// selection refers to columns that do not exist. fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error>; - /// Provides access to raw `PartitionChunk` data as an + /// Provides access to raw `QueryChunk` data as an /// asynchronous stream of `RecordBatch`es filtered by a *required* /// predicate. Note that not all chunks can evaluate all types of /// predicates and this function will return an error @@ -117,7 +117,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { /// directly is that the data for a particular Table lives in /// several chunks within a partition, so there needs to be an /// implementation of `TableProvider` that stitches together the - /// streams from several different `PartitionChunks`. + /// streams from several different `QueryChunk`s. fn read_filter( &self, predicate: &Predicate, @@ -132,7 +132,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { /// Storage for `Databases` which can be retrieved by name pub trait DatabaseStore: Debug + Send + Sync { /// The type of database that is stored by this DatabaseStore - type Database: Database; + type Database: QueryDatabase; /// The type of error this DataBase store generates type Error: std::error::Error + Send + Sync + 'static; diff --git a/query/src/provider.rs b/query/src/provider.rs index 4bfbc39c47..7dd0fce09e 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -1,4 +1,4 @@ -//! Implementation of a DataFusion `TableProvider` in terms of `PartitionChunk`s +//! Implementation of a DataFusion `TableProvider` in terms of `QueryChunk`s use std::sync::Arc; @@ -22,7 +22,7 @@ use crate::{ duplicate::group_potential_duplicates, predicate::{Predicate, PredicateBuilder}, util::{arrow_pk_sort_exprs, project_schema}, - PartitionChunk, + QueryChunk, }; use snafu::{ResultExt, Snafu}; @@ -79,16 +79,16 @@ impl From<Error> for DataFusionError { } /// Something that can prune chunks based on their metadata -pub trait ChunkPruner<C: PartitionChunk>: Sync + Send + std::fmt::Debug { +pub trait ChunkPruner<C: QueryChunk>: Sync + Send + std::fmt::Debug { /// prune `chunks`, if possible, based on predicate. fn prune_chunks(&self, chunks: Vec<Arc<C>>, predicate: &Predicate) -> Vec<Arc<C>>; } -/// Builds a `ChunkTableProvider` from a series of `PartitionChunk`s +/// Builds a `ChunkTableProvider` from a series of `QueryChunk`s /// and ensures the schema across the chunks is compatible and /// consistent. #[derive(Debug)] -pub struct ProviderBuilder<C: PartitionChunk + 'static> { +pub struct ProviderBuilder<C: QueryChunk + 'static> { table_name: Arc<str>, schema_merger: SchemaMerger, chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>, @@ -98,7 +98,7 @@ pub struct ProviderBuilder<C: PartitionChunk + 'static> { finished: bool, } -impl<C: PartitionChunk> ProviderBuilder<C> { +impl<C: QueryChunk> ProviderBuilder<C> { pub fn new(table_name: impl AsRef<str>) -> Self { Self { table_name: Arc::from(table_name.as_ref()), @@ -178,12 +178,12 @@ impl<C: PartitionChunk> ProviderBuilder<C> { } } -/// Implementation of a DataFusion TableProvider in terms of PartitionChunks +/// Implementation of a DataFusion TableProvider in terms of QueryChunks /// /// This allows DataFusion to see data from Chunks as a single table, as well as /// push predicates and selections down to chunks #[derive(Debug)] -pub struct ChunkTableProvider<C: PartitionChunk + 'static> { +pub struct ChunkTableProvider<C: QueryChunk + 'static> { table_name: Arc<str>, /// The IOx schema (wrapper around Arrow Schemaref) for this table iox_schema: Schema, @@ -193,7 +193,7 @@ pub struct ChunkTableProvider<C: PartitionChunk + 'static> { chunks: Vec<Arc<C>>, } -impl<C: PartitionChunk + 'static> ChunkTableProvider<C> { +impl<C: QueryChunk + 'static> ChunkTableProvider<C> { /// Return the IOx schema view for the data provided by this provider pub fn iox_schema(&self) -> Schema { self.iox_schema.clone() @@ -205,7 +205,7 @@ impl<C: PartitionChunk + 'static> ChunkTableProvider<C> { } } -impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> { +impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> { fn as_any(&self) -> &dyn std::any::Any { self } @@ -272,7 +272,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> { #[derive(Clone, Debug, Default)] /// A deduplicater that deduplicate the duplicated data during scan execution -pub(crate) struct Deduplicater<C: PartitionChunk + 'static> { +pub(crate) struct Deduplicater<C: QueryChunk + 'static> { // a vector of a vector of overlapped chunks pub overlapped_chunks_set: Vec<Vec<Arc<C>>>, @@ -283,7 +283,7 @@ pub(crate) struct Deduplicater<C: PartitionChunk + 'static> { pub no_duplicates_chunks: Vec<Arc<C>>, } -impl<C: PartitionChunk + 'static> Deduplicater<C> { +impl<C: QueryChunk + 'static> Deduplicater<C> { fn new() -> Self { Self { overlapped_chunks_set: vec![], @@ -670,7 +670,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> { #[derive(Debug)] /// A pruner that does not do pruning (suitable if no additional pruning is possible) struct NoOpPruner {} -impl<C: PartitionChunk> ChunkPruner<C> for NoOpPruner { +impl<C: QueryChunk> ChunkPruner<C> for NoOpPruner { fn prune_chunks(&self, chunks: Vec<Arc<C>>, _predicate: &Predicate) -> Vec<Arc<C>> { chunks } diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index 08c8214d5c..61df503d7f 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -9,7 +9,7 @@ use datafusion::{ }; use internal_types::{schema::Schema, selection::Selection}; -use crate::{predicate::Predicate, PartitionChunk}; +use crate::{predicate::Predicate, QueryChunk}; use async_trait::async_trait; @@ -17,7 +17,7 @@ use super::adapter::SchemaAdapterStream; /// Implements the DataFusion physical plan interface #[derive(Debug)] -pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> { +pub(crate) struct IOxReadFilterNode<C: QueryChunk + 'static> { table_name: Arc<str>, /// The desired output schema (includes selection_ /// note that the chunk may not have all these columns. @@ -26,7 +26,7 @@ pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> { predicate: Predicate, } -impl<C: PartitionChunk + 'static> IOxReadFilterNode<C> { +impl<C: QueryChunk + 'static> IOxReadFilterNode<C> { pub fn new( table_name: Arc<str>, schema: SchemaRef, @@ -43,7 +43,7 @@ impl<C: PartitionChunk + 'static> IOxReadFilterNode<C> { } #[async_trait] -impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> { +impl<C: QueryChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> { fn as_any(&self) -> &dyn std::any::Any { self } diff --git a/query/src/test.rs b/query/src/test.rs index 07f5f80757..40bfac9897 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -16,7 +16,7 @@ use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBa use crate::{ exec::stringset::{StringSet, StringSetRef}, - Database, DatabaseStore, PartitionChunk, Predicate, PredicateMatch, + DatabaseStore, Predicate, PredicateMatch, QueryChunk, QueryDatabase, }; use crate::{exec::Executor, pruning::Prunable}; @@ -92,7 +92,7 @@ impl TestDatabase { } } -impl Database for TestDatabase { +impl QueryDatabase for TestDatabase { type Error = TestError; type Chunk = TestChunk; @@ -706,7 +706,7 @@ impl TestChunk { } } -impl PartitionChunk for TestChunk { +impl QueryChunk for TestChunk { type Error = TestError; fn id(&self) -> u32 { diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index 3534471130..99a3fb8526 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -7,7 +7,7 @@ use query::{ exec::stringset::StringSet, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, predicate::PredicateBuilder, - PartitionChunk, + QueryChunk, }; use server::db::test_helpers::write_lp; diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index ace51764d9..08f96c2712 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -1,7 +1,7 @@ //! This module contains testing scenarios for Db #[allow(unused_imports, dead_code, unused_macros)] -use query::PartitionChunk; +use query::QueryChunk; use async_trait::async_trait; diff --git a/query_tests/src/table_schema.rs b/query_tests/src/table_schema.rs index 7fe3b17a75..d3451df4d5 100644 --- a/query_tests/src/table_schema.rs +++ b/query_tests/src/table_schema.rs @@ -2,7 +2,7 @@ use arrow::datatypes::DataType; use internal_types::{schema::builder::SchemaBuilder, selection::Selection}; -use query::{Database, PartitionChunk}; +use query::{QueryChunk, QueryDatabase}; use super::scenarios::*; use query::predicate::PredicateBuilder; diff --git a/server/src/db.rs b/server/src/db.rs index 15a2f544aa..7175abe3b0 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -37,7 +37,7 @@ use parquet_file::{ metadata::IoxMetadata, storage::Storage, }; -use query::{exec::Executor, predicate::Predicate, Database}; +use query::{exec::Executor, predicate::Predicate, QueryDatabase}; use rand_distr::{Distribution, Poisson}; use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; use snafu::{ResultExt, Snafu}; @@ -1060,7 +1060,7 @@ fn check_chunk_closed(chunk: &mut CatalogChunk, mutable_size_threshold: Option<N /// Convenience implementation of `Database` so the rest of the code /// can just use Db as a `Database` even though the implementation /// lives in `catalog_access` -impl Database for Db { +impl QueryDatabase for Db { type Error = Error; type Chunk = DbChunk; @@ -1477,7 +1477,7 @@ mod tests { catalog::test_helpers::assert_catalog_state_implementation, test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, }; - use query::{frontend::sql::SqlQueryPlanner, Database, PartitionChunk}; + use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use std::{ collections::HashSet, convert::TryFrom, diff --git a/server/src/db/access.rs b/server/src/db/access.rs index aa30f24f53..579bf99555 100644 --- a/server/src/db/access.rs +++ b/server/src/db/access.rs @@ -23,13 +23,13 @@ use query::{ predicate::{Predicate, PredicateBuilder}, provider::{self, ChunkPruner, ProviderBuilder}, pruning::Prunable, - PartitionChunk, DEFAULT_SCHEMA, + QueryChunk, DEFAULT_SCHEMA, }; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use query::{ pruning::{prune_chunks, PruningObserver}, - Database, + QueryDatabase, }; /// Metrics related to chunk access (pruning specifically) @@ -181,7 +181,7 @@ impl PruningObserver for ChunkAccess { } #[async_trait] -impl Database for QueryCatalogAccess { +impl QueryDatabase for QueryCatalogAccess { type Error = Error; type Chunk = DbChunk; diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 2ba2d29d8e..5af5d1d3cc 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -19,7 +19,7 @@ use query::{ exec::stringset::StringSet, predicate::{Predicate, PredicateMatch}, pruning::Prunable, - PartitionChunk, + QueryChunk, }; use read_buffer::RBChunk; @@ -206,7 +206,7 @@ impl DbChunk { } } -impl PartitionChunk for DbChunk { +impl QueryChunk for DbChunk { type Error = Error; fn id(&self) -> u32 { diff --git a/server/src/lib.rs b/server/src/lib.rs index 593e140b0b..0664b0092e 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1012,7 +1012,7 @@ mod tests { use influxdb_line_protocol::parse_lines; use metrics::MetricRegistry; use object_store::{memory::InMemory, path::ObjectStorePath}; - use query::{frontend::sql::SqlQueryPlanner, Database}; + use query::{frontend::sql::SqlQueryPlanner, QueryDatabase}; use super::*; use std::sync::atomic::{AtomicBool, Ordering}; diff --git a/server/src/utils.rs b/server/src/utils.rs index e27248b238..fe88e0282e 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -5,7 +5,7 @@ use data_types::{ DatabaseName, }; use object_store::{memory::InMemory, ObjectStore}; -use query::{exec::Executor, Database}; +use query::{exec::Executor, QueryDatabase}; use crate::{ db::{load_or_create_preserved_catalog, Db}, diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index c11ab6919e..e09ae61676 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -18,7 +18,7 @@ use data_types::{ }; use influxdb_iox_client::format::QueryOutputFormat; use influxdb_line_protocol::parse_lines; -use query::Database; +use query::QueryDatabase; use server::{ConnectionManager, Server as AppServer}; // External crates diff --git a/src/influxdb_ioxd/planner.rs b/src/influxdb_ioxd/planner.rs index 6bc520a411..f595f26ebc 100644 --- a/src/influxdb_ioxd/planner.rs +++ b/src/influxdb_ioxd/planner.rs @@ -8,7 +8,7 @@ use query::{ group_by::{Aggregate, WindowDuration}, plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan}, predicate::Predicate, - Database, + QueryDatabase, }; use snafu::{ResultExt, Snafu}; @@ -78,7 +78,7 @@ impl Planner { predicate: Predicate, ) -> Result<StringSetPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { let planner = InfluxRpcPlanner::new(); @@ -96,7 +96,7 @@ impl Planner { /// [`InfluxRpcPlanner::tag_keys`], on a separate threadpool pub async fn tag_keys<D>(&self, database: Arc<D>, predicate: Predicate) -> Result<StringSetPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { let planner = InfluxRpcPlanner::new(); @@ -119,7 +119,7 @@ impl Planner { predicate: Predicate, ) -> Result<StringSetPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { let tag_name = tag_name.into(); let planner = InfluxRpcPlanner::new(); @@ -142,7 +142,7 @@ impl Planner { predicate: Predicate, ) -> Result<FieldListPlan> where - D: Database + 'static, + D: QueryDatabase + 'static, { let planner = InfluxRpcPlanner::new(); @@ -164,7 +164,7 @@ impl Planner { predicate: Predicate, ) -> Result<SeriesSetPlans> where - D: Database + 'static, + D: QueryDatabase + 'static, { let planner = InfluxRpcPlanner::new(); @@ -188,7 +188,7 @@ impl Planner { group_columns: Vec<String>, ) -> Result<SeriesSetPlans> where - D: Database + 'static, + D: QueryDatabase + 'static, { let planner = InfluxRpcPlanner::new(); @@ -213,7 +213,7 @@ impl Planner { offset: WindowDuration, ) -> Result<SeriesSetPlans> where - D: Database + 'static, + D: QueryDatabase + 'static, { let planner = InfluxRpcPlanner::new(); diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 3a648e872a..1f48d83b34 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -8,7 +8,7 @@ use generated_types::google::{ }; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; use observability_deps::tracing::info; -use query::{Database, DatabaseStore}; +use query::{DatabaseStore, QueryDatabase}; use server::{ConnectionManager, Error, Server}; use tonic::{Request, Response, Status};