refactor: dyn-typed DB for `query_tests` (#4053)

To test the `db::Db` as well as the `querier` with the same test
framework, they require a shared interface. Ideally this interface is
dynamically typed instead of static dispatched via generics because:

- `query_tests` already take ages to compile
- we often hold a list of scenarios and a single scenario should (in a
  future PR) be able to represent both OG as well as NG

The vision here is that we basically keep the whole test setup but add
new scenarios which are NG-specific later on.

Now the issue w/ many query-related types is that they are NOT
object-safe because methods that don't take `&self` or they have
associated types that we cannot specify in general for OG and NG at the
same time.

So we need a bunch of wrappers that make dynamic dispatch possible. They
mostly call to an internal "interface" crate which is the actual `dyn`
part. The interface is currently only implemented for OG.

The scenarios currently also only contain OG databases. However,
creating a dynamic interface that can be used in all `query_tests` is
already a huge step.

Note that there are two places where we downcast the dynamic/abstract
database to `db::Db` again:

1. To create one scenario based on another and where we need to
   manipulate `db::Db` with OG-specific semantics.
2. `server_benchmarks`. These contain OG databases only and there is no
   point in benchmarking throw the dynamic dispatch interface because
   prod (`influxdb_ioxd`) also uses static dispatch.

Ref #3934.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-03-18 10:11:17 +00:00 committed by GitHub
parent d831e22f8b
commit a122b1e2ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 512 additions and 23 deletions

1
Cargo.lock generated
View File

@ -4246,6 +4246,7 @@ dependencies = [
"tempfile",
"test_helpers",
"tokio",
"trace",
"workspace-hack",
]

View File

@ -14,6 +14,8 @@ datafusion = { path = "../datafusion" }
db = { path = "../db" }
once_cell = { version = "1.10.0", features = ["parking_lot"] }
predicate = { path = "../predicate" }
schema = { path = "../schema" }
trace = { path = "../trace" }
query = { path = "../query" }
workspace-hack = { path = "../workspace-hack"}
@ -22,7 +24,6 @@ arrow = { version = "10", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
schema = { path = "../schema" }
snafu = "0.7"
tempfile = "3.1.0"
test_helpers = { path = "../test_helpers" }

421
query_tests/src/db.rs Normal file
View File

@ -0,0 +1,421 @@
use std::{
any::Any,
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder};
use datafusion::catalog::catalog::CatalogProvider;
use db::{chunk::DbChunk, Db};
use predicate::rpc_predicate::QueryDatabaseMeta;
use query::{
exec::{ExecutionContextProvider, IOxExecutionContext},
QueryChunk, QueryChunkMeta, QueryDatabase,
};
use self::sealed::{AbstractChunkInterface, AbstractDbInterface};
pub struct Error(String);
impl Error {
fn new<E>(e: E) -> Self
where
E: std::error::Error,
{
Self(e.to_string())
}
}
impl Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Error").field(&self.0).finish()
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl std::error::Error for Error {}
/// Abstract database used during testing.
///
/// This is required to make all the involved traits object-safe.
#[derive(Debug)]
pub struct AbstractDb(Box<dyn AbstractDbInterface>);
impl AbstractDb {
pub fn create_old(db: Arc<Db>) -> Self {
Self(Box::new(OldDb(db)))
}
pub fn old_db(&self) -> Option<Arc<Db>> {
self.0
.as_any()
.downcast_ref::<OldDb>()
.map(|o| Arc::clone(&o.0))
}
}
impl ExecutionContextProvider for AbstractDb {
fn new_query_context(
self: &Arc<Self>,
span_ctx: Option<trace::ctx::SpanContext>,
) -> IOxExecutionContext {
self.0.new_query_context(span_ctx)
}
}
impl CatalogProvider for AbstractDb {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
fn schema_names(&self) -> Vec<String> {
self.0.catalog_provider().schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn datafusion::catalog::schema::SchemaProvider>> {
self.0.catalog_provider().schema(name)
}
}
#[async_trait]
impl QueryDatabase for AbstractDb {
type Chunk = AbstractChunk;
async fn chunks(
&self,
table_name: &str,
predicate: &predicate::Predicate,
) -> Vec<Arc<Self::Chunk>> {
self.0.chunks(table_name, predicate).await
}
fn record_query(
&self,
ctx: &IOxExecutionContext,
query_type: impl Into<String>,
query_text: query::QueryText,
) -> query::QueryCompletedToken {
self.0.record_query(ctx, query_type.into(), query_text)
}
}
impl QueryDatabaseMeta for AbstractDb {
fn table_names(&self) -> Vec<String> {
self.0.table_names()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<schema::Schema>> {
self.0.table_schema(table_name)
}
}
#[derive(Debug)]
pub struct AbstractChunk(Box<dyn AbstractChunkInterface>);
impl AbstractChunk {
fn create_old(chunk: Arc<DbChunk>) -> Self {
Self(Box::new(OldChunk(chunk)))
}
}
impl QueryChunk for AbstractChunk {
type Error = Error;
fn id(&self) -> ChunkId {
self.0.id()
}
fn addr(&self) -> ChunkAddr {
self.0.addr()
}
fn table_name(&self) -> &str {
self.0.table_name()
}
fn may_contain_pk_duplicates(&self) -> bool {
self.0.may_contain_pk_duplicates()
}
fn apply_predicate_to_metadata(
&self,
predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, Self::Error> {
self.0.apply_predicate_to_metadata(predicate)
}
fn column_names(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, Self::Error> {
self.0.column_names(ctx, predicate, columns)
}
fn column_values(
&self,
ctx: IOxExecutionContext,
column_name: &str,
predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, Self::Error> {
self.0.column_values(ctx, column_name, predicate)
}
fn read_filter(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, Self::Error> {
self.0.read_filter(ctx, predicate, selection)
}
fn chunk_type(&self) -> &str {
self.0.chunk_type()
}
fn order(&self) -> ChunkOrder {
self.0.order()
}
}
impl QueryChunkMeta for AbstractChunk {
fn summary(&self) -> Option<&data_types::partition_metadata::TableSummary> {
self.0.summary()
}
fn schema(&self) -> Arc<schema::Schema> {
self.0.schema()
}
fn sort_key(&self) -> Option<&schema::sort::SortKey> {
self.0.sort_key()
}
fn delete_predicates(&self) -> &[Arc<data_types::delete_predicate::DeletePredicate>] {
self.0.delete_predicates()
}
}
mod sealed {
use super::*;
#[async_trait]
pub trait AbstractDbInterface: Debug + Send + Sync + 'static {
fn as_any(&self) -> &dyn Any;
fn new_query_context(
&self,
span_ctx: Option<trace::ctx::SpanContext>,
) -> IOxExecutionContext;
fn catalog_provider(&self) -> Arc<dyn CatalogProvider>;
async fn chunks(
&self,
table_name: &str,
predicate: &predicate::Predicate,
) -> Vec<Arc<AbstractChunk>>;
fn record_query(
&self,
ctx: &IOxExecutionContext,
query_type: String,
query_text: query::QueryText,
) -> query::QueryCompletedToken;
fn table_names(&self) -> Vec<String>;
fn table_schema(&self, table_name: &str) -> Option<Arc<schema::Schema>>;
}
pub trait AbstractChunkInterface: Debug + Send + Sync + 'static {
fn id(&self) -> ChunkId;
fn addr(&self) -> ChunkAddr;
fn table_name(&self) -> &str;
fn may_contain_pk_duplicates(&self) -> bool;
fn apply_predicate_to_metadata(
&self,
predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, Error>;
fn column_names(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, Error>;
fn column_values(
&self,
ctx: IOxExecutionContext,
column_name: &str,
predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, Error>;
fn read_filter(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, Error>;
fn chunk_type(&self) -> &str;
fn order(&self) -> ChunkOrder;
fn summary(&self) -> Option<&data_types::partition_metadata::TableSummary>;
fn schema(&self) -> Arc<schema::Schema>;
fn sort_key(&self) -> Option<&schema::sort::SortKey>;
fn delete_predicates(&self) -> &[Arc<data_types::delete_predicate::DeletePredicate>];
}
}
#[derive(Debug)]
struct OldDb(Arc<Db>);
#[async_trait]
impl AbstractDbInterface for OldDb {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
fn new_query_context(&self, span_ctx: Option<trace::ctx::SpanContext>) -> IOxExecutionContext {
self.0.new_query_context(span_ctx)
}
fn catalog_provider(&self) -> Arc<dyn CatalogProvider> {
Arc::clone(&self.0) as _
}
async fn chunks(
&self,
table_name: &str,
predicate: &predicate::Predicate,
) -> Vec<Arc<AbstractChunk>> {
self.0
.chunks(table_name, predicate)
.await
.into_iter()
.map(|c| Arc::new(AbstractChunk::create_old(c)))
.collect()
}
fn record_query(
&self,
ctx: &IOxExecutionContext,
query_type: String,
query_text: query::QueryText,
) -> query::QueryCompletedToken {
self.0.record_query(ctx, query_type, query_text)
}
fn table_names(&self) -> Vec<String> {
self.0.table_names()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<schema::Schema>> {
self.0.table_schema(table_name)
}
}
#[derive(Debug)]
struct OldChunk(Arc<DbChunk>);
impl AbstractChunkInterface for OldChunk {
fn id(&self) -> ChunkId {
self.0.id()
}
fn addr(&self) -> ChunkAddr {
self.0.addr().clone()
}
fn table_name(&self) -> &str {
self.0.table_name()
}
fn may_contain_pk_duplicates(&self) -> bool {
self.0.may_contain_pk_duplicates()
}
fn apply_predicate_to_metadata(
&self,
predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, Error> {
self.0
.apply_predicate_to_metadata(predicate)
.map_err(Error::new)
}
fn column_names(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, Error> {
self.0
.column_names(ctx, predicate, columns)
.map_err(Error::new)
}
fn column_values(
&self,
ctx: IOxExecutionContext,
column_name: &str,
predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, Error> {
self.0
.column_values(ctx, column_name, predicate)
.map_err(Error::new)
}
fn read_filter(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, Error> {
self.0
.read_filter(ctx, predicate, selection)
.map_err(Error::new)
}
fn chunk_type(&self) -> &str {
self.0.chunk_type()
}
fn order(&self) -> ChunkOrder {
self.0.order()
}
fn summary(&self) -> Option<&data_types::partition_metadata::TableSummary> {
self.0.summary()
}
fn schema(&self) -> Arc<schema::Schema> {
self.0.schema()
}
fn sort_key(&self) -> Option<&schema::sort::SortKey> {
self.0.sort_key()
}
fn delete_predicates(&self) -> &[Arc<data_types::delete_predicate::DeletePredicate>] {
self.0.delete_predicates()
}
}

View File

@ -4,7 +4,10 @@ use datafusion::logical_plan::{col, lit};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::fieldlist::{Field, FieldList},
exec::{
fieldlist::{Field, FieldList},
ExecutionContextProvider,
},
frontend::influxrpc::InfluxRpcPlanner,
};
@ -31,7 +34,7 @@ async fn run_field_columns_test_case<D>(
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let ctx = db.new_query_context(None);
let plan = planner
.field_columns(db.as_ref(), predicate.clone())

View File

@ -7,6 +7,7 @@ use crate::scenarios::{
TwoMeasurementsWithDelete, TwoMeasurementsWithDeleteAll,
};
use crate::{
db::AbstractDb,
influxrpc::util::run_series_set_plan_maybe_error,
scenarios::{
MeasurementStatusCode, MeasurementsForDefect2845, MeasurementsSortableTags,
@ -15,10 +16,9 @@ use crate::{
},
};
use datafusion::logical_plan::{col, lit, when};
use db::Db;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::frontend::influxrpc::InfluxRpcPlanner;
use query::{exec::ExecutionContextProvider, frontend::influxrpc::InfluxRpcPlanner};
use test_helpers::assert_contains;
/// runs read_filter(predicate) and compares it to the expected
@ -54,7 +54,7 @@ async fn run_read_filter_test_case<D>(
/// output
async fn run_read_filter(
predicate: InfluxRpcPredicate,
db: Arc<Db>,
db: Arc<AbstractDb>,
) -> Result<Vec<String>, String> {
let planner = InfluxRpcPlanner::default();
@ -63,7 +63,7 @@ async fn run_read_filter(
.await
.map_err(|e| e.to_string())?;
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let ctx = db.new_query_context(None);
run_series_set_plan_maybe_error(&ctx, plan)
.await
.map_err(|e| e.to_string())

View File

@ -16,7 +16,9 @@ use datafusion::{
};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{frontend::influxrpc::InfluxRpcPlanner, group_by::Aggregate};
use query::{
exec::ExecutionContextProvider, frontend::influxrpc::InfluxRpcPlanner, group_by::Aggregate,
};
/// runs read_group(predicate) and compares it to the expected
/// output
@ -38,7 +40,7 @@ async fn run_read_group_test_case<D>(
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let ctx = db.new_query_context(None);
let plans = planner
.read_group(db.as_ref(), predicate.clone(), agg, &group_columns)

View File

@ -4,6 +4,7 @@ use datafusion::prelude::*;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::ExecutionContextProvider,
frontend::influxrpc::InfluxRpcPlanner,
group_by::{Aggregate, WindowDuration},
};
@ -29,7 +30,7 @@ async fn run_read_window_aggregate_test_case<D>(
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let ctx = db.new_query_context(None);
let plan = planner
.read_window_aggregate(

View File

@ -4,7 +4,10 @@ use datafusion::logical_plan::{col, lit};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::stringset::{IntoStringSet, StringSetRef},
exec::{
stringset::{IntoStringSet, StringSetRef},
ExecutionContextProvider,
},
frontend::influxrpc::InfluxRpcPlanner,
};
@ -28,7 +31,7 @@ async fn run_table_names_test_case<D>(
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let ctx = db.new_query_context(None);
let plan = planner
.table_names(db.as_ref(), predicate.clone())

View File

@ -3,7 +3,10 @@ use datafusion::logical_plan::{col, lit};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::stringset::{IntoStringSet, StringSetRef},
exec::{
stringset::{IntoStringSet, StringSetRef},
ExecutionContextProvider,
},
frontend::influxrpc::InfluxRpcPlanner,
};
@ -30,7 +33,7 @@ async fn run_tag_keys_test_case<D>(
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let ctx = db.new_query_context(None);
let plan = planner
.tag_keys(db.as_ref(), predicate.clone())

View File

@ -2,7 +2,10 @@ use datafusion::logical_plan::{col, lit};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::stringset::{IntoStringSet, StringSetRef},
exec::{
stringset::{IntoStringSet, StringSetRef},
ExecutionContextProvider,
},
frontend::influxrpc::InfluxRpcPlanner,
};
@ -27,7 +30,7 @@ async fn run_tag_values_test_case<D>(
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let ctx = db.new_query_context(None);
let plan = planner
.tag_values(db.as_ref(), tag_name, predicate.clone())

View File

@ -24,4 +24,5 @@ pub mod sql;
pub mod table_schema;
// Used by the `server_benchmark` crate in addition to tests in this crate
pub mod db;
pub mod scenarios;

View File

@ -5,7 +5,6 @@ pub mod library;
pub mod util;
use async_trait::async_trait;
use db::Db;
use delete::{
OneDeleteMultiExprsOneChunk, OneDeleteSimpleExprOneChunk, OneDeleteSimpleExprOneChunkDeleteAll,
ThreeDeleteThreeChunks, TwoDeletesMultiExprsOneChunk,
@ -16,11 +15,13 @@ use std::{collections::HashMap, sync::Arc};
/// Reexport library of scenarios
pub use library::*;
use crate::db::AbstractDb;
/// Holds a database and a description of how its data was configured
#[derive(Debug)]
pub struct DbScenario {
pub scenario_name: String,
pub db: Arc<Db>,
pub db: Arc<AbstractDb>,
}
#[async_trait]

View File

@ -1,5 +1,7 @@
//! Library of test scenarios that can be used in query_tests
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{
delete_predicate::{DeleteExpr, DeletePredicate},
@ -14,6 +16,8 @@ use db::{
};
use query::QueryChunk;
use crate::db::AbstractDb;
use super::{
util::{
all_scenarios_for_one_chunk, make_one_chunk_mub_scenario, make_one_chunk_rub_scenario,
@ -93,6 +97,7 @@ impl DbSetup for ChunkOrder {
//
// So the query engine must use `order` as a primary key to sort chunks, NOT `id`.
let db = Arc::new(AbstractDb::create_old(db));
let scenario = DbScenario {
scenario_name: "chunks where chunk ID alone cannot be used for ordering".into(),
db,
@ -132,6 +137,7 @@ impl DbSetup for NoData {
// Scenario 1: No data in the DB yet
//
let db = make_db().await.db;
let db = Arc::new(AbstractDb::create_old(db));
let scenario1 = DbScenario {
scenario_name: "New, Empty Database".into(),
db,
@ -144,6 +150,7 @@ impl DbSetup for NoData {
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 0);
let db = Arc::new(AbstractDb::create_old(db));
let scenario2 = DbScenario {
scenario_name: "New, Empty Database after partitions are listed".into(),
db,
@ -182,6 +189,7 @@ impl DbSetup for NoData {
assert_eq!(count_read_buffer_chunks(&db), 0); // nothing after dropping chunk 0
assert_eq!(count_object_store_chunks(&db), 0); // still nothing
let db = Arc::new(AbstractDb::create_old(db));
let scenario3 = DbScenario {
scenario_name: "Empty Database after drop chunk that is in read buffer".into(),
db,
@ -228,6 +236,7 @@ impl DbSetup for NoData {
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 0);
let db = Arc::new(AbstractDb::create_old(db));
let scenario4 = DbScenario {
scenario_name:
"Empty Database after drop chunk that is in both read buffer and object store"
@ -644,6 +653,7 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk {
];
write_lp(&db, &lp_lines.join("\n"));
let db = Arc::new(AbstractDb::create_old(db));
vec![DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
@ -673,6 +683,7 @@ impl DbSetup for TwoMeasurementsManyFieldsOneRubChunk {
// move all data to RUB
db.compact_open_chunk("h2o", partition_key).await.unwrap();
let db = Arc::new(AbstractDb::create_old(db));
vec![DbScenario {
scenario_name: "Data in single chunk of read buffer".into(),
db,
@ -708,6 +719,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
let db = Arc::new(AbstractDb::create_old(db));
vec![DbScenario {
scenario_name: "Data in two open mutable buffer chunks per table and read buffer"
.into(),
@ -746,6 +758,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
assert_eq!(count_read_buffer_chunks(&db), 2);
assert_eq!(count_object_store_chunks(&db), 0);
let db = Arc::new(AbstractDb::create_old(db));
vec![DbScenario {
scenario_name: "2 chunks in read buffer".into(),
db,
@ -821,6 +834,7 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
assert_eq!(count_read_buffer_chunks(&db), 4);
assert_eq!(count_object_store_chunks(&db), 0);
let db = Arc::new(AbstractDb::create_old(db));
vec![DbScenario {
scenario_name: "Data in four chunks with duplicates".into(),
db,
@ -863,6 +877,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
let db = Arc::new(AbstractDb::create_old(db));
vec![DbScenario {
scenario_name: "Data in parquet, RUB, and MUB".into(),
db,
@ -1017,6 +1032,7 @@ impl DbSetup for OneMeasurementAllChunksDropped {
.await
.unwrap();
let db = Arc::new(AbstractDb::create_old(db));
vec![DbScenario {
scenario_name: "one measurement but all chunks are dropped".into(),
db,
@ -1515,6 +1531,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
let db = Arc::new(AbstractDb::create_old(db));
let scenario1 = DbScenario {
scenario_name: "Data in 4 partitions, open chunks of mutable buffer".into(),
db,
@ -1525,6 +1542,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
write_lp(&db, &data);
db.rollover_partition("h2o", "2020-03-01T00").await.unwrap();
db.rollover_partition("h2o", "2020-03-02T00").await.unwrap();
let db = Arc::new(AbstractDb::create_old(db));
let scenario2 = DbScenario {
scenario_name:
"Data in 4 partitions, two open chunk and two closed chunks of mutable buffer"
@ -1540,6 +1558,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
rollover_and_load(&db, "2020-03-02T00", "h2o").await;
rollover_and_load(&db, "2020-04-01T00", "h2o").await;
rollover_and_load(&db, "2020-04-02T00", "h2o").await;
let db = Arc::new(AbstractDb::create_old(db));
let scenario3 = DbScenario {
scenario_name: "Data in 4 partitions, 4 closed chunks in mutable buffer".into(),
db,

View File

@ -1,5 +1,7 @@
//! This module contains util functions for testing scenarios
use crate::db::AbstractDb;
use super::DbScenario;
use data_types::{chunk_metadata::ChunkId, delete_predicate::DeletePredicate};
use db::test_helpers::chunk_ids_rub;
@ -290,6 +292,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
if no_more_data {
let scenario_name =
format!("Deleted data from one {} chunk{}", chunk_stage, display);
let db = Arc::new(AbstractDb::create_old(db));
return DbScenario { scenario_name, db };
}
}
@ -351,6 +354,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
if no_more_data {
let scenario_name =
format!("Deleted data from one {} chunk{}", chunk_stage, display);
let db = Arc::new(AbstractDb::create_old(db));
return DbScenario { scenario_name, db };
}
}
@ -408,6 +412,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
}
let scenario_name = format!("Deleted data from one {} chunk{}", chunk_stage, display);
let db = Arc::new(AbstractDb::create_old(db));
DbScenario { scenario_name, db }
}
@ -500,6 +505,7 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
display,
preds.len()
);
let db = Arc::new(AbstractDb::create_old(db));
DbScenario { scenario_name, db }
}
@ -521,6 +527,7 @@ pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delet
.await;
let scenario_name = "Deletes and then compact all OS chunks".to_string();
let db = Arc::new(AbstractDb::create_old(db));
let scenario_1 = DbScenario { scenario_name, db };
// Scenario 2: compact all 3 chunks and apply deletes
@ -534,6 +541,7 @@ pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delet
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let scenario_name = "Compact all OS chunks and then deletes".to_string();
let db = Arc::new(AbstractDb::create_old(db));
let scenario_2 = DbScenario { scenario_name, db };
// Scenario 3: apply deletes then compact the first n-1 chunks
@ -548,6 +556,7 @@ pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delet
.join()
.await;
let scenario_name = "Deletes and then compact all but last OS chunk".to_string();
let db = Arc::new(AbstractDb::create_old(db));
let scenario_3 = DbScenario { scenario_name, db };
// Scenario 4: compact the first n-1 chunks then apply deletes
@ -562,6 +571,7 @@ pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delet
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let scenario_name = "Compact all but last OS chunk and then deletes".to_string();
let db = Arc::new(AbstractDb::create_old(db));
let scenario_4 = DbScenario { scenario_name, db };
vec![scenario_1, scenario_2, scenario_3, scenario_4]
@ -595,7 +605,7 @@ pub async fn make_contiguous_os_chunks(
.await;
// Get chunk ids in contiguous order
let db = scenario.db;
let db = scenario.db.old_db().unwrap();
let partition = db.partition(table_name, partition_key).unwrap();
let partition = partition.read();
let mut keyed_chunks: Vec<(_, _)> = partition
@ -616,6 +626,7 @@ pub(crate) async fn make_one_chunk_mub_scenario(data: &str) -> Vec<DbScenario> {
// Scenario 1: One open chunk in MUB
let db = make_db().await.db;
write_lp(&db, data);
let db = Arc::new(AbstractDb::create_old(db));
let scenario = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
@ -641,6 +652,7 @@ pub(crate) async fn make_one_chunk_rub_scenario(
.await
.unwrap();
}
let db = Arc::new(AbstractDb::create_old(db));
let scenario = DbScenario {
scenario_name: "Data in read buffer".into(),
db,
@ -663,6 +675,7 @@ pub async fn make_two_chunk_scenarios(
let db = make_db().await.db;
write_lp(&db, data1);
write_lp(&db, data2);
let db = Arc::new(AbstractDb::create_old(db));
let scenario1 = DbScenario {
scenario_name: "Data in single open chunk of mutable buffer".into(),
db,
@ -677,6 +690,7 @@ pub async fn make_two_chunk_scenarios(
.unwrap();
}
write_lp(&db, data2);
let db = Arc::new(AbstractDb::create_old(db));
let scenario2 = DbScenario {
scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(),
db,
@ -691,6 +705,7 @@ pub async fn make_two_chunk_scenarios(
.unwrap();
}
write_lp(&db, data2);
let db = Arc::new(AbstractDb::create_old(db));
let scenario3 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(),
db,
@ -711,6 +726,7 @@ pub async fn make_two_chunk_scenarios(
.await
.unwrap();
}
let db = Arc::new(AbstractDb::create_old(db));
let scenario4 = DbScenario {
scenario_name: "Data in two read buffer chunks".into(),
db,
@ -730,6 +746,7 @@ pub async fn make_two_chunk_scenarios(
.await
.unwrap();
}
let db = Arc::new(AbstractDb::create_old(db));
let scenario5 = DbScenario {
scenario_name: "Data in two read buffer chunks and two parquet file chunks".into(),
db,
@ -760,6 +777,7 @@ pub async fn make_two_chunk_scenarios(
db.unload_read_buffer(table_name, partition_key, id)
.unwrap();
}
let db = Arc::new(AbstractDb::create_old(db));
let scenario6 = DbScenario {
scenario_name: "Data in 2 parquet chunks in object store only".into(),
db,
@ -781,6 +799,7 @@ pub async fn make_two_chunk_scenarios(
.await
.unwrap();
}
let db = Arc::new(AbstractDb::create_old(db));
let scenario7 = DbScenario {
scenario_name: "Data in one compacted read buffer chunk".into(),
db,
@ -811,6 +830,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
.await
.unwrap();
}
let db = Arc::new(AbstractDb::create_old(db));
let scenario1 = DbScenario {
scenario_name: "--------------------- Data in read buffer".into(),
db,
@ -829,6 +849,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
db.unload_read_buffer(table_name, partition_key, id)
.unwrap();
}
let db = Arc::new(AbstractDb::create_old(db));
let scenario2 = DbScenario {
scenario_name: "--------------------- Data in object store only ".into(),
db,

View File

@ -37,7 +37,7 @@ async fn run_table_schema_test_case<D>(
let mut chunks_with_table = 0;
for chunk in db.chunks(table_name, &Default::default()).await {
if chunk.table_name().as_ref() == table_name {
if chunk.table_name() == table_name {
chunks_with_table += 1;
let actual_schema = chunk.schema().select(selection).unwrap();

View File

@ -77,6 +77,9 @@ fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
let DbScenario { scenario_name, db } = scenario;
let mut group = c.benchmark_group(format!("read_filter/{}", scenario_name));
// downcast Db for performance
let db = db.old_db().unwrap();
for (predicate, pred_name) in &predicates {
let chunks = db
.filtered_chunk_summaries(None, Some("2021-04-26T13"))
@ -94,7 +97,7 @@ fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
build_and_execute_plan(
&planner,
executor.as_ref(),
db,
&db,
predicate.clone(),
exp_data_frames,
)

View File

@ -78,6 +78,9 @@ fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
let DbScenario { scenario_name, db } = scenario;
let mut group = c.benchmark_group(format!("read_group/{}", scenario_name));
// downcast Db for performance
let db = db.old_db().unwrap();
for (predicate, pred_name) in &predicates {
// The number of expected frames, based on the expected number of
// individual series keys, which for grouping is the same no matter
@ -97,7 +100,7 @@ fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
build_and_execute_plan(
&planner,
executor.as_ref(),
db,
&db,
predicate.clone(),
Aggregate::Sum,
&["tag2"],

View File

@ -79,6 +79,9 @@ fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
let DbScenario { scenario_name, db } = scenario;
let mut group = c.benchmark_group(scenario_name);
// downcast Db for performance
let db = db.old_db().unwrap();
for (predicate, pred_name) in &predicates {
for tag_key in tag_keys {
group.bench_with_input(
@ -90,7 +93,7 @@ fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
run_tag_values_query(
&planner,
executor.as_ref(),
db,
&db,
tag_key,
predicate.clone(),
)