refactor: Extract influxdb_iox remote CLI tests to their own file
parent
bed55e2bcb
commit
71770486af
|
@ -1,9 +1,9 @@
|
||||||
//! Tests CLI commands
|
//! Tests CLI commands
|
||||||
|
|
||||||
|
use super::get_object_store_id;
|
||||||
use assert_cmd::Command;
|
use assert_cmd::Command;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use predicates::prelude::*;
|
use predicates::prelude::*;
|
||||||
use serde_json::Value;
|
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
use test_helpers_end_to_end::{
|
use test_helpers_end_to_end::{
|
||||||
|
@ -52,143 +52,6 @@ async fn default_run_mode_is_all_in_one() {
|
||||||
.stdout(predicate::str::contains("starting all in one server"));
|
.stdout(predicate::str::contains("starting all in one server"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// remote partition command and getting a parquet file from the object store and pulling the files
|
|
||||||
#[tokio::test]
|
|
||||||
async fn remote_partition_and_get_from_store_and_pull() {
|
|
||||||
test_helpers::maybe_start_logging();
|
|
||||||
let database_url = maybe_skip_integration!();
|
|
||||||
|
|
||||||
// The test below assumes a specific partition id, so use a
|
|
||||||
// non-shared one here so concurrent tests don't interfere with
|
|
||||||
// each other
|
|
||||||
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
|
|
||||||
|
|
||||||
StepTest::new(
|
|
||||||
&mut cluster,
|
|
||||||
vec![
|
|
||||||
Step::WriteLineProtocol(String::from(
|
|
||||||
"my_awesome_table,tag1=A,tag2=B val=42i 123456",
|
|
||||||
)),
|
|
||||||
// wait for partitions to be persisted
|
|
||||||
Step::WaitForPersisted,
|
|
||||||
// Run the 'remote partition' command
|
|
||||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
|
||||||
async {
|
|
||||||
let router_addr = state.cluster().router().router_grpc_base().to_string();
|
|
||||||
let namespace = state.cluster().namespace().to_string();
|
|
||||||
|
|
||||||
// Validate the output of the remote partition CLI command
|
|
||||||
//
|
|
||||||
// Looks like:
|
|
||||||
// {
|
|
||||||
// "id": "1",
|
|
||||||
// "shardId": 1,
|
|
||||||
// "namespaceId": 1,
|
|
||||||
// "tableId": 1,
|
|
||||||
// "partitionId": "1",
|
|
||||||
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
|
|
||||||
// "minTime": "123456",
|
|
||||||
// "maxTime": "123456",
|
|
||||||
// "fileSizeBytes": "2029",
|
|
||||||
// "rowCount": "1",
|
|
||||||
// "createdAt": "1650019674289347000"
|
|
||||||
// }
|
|
||||||
|
|
||||||
let out = Command::cargo_bin("influxdb_iox")
|
|
||||||
.unwrap()
|
|
||||||
.arg("-h")
|
|
||||||
.arg(&router_addr)
|
|
||||||
.arg("remote")
|
|
||||||
.arg("partition")
|
|
||||||
.arg("show")
|
|
||||||
.arg("1")
|
|
||||||
.assert()
|
|
||||||
.success()
|
|
||||||
.stdout(
|
|
||||||
predicate::str::contains(r#""id": "1""#)
|
|
||||||
.and(predicate::str::contains(r#""shardId": "1","#))
|
|
||||||
.and(predicate::str::contains(r#""partitionId": "1","#)),
|
|
||||||
)
|
|
||||||
.get_output()
|
|
||||||
.stdout
|
|
||||||
.clone();
|
|
||||||
|
|
||||||
let object_store_id = get_object_store_id(&out);
|
|
||||||
|
|
||||||
let dir = tempdir().unwrap();
|
|
||||||
let f = dir.path().join("tmp.parquet");
|
|
||||||
let filename = f.as_os_str().to_str().unwrap();
|
|
||||||
|
|
||||||
Command::cargo_bin("influxdb_iox")
|
|
||||||
.unwrap()
|
|
||||||
.arg("-h")
|
|
||||||
.arg(&router_addr)
|
|
||||||
.arg("remote")
|
|
||||||
.arg("store")
|
|
||||||
.arg("get")
|
|
||||||
.arg(&object_store_id)
|
|
||||||
.arg(filename)
|
|
||||||
.assert()
|
|
||||||
.success()
|
|
||||||
.stdout(
|
|
||||||
predicate::str::contains("wrote")
|
|
||||||
.and(predicate::str::contains(filename)),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Ensure a warning is emitted when specifying (or
|
|
||||||
// defaulting to) in-memory file storage.
|
|
||||||
Command::cargo_bin("influxdb_iox")
|
|
||||||
.unwrap()
|
|
||||||
.arg("-h")
|
|
||||||
.arg(&router_addr)
|
|
||||||
.arg("remote")
|
|
||||||
.arg("partition")
|
|
||||||
.arg("pull")
|
|
||||||
.arg("--catalog")
|
|
||||||
.arg("memory")
|
|
||||||
.arg("--object-store")
|
|
||||||
.arg("memory")
|
|
||||||
.arg(&namespace)
|
|
||||||
.arg("my_awesome_table")
|
|
||||||
.arg("1970-01-01")
|
|
||||||
.assert()
|
|
||||||
.failure()
|
|
||||||
.stderr(predicate::str::contains("try passing --object-store=file"));
|
|
||||||
|
|
||||||
// Ensure files are actually wrote to the filesystem
|
|
||||||
let dir = tempfile::tempdir().expect("could not get temporary directory");
|
|
||||||
|
|
||||||
Command::cargo_bin("influxdb_iox")
|
|
||||||
.unwrap()
|
|
||||||
.arg("-h")
|
|
||||||
.arg(&router_addr)
|
|
||||||
.arg("remote")
|
|
||||||
.arg("partition")
|
|
||||||
.arg("pull")
|
|
||||||
.arg("--catalog")
|
|
||||||
.arg("memory")
|
|
||||||
.arg("--object-store")
|
|
||||||
.arg("file")
|
|
||||||
.arg("--data-dir")
|
|
||||||
.arg(dir.path().to_str().unwrap())
|
|
||||||
.arg(&namespace)
|
|
||||||
.arg("my_awesome_table")
|
|
||||||
.arg("1970-01-01")
|
|
||||||
.assert()
|
|
||||||
.success()
|
|
||||||
.stdout(
|
|
||||||
predicate::str::contains("wrote file")
|
|
||||||
.and(predicate::str::contains(&object_store_id)),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
})),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
.run()
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn parquet_to_lp() {
|
async fn parquet_to_lp() {
|
||||||
test_helpers::maybe_start_logging();
|
test_helpers::maybe_start_logging();
|
||||||
|
@ -809,28 +672,3 @@ async fn query_ingester() {
|
||||||
.run()
|
.run()
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// extracts the parquet filename from JSON that looks like
|
|
||||||
/// ```text
|
|
||||||
/// {
|
|
||||||
/// "id": "1",
|
|
||||||
/// ...
|
|
||||||
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
|
|
||||||
/// ...
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
fn get_object_store_id(output: &[u8]) -> String {
|
|
||||||
let v: Value = serde_json::from_slice(output).unwrap();
|
|
||||||
// We only process the first value, so panic if it isn't there
|
|
||||||
let arr = v.as_array().unwrap();
|
|
||||||
assert_eq!(arr.len(), 1);
|
|
||||||
let id = arr[0]
|
|
||||||
.as_object()
|
|
||||||
.unwrap()
|
|
||||||
.get("objectStoreId")
|
|
||||||
.unwrap()
|
|
||||||
.as_str()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
id.to_string()
|
|
||||||
}
|
|
||||||
|
|
|
@ -11,5 +11,31 @@ mod logging;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
mod namespace;
|
mod namespace;
|
||||||
mod querier;
|
mod querier;
|
||||||
|
mod remote;
|
||||||
mod schema;
|
mod schema;
|
||||||
mod tracing;
|
mod tracing;
|
||||||
|
|
||||||
|
/// extracts the parquet filename from JSON that looks like
|
||||||
|
/// ```text
|
||||||
|
/// {
|
||||||
|
/// "id": "1",
|
||||||
|
/// ...
|
||||||
|
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
|
||||||
|
/// ...
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
fn get_object_store_id(output: &[u8]) -> String {
|
||||||
|
let v: serde_json::Value = serde_json::from_slice(output).unwrap();
|
||||||
|
// We only process the first value, so panic if it isn't there
|
||||||
|
let arr = v.as_array().unwrap();
|
||||||
|
assert_eq!(arr.len(), 1);
|
||||||
|
let id = arr[0]
|
||||||
|
.as_object()
|
||||||
|
.unwrap()
|
||||||
|
.get("objectStoreId")
|
||||||
|
.unwrap()
|
||||||
|
.as_str()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
id.to_string()
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,150 @@
|
||||||
|
//! Tests the `influxdb_iox remote` commands
|
||||||
|
|
||||||
|
use super::get_object_store_id;
|
||||||
|
use assert_cmd::Command;
|
||||||
|
use futures::FutureExt;
|
||||||
|
use predicates::prelude::*;
|
||||||
|
use tempfile::tempdir;
|
||||||
|
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
|
||||||
|
|
||||||
|
/// remote partition command and getting a parquet file from the object store and pulling the
|
||||||
|
/// files, using these commands:
|
||||||
|
///
|
||||||
|
/// - `remote partition show`
|
||||||
|
/// - `remote store get`
|
||||||
|
/// - `remote partition pull`
|
||||||
|
#[tokio::test]
|
||||||
|
async fn remote_partition_and_get_from_store_and_pull() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
let database_url = maybe_skip_integration!();
|
||||||
|
|
||||||
|
// The test below assumes a specific partition id, so use a
|
||||||
|
// non-shared one here so concurrent tests don't interfere with
|
||||||
|
// each other
|
||||||
|
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
|
||||||
|
|
||||||
|
StepTest::new(
|
||||||
|
&mut cluster,
|
||||||
|
vec![
|
||||||
|
Step::WriteLineProtocol(String::from(
|
||||||
|
"my_awesome_table,tag1=A,tag2=B val=42i 123456",
|
||||||
|
)),
|
||||||
|
// wait for partitions to be persisted
|
||||||
|
Step::WaitForPersisted,
|
||||||
|
// Run the 'remote partition' command
|
||||||
|
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||||
|
async {
|
||||||
|
let router_addr = state.cluster().router().router_grpc_base().to_string();
|
||||||
|
let namespace = state.cluster().namespace().to_string();
|
||||||
|
|
||||||
|
// Validate the output of the remote partition CLI command
|
||||||
|
//
|
||||||
|
// Looks like:
|
||||||
|
// {
|
||||||
|
// "id": "1",
|
||||||
|
// "shardId": 1,
|
||||||
|
// "namespaceId": 1,
|
||||||
|
// "tableId": 1,
|
||||||
|
// "partitionId": "1",
|
||||||
|
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
|
||||||
|
// "minTime": "123456",
|
||||||
|
// "maxTime": "123456",
|
||||||
|
// "fileSizeBytes": "2029",
|
||||||
|
// "rowCount": "1",
|
||||||
|
// "createdAt": "1650019674289347000"
|
||||||
|
// }
|
||||||
|
|
||||||
|
let out = Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("show")
|
||||||
|
.arg("1")
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
predicate::str::contains(r#""id": "1""#)
|
||||||
|
.and(predicate::str::contains(r#""shardId": "1","#))
|
||||||
|
.and(predicate::str::contains(r#""partitionId": "1","#)),
|
||||||
|
)
|
||||||
|
.get_output()
|
||||||
|
.stdout
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
let object_store_id = get_object_store_id(&out);
|
||||||
|
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let f = dir.path().join("tmp.parquet");
|
||||||
|
let filename = f.as_os_str().to_str().unwrap();
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("store")
|
||||||
|
.arg("get")
|
||||||
|
.arg(&object_store_id)
|
||||||
|
.arg(filename)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
predicate::str::contains("wrote")
|
||||||
|
.and(predicate::str::contains(filename)),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Ensure a warning is emitted when specifying (or
|
||||||
|
// defaulting to) in-memory file storage.
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("pull")
|
||||||
|
.arg("--catalog")
|
||||||
|
.arg("memory")
|
||||||
|
.arg("--object-store")
|
||||||
|
.arg("memory")
|
||||||
|
.arg(&namespace)
|
||||||
|
.arg("my_awesome_table")
|
||||||
|
.arg("1970-01-01")
|
||||||
|
.assert()
|
||||||
|
.failure()
|
||||||
|
.stderr(predicate::str::contains("try passing --object-store=file"));
|
||||||
|
|
||||||
|
// Ensure files are actually wrote to the filesystem
|
||||||
|
let dir = tempfile::tempdir().expect("could not get temporary directory");
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("pull")
|
||||||
|
.arg("--catalog")
|
||||||
|
.arg("memory")
|
||||||
|
.arg("--object-store")
|
||||||
|
.arg("file")
|
||||||
|
.arg("--data-dir")
|
||||||
|
.arg(dir.path().to_str().unwrap())
|
||||||
|
.arg(&namespace)
|
||||||
|
.arg("my_awesome_table")
|
||||||
|
.arg("1970-01-01")
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.stdout(
|
||||||
|
predicate::str::contains("wrote file")
|
||||||
|
.and(predicate::str::contains(&object_store_id)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
})),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
Loading…
Reference in New Issue