Merge pull request #6497 from influxdata/cn/query-tests-grpc

feat: Reimagining query_tests sql cases
pull/24376/head
kodiakhq[bot] 2023-01-18 19:10:26 +00:00 committed by GitHub
commit 9e530da897
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1624 additions and 21 deletions

26
Cargo.lock generated
View File

@ -4589,6 +4589,29 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "query_tests2"
version = "0.1.0"
dependencies = [
"async-trait",
"dotenvy",
"generated_types",
"ingester2",
"iox_catalog",
"iox_query",
"metric",
"object_store",
"observability_deps",
"once_cell",
"parquet_file",
"snafu",
"tempfile",
"test_helpers",
"test_helpers_end_to_end",
"tokio",
"workspace-hack",
]
[[package]]
name = "quick-error"
version = "2.0.1"
@ -5698,13 +5721,16 @@ dependencies = [
"parking_lot 0.12.1",
"prost 0.11.6",
"rand",
"regex",
"reqwest",
"snafu",
"sqlx",
"tempfile",
"test_helpers",
"tokio",
"tokio-util",
"tonic",
"uuid",
"workspace-hack",
]

View File

@ -59,6 +59,7 @@ members = [
"querier",
"query_functions",
"query_tests",
"query_tests2",
"router",
"schema",
"service_common",

View File

@ -54,6 +54,7 @@ async fn tag_keys() {
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
@ -86,6 +87,7 @@ async fn tag_values() {
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
@ -123,6 +125,7 @@ async fn measurement_names() {
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
@ -164,6 +167,7 @@ async fn measurement_tag_keys() {
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
@ -203,6 +207,7 @@ async fn measurement_tag_values() {
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
@ -242,6 +247,7 @@ async fn measurement_fields() {
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();

View File

@ -14,6 +14,7 @@ use test_helpers_end_to_end::{
async fn read_filter() {
let generator = Arc::new(DataGenerator::new());
run_data_test(Arc::clone(&generator), Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
@ -325,6 +326,8 @@ async fn do_read_filter_test(
Step::WriteLineProtocol(line_protocol),
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
let request_builder = request_builder.clone();
let expected_frames = expected_frames.clone();
async move {
let mut storage_client = state.cluster().querier_storage_client();

View File

@ -227,6 +227,7 @@ async fn test_group_by_time() {
do_test_invalid_group_key(InvalidGroupKey::Time).await;
}
#[derive(Clone, Copy)]
enum InvalidGroupKey {
ColNotFound,
NotATag,
@ -312,6 +313,8 @@ async fn do_read_group_test(
Step::WriteLineProtocol(line_protocol),
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
let request_builder = request_builder.clone();
let expected_frames = expected_frames.clone();
async move {
let grpc_connection = state
.cluster()

View File

@ -87,6 +87,8 @@ async fn do_read_window_aggregate_test(
Step::WriteLineProtocol(line_protocol),
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
let request_builder = request_builder.clone();
let expected_frames = expected_frames.clone();
async move {
let mut storage_client = state.cluster().querier_storage_client();

View File

@ -134,8 +134,7 @@ async fn basic_multi_ingesters() {
assert_eq!(array.len(), 1);
assert_eq!(array.value(0), i);
}),
}))
.collect();
}));
// Run the tests
StepTest::new(&mut cluster, test_steps).run().await

26
query_tests2/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "query_tests2"
description = "Tests of the query engine against different database configurations"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.60"
dotenvy = "0.15.6"
ingester2 = { path = "../ingester2" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
generated_types = { path = "../generated_types" }
metric = { path = "../metric" }
object_store = "0.5.2"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.17", features = ["parking_lot"] }
parquet_file = { path = "../parquet_file" }
snafu = "0.7"
tempfile = "3.1.0"
test_helpers = { path = "../test_helpers" }
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
tokio = { version = "1.22", features = ["macros", "parking_lot", "rt-multi-thread", "time"] }
workspace-hack = { path = "../workspace-hack"}

204
query_tests2/src/cases.rs Normal file
View File

@ -0,0 +1,204 @@
use crate::framework::{ChunkStage, TestCase};
// TODO: Generate these tests from the files on disk.
// See <https://github.com/influxdata/influxdb_iox/issues/6610>.
#[tokio::test]
async fn basic() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/basic.sql",
chunk_stage: ChunkStage::All,
}
.run()
.await;
}
#[tokio::test]
async fn dedup_and_predicates_parquet() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/dedup_and_predicates_parquet.sql",
chunk_stage: ChunkStage::Parquet,
}
.run()
.await;
}
#[tokio::test]
async fn dedup_and_predicates_parquet_ingester() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/dedup_and_predicates_parquet_ingester.sql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}
#[tokio::test]
async fn duplicates_ingester() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/duplicates_ingester.sql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}
#[tokio::test]
async fn duplicates_parquet() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/duplicates_parquet.sql",
chunk_stage: ChunkStage::Parquet,
}
.run()
.await;
}
#[tokio::test]
async fn duplicates_parquet_many() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/duplicates_parquet_many.sql",
chunk_stage: ChunkStage::Parquet,
}
.run()
.await;
}
#[tokio::test]
#[ignore]
async fn new_sql_system_tables() {
unimplemented!("Test snapshot might need updating?");
// test_helpers::maybe_start_logging();
//
// TestCase {
// input: "cases/in/new_sql_system_tables.sql",
// chunk_stage: ChunkStage::Ingester,
// }
// .run()
// .await;
}
#[tokio::test]
#[ignore]
async fn periods() {
unimplemented!("See <https://github.com/influxdata/influxdb_iox/issues/6515>");
// test_helpers::maybe_start_logging();
//
// TestCase {
// input: "cases/in/periods.sql",
// chunk_stage: ChunkStage::Ingester,
// }
// .run()
// .await;
}
#[tokio::test]
async fn pushdown() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/pushdown.sql",
chunk_stage: ChunkStage::Parquet,
}
.run()
.await;
}
#[tokio::test]
#[ignore]
async fn retention() {
unimplemented!("See <https://github.com/influxdata/influxdb_iox/issues/6592>");
// test_helpers::maybe_start_logging();
//
// TestCase {
// input: "cases/in/retention.sql",
// chunk_stage: ChunkStage::Parquet,
// }
// .run()
// .await;
}
#[tokio::test]
#[ignore]
async fn selectors() {
unimplemented!("See <https://github.com/influxdata/influxdb_iox/issues/6515>");
// test_helpers::maybe_start_logging();
//
// TestCase {
// input: "cases/in/selectors.sql",
// chunk_stage: ChunkStage::All,
// }
// .run()
// .await;
}
#[tokio::test]
async fn several_chunks() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/several_chunks.sql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}
#[tokio::test]
async fn sql_information_schema() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/sql_information_schema.sql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}
#[tokio::test]
async fn timestamps() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/timestamps.sql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}
#[tokio::test]
async fn two_chunks() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/two_chunks.sql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}
#[tokio::test]
async fn two_chunks_missing_columns() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/two_chunks_missing_columns.sql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}

