Merge pull request #6626 from influxdata/cn/test-old-with-new

test: Add old ingester and old parquet states into the query_tests2 framework
pull/24376/head
kodiakhq[bot] 2023-01-20 15:27:43 +00:00 committed by GitHub
commit ec7c7634e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 290 additions and 193 deletions

23
Cargo.lock generated
View File

@ -4599,29 +4599,6 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "query_tests2"
version = "0.1.0"
dependencies = [
"async-trait",
"dotenvy",
"generated_types",
"ingester2",
"iox_catalog",
"iox_query",
"metric",
"object_store",
"observability_deps",
"once_cell",
"parquet_file",
"snafu",
"tempfile",
"test_helpers",
"test_helpers_end_to_end",
"tokio",
"workspace-hack",
]
[[package]]
name = "quick-error"
version = "2.0.1"

View File

@ -58,7 +58,6 @@ members = [
"querier",
"query_functions",
"query_tests",
"query_tests2",
"router",
"schema",
"service_common",

View File

@ -0,0 +1,3 @@
//! Tests of various queries for data in various states.
mod query_tests2;

View File

@ -1,4 +1,4 @@
use crate::framework::{ChunkStage, TestCase};
use super::framework::{ChunkStage, TestCase};
// TODO: Generate these tests from the files on disk.
// See <https://github.com/influxdata/influxdb_iox/issues/6610>.

View File

@ -1,4 +1,4 @@
//! The common test code that drives the tests in the [`cases`][crate::cases] module.
//! The common test code that drives the tests in the [`cases`][super::cases] module.
use observability_deps::tracing::*;
use snafu::{OptionExt, Snafu};
@ -12,12 +12,13 @@ use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTes
/// ingester that persists everything as fast as possible.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChunkStage {
/// Set up all chunks in the ingester; never persist automatically. The chunks are accessible
/// from the ingester.
/// Set up all chunks in the ingester set up to go through the write buffer (Kafka). This is
/// temporary until the switch to the Kafkaless architecture is complete.
Ingester,
/// Set up all chunks persisted in Parquet, as fast as possible. The chunks are accessible from
/// and managed by the querier.
/// Set up all chunks persisted in Parquet, as fast as possible, through the old ingester and
/// write buffer (Kafka). This is temporary until the switch to the Kafkaless architecture is
/// complete.
Parquet,
/// Run tests against all of the previous states in this enum.
@ -38,6 +39,17 @@ impl IntoIterator for ChunkStage {
}
}
/// Which architecture is being used in this test run. This enum and running the tests twice is temporary until the Kafkaful architecture is retired.
#[derive(Debug, Copy, Clone)]
enum IoxArchitecture {
/// Use the "standard" MiniCluster that uses ingester, router, querier, compactor with a write
/// buffer (aka Kafka). This is slated for retirement soon.
Kafkaful,
/// Use the "RPC write"/"version 2" MiniCluster that uses ingester2, router2, querier2 without
/// a write buffer. This will soon be the only architecture.
Kafkaless,
}
/// Struct to orchestrate the test setup and assertions based on the `.sql` file specified in the
/// `input` field and the chunk stages specified in `chunk_stage`.
#[derive(Debug)]
@ -50,49 +62,77 @@ impl TestCase {
pub async fn run(&self) {
let database_url = maybe_skip_integration!();
for chunk_stage in self.chunk_stage {
info!("Using ChunkStage::{chunk_stage:?}");
for arch in [IoxArchitecture::Kafkaful, IoxArchitecture::Kafkaless] {
for chunk_stage in self.chunk_stage {
info!("Using IoxArchitecture::{arch:?} and ChunkStage::{chunk_stage:?}");
// Setup that differs by chunk stage. These need to be non-shared clusters; if they're
// shared, then the tests that run in parallel and persist at particular times mess
// with each other because persistence applies to everything in the ingester.
let mut cluster = match chunk_stage {
ChunkStage::Ingester => {
MiniCluster::create_non_shared2_never_persist(database_url.clone()).await
}
ChunkStage::Parquet => MiniCluster::create_non_shared2(database_url.clone()).await,
ChunkStage::All => unreachable!("See `impl IntoIterator for ChunkStage`"),
};
// Setup that differs by architecture and chunk stage. These need to be non-shared
// clusters; if they're shared, then the tests that run in parallel and persist at
// particular times mess with each other because persistence applies to everything in
// the ingester.
let mut cluster = match (arch, chunk_stage) {
(IoxArchitecture::Kafkaful, ChunkStage::Ingester) => {
MiniCluster::create_non_shared_standard_never_persist(database_url.clone())
.await
}
(IoxArchitecture::Kafkaful, ChunkStage::Parquet) => {
MiniCluster::create_non_shared_standard(database_url.clone()).await
}
(IoxArchitecture::Kafkaless, ChunkStage::Ingester) => {
MiniCluster::create_non_shared2_never_persist(database_url.clone()).await
}
(IoxArchitecture::Kafkaless, ChunkStage::Parquet) => {
MiniCluster::create_non_shared2(database_url.clone()).await
}
(_, ChunkStage::All) => unreachable!("See `impl IntoIterator for ChunkStage`"),
};
// TEMPORARY: look in `query_tests` for all case files; change this if we decide to
// move them
let given_input_path: PathBuf = self.input.into();
let mut input_path = PathBuf::from("../query_tests/");
input_path.push(given_input_path);
let contents = fs::read_to_string(&input_path).unwrap_or_else(|_| {
panic!("Could not read test case file `{}`", input_path.display())
});
// TEMPORARY: look in `query_tests` for all case files; change this if we decide to
// move them
let given_input_path: PathBuf = self.input.into();
let mut input_path = PathBuf::from("../query_tests/");
input_path.push(given_input_path);
let contents = fs::read_to_string(&input_path).unwrap_or_else(|_| {
panic!("Could not read test case file `{}`", input_path.display())
});
let setup =
TestSetup::try_from_lines(contents.lines()).expect("Could not get TestSetup");
let setup_name = setup.setup_name();
info!("Using setup {setup_name}");
let setup =
TestSetup::try_from_lines(contents.lines()).expect("Could not get TestSetup");
let setup_name = setup.setup_name();
info!("Using setup {setup_name}");
// Run the setup steps and the QueryAndCompare step
let setup_steps = crate::setups::SETUPS
.get(setup_name)
.unwrap_or_else(|| panic!("Could not find setup with key `{setup_name}`"))
.iter();
let test_step = Step::QueryAndCompare {
input_path,
setup_name: setup_name.into(),
contents,
};
// Run the setup steps and the QueryAndCompare step
let setup_steps = super::setups::SETUPS
.get(setup_name)
.unwrap_or_else(|| panic!("Could not find setup with key `{setup_name}`"))
.iter()
// When we've switched over to the Kafkaless architecture, this map can be
// removed.
.flat_map(|step| match (arch, step) {
// If we're using the old architecture and the test steps include
// `WaitForPersist2`, swap it with `WaitForPersist` instead.
(IoxArchitecture::Kafkaful, Step::WaitForPersisted2 { .. }) => {
vec![&Step::WaitForPersisted]
}
// If we're using the old architecture and the test steps include
// `WriteLineProtocol`, wait for the data to be readable after writing.
(IoxArchitecture::Kafkaful, Step::WriteLineProtocol { .. }) => {
vec![step, &Step::WaitForReadable]
}
(_, other) => vec![other],
});
// Run the tests
StepTest::new(&mut cluster, setup_steps.chain(std::iter::once(&test_step)))
.run()
.await;
let test_step = Step::QueryAndCompare {
input_path,
setup_name: setup_name.into(),
contents,
};
// Run the tests
StepTest::new(&mut cluster, setup_steps.chain(std::iter::once(&test_step)))
.run()
.await;
}
}
}
}

View File

@ -0,0 +1,3 @@
mod cases;
mod framework;
mod setups;

View File

@ -29,7 +29,7 @@ use write_summary::ShardProgress;
use crate::{
data::IngesterData,
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleHandleImpl, LifecycleManager},
poison::PoisonCabinet,
querier_handler::{prepare_data_to_querier, IngesterQueryResponse},
stream_handler::{
@ -75,6 +75,9 @@ pub trait IngestHandler: Send + Sync {
/// Shut down background workers.
fn shutdown(&self);
/// Persist everything immediately.
async fn persist_all(&self);
}
/// A [`JoinHandle`] that can be cloned
@ -96,6 +99,8 @@ pub struct IngestHandlerImpl<T = SystemProvider> {
/// Future that resolves when the background worker exits
join_handles: Vec<(String, SharedJoinHandle)>,
lifecycle_handle: LifecycleHandleImpl,
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
@ -267,6 +272,7 @@ impl IngestHandlerImpl {
data,
topic,
join_handles,
lifecycle_handle,
shutdown,
query_duration_success,
query_duration_error_not_found,
@ -371,6 +377,13 @@ impl IngestHandler for IngestHandlerImpl {
) -> BTreeMap<ShardIndex, ShardProgress> {
self.data.progresses(shard_indexes).await
}
/// Persist everything immediately. This is called by the `PersistService` gRPC API in tests
/// asserting on persisted data, and should probably not be used in production. May behave in
/// unexpected ways if used concurrently with writes or lifecycle persists.
async fn persist_all(&self) {
self.lifecycle_handle.state.lock().persist_everything_now = true;
}
}
impl<T> Drop for IngestHandlerImpl<T> {

View File

@ -57,7 +57,7 @@ pub(crate) struct LifecycleHandleImpl {
config: Arc<LifecycleConfig>,
/// The state shared with the [`LifecycleManager`].
state: Arc<Mutex<LifecycleState>>,
pub(crate) state: Arc<Mutex<LifecycleState>>,
}
impl LifecycleHandle for LifecycleHandleImpl {
@ -224,9 +224,10 @@ impl LifecycleConfig {
}
#[derive(Default, Debug)]
struct LifecycleState {
pub(crate) struct LifecycleState {
total_bytes: usize,
partition_stats: BTreeMap<PartitionId, PartitionLifecycleStats>,
pub(crate) persist_everything_now: bool,
}
impl LifecycleState {
@ -330,19 +331,83 @@ impl LifecycleManager {
// get anything over the threshold size or age to persist
let now = self.time_provider.now();
let persist_everything_now = self.state.lock().persist_everything_now;
let (mut to_persist, mut rest): (
Vec<PartitionLifecycleStats>,
Vec<PartitionLifecycleStats>,
) = partition_stats.into_iter().partition(|s| {
//
// Log the partitions that are marked for persistence using
// consistent fields across all trigger types.
//
) = if persist_everything_now {
partition_stats.into_iter().partition(|_| true)
} else {
partition_stats.into_iter().partition(|s| {
//
// Log the partitions that are marked for persistence using
// consistent fields across all trigger types.
//
// Check if this partition's first write occurred long enough ago
// that the data is considered "old" and can be flushed.
let aged_out = match now.checked_duration_since(s.first_write) {
Some(age) if age > self.config.partition_age_threshold => {
// Check if this partition's first write occurred long enough ago
// that the data is considered "old" and can be flushed.
let aged_out = match now.checked_duration_since(s.first_write) {
Some(age) if age > self.config.partition_age_threshold => {
info!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
first_write=%s.first_write,
last_write=%s.last_write,
bytes_written=s.bytes_written,
rows_written=s.rows_written,
first_sequence_number=?s.first_sequence_number,
age=?age,
"partition is over age threshold, persisting"
);
self.persist_age_counter.inc(1);
true
}
None => {
warn!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
"unable to calculate partition age"
);
false
}
_ => false,
};
// Check if this partition's most recent write was long enough ago
// that the partition is considered "cold" and is unlikely to see
// new writes imminently.
let is_cold = match now.checked_duration_since(s.last_write) {
Some(age) if age > self.config.partition_cold_threshold => {
info!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
first_write=%s.first_write,
last_write=%s.last_write,
bytes_written=s.bytes_written,
rows_written=s.rows_written,
first_sequence_number=?s.first_sequence_number,
no_writes_for=?age,
"partition is cold, persisting"
);
self.persist_cold_counter.inc(1);
true
}
None => {
warn!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
"unable to calculate partition cold duration"
);
false
}
_ => false,
};
// If this partition contains more rows than it is permitted, flush
// it.
let exceeded_max_rows = s.rows_written >= self.config.partition_row_max;
if exceeded_max_rows {
info!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
@ -351,28 +416,15 @@ impl LifecycleManager {
bytes_written=s.bytes_written,
rows_written=s.rows_written,
first_sequence_number=?s.first_sequence_number,
age=?age,
"partition is over age threshold, persisting"
"partition is over max row, persisting"
);
self.persist_age_counter.inc(1);
true
self.persist_rows_counter.inc(1);
}
None => {
warn!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
"unable to calculate partition age"
);
false
}
_ => false,
};
// Check if this partition's most recent write was long enough ago
// that the partition is considered "cold" and is unlikely to see
// new writes imminently.
let is_cold = match now.checked_duration_since(s.last_write) {
Some(age) if age > self.config.partition_cold_threshold => {
// If the partition's in-memory buffer is larger than the configured
// maximum byte size, flush it.
let sized_out = s.bytes_written > self.config.partition_size_threshold;
if sized_out {
info!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
@ -381,59 +433,14 @@ impl LifecycleManager {
bytes_written=s.bytes_written,
rows_written=s.rows_written,
first_sequence_number=?s.first_sequence_number,
no_writes_for=?age,
"partition is cold, persisting"
"partition exceeded byte size threshold, persisting"
);
self.persist_cold_counter.inc(1);
true
self.persist_size_counter.inc(1);
}
None => {
warn!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
"unable to calculate partition cold duration"
);
false
}
_ => false,
};
// If this partition contains more rows than it is permitted, flush
// it.
let exceeded_max_rows = s.rows_written >= self.config.partition_row_max;
if exceeded_max_rows {
info!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
first_write=%s.first_write,
last_write=%s.last_write,
bytes_written=s.bytes_written,
rows_written=s.rows_written,
first_sequence_number=?s.first_sequence_number,
"partition is over max row, persisting"
);
self.persist_rows_counter.inc(1);
}
// If the partition's in-memory buffer is larger than the configured
// maximum byte size, flush it.
let sized_out = s.bytes_written > self.config.partition_size_threshold;
if sized_out {
info!(
shard_id=%s.shard_id,
partition_id=%s.partition_id,
first_write=%s.first_write,
last_write=%s.last_write,
bytes_written=s.bytes_written,
rows_written=s.rows_written,
first_sequence_number=?s.first_sequence_number,
"partition exceeded byte size threshold, persisting"
);
self.persist_size_counter.inc(1);
}
aged_out || sized_out || is_cold || exceeded_max_rows
});
aged_out || sized_out || is_cold || exceeded_max_rows
})
};
// keep track of what we'll be evicting to see what else to drop
for s in &to_persist {
@ -565,6 +572,19 @@ impl LifecycleManager {
.update_min_unpersisted_sequence_number(shard_id, min)
.await;
}
// If we persisted some data (because we're within the `if !persist_tasks.is_empty()`
// condition), and we were asked to persist everything because of an explicit call to
// the persist API (which sets `persist_everything_now` to true), then we can reset the
// `persist_everything_now` directive to `false` as it's now completed. Only resetting
// `persist_everything_now` if data has actually been persisted is important to
// eliminate a race condition in the tests where the tests would write data and call
// the persist API, but the data hadn't actually gotten to the ingester yet to be
// persisted.
if persist_everything_now {
info!("resetting persist_everything_now to false");
self.state.lock().persist_everything_now = false;
}
}
}

View File

@ -1,5 +1,6 @@
//! gRPC service implementations for `ingester`.
mod persist;
mod query;
mod rpc_write;
mod write_info;
@ -11,7 +12,10 @@ use arrow_flight::flight_service_server::{
};
use generated_types::influxdata::iox::{
catalog::v1::*,
ingester::v1::write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
ingester::v1::{
persist_service_server::{PersistService, PersistServiceServer},
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
},
};
use iox_catalog::interface::Catalog;
use service_grpc_catalog::CatalogService;
@ -70,4 +74,13 @@ impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
&self.catalog,
)))
}
/// Return a [`PersistService`] gRPC implementation.
///
/// [`PersistService`]: generated_types::influxdata::iox::ingester::v1::persist_service_server::PersistService.
pub fn persist_service(&self) -> PersistServiceServer<impl PersistService> {
PersistServiceServer::new(persist::PersistHandler::new(Arc::clone(
&self.ingest_handler,
)))
}
}

