chore(cli): add `--partition-template` to `namespace create` (#8365)

* chore(cli): add `--partition-template` to namespace create

* chore: fix typo in doc for `PartitionTemplateConfig`

chore: add max limit 8 for partition template in doc

* chore: add e2e tests

* chore: fmt

* chore: add more e2e tests for namespace create with partition template

* chore: show doc comments in cli help interface

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Chunchun Ye 2023-08-01 09:37:00 -05:00 committed by GitHub
parent 73f38077b6
commit c8242c7469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 566 additions and 9 deletions

View File

@ -1,6 +1,6 @@
use influxdb_iox_client::connection::Connection; use influxdb_iox_client::connection::Connection;
use crate::commands::namespace::Result; use crate::commands::{namespace::Result, partition_template::PartitionTemplateConfig};
use influxdb_iox_client::namespace::generated_types::ServiceProtectionLimits; use influxdb_iox_client::namespace::generated_types::ServiceProtectionLimits;
/// Write data into the specified database /// Write data into the specified database
@ -23,6 +23,10 @@ pub struct Config {
#[clap(flatten)] #[clap(flatten)]
service_protection_limits: ServiceProtectionLimitsArgs, service_protection_limits: ServiceProtectionLimitsArgs,
/// Partition template
#[clap(flatten)]
partition_template_config: PartitionTemplateConfig,
} }
#[derive(Debug, clap::Args)] #[derive(Debug, clap::Args)]
@ -57,6 +61,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
namespace, namespace,
retention_hours, retention_hours,
service_protection_limits, service_protection_limits,
partition_template_config,
} = config; } = config;
let mut client = influxdb_iox_client::namespace::Client::new(connection); let mut client = influxdb_iox_client::namespace::Client::new(connection);
@ -74,10 +79,57 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
&namespace, &namespace,
retention, retention,
service_protection_limits.into(), service_protection_limits.into(),
None, partition_template_config.partition_template,
) )
.await?; .await?;
println!("{}", serde_json::to_string_pretty(&namespace)?); println!("{}", serde_json::to_string_pretty(&namespace)?);
Ok(()) Ok(())
} }
#[cfg(test)]
mod tests {
use crate::commands::namespace::create::Config;
use clap::Parser;
use influxdb_iox_client::table::generated_types::{Part, PartitionTemplate, TemplatePart};
// Valid config without partition template
#[test]
fn valid_no_partition_template() {
let config = Config::try_parse_from(["server", "namespace"]).unwrap();
assert_eq!(config.namespace, "namespace");
assert_eq!(config.partition_template_config.partition_template, None);
}
// Valid config with partition template
#[test]
fn valid_partition_template() {
let config = Config::try_parse_from([
"server",
"namespace",
"--partition-template",
"{\"parts\": [{\"tagValue\": \"col1\"}, {\"timeFormat\": \"%Y.%j\"}, {\"tagValue\": \"col2,col3 col4\"}] }",
]).unwrap();
let expected = Some(PartitionTemplate {
parts: vec![
TemplatePart {
part: Some(Part::TagValue("col1".to_string())),
},
TemplatePart {
part: Some(Part::TimeFormat("%Y.%j".to_string())),
},
TemplatePart {
part: Some(Part::TagValue("col2,col3 col4".to_string())),
},
],
});
assert_eq!(config.namespace, "namespace");
assert_eq!(
config.partition_template_config.partition_template,
expected
);
}
}

View File