View File

@ -0,0 +1,226 @@
//! The common test code that drives the tests in the [`cases`][crate::cases] module.
use observability_deps::tracing::*;
use snafu::{OptionExt, Snafu};
use std::{fmt::Debug, fs, path::PathBuf};
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest};
/// The kind of chunks the test should set up by default. Choosing `All` will run the test twice,
/// once with all chunks in the ingester and once with all chunks in Parquet. Choosing `Ingester`
/// will set up an ingester that effectively never persists, so if you want to persist some of the
/// chunks, you will need to use `Step::Persist` explicitly. Choosing `Parquet` will set up an
/// ingester that persists everything as fast as possible.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChunkStage {
/// Set up all chunks in the ingester; never persist automatically. The chunks are accessible
/// from the ingester.
Ingester,
/// Set up all chunks persisted in Parquet, as fast as possible. The chunks are accessible from
/// and managed by the querier.
Parquet,
/// Run tests against all of the previous states in this enum.
All,
}
impl IntoIterator for ChunkStage {
type Item = Self;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
match self {
// If `All` is specified, run the test twice, once with all chunks in the ingester and
// then once with all chunks in Parquet.
Self::All => vec![Self::Ingester, Self::Parquet].into_iter(),
other => vec![other].into_iter(),
}
}
}
/// Struct to orchestrate the test setup and assertions based on the `.sql` file specified in the
/// `input` field and the chunk stages specified in `chunk_stage`.
#[derive(Debug)]
pub struct TestCase {
pub input: &'static str,
pub chunk_stage: ChunkStage,
}
impl TestCase {
pub async fn run(&self) {
let database_url = maybe_skip_integration!();
for chunk_stage in self.chunk_stage {
info!("Using ChunkStage::{chunk_stage:?}");
// Setup that differs by chunk stage. These need to be non-shared clusters; if they're
// shared, then the tests that run in parallel and persist at particular times mess
// with each other because persistence applies to everything in the ingester.
let mut cluster = match chunk_stage {
ChunkStage::Ingester => {
MiniCluster::create_non_shared2_never_persist(database_url.clone()).await
}
ChunkStage::Parquet => MiniCluster::create_non_shared2(database_url.clone()).await,
ChunkStage::All => unreachable!("See `impl IntoIterator for ChunkStage`"),
};
// TEMPORARY: look in `query_tests` for all case files; change this if we decide to
// move them
let given_input_path: PathBuf = self.input.into();
let mut input_path = PathBuf::from("../query_tests/");
input_path.push(given_input_path);
let contents = fs::read_to_string(&input_path).unwrap_or_else(|_| {
panic!("Could not read test case file `{}`", input_path.display())
});
let setup =
TestSetup::try_from_lines(contents.lines()).expect("Could not get TestSetup");
let setup_name = setup.setup_name();
info!("Using setup {setup_name}");
// Run the setup steps and the QueryAndCompare step
let setup_steps = crate::setups::SETUPS
.get(setup_name)
.unwrap_or_else(|| panic!("Could not find setup with key `{setup_name}`"))
.iter();
let test_step = Step::QueryAndCompare {
input_path,
setup_name: setup_name.into(),
contents,
};
// Run the tests
StepTest::new(&mut cluster, setup_steps.chain(std::iter::once(&test_step)))
.run()
.await;
}
}
}
/// The magic value to look for in the `.sql` files to determine which setup steps to run based on
/// the setup name that appears in the file after this string.
const IOX_SETUP_NEEDLE: &str = "-- IOX_SETUP: ";
/// Encapsulates the setup needed for a test
///
/// Currently supports the following commands
///
/// # Run the specified setup:
/// # -- IOX_SETUP: SetupName
#[derive(Debug, PartialEq, Eq)]
pub struct TestSetup {
setup_name: String,
}
#[derive(Debug, Snafu)]
pub enum TestSetupError {
#[snafu(display(
"No setup found. Looking for lines that start with '{}'",
IOX_SETUP_NEEDLE
))]
SetupNotFoundInFile {},
#[snafu(display(
"Only one setup is supported. Previously saw setup '{}' and now saw '{}'",
setup_name,
new_setup_name
))]
SecondSetupFound {
setup_name: String,
new_setup_name: String,
},
}
impl TestSetup {
/// return the name of the setup that has been parsed
pub fn setup_name(&self) -> &str {
&self.setup_name
}
/// Create a new TestSetup object from the lines
pub fn try_from_lines<I, S>(lines: I) -> Result<Self, TestSetupError>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut parser = lines.into_iter().filter_map(|line| {
let line = line.as_ref().trim();
line.strip_prefix(IOX_SETUP_NEEDLE)
.map(|setup_name| setup_name.trim().to_string())
});
let setup_name = parser.next().context(SetupNotFoundInFileSnafu)?;
if let Some(new_setup_name) = parser.next() {
return SecondSetupFoundSnafu {
setup_name,
new_setup_name,
}
.fail();
}
Ok(Self { setup_name })
}
}
/// Who tests the test framework? This module does!
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_parse_lines() {
let lines = vec!["Foo", "bar", "-- IOX_SETUP: MySetup", "goo"];
let setup = TestSetup::try_from_lines(lines).unwrap();
assert_eq!(
setup,
TestSetup {
setup_name: "MySetup".into()
}
);
}
#[test]
fn test_parse_lines_extra_whitespace() {
let lines = vec!["Foo", " -- IOX_SETUP: MySetup "];
let setup = TestSetup::try_from_lines(lines).unwrap();
assert_eq!(
setup,
TestSetup {
setup_name: "MySetup".into()
}
);
}
#[test]
fn test_parse_lines_setup_name_with_whitespace() {
let lines = vec!["Foo", " -- IOX_SETUP: My Awesome Setup "];
let setup = TestSetup::try_from_lines(lines).unwrap();
assert_eq!(
setup,
TestSetup {
setup_name: "My Awesome Setup".into()
}
);
}
#[test]
fn test_parse_lines_none() {
let lines = vec!["Foo", " MySetup "];
let setup = TestSetup::try_from_lines(lines).unwrap_err().to_string();
assert_eq!(
setup,
"No setup found. Looking for lines that start with '-- IOX_SETUP: '"
);
}
#[test]
fn test_parse_lines_multi() {
let lines = vec!["Foo", "-- IOX_SETUP: MySetup", "-- IOX_SETUP: MySetup2"];
let setup = TestSetup::try_from_lines(lines).unwrap_err().to_string();
assert_eq!(
setup,
"Only one setup is supported. Previously saw setup 'MySetup' and now saw 'MySetup2'"
);
}
}