View File

@ -0,0 +1,32 @@
use crate::handler::IngestHandler;
use generated_types::influxdata::iox::ingester::v1::{
self as proto, persist_service_server::PersistService,
};
use std::sync::Arc;
use tonic::{Request, Response};
#[derive(Debug)]
pub(crate) struct PersistHandler<I: IngestHandler> {
ingest_handler: Arc<I>,
}
impl<I: IngestHandler> PersistHandler<I> {
pub fn new(ingest_handler: Arc<I>) -> Self {
Self { ingest_handler }
}
}
#[tonic::async_trait]
impl<I: IngestHandler + 'static> PersistService for PersistHandler<I> {
/// Handle the RPC request to persist immediately. This is useful in tests asserting on
/// persisted data. May behave in unexpected ways if used concurrently with writes or lifecycle
/// persists.
async fn persist(
&self,
_request: Request<proto::PersistRequest>,
) -> Result<Response<proto::PersistResponse>, tonic::Status> {
self.ingest_handler.persist_all().await;
Ok(Response::new(proto::PersistResponse {}))
}
}

View File

@ -96,6 +96,7 @@ impl<I: IngestHandler + Sync + Send + Debug + 'static> ServerType for IngesterSe
add_service!(builder, self.server.grpc().flight_service());
add_service!(builder, self.server.grpc().write_info_service());
add_service!(builder, self.server.grpc().catalog_service());
add_service!(builder, self.server.grpc().persist_service());
serve_builder!(builder);

