Merge pull request #7270 from influxdata/cn/shard-tests
test: Add compactor shard e2e testspull/24376/head
commit
90b52d2332
|
@ -0,0 +1,242 @@
|
||||||
|
use assert_cmd::Command;
|
||||||
|
use futures::FutureExt;
|
||||||
|
use predicates::prelude::*;
|
||||||
|
use test_helpers_end_to_end::{
|
||||||
|
maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn shard_id_greater_than_num_shards_is_invalid() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
let database_url = maybe_skip_integration!();
|
||||||
|
|
||||||
|
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||||
|
let router_config = TestConfig::new_router2(&ingester_config);
|
||||||
|
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||||
|
let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards(
|
||||||
|
2, // num shards 2
|
||||||
|
100, // and shard id > num shards; not valid
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut cluster = MiniCluster::new()
|
||||||
|
.with_router(router_config)
|
||||||
|
.await
|
||||||
|
.with_ingester(ingester_config)
|
||||||
|
.await
|
||||||
|
.with_querier(querier_config)
|
||||||
|
.await
|
||||||
|
.with_compactor_config(compactor_config);
|
||||||
|
|
||||||
|
StepTest::new(
|
||||||
|
&mut cluster,
|
||||||
|
vec![Step::CompactExpectingError {
|
||||||
|
expected_message: "shard_id out of range".into(),
|
||||||
|
}],
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn shard_id_without_num_shards_is_invalid() {
|
||||||
|
// This test doesn't use MiniCluster/TestConfig/with_compactor_shards because those exist to
|
||||||
|
// enable *correct* configuration of components, and this test is testing an *invalid*
|
||||||
|
// configuration of the compactor command.
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("run")
|
||||||
|
.arg("compactor2")
|
||||||
|
.env("INFLUXDB_IOX_COMPACTION_SHARD_ID", "1") // only provide shard ID
|
||||||
|
.env("INFLUXDB_IOX_RPC_MODE", "2")
|
||||||
|
.env("INFLUXDB_IOX_CATALOG_TYPE", "memory")
|
||||||
|
.assert()
|
||||||
|
.failure()
|
||||||
|
.stderr(predicate::str::contains(
|
||||||
|
"must provide or not provide shard ID and count",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn num_shards_without_shard_id_is_invalid() {
|
||||||
|
// This test doesn't use MiniCluster/TestConfig/with_compactor_shards because those exist to
|
||||||
|
// enable *correct* configuration of components, and this test is testing an *invalid*
|
||||||
|
// configuration of the compactor command.
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("run")
|
||||||
|
.arg("compactor2")
|
||||||
|
.env("INFLUXDB_IOX_COMPACTION_SHARD_COUNT", "1") // only provide shard count
|
||||||
|
.env("INFLUXDB_IOX_RPC_MODE", "2")
|
||||||
|
.env("INFLUXDB_IOX_CATALOG_TYPE", "memory")
|
||||||
|
.assert()
|
||||||
|
.failure()
|
||||||
|
.stderr(predicate::str::contains(
|
||||||
|
"must provide or not provide shard ID and count",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn sharded_compactor_0_always_compacts_partition_1() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
let database_url = maybe_skip_integration!();
|
||||||
|
|
||||||
|
// The test below assumes a specific partition id, and it needs to customize the compactor
|
||||||
|
// config, so use a non-shared minicluster here.
|
||||||
|
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||||
|
let router_config = TestConfig::new_router2(&ingester_config);
|
||||||
|
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||||
|
let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards(
|
||||||
|
2, // num shards 2
|
||||||
|
0, // shard ID 0, which will always get partition ID 1
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut cluster = MiniCluster::new()
|
||||||
|
.with_router(router_config)
|
||||||
|
.await
|
||||||
|
.with_ingester(ingester_config)
|
||||||
|
.await
|
||||||
|
.with_querier(querier_config)
|
||||||
|
.await
|
||||||
|
.with_compactor_config(compactor_config);
|
||||||
|
|
||||||
|
StepTest::new(
|
||||||
|
&mut cluster,
|
||||||
|
vec![
|
||||||
|
Step::RecordNumParquetFiles,
|
||||||
|
Step::WriteLineProtocol(String::from(
|
||||||
|
"my_awesome_table,tag1=A,tag2=B val=42i 123456",
|
||||||
|
)),
|
||||||
|
// wait for partitions to be persisted
|
||||||
|
Step::WaitForPersisted2 {
|
||||||
|
expected_increase: 1,
|
||||||
|
},
|
||||||
|
// Run the compactor
|
||||||
|
Step::Compact,
|
||||||
|
// Run the 'remote partition' command
|
||||||
|
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||||
|
async {
|
||||||
|
let router_addr = state.cluster().router().router_grpc_base().to_string();
|
||||||
|
|
||||||
|
// Validate the output of the remote partition CLI command
|
||||||
|
//
|
||||||
|
// Looks something like:
|
||||||
|
// {
|
||||||
|
// "id": "1",
|
||||||
|
// "namespaceId": 1,
|
||||||
|
// "tableId": 1,
|
||||||
|
// "partitionId": "1",
|
||||||
|
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
|
||||||
|
// "minTime": "123456",
|
||||||
|
// "maxTime": "123456",
|
||||||
|
// "fileSizeBytes": "2029",
|
||||||
|
// "rowCount": "1",
|
||||||
|
// "compactionLevel": "1",
|
||||||
|
// "createdAt": "1650019674289347000"
|
||||||
|
// }
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("show")
|
||||||
|
.arg("1")
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
// Important parts are the expected partition ID
|
||||||
|
predicate::str::contains(r#""partitionId": "1","#)
|
||||||
|
// and compaction level
|
||||||
|
.and(predicate::str::contains(r#""compactionLevel": 1"#)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
})),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn sharded_compactor_1_never_compacts_partition_1() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
let database_url = maybe_skip_integration!();
|
||||||
|
|
||||||
|
// The test below assumes a specific partition id, and it needs to customize the compactor
|
||||||
|
// config, so use a non-shared minicluster here.
|
||||||
|
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||||
|
let router_config = TestConfig::new_router2(&ingester_config);
|
||||||
|
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||||
|
let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards(
|
||||||
|
2, // num shards 2
|
||||||
|
1, // shard ID 1, which will never get partition ID 1
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut cluster = MiniCluster::new()
|
||||||
|
.with_router(router_config)
|
||||||
|
.await
|
||||||
|
.with_ingester(ingester_config)
|
||||||
|
.await
|
||||||
|
.with_querier(querier_config)
|
||||||
|
.await
|
||||||
|
.with_compactor_config(compactor_config);
|
||||||
|
|
||||||
|
StepTest::new(
|
||||||
|
&mut cluster,
|
||||||
|
vec![
|
||||||
|
Step::RecordNumParquetFiles,
|
||||||
|
Step::WriteLineProtocol(String::from(
|
||||||
|
"my_awesome_table,tag1=A,tag2=B val=42i 123456",
|
||||||
|
)),
|
||||||
|
// wait for partitions to be persisted
|
||||||
|
Step::WaitForPersisted2 {
|
||||||
|
expected_increase: 1,
|
||||||
|
},
|
||||||
|
// Run the compactor
|
||||||
|
Step::Compact,
|
||||||
|
// Run the 'remote partition' command
|
||||||
|
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||||
|
async {
|
||||||
|
let router_addr = state.cluster().router().router_grpc_base().to_string();
|
||||||
|
|
||||||
|
// Validate the output of the remote partition CLI command
|
||||||
|
//
|
||||||
|
// Looks something like:
|
||||||
|
// {
|
||||||
|
// "id": "1",
|
||||||
|
// "namespaceId": 1,
|
||||||
|
// "tableId": 1,
|
||||||
|
// "partitionId": "1",
|
||||||
|
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
|
||||||
|
// "minTime": "123456",
|
||||||
|
// "maxTime": "123456",
|
||||||
|
// "fileSizeBytes": "2029",
|
||||||
|
// "rowCount": "1",
|
||||||
|
// "compactionLevel": "1",
|
||||||
|
// "createdAt": "1650019674289347000"
|
||||||
|
// }
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("show")
|
||||||
|
.arg("1")
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
// Important parts are the expected partition ID
|
||||||
|
predicate::str::contains(r#""partitionId": "1","#)
|
||||||
|
// and compaction level is 0 so it's not returned
|
||||||
|
.and(predicate::str::contains("compactionLevel").not()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
})),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ mod all_in_one;
|
||||||
// loading shared libraries: libjemalloc.so.2: cannot open shared object file: No such file or directory"
|
// loading shared libraries: libjemalloc.so.2: cannot open shared object file: No such file or directory"
|
||||||
#[cfg(not(feature = "heappy"))]
|
#[cfg(not(feature = "heappy"))]
|
||||||
mod cli;
|
mod cli;
|
||||||
|
mod compactor;
|
||||||
mod debug;
|
mod debug;
|
||||||
mod error;
|
mod error;
|
||||||
mod flightsql;
|
mod flightsql;
|
||||||
|
|
|
@ -141,13 +141,6 @@ pub async fn create_compactor2_server_type(
|
||||||
compactor_config: Compactor2Config,
|
compactor_config: Compactor2Config,
|
||||||
) -> Arc<dyn ServerType> {
|
) -> Arc<dyn ServerType> {
|
||||||
let backoff_config = BackoffConfig::default();
|
let backoff_config = BackoffConfig::default();
|
||||||
let shard_id = Config::fetch_shard_id(
|
|
||||||
Arc::clone(&catalog),
|
|
||||||
backoff_config.clone(),
|
|
||||||
TOPIC.to_string(),
|
|
||||||
TRANSITION_SHARD_INDEX,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
compactor_config.shard_id.is_some() == compactor_config.shard_count.is_some(),
|
compactor_config.shard_id.is_some() == compactor_config.shard_count.is_some(),
|
||||||
|
@ -172,6 +165,13 @@ pub async fn create_compactor2_server_type(
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let shard_id = Config::fetch_shard_id(
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
backoff_config.clone(),
|
||||||
|
TOPIC.to_string(),
|
||||||
|
TRANSITION_SHARD_INDEX,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
let compactor = Compactor2::start(Config {
|
let compactor = Compactor2::start(Config {
|
||||||
shard_id,
|
shard_id,
|
||||||
metric_registry: Arc::clone(&metric_registry),
|
metric_registry: Arc::clone(&metric_registry),
|
||||||
|
|
|
@ -250,6 +250,12 @@ impl TestConfig {
|
||||||
self.with_env("INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", bytes.to_string())
|
self.with_env("INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", bytes.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configure sharding splits for the compactor.
|
||||||
|
pub fn with_compactor_shards(self, n_shards: usize, shard_id: usize) -> Self {
|
||||||
|
self.with_env("INFLUXDB_IOX_COMPACTION_SHARD_COUNT", n_shards.to_string())
|
||||||
|
.with_env("INFLUXDB_IOX_COMPACTION_SHARD_ID", shard_id.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the test config's server type.
|
/// Get the test config's server type.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn server_type(&self) -> ServerType {
|
pub fn server_type(&self) -> ServerType {
|
||||||
|
|
|
@ -452,7 +452,7 @@ impl MiniCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_compaction(&self) {
|
pub fn run_compaction(&self) -> Result<(), String> {
|
||||||
let (log_file, log_path) = NamedTempFile::new()
|
let (log_file, log_path) = NamedTempFile::new()
|
||||||
.expect("opening log file")
|
.expect("opening log file")
|
||||||
.keep()
|
.keep()
|
||||||
|
@ -500,11 +500,15 @@ impl MiniCluster {
|
||||||
|
|
||||||
log_command(command);
|
log_command(command);
|
||||||
|
|
||||||
if let Err(e) = command.ok() {
|
let run_result = command.ok();
|
||||||
dump_log_to_stdout("compactor run-once", &log_path);
|
|
||||||
panic!("Command failed: {:?}", e);
|
|
||||||
}
|
|
||||||
dump_log_to_stdout("compactor run-once", &log_path);
|
dump_log_to_stdout("compactor run-once", &log_path);
|
||||||
|
|
||||||
|
// Return the command output from the log file as the error message to enable
|
||||||
|
// assertions on the error message contents
|
||||||
|
run_result.map_err(|_| std::fs::read_to_string(&log_path).unwrap())?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a storage client connected to the querier member of the cluster
|
/// Create a storage client connected to the querier member of the cluster
|
||||||
|
|
|
@ -9,6 +9,7 @@ use futures::future::BoxFuture;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
use observability_deps::tracing::info;
|
use observability_deps::tracing::info;
|
||||||
use std::{path::PathBuf, time::Duration};
|
use std::{path::PathBuf, time::Duration};
|
||||||
|
use test_helpers::assert_contains;
|
||||||
|
|
||||||
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
|
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
|
||||||
|
|
||||||
|
@ -161,9 +162,13 @@ pub enum Step {
|
||||||
/// (i.e. never drop data).
|
/// (i.e. never drop data).
|
||||||
SetRetention(Option<i64>),
|
SetRetention(Option<i64>),
|
||||||
|
|
||||||
/// Run one hot and one cold compaction operation and wait for it to finish.
|
/// Run one compaction operation and wait for it to finish, expecting success.
|
||||||
Compact,
|
Compact,
|
||||||
|
|
||||||
|
/// Run one compaction operation and wait for it to finish, expecting an error that matches
|
||||||
|
/// the specified message.
|
||||||
|
CompactExpectingError { expected_message: String },
|
||||||
|
|
||||||
/// Run a SQL query using the FlightSQL interface and verify that the
|
/// Run a SQL query using the FlightSQL interface and verify that the
|
||||||
/// results match the expected results using the
|
/// results match the expected results using the
|
||||||
/// `assert_batches_eq!` macro
|
/// `assert_batches_eq!` macro
|
||||||
|
@ -309,9 +314,18 @@ where
|
||||||
}
|
}
|
||||||
Step::Compact => {
|
Step::Compact => {
|
||||||
info!("====Begin running compaction");
|
info!("====Begin running compaction");
|
||||||
state.cluster.run_compaction();
|
state.cluster.run_compaction().unwrap();
|
||||||
info!("====Done running compaction");
|
info!("====Done running compaction");
|
||||||
}
|
}
|
||||||
|
Step::CompactExpectingError { expected_message } => {
|
||||||
|
info!("====Begin running compaction expected to error");
|
||||||
|
let err = state.cluster.run_compaction().unwrap_err();
|
||||||
|
|
||||||
|
assert_contains!(err, expected_message);
|
||||||
|
|
||||||
|
info!("====Done running");
|
||||||
|
}
|
||||||
|
|
||||||
Step::SetRetention(retention_period_ns) => {
|
Step::SetRetention(retention_period_ns) => {
|
||||||
info!("====Begin setting retention period to {retention_period_ns:?}");
|
info!("====Begin setting retention period to {retention_period_ns:?}");
|
||||||
let namespace = state.cluster().namespace();
|
let namespace = state.cluster().namespace();
|
||||||
|
|
Loading…
Reference in New Issue