17
query_tests2/src/lib.rs Normal file
View File

@ -0,0 +1,17 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr,
clippy::future_not_send,
clippy::todo,
clippy::dbg_macro
)]
#![cfg(test)]
//! Tests of various queries for data in various states.
mod cases;
mod framework;
mod setups;

473
query_tests2/src/setups.rs Normal file
View File

@ -0,0 +1,473 @@
//! The setups available for any `TestCase` to use by specifying the test name in a comment at the
//! start of the `.sql` file in the form of:
//!
//! ```text
//! -- IOX_SETUP: [test name]
//! ```
use once_cell::sync::Lazy;
use std::collections::HashMap;
use test_helpers_end_to_end::Step;
/// The string value that will appear in `.sql` files.
pub type SetupName = &'static str;
/// The steps that should be run when this setup is chosen.
pub type SetupSteps = Vec<Step>;
/// All possible setups for the [`TestCase`][crate::TestCase]s to use, indexed by name
pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
HashMap::from([
(
"TwoMeasurements",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 2,
},
],
),
(
"TwoChunksDedupWeirdnessParquet",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol("table,tag=A foo=1,bar=1 0".into()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(["table,tag=A bar=2 0", "table,tag=B foo=1 0"].join("\n")),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"TwoChunksDedupWeirdnessParquetIngester",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol("table,tag=A foo=1,bar=1 0".into()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::WriteLineProtocol(["table,tag=A bar=2 0", "table,tag=B foo=1 0"].join("\n")),
],
),
(
"OneMeasurementFourChunksWithDuplicatesWithIngester",
vec![
Step::RecordNumParquetFiles,
// Chunk 1:
// . time range: 50-250
// . no duplicates in its own chunk
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston min_temp=70.4 50",
"h2o,state=MA,city=Bedford min_temp=71.59 150",
"h2o,state=MA,city=Boston max_temp=75.4 250",
"h2o,state=MA,city=Andover max_temp=69.2, 250",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// Chunk 2: overlaps with chunk 1
// . time range: 150 - 300
// . no duplicates in its own chunk
Step::WriteLineProtocol(
[
// new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150",
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
"h2o,state=MA,city=Reading min_temp=53.4, 250",
"h2o,state=CA,city=SF min_temp=79.0,max_temp=87.2,area=500u 300",
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// Chunk 3: no overlap
// . time range: 400 - 500
// . duplicates in its own chunk
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400",
"h2o,state=MA,city=Boston min_temp=68.4 400",
"h2o,state=MA,city=Bedford min_temp=65.22,area=750u 400", // duplicate
"h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // duplicate
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
// Chunk 4: no overlap
// . time range: 600 - 700
// . no duplicates
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600",
"h2o,state=MA,city=Boston min_temp=67.4 600",
"h2o,state=MA,city=Reading min_temp=60.4, 600",
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
]
.join("\n"),
),
],
),
(
"OneMeasurementFourChunksWithDuplicatesParquetOnly",
vec![
Step::RecordNumParquetFiles,
// Chunk 1:
// . time range: 50-250
// . no duplicates in its own chunk
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston min_temp=70.4 50",
"h2o,state=MA,city=Bedford min_temp=71.59 150",
"h2o,state=MA,city=Boston max_temp=75.4 250",
"h2o,state=MA,city=Andover max_temp=69.2, 250",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// Chunk 2: overlaps with chunk 1
// . time range: 150 - 300
// . no duplicates in its own chunk
Step::WriteLineProtocol(
[
// new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150",
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
"h2o,state=MA,city=Reading min_temp=53.4, 250",
"h2o,state=CA,city=SF min_temp=79.0,max_temp=87.2,area=500u 300",
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// Chunk 3: no overlap
// . time range: 400 - 500
// . duplicates in its own chunk
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400",
"h2o,state=MA,city=Boston min_temp=68.4 400",
"h2o,state=MA,city=Bedford min_temp=65.22,area=750u 400", // duplicate
"h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // duplicate
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// Chunk 4: no overlap
// . time range: 600 - 700
// . no duplicates
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600",
"h2o,state=MA,city=Boston min_temp=67.4 600",
"h2o,state=MA,city=Reading min_temp=60.4, 600",
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"TwentySortedParquetFiles",
(0..20)
.flat_map(|i| {
let write = if i % 2 == 0 {
Step::WriteLineProtocol(format!(
"m,tag=A f=1 {}\nm,tab=B f=2 {}",
1000 - i, // unique in this chunk
1000 - i, // unique in this chunk (not plus i!)
))
} else {
Step::WriteLineProtocol(
"m,tag=A f=3 2001".into(), // duplicated across all chunks
)
};
[
Step::RecordNumParquetFiles,
write,
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
]
.into_iter()
})
.collect::<Vec<_>>(),
),
(
"TwoMeasurementsManyFields",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
"h2o,state=CA,city=Boston other_temp=72.4 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 2,
},
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
"h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000".into(),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"TwoMeasurementsManyFieldsTwoChunks",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::WriteLineProtocol(
[
"h2o,state=CA,city=Boston other_temp=72.4 150",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
]
.join("\n"),
),
// The system tables test looks for queries, so the setup needs to run this query.
Step::Query {
sql: "SELECT 1;".into(),
expected: vec![
"+----------+",
"| Int64(1) |",
"+----------+",
"| 1 |",
"+----------+",
],
},
],
),
(
"PeriodsInNames",
vec![Step::WriteLineProtocol(
[
"measurement.one,tag.one=value,tag.two=other field.one=1.0,field.two=t \
1609459201000000001",
"measurement.one,tag.one=value2,tag.two=other2 field.one=1.0,field.two=f \
1609459201000000002",
]
.join("\n"),
)],
),
(
"TwoMeasurementsPredicatePushDown",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"restaurant,town=andover count=40000u,system=5.0 100",
"restaurant,town=reading count=632u,system=5.0 120",
"restaurant,town=bedford count=189u,system=7.0 110",
"restaurant,town=tewsbury count=471u,system=6.0 110",
"restaurant,town=lexington count=372u,system=5.0 100",
"restaurant,town=lawrence count=872u,system=6.0 110",
"restaurant,town=reading count=632u,system=6.0 130",
]
.join("\n"),
),
Step::WriteLineProtocol(
[
"school,town=reading count=17u,system=6.0 150",
"school,town=andover count=25u,system=6.0 160",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 2,
},
],
),
(
"AllTypes",
vec![Step::WriteLineProtocol(
[
"m,tag=row1 float_field=64.0 450",
"m,tag=row1 int_field=64 550",
"m,tag=row1 \
float_field=61.0,int_field=22,uint_field=25u,\
string_field=\"foo\",bool_field=t 500",
"m,tag=row1 \
float_field=62.0,int_field=21,uint_field=30u,\
string_field=\"ba\",bool_field=f 200",
"m,tag=row1 \
float_field=63.0,int_field=20,uint_field=35u,\
string_field=\"baz\",bool_field=f 300",
"m,tag=row1 \
float_field=64.0,int_field=19,uint_field=20u,\
string_field=\"bar\",bool_field=t 400",
"m,tag=row1 \
float_field=65.0,int_field=18,uint_field=40u,\
string_field=\"fruz\",bool_field=f 100",
"m,tag=row1 \
float_field=66.0,int_field=17,uint_field=10u,\
string_field=\"faa\",bool_field=t 600",
]
.join("\n"),
)],
),
(
"ManyFieldsSeveralChunks",
vec![
Step::RecordNumParquetFiles,
// c1: parquet stage
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston temp=70.4 50",
// duplicate with a row in c4 and will be removed
"h2o,state=MA,city=Boston other_temp=70.4 250",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// c2: parquet stage & overlaps with c1
Step::WriteLineProtocol("h2o,state=CA,city=Andover other_temp=72.4 150".into()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// c3: parquet stage & doesn't overlap with any
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston temp=80.7 350",
"h2o,state=MA,city=Boston other_temp=68.2 450",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
// c4: parquet stage & overlap with c1
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston temp=88.6 230",
// duplicate with a row in c1 but more
// recent => this row is kept
"h2o,state=MA,city=Boston other_temp=80 250",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
// c5: ingester stage & doesn't overlap with any
Step::WriteLineProtocol("h2o,state=CA,city=Andover temp=67.3 500".into()),
],
),
(
"OneMeasurementRealisticTimes",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"cpu,region=west user=23.2 1626809330000000000",
"cpu,region=west user=21.0 1626809430000000000",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"TwoChunksMissingColumns",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol("table,tag1=a,tag2=b field1=10,field2=11 100".into()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
Step::WriteLineProtocol("table,tag1=a,tag3=c field1=20,field3=22 200".into()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
])
});