@ -11,15 +11,20 @@ pub enum Error {
NoParts, NoParts,
} }
/// Partition template in format:
/// {"parts": [{"TimeFormat": "%Y.%j"}, {"TagValue": "col1"}, {"TagValue": "col2,col3 col4"}] }
/// - TimeFormat and TagFormat can be in any order
/// - The value of TimeFormat and TagFormat are string and can be whatever at parsing time.
/// If they are not in the right format the server expcected, the server will return error.
/// - The number of TimeFormats and TagFormats are not limited at parsing time. Server limits
/// the total number of them and will send back error if it exceeds the limit.
#[derive(Debug, clap::Parser, Default, Clone)] #[derive(Debug, clap::Parser, Default, Clone)]
pub struct PartitionTemplateConfig { pub struct PartitionTemplateConfig {
/// Partition template format:
///
/// e.g. {"parts": [{"timeFormat": "%Y-%m"}, {"tagValue": "col1"}, {"tagValue": "col2,col3,col4"}]}
///
/// - timeFormat and tagValue can be in any order
///
/// - The value of timeFormat and tagValue are string and can be whatever at parsing time.
/// If they are not in the right format the server expcected, the server will return error.
/// Note that "time" is a reserved word and cannot be used in timeFormat.
///
/// - The number of timeFormats and tagValues are not limited at parsing time. Server limits
/// the total number of them and will send back error if it exceeds the limit 8.
#[clap( #[clap(
action, action,
long = "partition-template", long = "partition-template",

View File

@ -441,6 +441,23 @@ async fn create_tables_negative() {
.stderr(predicate::str::contains( .stderr(predicate::str::contains(
"Partition templates may have a maximum of 8 parts", "Partition templates may have a maximum of 8 parts",
)); ));
// Update an existing table
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&router_grpc_addr)
.arg("table")
.arg("update")
.arg(namespace)
.arg("h2o_temperature")
.arg("--partition-template")
.arg("{\"parts\":[{\"tagValue\":\"col1\"}] }")
.assert()
.failure()
.stderr(predicate::str::contains(
"error: unrecognized subcommand 'update'",
));
} }
.boxed() .boxed()
})), })),
@ -1428,6 +1445,489 @@ async fn namespace_create_service_limits() {
.await .await
} }
/// Test setting partition template while creating namespaces, negative tests
#[tokio::test]
async fn namespace_create_partition_template_negative() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = "ns_negative";
// No partition tempplate specified
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.assert()
.failure()
.stderr(
predicate::str::contains(
"error: a value is required for '--partition-template <PARTITION_TEMPLATE>' but none was supplied"
)
);
// Wrong spelling `prts`
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.arg("{\"prts\": [{\"tagValue\": \"location\"}, {\"tagValue\": \"state\"}, {\"timeFormat\": \"%Y-%m\"}] }")
.assert()
.failure()
.stderr(predicate::str::contains(
"Client Error: Invalid partition template format : unknown field `prts`",
));
// Time as tag
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.arg("{\"parts\": [{\"tagValue\": \"location\"}, {\"tagValue\": \"time\"}, {\"timeFormat\": \"%Y-%m\"}] }")
.assert()
.failure()
.stderr(predicate::str::contains(
"Client error: Client specified an invalid argument: invalid tag value in partition template: time cannot be used",
));
// Time format is `%42` which is invalid
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.arg("{\"parts\": [{\"tagValue\": \"location\"}, {\"timeFormat\": \"%42\"}] }")
.assert()
.failure()
.stderr(predicate::str::contains(
"Client error: Client specified an invalid argument: invalid strftime format in partition template",
));
// Over 8 parts
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.arg("{\"parts\": [{\"tagValue\": \"1\"},{\"tagValue\": \"2\"},{\"timeFormat\": \"%Y-%m\"},{\"tagValue\": \"4\"},{\"tagValue\": \"5\"},{\"tagValue\": \"6\"},{\"tagValue\": \"7\"},{\"tagValue\": \"8\"},{\"tagValue\": \"9\"}]}")
.assert()
.failure()
.stderr(predicate::str::contains(
"Partition templates may have a maximum of 8 parts",
));
}
.boxed()
}))
],
)
.run()
.await
}
/// Test setting partition template while creating namespaces, positive tests
#[tokio::test]
async fn namespace_create_partition_template_positive() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
// No partition template specified
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg("ns_partition_template_1")
.assert()
.success()
.stdout(predicate::str::contains("ns_partition_template_1"));
// Partition template with time format
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg("ns_partition_template_2")
.arg("--partition-template")
.arg("{\"parts\":[{\"timeFormat\":\"%Y-%m\"}] }")
.assert()
.success()
.stdout(predicate::str::contains("ns_partition_template_2"));
// Partition template with tag value
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg("ns_partition_template_3")
.arg("--partition-template")
.arg("{\"parts\":[{\"tagValue\":\"col1\"}] }")
.assert()
.success()
.stdout(predicate::str::contains("ns_partition_template_3"));
// Partition template with time format, tag value, and tag of unsual column name
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg("ns_partition_template_4")
.arg("--partition-template")
.arg("{\"parts\":[{\"tagValue\":\"col1\"},{\"timeFormat\":\"%Y-%d\"},{\"tagValue\":\"yes,col name\"}] }")
.assert()
.success()
.stdout(predicate::str::contains("ns_partition_template_4"));
// Update an existing namespace
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("update")
.arg("ns_partition_template_4")
.arg("--partition-template")
.arg("{\"parts\":[{\"tagValue\":\"col1\"}] }")
.assert()
.failure()
.stderr(predicate::str::contains(
"error: unrecognized subcommand 'update'",
));
}
.boxed()
})),
],
)
.run()
.await
}
/// Test partition template for namespace and table creation:
/// When a namespace is created *with* a custom partition template
/// and a table is created implicitly, i.e. *without* a partition template,
/// the namespace's partition template will be applied to this table
#[tokio::test]
async fn namespace_create_partition_template_implicit_table_creation() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared(database_url).await;
let namespace = "ns_createtableimplicit";
StepTest::new(
&mut cluster,
vec![
// Explicitly create a namespace with a custom partition template
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = "ns_createtableimplicit";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.arg(
"{\"parts\":[{\"timeFormat\":\"%Y-%m\"}, {\"tagValue\":\"location\"}]}",
)
.assert()
.success()
.stdout(predicate::str::contains(namespace));
}
.boxed()
})),
// Write, which implicitly creates the table with the namespace's custom partition template
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_http_base().to_string();
let namespace = "ns_createtableimplicit";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-v")
.arg("-h")
.arg(&addr)
.arg("write")
.arg(namespace)
.arg("../test_fixtures/lineproto/temperature.lp")
.assert()
.success()
.stdout(predicate::str::contains("591 Bytes OK"));
}
.boxed()
})),
// Read data
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
// data from 'air_and_water.lp'
wait_for_query_result_with_namespace(
namespace,
state,
"SELECT * from h2o_temperature order by time desc limit 10",
None,
"| 51.3 | coyote_creek | CA | 55.1 | 1970-01-01T00:00:01.568756160Z |"
).await;
}
.boxed()
})),
// Check partition keys that use the namespace's partition template
Step::PartitionKeys {
table_name: "h2o_temperature".to_string(),
namespace_name: Some(namespace.to_string()),
expected: vec![
"1970-01|coyote_creek",
"1970-01|puget_sound",
"1970-01|santa_monica",
],
},
],
)
.run()
.await
}
/// Test partition template for namespace and table creation:
/// When a namespace is created *with* a custom partition template
/// and a table is created *without* a partition template,
/// the namespace's partition template will be applied to this table
#[tokio::test]
async fn namespace_create_partition_template_explicit_table_creation_without_partition_template() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared(database_url).await;
let namespace = "ns_createtableexplicitwithout";
StepTest::new(
&mut cluster,
vec![
// Explicitly create a namespace with a custom partition template
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = "ns_createtableexplicitwithout";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.arg("{\"parts\":[{\"timeFormat\":\"%Y-%m\"}, {\"tagValue\":\"state\"}]}")
.assert()
.success()
.stdout(predicate::str::contains(namespace));
}
.boxed()
})),
// Explicitly create a table *without* a custom partition template
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = "ns_createtableexplicitwithout";
let table_name = "h2o_temperature";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-v")
.arg("-h")
.arg(&addr)
.arg("table")
.arg("create")
.arg(namespace)
.arg(table_name)
.assert()
.success()
.stdout(predicate::str::contains(table_name));
}
.boxed()
})),
// Write to the just-created table
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_http_base().to_string();
let namespace = "ns_createtableexplicitwithout";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-v")
.arg("-h")
.arg(&addr)
.arg("write")
.arg(namespace)
.arg("../test_fixtures/lineproto/temperature.lp")
.assert()
.success()
.stdout(predicate::str::contains("591 Bytes OK"));
}
.boxed()
})),
// Read data
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
// data from 'air_and_water.lp'
wait_for_query_result_with_namespace(
namespace,
state,
"SELECT * from h2o_temperature order by time desc limit 10",
None,
"| 51.3 | coyote_creek | CA | 55.1 | 1970-01-01T00:00:01.568756160Z |"
).await;
}
.boxed()
})),
// Check partition keys that use the namespace's partition template
Step::PartitionKeys{table_name: "h2o_temperature".to_string(), namespace_name: Some(namespace.to_string()), expected: vec!["1970-01|CA", "1970-01|WA"]},
],
)
.run()
.await
}
/// Test partition template for namespace and table creation:
/// When a namespace is created *with* a custom partition template
/// and a table is created *with* a partition template,
/// the table's partition template will be applied to this table
#[tokio::test]
async fn namespace_create_partition_template_explicit_table_creation_with_partition_template() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared(database_url).await;
let namespace = "ns_createtableexplicitwith";
StepTest::new(
&mut cluster,
vec![
// Explicitly create a namespace with a custom partition template
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = "ns_createtableexplicitwith";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.arg("--partition-template")
.arg("{\"parts\":[{\"timeFormat\":\"%Y-%m\"}, {\"tagValue\":\"state\"}]}")
.assert()
.success()
.stdout(predicate::str::contains(namespace));
}
.boxed()
})),
// Explicitly create a table *with* a custom partition template
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = "ns_createtableexplicitwith";
let table_name = "h2o_temperature";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-v")
.arg("-h")
.arg(&addr)
.arg("table")
.arg("create")
.arg(namespace)
.arg(table_name)
.arg("--partition-template")
.arg("{\"parts\":[{\"tagValue\":\"location\"}, {\"timeFormat\":\"%Y-%m\"}]}")
.assert()
.success()
.stdout(predicate::str::contains(table_name));
}
.boxed()
})),
// Write to the just-created table
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_http_base().to_string();
let namespace = "ns_createtableexplicitwith";
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-v")
.arg("-h")
.arg(&addr)
.arg("write")
.arg(namespace)
.arg("../test_fixtures/lineproto/temperature.lp")
.assert()
.success()
.stdout(predicate::str::contains("591 Bytes OK"));
}
.boxed()
})),
// Read data
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
// data from 'air_and_water.lp'
wait_for_query_result_with_namespace(
namespace,
state,
"SELECT * from h2o_temperature order by time desc limit 10",
None,
"| 51.3 | coyote_creek | CA | 55.1 | 1970-01-01T00:00:01.568756160Z |"
).await;
}
.boxed()
})),
// Check partition keys that use the table's partition template
Step::PartitionKeys{table_name: "h2o_temperature".to_string(), namespace_name: Some(namespace.to_string()), expected: vec!["coyote_creek|1970-01", "puget_sound|1970-01", "santa_monica|1970-01"]},
],
)
.run()
.await
}
/// Test the namespace update service limit command /// Test the namespace update service limit command
#[tokio::test] #[tokio::test]
async fn namespace_update_service_limit() { async fn namespace_update_service_limit() {