//! Tests CLI commands use assert_cmd::Command; use futures::FutureExt; use predicates::prelude::*; use serde_json::Value; use std::time::Duration; use tempfile::tempdir; use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState}; #[tokio::test] async fn default_mode_is_run_all_in_one() { let tmpdir = tempdir().unwrap(); Command::cargo_bin("influxdb_iox") .unwrap() .args(&["-v"]) // Do not attempt to connect to the real DB (object store, etc) if the // prod DSN is set - all other tests use TEST_INFLUXDB_IOX_CATALOG_DSN // but this one will use the real env if not cleared. .env_clear() // Without this, we have errors about writing read-only root filesystem on macos. .env("HOME", tmpdir.path()) .timeout(Duration::from_secs(2)) .assert() .failure() .stdout(predicate::str::contains("starting all in one server")); } #[tokio::test] async fn default_run_mode_is_all_in_one() { let tmpdir = tempdir().unwrap(); Command::cargo_bin("influxdb_iox") .unwrap() .args(&["run", "-v"]) // This test is designed to assert the default running mode is using // in-memory state, so ensure that any outside config does not influence // this. .env_clear() .env("HOME", tmpdir.path()) .timeout(Duration::from_secs(2)) .assert() .failure() .stdout(predicate::str::contains("starting all in one server")); } /// remote partition command and getting a parquet file from the object store and pulling the files #[tokio::test] async fn remote_partition_and_get_from_store_and_pull() { test_helpers::maybe_start_logging(); let database_url = maybe_skip_integration!(); // The test below assumes a specific partition id, so use a // non-shared one here so concurrent tests don't interfere with // each other let mut cluster = MiniCluster::create_non_shared_standard(database_url).await; StepTest::new( &mut cluster, vec![ Step::WriteLineProtocol(String::from( "my_awesome_table,tag1=A,tag2=B val=42i 123456", )), // wait for partitions to be persisted Step::WaitForPersisted, // Run the 'remote partition' command Step::Custom(Box::new(|state: &mut StepTestState| { async { let router_addr = state.cluster().router().router_grpc_base().to_string(); let namespace = state.cluster().namespace().to_string(); // Validate the output of the remote partition CLI command // // Looks like: // { // "id": "1", // "shardId": 1, // "namespaceId": 1, // "tableId": 1, // "partitionId": "1", // "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd", // "minTime": "123456", // "maxTime": "123456", // "fileSizeBytes": "2029", // "rowCount": "1", // "createdAt": "1650019674289347000" // } let out = Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("partition") .arg("show") .arg("1") .assert() .success() .stdout( predicate::str::contains(r#""id": "1""#) .and(predicate::str::contains(r#""shardId": "1","#)) .and(predicate::str::contains(r#""partitionId": "1","#)), ) .get_output() .stdout .clone(); let v: Value = serde_json::from_slice(&out).unwrap(); let id = v.as_array().unwrap()[0] .as_object() .unwrap() .get("objectStoreId") .unwrap() .as_str() .unwrap(); let dir = tempdir().unwrap(); let f = dir.path().join("tmp.parquet"); let filename = f.as_os_str().to_str().unwrap(); Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("store") .arg("get") .arg(id) .arg(filename) .assert() .success() .stdout( predicate::str::contains("wrote") .and(predicate::str::contains(filename)), ); // Ensure a warning is emitted when specifying (or // defaulting to) in-memory file storage. Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("partition") .arg("pull") .arg("--catalog") .arg("memory") .arg("--object-store") .arg("memory") .arg(&namespace) .arg("my_awesome_table") .arg("1970-01-01") .assert() .failure() .stderr(predicate::str::contains("try passing --object-store=file")); // Ensure files are actually wrote to the filesystem let dir = tempfile::tempdir().expect("could not get temporary directory"); Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("partition") .arg("pull") .arg("--catalog") .arg("memory") .arg("--object-store") .arg("file") .arg("--data-dir") .arg(dir.path().to_str().unwrap()) .arg(&namespace) .arg("my_awesome_table") .arg("1970-01-01") .assert() .success() .stdout( predicate::str::contains("wrote file") .and(predicate::str::contains(id)), ); } .boxed() })), ], ) .run() .await } /// Write, compact, then use the remote partition command #[tokio::test] async fn compact_and_get_remote_partition() { test_helpers::maybe_start_logging(); let database_url = maybe_skip_integration!(); // The test below assumes a specific partition id, so use a // non-shared one here so concurrent tests don't interfere with // each other let mut cluster = MiniCluster::create_non_shared_standard(database_url).await; StepTest::new( &mut cluster, vec![ Step::WriteLineProtocol(String::from( "my_awesome_table,tag1=A,tag2=B val=42i 123456", )), // wait for partitions to be persisted Step::WaitForPersisted, // Run the compactor Step::Compact, // Run the 'remote partition' command Step::Custom(Box::new(|state: &mut StepTestState| { async { let router_addr = state.cluster().router().router_grpc_base().to_string(); let namespace = state.cluster().namespace().to_string(); // Validate the output of the remote partition CLI command // // Looks like: // { // "id": "2", // "shardId": 1, // "namespaceId": 1, // "tableId": 1, // "partitionId": "1", // "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd", // "minTime": "123456", // "maxTime": "123456", // "fileSizeBytes": "2029", // "rowCount": "1", // "compactionLevel": "1", // "createdAt": "1650019674289347000" // } let out = Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("partition") .arg("show") .arg("1") .assert() .success() .stdout( predicate::str::contains(r#""id": "2""#) .and(predicate::str::contains(r#""shardId": "1","#)) .and(predicate::str::contains(r#""partitionId": "1","#)) .and(predicate::str::contains(r#""compactionLevel": 1"#)), ) .get_output() .stdout .clone(); let v: Value = serde_json::from_slice(&out).unwrap(); let id = v.as_array().unwrap()[0] .as_object() .unwrap() .get("objectStoreId") .unwrap() .as_str() .unwrap(); let dir = tempdir().unwrap(); let f = dir.path().join("tmp.parquet"); let filename = f.as_os_str().to_str().unwrap(); Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("store") .arg("get") .arg(id) .arg(filename) .assert() .success() .stdout( predicate::str::contains("wrote") .and(predicate::str::contains(filename)), ); // Ensure a warning is emitted when specifying (or // defaulting to) in-memory file storage. Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("partition") .arg("pull") .arg("--catalog") .arg("memory") .arg("--object-store") .arg("memory") .arg(&namespace) .arg("my_awesome_table") .arg("1970-01-01") .assert() .failure() .stderr(predicate::str::contains("try passing --object-store=file")); // Ensure files are actually wrote to the filesystem let dir = tempfile::tempdir().expect("could not get temporary directory"); Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&router_addr) .arg("remote") .arg("partition") .arg("pull") .arg("--catalog") .arg("memory") .arg("--object-store") .arg("file") .arg("--data-dir") .arg(dir.path().to_str().unwrap()) .arg(&namespace) .arg("my_awesome_table") .arg("1970-01-01") .assert() .success() .stdout( predicate::str::contains("wrote file") .and(predicate::str::contains(id)), ); } .boxed() })), ], ) .run() .await } /// Test the schema cli command #[tokio::test] async fn schema_cli() { test_helpers::maybe_start_logging(); let database_url = maybe_skip_integration!(); let mut cluster = MiniCluster::create_shared(database_url).await; StepTest::new( &mut cluster, vec![ Step::WriteLineProtocol(String::from( "my_awesome_table2,tag1=A,tag2=B val=42i 123456", )), Step::Custom(Box::new(|state: &mut StepTestState| { async { // should be able to query both router and querier for the schema let addrs = vec![ ( "router", state.cluster().router().router_grpc_base().to_string(), ), ( "querier", state.cluster().querier().querier_grpc_base().to_string(), ), ]; for (addr_type, addr) in addrs { println!("Trying address {}: {}", addr_type, addr); // Validate the output of the schema CLI command Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&addr) .arg("debug") .arg("schema") .arg("get") .arg(state.cluster().namespace()) .assert() .success() .stdout( predicate::str::contains("my_awesome_table2") .and(predicate::str::contains("tag1")) .and(predicate::str::contains("val")), ); } } .boxed() })), ], ) .run() .await } /// Test the schema cli command #[tokio::test] async fn namespaces_cli() { test_helpers::maybe_start_logging(); let database_url = maybe_skip_integration!(); let mut cluster = MiniCluster::create_shared(database_url).await; StepTest::new( &mut cluster, vec![ Step::WriteLineProtocol(String::from( "my_awesome_table2,tag1=A,tag2=B val=42i 123456", )), Step::Custom(Box::new(|state: &mut StepTestState| { async { let querier_addr = state.cluster().querier().querier_grpc_base().to_string(); // Validate the output of the schema CLI command Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&querier_addr) .arg("debug") .arg("namespace") .arg("list") .assert() .success() .stdout(predicate::str::contains(state.cluster().namespace())); } .boxed() })), ], ) .run() .await } /// Test the query_ingester CLI command #[tokio::test] async fn query_ingester() { test_helpers::maybe_start_logging(); let database_url = maybe_skip_integration!(); let mut cluster = MiniCluster::create_shared(database_url).await; StepTest::new( &mut cluster, vec![ Step::WriteLineProtocol(String::from( "my_awesome_table2,tag1=A,tag2=B val=42i 123456", )), Step::WaitForReadable, Step::Custom(Box::new(|state: &mut StepTestState| { async { let ingester_addr = state.cluster().ingester().ingester_grpc_base().to_string(); let expected = [ "+------+------+--------------------------------+-----+", "| tag1 | tag2 | time | val |", "+------+------+--------------------------------+-----+", "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", "+------+------+--------------------------------+-----+", ] .join("\n"); // Validate the output of the query Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&ingester_addr) .arg("query-ingester") .arg(state.cluster().namespace()) .arg("my_awesome_table2") .assert() .success() .stdout(predicate::str::contains(&expected)); } .boxed() })), Step::Custom(Box::new(|state: &mut StepTestState| { async { // now run the query-ingester command against the querier let querier_addr = state.cluster().querier().querier_grpc_base().to_string(); // this is not a good error: it should be // something like "wrong query protocol" or // "invalid message" as the querier requires a // different message format Ticket in the flight protocol let expected = "Unknown namespace: my_awesome_table2"; // Validate that the error message contains a reasonable error Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&querier_addr) .arg("query-ingester") .arg(state.cluster().namespace()) .arg("my_awesome_table2") .assert() .failure() .stderr(predicate::str::contains(expected)); } .boxed() })), Step::Custom(Box::new(|state: &mut StepTestState| { async { let ingester_addr = state.cluster().ingester().ingester_grpc_base().to_string(); let expected = [ "+------+-----+", "| tag1 | val |", "+------+-----+", "| A | 42 |", "+------+-----+", ] .join("\n"); // Validate the output of the query Command::cargo_bin("influxdb_iox") .unwrap() .arg("-h") .arg(&ingester_addr) .arg("query-ingester") .arg(state.cluster().namespace()) .arg("my_awesome_table2") .arg("--columns") .arg("tag1,val") .assert() .success() .stdout(predicate::str::contains(&expected)); } .boxed() })), ], ) .run() .await }