View File

@ -23,11 +23,14 @@ once_cell = { version = "1.17", features = ["parking_lot"] }
parking_lot = "0.12"
prost = "0.11"
rand = "0.8.3"
regex = "1.7.0"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
snafu = "0.7"
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] }
tempfile = "3.1.0"
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
tokio = { version = "1.24", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util = "0.7"
tonic = "0.8"
uuid = "1"
workspace-hack = { path = "../workspace-hack"}

View File

@ -179,6 +179,8 @@ impl TestConfig {
)
.with_existing_object_store(ingester_config)
.with_env("INFLUXDB_IOX_RPC_MODE", "2")
// Hard code query threads so query plans do not vary based on environment
.with_env("INFLUXDB_IOX_NUM_QUERY_THREADS", "4")
}
/// Create a minimal all in one configuration

View File

@ -9,7 +9,7 @@ use generated_types::{
};
use prost::Message;
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
/// Helps create and send influxrpc / gRPC requests to IOx
pub struct GrpcRequestBuilder {
read_source: Option<generated_types::google::protobuf::Any>,

View File

@ -13,6 +13,7 @@ mod grpc;
mod mini_cluster;
mod server_fixture;
mod server_type;
mod snapshot_comparison;
mod steps;
mod udp_listener;

View File

@ -0,0 +1,570 @@
use crate::{run_sql, MiniCluster};
use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch};
use regex::{Captures, Regex};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::HashMap,
fs,
path::{Path, PathBuf},
};
use uuid::Uuid;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Could not read case file '{:?}': {}", path, source))]
ReadingCaseFile {
path: PathBuf,
source: std::io::Error,
},
#[snafu(context(false))]
MakingOutputPath { source: OutputPathError },
#[snafu(display("Could not write to output file '{:?}': {}", output_path, source))]
WritingToOutputFile {
output_path: PathBuf,
source: std::io::Error,
},
#[snafu(display("Could not read expected file '{:?}': {}", path, source))]
ReadingExpectedFile {
path: PathBuf,
source: std::io::Error,
},
#[snafu(display(
"Contents of output '{:?}' does not match contents of expected '{:?}'",
output_path,
expected_path,
))]
OutputMismatch {
output_path: PathBuf,
expected_path: PathBuf,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub async fn run(
cluster: &mut MiniCluster,
input_path: PathBuf,
setup_name: String,
contents: String,
) -> Result<()> {
// create output and expected output
let output_path = make_output_path(&input_path)?;
let expected_path = input_path.with_extension("expected");
println!("Running case in {:?}", input_path);
println!(" writing output to {:?}", output_path);
println!(" expected output in {:?}", expected_path);
println!("Processing contents:\n{}", contents);
let queries = TestQueries::from_lines(contents.lines());
let mut output = vec![];
output.push(format!("-- Test Setup: {setup_name}"));
for q in queries.iter() {
output.push(format!("-- SQL: {}", q.sql()));
if q.sorted_compare() {
output.push("-- Results After Sorting".into())
}
if q.normalized_uuids() {
output.push("-- Results After Normalizing UUIDs".into())
}
if q.normalized_metrics() {
output.push("-- Results After Normalizing Metrics".into())
}
let results = run_query(cluster, q).await?;
output.extend(results);
}
fs::write(&output_path, output.join("\n")).context(WritingToOutputFileSnafu {
output_path: &output_path,
})?;
// Now, compare to expected results
let expected_data = fs::read_to_string(&expected_path)
.context(ReadingExpectedFileSnafu { path: &input_path })?;
let expected_contents: Vec<_> = expected_data.lines().map(|s| s.to_string()).collect();
if expected_contents != output {
let expected_path = make_absolute(&expected_path);
let output_path = make_absolute(&output_path);
println!("Expected output does not match actual output");
println!(" expected output in {:?}", expected_path);
println!(" actual output in {:?}", output_path);
println!("Possibly helpful commands:");
println!(" # See diff");
println!(" diff -du {:?} {:?}", expected_path, output_path);
println!(" # Update expected");
println!(" cp -f {:?} {:?}", output_path, expected_path);
OutputMismatchSnafu {
output_path,
expected_path,
}
.fail()
} else {
Ok(())
}
}
#[derive(Debug, Snafu)]
pub enum OutputPathError {
#[snafu(display("Input path has no file stem: '{:?}'", path))]
NoFileStem { path: PathBuf },
#[snafu(display("Input path has no parent?!: '{:?}'", path))]
NoParent { path: PathBuf },
}
/// Return output path for input path.
///
/// This converts `some/prefix/in/foo.sql` (or other file extensions) to `some/prefix/out/foo.out`.
fn make_output_path(input: &Path) -> Result<PathBuf, OutputPathError> {
let stem = input.file_stem().context(NoFileStemSnafu { path: input })?;
// go two levels up (from file to dir, from dir to parent dir)
let parent = input.parent().context(NoParentSnafu { path: input })?;
let parent = parent.parent().context(NoParentSnafu { path: parent })?;
let mut out = parent.to_path_buf();
// go one level down (from parent dir to out-dir)
out.push("out");
// set file name and ext
out.push(stem);
out.set_extension("out");
Ok(out)
}
/// Return the absolute path to `path`, regardless of if it exists on the local filesystem
fn make_absolute(path: &Path) -> PathBuf {
let mut absolute = std::env::current_dir().expect("cannot get current working directory");
absolute.extend(path);
absolute
}
async fn run_query(cluster: &MiniCluster, query: &Query) -> Result<Vec<String>> {
let sql = query.sql();
let mut results = run_sql(
sql,
cluster.namespace(),
cluster.querier().querier_grpc_connection(),
)
.await;
// compare against sorted results, if requested
if query.sorted_compare() && !results.is_empty() {
let schema = results[0].schema();
let batch =
arrow::compute::concat_batches(&schema, &results).expect("concatenating batches");
results = vec![sort_record_batch(batch)];
}
let mut current_results = pretty_format_batches(&results)
.unwrap()
.trim()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>();
// normalize UUIDs, if requested
if query.normalized_uuids() {
let regex_uuid = Regex::new("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
.expect("UUID regex");
let regex_dirs = Regex::new(r#"[0-9]+/[0-9]+/[0-9]+/[0-9+]"#).expect("directory regex");
let mut seen: HashMap<String, u128> = HashMap::new();
current_results = current_results
.into_iter()
.map(|s| {
let s = regex_uuid
.replace_all(&s, |s: &Captures| {
let next = seen.len() as u128;
Uuid::from_u128(
*seen
.entry(s.get(0).unwrap().as_str().to_owned())
.or_insert(next),
)
.to_string()
})
.to_string();
regex_dirs.replace_all(&s, "1/1/1/1").to_string()
})
.collect();
}
// normalize metrics, if requested
if query.normalized_metrics() {
// Parse regex once and apply to all rows. See description around the `replace...` calls on
// why/how the regexes are used.
let regex_metrics = Regex::new(r#"metrics=\[([^\]]*)\]"#).expect("metrics regex");
let regex_timing = Regex::new(r#"[0-9]+(\.[0-9]+)?.s"#).expect("timing regex");
let regex_linesep = Regex::new(r#"[+-]{6,}"#).expect("linesep regex");
let regex_col = Regex::new(r#"\s+\|"#).expect("col regex");
current_results = current_results
.into_iter()
.map(|s| {
// Replace timings with fixed value, e.g.:
//
// `1s` -> `1.234ms`
// `1.2ms` -> `1.234ms`
// `10.2μs` -> `1.234ms`
let s = regex_timing.replace_all(&s, "1.234ms");
// Replace table row separators of flexible width with fixed with. This is required
// because the original timing values may differ in "printed width", so the table
// cells have different widths and hence the separators / borders. E.g.:
//
// `+--+--+` -> `----------`
// `+--+------+` -> `----------`
//
// Note that we're kinda inexact with our regex here, but it gets the job done.
let s = regex_linesep.replace_all(&s, "----------");
// Similar to the row separator issue above, the table columns are right-padded
// with spaces. Due to the different "printed width" of the timing values, we need
// to normalize this padding as well. E.g.:
//
// ` |` -> ` |`
// ` |` -> ` |`
let s = regex_col.replace_all(&s, " |");
// Metrics are currently ordered by value (not by key), so different timings may
// reorder them. We "parse" the list and normalize the sorting. E.g.:
//
// `metrics=[]` => `metrics=[]`
// `metrics=[foo=1, bar=2]` => `metrics=[bar=2, foo=1]`
// `metrics=[foo=2, bar=1]` => `metrics=[bar=1, foo=2]`
regex_metrics
.replace_all(&s, |c: &Captures| {
let mut metrics: Vec<_> = c[1].split(", ").collect();
metrics.sort();
format!("metrics=[{}]", metrics.join(", "))
})
.to_string()
})
.collect();
}
Ok(current_results)
}
/// A query to run with optional annotations
#[derive(Debug, PartialEq, Eq, Default)]
pub struct Query {
/// If true, results are sorted first prior to comparison, meaning that differences in the
/// output order compared with expected order do not cause a diff
sorted_compare: bool,
/// If true, replace UUIDs with static placeholders.
normalized_uuids: bool,
/// If true, normalize timings in queries by replacing them with static placeholders.
normalized_metrics: bool,
/// The SQL string
sql: String,
}
impl Query {
#[cfg(test)]
fn new(sql: impl Into<String>) -> Self {
let sql = sql.into();
Self {
sorted_compare: false,
normalized_uuids: false,
normalized_metrics: false,
sql,
}
}
#[cfg(test)]
fn with_sorted_compare(mut self) -> Self {
self.sorted_compare = true;
self
}
/// Get a reference to the query's sql.
pub fn sql(&self) -> &str {
self.sql.as_ref()
}
/// Get the query's sorted compare.
pub fn sorted_compare(&self) -> bool {
self.sorted_compare
}
/// Get queries normalized UUID
pub fn normalized_uuids(&self) -> bool {
self.normalized_uuids
}
/// Use normalized timing values
pub fn normalized_metrics(&self) -> bool {
self.normalized_metrics
}
}
#[derive(Debug, Default)]
struct QueryBuilder {
query: Query,
}
impl QueryBuilder {
fn new() -> Self {
Default::default()
}
fn push_str(&mut self, s: &str) {
self.query.sql.push_str(s)
}
fn push(&mut self, c: char) {
self.query.sql.push(c)
}
fn sorted_compare(&mut self) {
self.query.sorted_compare = true;
}
fn normalized_uuids(&mut self) {
self.query.normalized_uuids = true;
}
fn normalize_metrics(&mut self) {
self.query.normalized_metrics = true;
}
fn is_empty(&self) -> bool {
self.query.sql.is_empty()
}
/// Creates a Query and resets this builder to default
fn build_and_reset(&mut self) -> Option<Query> {
(!self.is_empty()).then(|| std::mem::take(&mut self.query))
}
}
/// Poor man's parser to find all the SQL queries in an input file
#[derive(Debug, PartialEq, Eq)]
pub struct TestQueries {
queries: Vec<Query>,
}
impl TestQueries {
/// find all queries (more or less a fancy split on `;`
pub fn from_lines<I, S>(lines: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut queries = vec![];
let mut builder = QueryBuilder::new();
lines.into_iter().for_each(|line| {
let line = line.as_ref().trim();
const COMPARE_STR: &str = "-- IOX_COMPARE: ";
if line.starts_with(COMPARE_STR) {
let (_, options) = line.split_at(COMPARE_STR.len());
for option in options.split(',') {
let option = option.trim();
match option {
"sorted" => {
builder.sorted_compare();
}
"uuid" => {
builder.normalized_uuids();
}
"metrics" => {
builder.normalize_metrics();
}
_ => {}
}
}
}
if line.starts_with("--") {
return;
}
if line.is_empty() {
return;
}
// replace newlines
if !builder.is_empty() {
builder.push(' ');
}
builder.push_str(line);
// declare queries when we see a semicolon at the end of the line
if line.ends_with(';') {
if let Some(q) = builder.build_and_reset() {
queries.push(q);
}
}
});
if let Some(q) = builder.build_and_reset() {
queries.push(q);
}
Self { queries }
}
// Get an iterator over the queries
pub fn iter(&self) -> impl Iterator<Item = &Query> {
self.queries.iter()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_parse_queries() {
let input = r#"
-- This is a test
select * from foo;
-- another comment
select * from bar;
-- This query has been commented out and should not be seen
-- select * from baz;
"#;
let queries = TestQueries::from_lines(input.split('\n'));
assert_eq!(
queries,
TestQueries {
queries: vec![
Query::new("select * from foo;"),
Query::new("select * from bar;"),
]
}
)
}
#[test]
fn test_parse_queries_no_ending_semi() {
let input = r#"
select * from foo;
-- no ending semi colon
select * from bar
"#;
let queries = TestQueries::from_lines(input.split('\n'));
assert_eq!(
queries,
TestQueries {
queries: vec![
Query::new("select * from foo;"),
Query::new("select * from bar")
]
}
)
}
#[test]
fn test_parse_queries_mulit_line() {
let input = r#"
select
*
from
foo;
select * from bar;
"#;
let queries = TestQueries::from_lines(input.split('\n'));
assert_eq!(
queries,
TestQueries {
queries: vec![
Query::new("select * from foo;"),
Query::new("select * from bar;"),
]
}
)
}
#[test]
fn test_parse_queries_empty() {
let input = r#"
-- This is a test
-- another comment
"#;
let queries = TestQueries::from_lines(input.split('\n'));
assert_eq!(queries, TestQueries { queries: vec![] })
}
#[test]
fn test_parse_queries_sorted_compare() {
let input = r#"
select * from foo;
-- The second query should be compared to expected after sorting
-- IOX_COMPARE: sorted
select * from bar;
-- Since this query is not annotated, it should not use exected sorted
select * from baz;
select * from baz2;
-- IOX_COMPARE: sorted
select * from waz;
-- (But the compare should work subsequently)
"#;
let queries = TestQueries::from_lines(input.split('\n'));
assert_eq!(
queries,
TestQueries {
queries: vec![
Query::new("select * from foo;"),
Query::new("select * from bar;").with_sorted_compare(),
Query::new("select * from baz;"),
Query::new("select * from baz2;"),
Query::new("select * from waz;").with_sorted_compare(),
]
}
)
}
#[test]
fn test_parse_queries_sorted_compare_after() {
let input = r#"
select * from foo;
-- IOX_COMPARE: sorted
"#;
let queries = TestQueries::from_lines(input.split('\n'));
assert_eq!(
queries,
TestQueries {
queries: vec![Query::new("select * from foo;")]
}
)
}
#[test]
fn test_parse_queries_sorted_compare_not_match_ignored() {
let input = r#"
-- IOX_COMPARE: something_else
select * from foo;
"#;
let queries = TestQueries::from_lines(input.split('\n'));
assert_eq!(
queries,
TestQueries {
queries: vec![Query::new("select * from foo;")]
}
)
}
}

View File

@ -1,22 +1,23 @@
use crate::{
check_flight_error, get_write_token, run_influxql, run_sql, token_is_persisted,
try_run_influxql, try_run_sql, wait_for_persisted, wait_for_readable, MiniCluster,
check_flight_error, get_write_token, run_influxql, run_sql, snapshot_comparison,
token_is_persisted, try_run_influxql, try_run_sql, wait_for_persisted, wait_for_readable,
MiniCluster,
};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
use futures::future::BoxFuture;
use http::StatusCode;
use observability_deps::tracing::info;
use std::time::Duration;
use std::{path::PathBuf, time::Duration};
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
/// Test harness for end to end tests that are comprised of several steps
pub struct StepTest<'a> {
pub struct StepTest<'a, S> {
cluster: &'a mut MiniCluster,
/// The test steps to perform
steps: Vec<Step>,
steps: Box<dyn Iterator<Item = S> + Send + Sync + 'a>,
}
/// The test state that is passed to custom steps
@ -132,10 +133,10 @@ impl<'a> StepTestState<'a> {
/// }.boxed()
/// });
/// ```
pub type FCustom = Box<dyn for<'b> FnOnce(&'b mut StepTestState) -> BoxFuture<'b, ()>>;
pub type FCustom = Box<dyn for<'b> Fn(&'b mut StepTestState) -> BoxFuture<'b, ()> + Send + Sync>;
/// Function to do custom validation on metrics. Expected to panic on validation failure.
pub type MetricsValidationFn = Box<dyn Fn(&mut StepTestState, String)>;
pub type MetricsValidationFn = Box<dyn Fn(&mut StepTestState, String) + Send + Sync>;
/// Possible test steps that a test can perform
pub enum Step {
@ -184,6 +185,14 @@ pub enum Step {
expected: Vec<&'static str>,
},
/// Read the SQL queries in the specified file and verify that the results match the expected
/// results in the corresponding expected file
QueryAndCompare {
input_path: PathBuf,
setup_name: String,
contents: String,
},
/// Run a SQL query that's expected to fail using the FlightSQL interface and verify that the
/// request returns the expected error code and message
QueryExpectingError {
@ -200,7 +209,7 @@ pub enum Step {
/// failure.
VerifiedQuery {
sql: String,
verify: Box<dyn Fn(Vec<RecordBatch>)>,
verify: Box<dyn Fn(Vec<RecordBatch>) + Send + Sync>,
},
/// Run an InfluxQL query using the FlightSQL interface and verify that the
@ -231,11 +240,27 @@ pub enum Step {
Custom(FCustom),
}
impl<'a> StepTest<'a> {
impl AsRef<Step> for Step {
fn as_ref(&self) -> &Step {
self
}
}
impl<'a, S> StepTest<'a, S>
where
S: AsRef<Step>,
{
/// Create a new test that runs each `step`, in sequence, against
/// `cluster` panic'ing if any step fails
pub fn new(cluster: &'a mut MiniCluster, steps: Vec<Step>) -> Self {
Self { cluster, steps }
pub fn new<I>(cluster: &'a mut MiniCluster, steps: I) -> Self
where
I: IntoIterator<Item = S> + Send + Sync + 'a,
<I as IntoIterator>::IntoIter: Send + Sync,
{
Self {
cluster,
steps: Box::new(steps.into_iter()),
}
}
/// run the test.
@ -248,9 +273,9 @@ impl<'a> StepTest<'a> {
num_parquet_files: Default::default(),
};
for (i, step) in steps.into_iter().enumerate() {
for (i, step) in steps.enumerate() {
info!("**** Begin step {} *****", i);
match step {
match step.as_ref() {
Step::WriteLineProtocol(line_protocol) => {
info!(
"====Begin writing line protocol to v2 HTTP API:\n{}",
@ -292,7 +317,7 @@ impl<'a> StepTest<'a> {
Step::WaitForPersisted2 { expected_increase } => {
info!("====Begin waiting for a change in the number of Parquet files");
state
.wait_for_num_parquet_file_change(expected_increase)
.wait_for_num_parquet_file_change(*expected_increase)
.await;
info!("====Done waiting for a change in the number of Parquet files");
}
@ -342,9 +367,25 @@ impl<'a> StepTest<'a> {
state.cluster.querier().querier_grpc_connection(),
)
.await;
assert_batches_sorted_eq!(&expected, &batches);
assert_batches_sorted_eq!(expected, &batches);
info!("====Done running");
}
Step::QueryAndCompare {
input_path,
setup_name,
contents,
} => {
info!("====Begin running queries in file {}", input_path.display());
snapshot_comparison::run(
state.cluster,
input_path.into(),
setup_name.into(),
contents.into(),
)
.await
.unwrap();
info!("====Done running queries");
}
Step::QueryExpectingError {
sql,
expected_error_code,
@ -360,7 +401,7 @@ impl<'a> StepTest<'a> {
.await
.unwrap_err();
check_flight_error(err, expected_error_code, Some(&expected_message));
check_flight_error(err, *expected_error_code, Some(expected_message));
info!("====Done running");
}
@ -385,7 +426,7 @@ impl<'a> StepTest<'a> {
state.cluster.querier().querier_grpc_connection(),
)
.await;
assert_batches_sorted_eq!(&expected, &batches);
assert_batches_sorted_eq!(expected, &batches);
info!("====Done running");
}
Step::InfluxQLExpectingError {
@ -406,7 +447,7 @@ impl<'a> StepTest<'a> {
.await
.unwrap_err();
check_flight_error(err, expected_error_code, Some(&expected_message));
check_flight_error(err, *expected_error_code, Some(expected_message));
info!("====Done running");
}