refactor: Rename query::PartitionChunk --> query::QueryChunk (#1754)

pull/24376/head
Andrew Lamb 2021-06-18 09:24:09 -04:00 committed by GitHub
parent ec43a87909
commit 1c13d676b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 64 additions and 64 deletions

View File

@ -33,7 +33,7 @@ use crate::{
predicate::{Predicate, PredicateMatch},
provider::ProviderBuilder,
util::schema_has_all_expr_columns,
Database, PartitionChunk,
QueryChunk, QueryDatabase,
};
#[derive(Debug, Snafu)]
@ -191,7 +191,7 @@ impl InfluxRpcPlanner {
/// conditions listed on `predicate`
pub fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let mut builder = StringSetPlanBuilder::new();
@ -233,7 +233,7 @@ impl InfluxRpcPlanner {
/// conditions specified by `predicate`.
pub fn tag_keys<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
debug!(predicate=?predicate, "planning tag_keys");
@ -346,7 +346,7 @@ impl InfluxRpcPlanner {
predicate: Predicate,
) -> Result<StringSetPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
debug!(predicate=?predicate, tag_name, "planning tag_values");
@ -495,7 +495,7 @@ impl InfluxRpcPlanner {
/// specified by `predicate`.
pub fn field_columns<D>(&self, database: &D, predicate: Predicate) -> Result<FieldListPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
debug!(predicate=?predicate, "planning field_columns");
@ -539,7 +539,7 @@ impl InfluxRpcPlanner {
/// same) occur together in the plan
pub fn read_filter<D>(&self, database: &D, predicate: Predicate) -> Result<SeriesSetPlans>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
debug!(predicate=?predicate, "planning read_filter");
@ -575,7 +575,7 @@ impl InfluxRpcPlanner {
group_columns: &[impl AsRef<str>],
) -> Result<SeriesSetPlans>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
debug!(predicate=?predicate, agg=?agg, "planning read_group");
@ -615,7 +615,7 @@ impl InfluxRpcPlanner {
offset: WindowDuration,
) -> Result<SeriesSetPlans>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
debug!(predicate=?predicate, "planning read_window_aggregate");
@ -644,7 +644,7 @@ impl InfluxRpcPlanner {
chunks: Vec<Arc<C>>,
) -> Result<BTreeMap<String, Vec<Arc<C>>>>
where
C: PartitionChunk + 'static,
C: QueryChunk + 'static,
{
let mut table_chunks = BTreeMap::new();
for chunk in chunks {
@ -691,7 +691,7 @@ impl InfluxRpcPlanner {
chunks: Vec<Arc<C>>,
) -> Result<Option<StringSetPlan>>
where
C: PartitionChunk + 'static,
C: QueryChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
@ -753,7 +753,7 @@ impl InfluxRpcPlanner {
chunks: Vec<Arc<C>>,
) -> Result<Option<LogicalPlan>>
where
C: PartitionChunk + 'static,
C: QueryChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
let TableScanAndFilter {
@ -804,7 +804,7 @@ impl InfluxRpcPlanner {
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
C: PartitionChunk + 'static,
C: QueryChunk + 'static,
{
let table_name = table_name.as_ref();
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
@ -925,7 +925,7 @@ impl InfluxRpcPlanner {
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
C: PartitionChunk + 'static,
C: QueryChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
@ -1016,7 +1016,7 @@ impl InfluxRpcPlanner {
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
C: PartitionChunk + 'static,
C: QueryChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
@ -1100,7 +1100,7 @@ impl InfluxRpcPlanner {
chunks: Vec<Arc<C>>,
) -> Result<Option<TableScanAndFilter>>
where
C: PartitionChunk + 'static,
C: QueryChunk + 'static,
{
// Scan all columns to begin with (DataFusion projection
// push-down optimization will prune out unneeded columns later)

View File

@ -40,9 +40,9 @@ use self::predicate::Predicate;
///
/// TODO: Move all Query and Line Protocol specific things out of this
/// trait and into the various query planners.
pub trait Database: Debug + Send + Sync {
pub trait QueryDatabase: Debug + Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
type Chunk: PartitionChunk;
type Chunk: QueryChunk;
/// Return the partition keys for data in this DB
fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
@ -57,7 +57,7 @@ pub trait Database: Debug + Send + Sync {
}
/// Collection of data that shares the same partition key
pub trait PartitionChunk: Prunable + Debug + Send + Sync {
pub trait QueryChunk: Prunable + Debug + Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
/// returns the Id of this chunk. Ids are unique within a
@ -105,7 +105,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
/// selection refers to columns that do not exist.
fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error>;
/// Provides access to raw `PartitionChunk` data as an
/// Provides access to raw `QueryChunk` data as an
/// asynchronous stream of `RecordBatch`es filtered by a *required*
/// predicate. Note that not all chunks can evaluate all types of
/// predicates and this function will return an error
@ -117,7 +117,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
/// directly is that the data for a particular Table lives in
/// several chunks within a partition, so there needs to be an
/// implementation of `TableProvider` that stitches together the
/// streams from several different `PartitionChunks`.
/// streams from several different `QueryChunk`s.
fn read_filter(
&self,
predicate: &Predicate,
@ -132,7 +132,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
/// Storage for `Databases` which can be retrieved by name
pub trait DatabaseStore: Debug + Send + Sync {
/// The type of database that is stored by this DatabaseStore
type Database: Database;
type Database: QueryDatabase;
/// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static;

View File

@ -1,4 +1,4 @@
//! Implementation of a DataFusion `TableProvider` in terms of `PartitionChunk`s
//! Implementation of a DataFusion `TableProvider` in terms of `QueryChunk`s
use std::sync::Arc;
@ -22,7 +22,7 @@ use crate::{
duplicate::group_potential_duplicates,
predicate::{Predicate, PredicateBuilder},
util::{arrow_pk_sort_exprs, project_schema},
PartitionChunk,
QueryChunk,
};
use snafu::{ResultExt, Snafu};
@ -79,16 +79,16 @@ impl From<Error> for DataFusionError {
}
/// Something that can prune chunks based on their metadata
pub trait ChunkPruner<C: PartitionChunk>: Sync + Send + std::fmt::Debug {
pub trait ChunkPruner<C: QueryChunk>: Sync + Send + std::fmt::Debug {
/// prune `chunks`, if possible, based on predicate.
fn prune_chunks(&self, chunks: Vec<Arc<C>>, predicate: &Predicate) -> Vec<Arc<C>>;
}
/// Builds a `ChunkTableProvider` from a series of `PartitionChunk`s
/// Builds a `ChunkTableProvider` from a series of `QueryChunk`s
/// and ensures the schema across the chunks is compatible and
/// consistent.
#[derive(Debug)]
pub struct ProviderBuilder<C: PartitionChunk + 'static> {
pub struct ProviderBuilder<C: QueryChunk + 'static> {
table_name: Arc<str>,
schema_merger: SchemaMerger,
chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>,
@ -98,7 +98,7 @@ pub struct ProviderBuilder<C: PartitionChunk + 'static> {
finished: bool,
}
impl<C: PartitionChunk> ProviderBuilder<C> {
impl<C: QueryChunk> ProviderBuilder<C> {
pub fn new(table_name: impl AsRef<str>) -> Self {
Self {
table_name: Arc::from(table_name.as_ref()),
@ -178,12 +178,12 @@ impl<C: PartitionChunk> ProviderBuilder<C> {
}
}
/// Implementation of a DataFusion TableProvider in terms of PartitionChunks
/// Implementation of a DataFusion TableProvider in terms of QueryChunks
///
/// This allows DataFusion to see data from Chunks as a single table, as well as
/// push predicates and selections down to chunks
#[derive(Debug)]
pub struct ChunkTableProvider<C: PartitionChunk + 'static> {
pub struct ChunkTableProvider<C: QueryChunk + 'static> {
table_name: Arc<str>,
/// The IOx schema (wrapper around Arrow Schemaref) for this table
iox_schema: Schema,
@ -193,7 +193,7 @@ pub struct ChunkTableProvider<C: PartitionChunk + 'static> {
chunks: Vec<Arc<C>>,
}
impl<C: PartitionChunk + 'static> ChunkTableProvider<C> {
impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
/// Return the IOx schema view for the data provided by this provider
pub fn iox_schema(&self) -> Schema {
self.iox_schema.clone()
@ -205,7 +205,7 @@ impl<C: PartitionChunk + 'static> ChunkTableProvider<C> {
}
}
impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
fn as_any(&self) -> &dyn std::any::Any {
self
}
@ -272,7 +272,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
#[derive(Clone, Debug, Default)]
/// A deduplicater that deduplicate the duplicated data during scan execution
pub(crate) struct Deduplicater<C: PartitionChunk + 'static> {
pub(crate) struct Deduplicater<C: QueryChunk + 'static> {
// a vector of a vector of overlapped chunks
pub overlapped_chunks_set: Vec<Vec<Arc<C>>>,
@ -283,7 +283,7 @@ pub(crate) struct Deduplicater<C: PartitionChunk + 'static> {
pub no_duplicates_chunks: Vec<Arc<C>>,
}
impl<C: PartitionChunk + 'static> Deduplicater<C> {
impl<C: QueryChunk + 'static> Deduplicater<C> {
fn new() -> Self {
Self {
overlapped_chunks_set: vec![],
@ -670,7 +670,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
#[derive(Debug)]
/// A pruner that does not do pruning (suitable if no additional pruning is possible)
struct NoOpPruner {}
impl<C: PartitionChunk> ChunkPruner<C> for NoOpPruner {
impl<C: QueryChunk> ChunkPruner<C> for NoOpPruner {
fn prune_chunks(&self, chunks: Vec<Arc<C>>, _predicate: &Predicate) -> Vec<Arc<C>> {
chunks
}

View File

@ -9,7 +9,7 @@ use datafusion::{
};
use internal_types::{schema::Schema, selection::Selection};
use crate::{predicate::Predicate, PartitionChunk};
use crate::{predicate::Predicate, QueryChunk};
use async_trait::async_trait;
@ -17,7 +17,7 @@ use super::adapter::SchemaAdapterStream;
/// Implements the DataFusion physical plan interface
#[derive(Debug)]
pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> {
pub(crate) struct IOxReadFilterNode<C: QueryChunk + 'static> {
table_name: Arc<str>,
/// The desired output schema (includes selection_
/// note that the chunk may not have all these columns.
@ -26,7 +26,7 @@ pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> {
predicate: Predicate,
}
impl<C: PartitionChunk + 'static> IOxReadFilterNode<C> {
impl<C: QueryChunk + 'static> IOxReadFilterNode<C> {
pub fn new(
table_name: Arc<str>,
schema: SchemaRef,
@ -43,7 +43,7 @@ impl<C: PartitionChunk + 'static> IOxReadFilterNode<C> {
}
#[async_trait]
impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
impl<C: QueryChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
fn as_any(&self) -> &dyn std::any::Any {
self
}

View File

@ -16,7 +16,7 @@ use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBa
use crate::{
exec::stringset::{StringSet, StringSetRef},
Database, DatabaseStore, PartitionChunk, Predicate, PredicateMatch,
DatabaseStore, Predicate, PredicateMatch, QueryChunk, QueryDatabase,
};
use crate::{exec::Executor, pruning::Prunable};
@ -92,7 +92,7 @@ impl TestDatabase {
}
}
impl Database for TestDatabase {
impl QueryDatabase for TestDatabase {
type Error = TestError;
type Chunk = TestChunk;
@ -706,7 +706,7 @@ impl TestChunk {
}
}
impl PartitionChunk for TestChunk {
impl QueryChunk for TestChunk {
type Error = TestError;
fn id(&self) -> u32 {

View File

@ -7,7 +7,7 @@ use query::{
exec::stringset::StringSet,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
predicate::PredicateBuilder,
PartitionChunk,
QueryChunk,
};
use server::db::test_helpers::write_lp;

View File

@ -1,7 +1,7 @@
//! This module contains testing scenarios for Db
#[allow(unused_imports, dead_code, unused_macros)]
use query::PartitionChunk;
use query::QueryChunk;
use async_trait::async_trait;

View File

@ -2,7 +2,7 @@
use arrow::datatypes::DataType;
use internal_types::{schema::builder::SchemaBuilder, selection::Selection};
use query::{Database, PartitionChunk};
use query::{QueryChunk, QueryDatabase};
use super::scenarios::*;
use query::predicate::PredicateBuilder;

View File

@ -37,7 +37,7 @@ use parquet_file::{
metadata::IoxMetadata,
storage::Storage,
};
use query::{exec::Executor, predicate::Predicate, Database};
use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::{ResultExt, Snafu};
@ -1060,7 +1060,7 @@ fn check_chunk_closed(chunk: &mut CatalogChunk, mutable_size_threshold: Option<N
/// Convenience implementation of `Database` so the rest of the code
/// can just use Db as a `Database` even though the implementation
/// lives in `catalog_access`
impl Database for Db {
impl QueryDatabase for Db {
type Error = Error;
type Chunk = DbChunk;
@ -1477,7 +1477,7 @@ mod tests {
catalog::test_helpers::assert_catalog_state_implementation,
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
use query::{frontend::sql::SqlQueryPlanner, Database, PartitionChunk};
use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
use std::{
collections::HashSet,
convert::TryFrom,

View File

@ -23,13 +23,13 @@ use query::{
predicate::{Predicate, PredicateBuilder},
provider::{self, ChunkPruner, ProviderBuilder},
pruning::Prunable,
PartitionChunk, DEFAULT_SCHEMA,
QueryChunk, DEFAULT_SCHEMA,
};
use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
use query::{
pruning::{prune_chunks, PruningObserver},
Database,
QueryDatabase,
};
/// Metrics related to chunk access (pruning specifically)
@ -181,7 +181,7 @@ impl PruningObserver for ChunkAccess {
}
#[async_trait]
impl Database for QueryCatalogAccess {
impl QueryDatabase for QueryCatalogAccess {
type Error = Error;
type Chunk = DbChunk;

View File

@ -19,7 +19,7 @@ use query::{
exec::stringset::StringSet,
predicate::{Predicate, PredicateMatch},
pruning::Prunable,
PartitionChunk,
QueryChunk,
};
use read_buffer::RBChunk;
@ -206,7 +206,7 @@ impl DbChunk {
}
}
impl PartitionChunk for DbChunk {
impl QueryChunk for DbChunk {
type Error = Error;
fn id(&self) -> u32 {

View File

@ -1012,7 +1012,7 @@ mod tests {
use influxdb_line_protocol::parse_lines;
use metrics::MetricRegistry;
use object_store::{memory::InMemory, path::ObjectStorePath};
use query::{frontend::sql::SqlQueryPlanner, Database};
use query::{frontend::sql::SqlQueryPlanner, QueryDatabase};
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};

View File

@ -5,7 +5,7 @@ use data_types::{
DatabaseName,
};
use object_store::{memory::InMemory, ObjectStore};
use query::{exec::Executor, Database};
use query::{exec::Executor, QueryDatabase};
use crate::{
db::{load_or_create_preserved_catalog, Db},

View File

@ -18,7 +18,7 @@ use data_types::{
};
use influxdb_iox_client::format::QueryOutputFormat;
use influxdb_line_protocol::parse_lines;
use query::Database;
use query::QueryDatabase;
use server::{ConnectionManager, Server as AppServer};
// External crates

View File

@ -8,7 +8,7 @@ use query::{
group_by::{Aggregate, WindowDuration},
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
predicate::Predicate,
Database,
QueryDatabase,
};
use snafu::{ResultExt, Snafu};
@ -78,7 +78,7 @@ impl Planner {
predicate: Predicate,
) -> Result<StringSetPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::new();
@ -96,7 +96,7 @@ impl Planner {
/// [`InfluxRpcPlanner::tag_keys`], on a separate threadpool
pub async fn tag_keys<D>(&self, database: Arc<D>, predicate: Predicate) -> Result<StringSetPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::new();
@ -119,7 +119,7 @@ impl Planner {
predicate: Predicate,
) -> Result<StringSetPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let tag_name = tag_name.into();
let planner = InfluxRpcPlanner::new();
@ -142,7 +142,7 @@ impl Planner {
predicate: Predicate,
) -> Result<FieldListPlan>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::new();
@ -164,7 +164,7 @@ impl Planner {
predicate: Predicate,
) -> Result<SeriesSetPlans>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::new();
@ -188,7 +188,7 @@ impl Planner {
group_columns: Vec<String>,
) -> Result<SeriesSetPlans>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::new();
@ -213,7 +213,7 @@ impl Planner {
offset: WindowDuration,
) -> Result<SeriesSetPlans>
where
D: Database + 'static,
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::new();

View File

@ -8,7 +8,7 @@ use generated_types::google::{
};
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
use observability_deps::tracing::info;
use query::{Database, DatabaseStore};
use query::{DatabaseStore, QueryDatabase};
use server::{ConnectionManager, Error, Server};
use tonic::{Request, Response, Status};