Merge pull request #7252 from influxdata/cn/compactor2-once
test: Re-enable running compactor once in step testspull/24376/head
commit
1d8c85e61c
|
@ -179,6 +179,147 @@ async fn parquet_to_lp() {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Write, compact, then use the remote partition command
|
||||||
|
#[tokio::test]
|
||||||
|
async fn compact_and_get_remote_partition() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
let database_url = maybe_skip_integration!();
|
||||||
|
|
||||||
|
// The test below assumes a specific partition id, so use a
|
||||||
|
// non-shared one here so concurrent tests don't interfere with
|
||||||
|
// each other
|
||||||
|
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
|
||||||
|
|
||||||
|
StepTest::new(
|
||||||
|
&mut cluster,
|
||||||
|
vec![
|
||||||
|
Step::RecordNumParquetFiles,
|
||||||
|
Step::WriteLineProtocol(String::from(
|
||||||
|
"my_awesome_table,tag1=A,tag2=B val=42i 123456",
|
||||||
|
)),
|
||||||
|
// wait for partitions to be persisted
|
||||||
|
Step::WaitForPersisted2 {
|
||||||
|
expected_increase: 1,
|
||||||
|
},
|
||||||
|
// Run the compactor
|
||||||
|
Step::Compact,
|
||||||
|
// Run the 'remote partition' command
|
||||||
|
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||||
|
async {
|
||||||
|
let router_addr = state.cluster().router().router_grpc_base().to_string();
|
||||||
|
let namespace = state.cluster().namespace().to_string();
|
||||||
|
|
||||||
|
// Validate the output of the remote partition CLI command
|
||||||
|
//
|
||||||
|
// Looks something like:
|
||||||
|
// {
|
||||||
|
// "id": "1",
|
||||||
|
// "namespaceId": 1,
|
||||||
|
// "tableId": 1,
|
||||||
|
// "partitionId": "1",
|
||||||
|
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
|
||||||
|
// "minTime": "123456",
|
||||||
|
// "maxTime": "123456",
|
||||||
|
// "fileSizeBytes": "2029",
|
||||||
|
// "rowCount": "1",
|
||||||
|
// "compactionLevel": "1",
|
||||||
|
// "createdAt": "1650019674289347000"
|
||||||
|
// }
|
||||||
|
let out = Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("show")
|
||||||
|
.arg("1")
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
// Important parts are the expected partition ID
|
||||||
|
predicate::str::contains(r#""partitionId": "1","#)
|
||||||
|
// and compaction level
|
||||||
|
.and(predicate::str::contains(r#""compactionLevel": 1"#)),
|
||||||
|
)
|
||||||
|
.get_output()
|
||||||
|
.stdout
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
let object_store_id = get_object_store_id(&out);
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let f = dir.path().join("tmp.parquet");
|
||||||
|
let filename = f.as_os_str().to_str().unwrap();
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("store")
|
||||||
|
.arg("get")
|
||||||
|
.arg(&object_store_id)
|
||||||
|
.arg(filename)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
predicate::str::contains("wrote")
|
||||||
|
.and(predicate::str::contains(filename)),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Ensure a warning is emitted when specifying (or
|
||||||
|
// defaulting to) in-memory file storage.
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("pull")
|
||||||
|
.arg("--catalog")
|
||||||
|
.arg("memory")
|
||||||
|
.arg("--object-store")
|
||||||
|
.arg("memory")
|
||||||
|
.arg(&namespace)
|
||||||
|
.arg("my_awesome_table")
|
||||||
|
.arg("1970-01-01")
|
||||||
|
.assert()
|
||||||
|
.failure()
|
||||||
|
.stderr(predicate::str::contains("try passing --object-store=file"));
|
||||||
|
|
||||||
|
// Ensure files are actually wrote to the filesystem
|
||||||
|
let dir = tempfile::tempdir().expect("could not get temporary directory");
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("pull")
|
||||||
|
.arg("--catalog")
|
||||||
|
.arg("memory")
|
||||||
|
.arg("--object-store")
|
||||||
|
.arg("file")
|
||||||
|
.arg("--data-dir")
|
||||||
|
.arg(dir.path().to_str().unwrap())
|
||||||
|
.arg(&namespace)
|
||||||
|
.arg("my_awesome_table")
|
||||||
|
.arg("1970-01-01")
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
predicate::str::contains("wrote file")
|
||||||
|
.and(predicate::str::contains(object_store_id)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
})),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
/// Test the schema cli command
|
/// Test the schema cli command
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn schema_cli() {
|
async fn schema_cli() {
|
||||||
|
|
|
@ -121,6 +121,7 @@ impl TestConfig {
|
||||||
other.dsn().to_owned(),
|
other.dsn().to_owned(),
|
||||||
other.catalog_schema_name(),
|
other.catalog_schema_name(),
|
||||||
)
|
)
|
||||||
|
.with_env("INFLUXDB_IOX_RPC_MODE", "2")
|
||||||
.with_existing_object_store(other)
|
.with_existing_object_store(other)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
dump_log_to_stdout, log_command, rand_id, write_to_ingester, write_to_router, ServerFixture,
|
dump_log_to_stdout, log_command, rand_id, server_type::AddAddrEnv, write_to_ingester,
|
||||||
TestConfig, TestServer,
|
write_to_router, ServerFixture, TestConfig, TestServer,
|
||||||
};
|
};
|
||||||
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
|
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
|
||||||
use arrow_flight::{
|
use arrow_flight::{
|
||||||
|
@ -473,8 +473,10 @@ impl MiniCluster {
|
||||||
|
|
||||||
let mut command = Command::cargo_bin("influxdb_iox").unwrap();
|
let mut command = Command::cargo_bin("influxdb_iox").unwrap();
|
||||||
let command = command
|
let command = command
|
||||||
.arg("compactor")
|
.arg("run")
|
||||||
.arg("run-once")
|
.arg("compactor2")
|
||||||
|
.arg("--compaction-process-once")
|
||||||
|
.arg("--compaction-process-all-partitions")
|
||||||
.env("LOG_FILTER", log_filter)
|
.env("LOG_FILTER", log_filter)
|
||||||
.env(
|
.env(
|
||||||
"INFLUXDB_IOX_CATALOG_DSN",
|
"INFLUXDB_IOX_CATALOG_DSN",
|
||||||
|
@ -488,13 +490,20 @@ impl MiniCluster {
|
||||||
self.compactor_config().catalog_schema_name(),
|
self.compactor_config().catalog_schema_name(),
|
||||||
)
|
)
|
||||||
.envs(self.compactor_config().env())
|
.envs(self.compactor_config().env())
|
||||||
|
.add_addr_env(
|
||||||
|
self.compactor_config().server_type(),
|
||||||
|
self.compactor_config().addrs(),
|
||||||
|
)
|
||||||
// redirect output to log file
|
// redirect output to log file
|
||||||
.stdout(stdout_log_file)
|
.stdout(stdout_log_file)
|
||||||
.stderr(stderr_log_file);
|
.stderr(stderr_log_file);
|
||||||
|
|
||||||
log_command(command);
|
log_command(command);
|
||||||
|
|
||||||
command.ok().unwrap();
|
if let Err(e) = command.ok() {
|
||||||
|
dump_log_to_stdout("compactor run-once", &log_path);
|
||||||
|
panic!("Command failed: {:?}", e);
|
||||||
|
}
|
||||||
dump_log_to_stdout("compactor run-once", &log_path);
|
dump_log_to_stdout("compactor run-once", &log_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue