From d8bd185fa0742b1096fe3e3908e1209e5ed50634 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 20 Mar 2023 14:42:22 -0400 Subject: [PATCH] test: Add compactor shard e2e tests --- .../tests/end_to_end_cases/compactor.rs | 240 ++++++++++++++++++ influxdb_iox/tests/end_to_end_cases/mod.rs | 1 + test_helpers_end_to_end/src/config.rs | 6 + test_helpers_end_to_end/src/mini_cluster.rs | 14 +- test_helpers_end_to_end/src/steps.rs | 18 +- 5 files changed, 272 insertions(+), 7 deletions(-) create mode 100644 influxdb_iox/tests/end_to_end_cases/compactor.rs diff --git a/influxdb_iox/tests/end_to_end_cases/compactor.rs b/influxdb_iox/tests/end_to_end_cases/compactor.rs new file mode 100644 index 0000000000..d703e4833d --- /dev/null +++ b/influxdb_iox/tests/end_to_end_cases/compactor.rs @@ -0,0 +1,240 @@ +use assert_cmd::Command; +use futures::FutureExt; +use predicates::prelude::*; +use test_helpers_end_to_end::{ + maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig, +}; + +#[tokio::test] +async fn shard_id_greater_than_num_shards_is_invalid() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let ingester_config = TestConfig::new_ingester2(&database_url); + let router_config = TestConfig::new_router2(&ingester_config); + let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1); + let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards( + 2, // num shards 2 + 100, // and shard id > num shards; not valid + ); + + let mut cluster = MiniCluster::new() + .with_router(router_config) + .await + .with_ingester(ingester_config) + .await + .with_querier(querier_config) + .await + .with_compactor_config(compactor_config); + + StepTest::new( + &mut cluster, + vec![Step::CompactExpectingError { + expected_message: "shard_id out of range".into(), + }], + ) + .run() + .await +} + +#[test] +fn shard_id_without_num_shards_is_invalid() { + // This test doesn't use MiniCluster/TestConfig/with_compactor_shards because those exist to + // enable *correct* configuration of components, and this test is testing an *invalid* + // configuration of the compactor command. + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("run") + .arg("compactor2") + .env("INFLUXDB_IOX_COMPACTION_SHARD_ID", "1") // only provide shard ID + .env("INFLUXDB_IOX_RPC_MODE", "2") + .assert() + .failure() + .stderr(predicate::str::contains( + "must provide or not provide shard ID and count", + )); +} + +#[test] +fn num_shards_without_shard_id_is_invalid() { + // This test doesn't use MiniCluster/TestConfig/with_compactor_shards because those exist to + // enable *correct* configuration of components, and this test is testing an *invalid* + // configuration of the compactor command. + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("run") + .arg("compactor2") + .env("INFLUXDB_IOX_COMPACTION_SHARD_COUNT", "1") // only provide shard count + .env("INFLUXDB_IOX_RPC_MODE", "2") + .assert() + .failure() + .stderr(predicate::str::contains( + "must provide or not provide shard ID and count", + )); +} + +#[tokio::test] +async fn sharded_compactor_0_always_compacts_partition_1() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + // The test below assumes a specific partition id, and it needs to customize the compactor + // config, so use a non-shared minicluster here. + let ingester_config = TestConfig::new_ingester2(&database_url); + let router_config = TestConfig::new_router2(&ingester_config); + let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1); + let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards( + 2, // num shards 2 + 0, // shard ID 0, which will always get partition ID 1 + ); + + let mut cluster = MiniCluster::new() + .with_router(router_config) + .await + .with_ingester(ingester_config) + .await + .with_querier(querier_config) + .await + .with_compactor_config(compactor_config); + + StepTest::new( + &mut cluster, + vec![ + Step::RecordNumParquetFiles, + Step::WriteLineProtocol(String::from( + "my_awesome_table,tag1=A,tag2=B val=42i 123456", + )), + // wait for partitions to be persisted + Step::WaitForPersisted2 { + expected_increase: 1, + }, + // Run the compactor + Step::Compact, + // Run the 'remote partition' command + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let router_addr = state.cluster().router().router_grpc_base().to_string(); + + // Validate the output of the remote partition CLI command + // + // Looks something like: + // { + // "id": "1", + // "namespaceId": 1, + // "tableId": 1, + // "partitionId": "1", + // "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd", + // "minTime": "123456", + // "maxTime": "123456", + // "fileSizeBytes": "2029", + // "rowCount": "1", + // "compactionLevel": "1", + // "createdAt": "1650019674289347000" + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&router_addr) + .arg("remote") + .arg("partition") + .arg("show") + .arg("1") + .assert() + .success() + .stdout( + // Important parts are the expected partition ID + predicate::str::contains(r#""partitionId": "1","#) + // and compaction level + .and(predicate::str::contains(r#""compactionLevel": 1"#)), + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + +#[tokio::test] +async fn sharded_compactor_1_never_compacts_partition_1() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + // The test below assumes a specific partition id, and it needs to customize the compactor + // config, so use a non-shared minicluster here. + let ingester_config = TestConfig::new_ingester2(&database_url); + let router_config = TestConfig::new_router2(&ingester_config); + let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1); + let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards( + 2, // num shards 2 + 1, // shard ID 1, which will never get partition ID 1 + ); + + let mut cluster = MiniCluster::new() + .with_router(router_config) + .await + .with_ingester(ingester_config) + .await + .with_querier(querier_config) + .await + .with_compactor_config(compactor_config); + + StepTest::new( + &mut cluster, + vec![ + Step::RecordNumParquetFiles, + Step::WriteLineProtocol(String::from( + "my_awesome_table,tag1=A,tag2=B val=42i 123456", + )), + // wait for partitions to be persisted + Step::WaitForPersisted2 { + expected_increase: 1, + }, + // Run the compactor + Step::Compact, + // Run the 'remote partition' command + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let router_addr = state.cluster().router().router_grpc_base().to_string(); + + // Validate the output of the remote partition CLI command + // + // Looks something like: + // { + // "id": "1", + // "namespaceId": 1, + // "tableId": 1, + // "partitionId": "1", + // "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd", + // "minTime": "123456", + // "maxTime": "123456", + // "fileSizeBytes": "2029", + // "rowCount": "1", + // "compactionLevel": "1", + // "createdAt": "1650019674289347000" + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&router_addr) + .arg("remote") + .arg("partition") + .arg("show") + .arg("1") + .assert() + .success() + .stdout( + // Important parts are the expected partition ID + predicate::str::contains(r#""partitionId": "1","#) + // and compaction level is 0 so it's not returned + .and(predicate::str::contains("compactionLevel").not()), + ); + } + .boxed() + })), + ], + ) + .run() + .await +} diff --git a/influxdb_iox/tests/end_to_end_cases/mod.rs b/influxdb_iox/tests/end_to_end_cases/mod.rs index 1b3a11d2a2..26bb23ccfd 100644 --- a/influxdb_iox/tests/end_to_end_cases/mod.rs +++ b/influxdb_iox/tests/end_to_end_cases/mod.rs @@ -3,6 +3,7 @@ mod all_in_one; // loading shared libraries: libjemalloc.so.2: cannot open shared object file: No such file or directory" #[cfg(not(feature = "heappy"))] mod cli; +mod compactor; mod debug; mod error; mod flightsql; diff --git a/test_helpers_end_to_end/src/config.rs b/test_helpers_end_to_end/src/config.rs index 1329026128..cd8372b525 100644 --- a/test_helpers_end_to_end/src/config.rs +++ b/test_helpers_end_to_end/src/config.rs @@ -250,6 +250,12 @@ impl TestConfig { self.with_env("INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", bytes.to_string()) } + /// Configure sharding splits for the compactor. + pub fn with_compactor_shards(self, n_shards: usize, shard_id: usize) -> Self { + self.with_env("INFLUXDB_IOX_COMPACTION_SHARD_COUNT", n_shards.to_string()) + .with_env("INFLUXDB_IOX_COMPACTION_SHARD_ID", shard_id.to_string()) + } + /// Get the test config's server type. #[must_use] pub fn server_type(&self) -> ServerType { diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index cc0170435c..e408e8fd2d 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -452,7 +452,7 @@ impl MiniCluster { } } - pub fn run_compaction(&self) { + pub fn run_compaction(&self) -> Result<(), String> { let (log_file, log_path) = NamedTempFile::new() .expect("opening log file") .keep() @@ -500,11 +500,15 @@ impl MiniCluster { log_command(command); - if let Err(e) = command.ok() { - dump_log_to_stdout("compactor run-once", &log_path); - panic!("Command failed: {:?}", e); - } + let run_result = command.ok(); + dump_log_to_stdout("compactor run-once", &log_path); + + // Return the command output from the log file as the error message to enable + // assertions on the error message contents + run_result.map_err(|_| std::fs::read_to_string(&log_path).unwrap())?; + + Ok(()) } /// Create a storage client connected to the querier member of the cluster diff --git a/test_helpers_end_to_end/src/steps.rs b/test_helpers_end_to_end/src/steps.rs index 4836cc814e..b7384391ef 100644 --- a/test_helpers_end_to_end/src/steps.rs +++ b/test_helpers_end_to_end/src/steps.rs @@ -9,6 +9,7 @@ use futures::future::BoxFuture; use http::StatusCode; use observability_deps::tracing::info; use std::{path::PathBuf, time::Duration}; +use test_helpers::assert_contains; const MAX_QUERY_RETRY_TIME_SEC: u64 = 20; @@ -161,9 +162,13 @@ pub enum Step { /// (i.e. never drop data). SetRetention(Option), - /// Run one hot and one cold compaction operation and wait for it to finish. + /// Run one compaction operation and wait for it to finish, expecting success. Compact, + /// Run one compaction operation and wait for it to finish, expecting an error that matches + /// the specified message. + CompactExpectingError { expected_message: String }, + /// Run a SQL query using the FlightSQL interface and verify that the /// results match the expected results using the /// `assert_batches_eq!` macro @@ -309,9 +314,18 @@ where } Step::Compact => { info!("====Begin running compaction"); - state.cluster.run_compaction(); + state.cluster.run_compaction().unwrap(); info!("====Done running compaction"); } + Step::CompactExpectingError { expected_message } => { + info!("====Begin running compaction expected to error"); + let err = state.cluster.run_compaction().unwrap_err(); + + assert_contains!(err, expected_message); + + info!("====Done running"); + } + Step::SetRetention(retention_period_ns) => { info!("====Begin setting retention period to {retention_period_ns:?}"); let namespace = state.cluster().namespace();