View File

@ -1,26 +0,0 @@
[package]
name = "query_tests2"
description = "Tests of the query engine against different database configurations"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.60"
dotenvy = "0.15.6"
ingester2 = { path = "../ingester2" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
generated_types = { path = "../generated_types" }
metric = { path = "../metric" }
object_store = "0.5.2"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.17", features = ["parking_lot"] }
parquet_file = { path = "../parquet_file" }
snafu = "0.7"
tempfile = "3.1.0"
test_helpers = { path = "../test_helpers" }
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
tokio = { version = "1.22", features = ["macros", "parking_lot", "rt-multi-thread", "time"] }
workspace-hack = { path = "../workspace-hack"}

View File

@ -1,17 +0,0 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr,
clippy::future_not_send,
clippy::todo,
clippy::dbg_macro
)]
#![cfg(test)]
//! Tests of various queries for data in various states.
mod cases;
mod framework;
mod setups;

View File

@ -99,6 +99,22 @@ impl TestConfig {
.with_default_ingester_options()
}
/// Create a minimal ingester configuration, using the dsn and write buffer configuration from
/// `other`. Set the persistence options such that it will likely never persist, to be able to
/// test when data only exists in the ingester's memory.
pub fn new_ingester_never_persist(other: &TestConfig) -> Self {
Self::new(
ServerType::Ingester,
other.dsn().to_owned(),
other.catalog_schema_name(),
)
.with_existing_write_buffer(other)
.with_existing_object_store(other)
.with_default_ingester_options()
// No test writes this much data, so with this threshold, the ingester will never persist.
.with_ingester_persist_memory_threshold(1_000_000)
}
/// Create a minimal ingester2 configuration, using the dsn configuration specified. Set the
/// persistence options such that it will persist as quickly as possible.
pub fn new_ingester2(dsn: impl Into<String>) -> Self {
@ -167,6 +183,8 @@ impl TestConfig {
)
.with_existing_object_store(ingester_config)
.with_shard_to_ingesters_mapping("{\"ignoreAll\": true}")
// Hard code query threads so query plans do not vary based on environment
.with_env("INFLUXDB_IOX_NUM_QUERY_THREADS", "4")
}
/// Create a minimal querier2 configuration from the specified ingester2 configuration, using

View File

@ -241,6 +241,27 @@ impl MiniCluster {
.with_compactor_config(compactor_config)
}
/// Create a non-shared "standard" MiniCluster that has a router, ingester set to essentially
/// never persist data (except on-demand), and querier. Save config for a compactor, but the
/// compactor should be run on-demand in tests using `compactor run-once` rather than using
/// `run compactor`.
pub async fn create_non_shared_standard_never_persist(database_url: String) -> Self {
let router_config = TestConfig::new_router(&database_url);
let ingester_config = TestConfig::new_ingester_never_persist(&router_config);
let querier_config = TestConfig::new_querier(&ingester_config);
let compactor_config = TestConfig::new_compactor(&ingester_config);
// Set up the cluster ====================================
Self::new()
.with_router(router_config)
.await
.with_ingester(ingester_config)
.await
.with_querier(querier_config)
.await
.with_compactor_config(compactor_config)
}
/// Create a non-shared "version 2" "standard" MiniCluster that has a router, ingester, querier.
pub async fn create_non_shared2(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester2(&database_url);