fix: Move name parsing code to data_types2
parent
3ab0788a94
commit
6b0e7ae46a
|
@ -1175,7 +1175,9 @@ dependencies = [
|
||||||
"data_types",
|
"data_types",
|
||||||
"influxdb_line_protocol",
|
"influxdb_line_protocol",
|
||||||
"ordered-float 3.0.0",
|
"ordered-float 3.0.0",
|
||||||
|
"percent-encoding",
|
||||||
"schema",
|
"schema",
|
||||||
|
"snafu",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"uuid 0.8.2",
|
"uuid 0.8.2",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
|
@ -2568,6 +2570,7 @@ dependencies = [
|
||||||
"clap 3.1.12",
|
"clap 3.1.12",
|
||||||
"clap_blocks",
|
"clap_blocks",
|
||||||
"data_types",
|
"data_types",
|
||||||
|
"data_types2",
|
||||||
"dml",
|
"dml",
|
||||||
"flate2",
|
"flate2",
|
||||||
"futures",
|
"futures",
|
||||||
|
|
|
@ -17,7 +17,6 @@ mod database_name;
|
||||||
pub mod database_rules;
|
pub mod database_rules;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod job;
|
pub mod job;
|
||||||
pub mod names;
|
|
||||||
pub mod non_empty;
|
pub mod non_empty;
|
||||||
pub mod partition_metadata;
|
pub mod partition_metadata;
|
||||||
pub mod router;
|
pub mod router;
|
||||||
|
|
|
@ -1,90 +0,0 @@
|
||||||
use std::borrow::Cow;
|
|
||||||
|
|
||||||
use crate::{DatabaseName, DatabaseNameError};
|
|
||||||
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
|
|
||||||
use snafu::{ResultExt, Snafu};
|
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
|
||||||
pub enum OrgBucketMappingError {
|
|
||||||
#[snafu(display("Invalid database name: {}", source))]
|
|
||||||
InvalidDatabaseName { source: DatabaseNameError },
|
|
||||||
|
|
||||||
#[snafu(display("missing org/bucket value"))]
|
|
||||||
NotSpecified,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Map an InfluxDB 2.X org & bucket into an IOx DatabaseName.
|
|
||||||
///
|
|
||||||
/// This function ensures the mapping is unambiguous by requiring both `org` and
|
|
||||||
/// `bucket` to not contain the `_` character in addition to the
|
|
||||||
/// [`DatabaseName`] validation.
|
|
||||||
pub fn org_and_bucket_to_database<'a, O: AsRef<str>, B: AsRef<str>>(
|
|
||||||
org: O,
|
|
||||||
bucket: B,
|
|
||||||
) -> Result<DatabaseName<'a>, OrgBucketMappingError> {
|
|
||||||
const SEPARATOR: char = '_';
|
|
||||||
|
|
||||||
let org: Cow<'_, str> = utf8_percent_encode(org.as_ref(), NON_ALPHANUMERIC).into();
|
|
||||||
let bucket: Cow<'_, str> = utf8_percent_encode(bucket.as_ref(), NON_ALPHANUMERIC).into();
|
|
||||||
|
|
||||||
// An empty org or bucket is not acceptable.
|
|
||||||
if org.is_empty() || bucket.is_empty() {
|
|
||||||
return Err(OrgBucketMappingError::NotSpecified);
|
|
||||||
}
|
|
||||||
|
|
||||||
let db_name = format!("{}{}{}", org.as_ref(), SEPARATOR, bucket.as_ref());
|
|
||||||
|
|
||||||
DatabaseName::new(db_name).context(InvalidDatabaseNameSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_org_bucket_map_db_ok() {
|
|
||||||
let got = org_and_bucket_to_database("org", "bucket").expect("failed on valid DB mapping");
|
|
||||||
|
|
||||||
assert_eq!(got.as_str(), "org_bucket");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_org_bucket_map_db_contains_underscore() {
|
|
||||||
let got = org_and_bucket_to_database("my_org", "bucket").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "my%5Forg_bucket");
|
|
||||||
|
|
||||||
let got = org_and_bucket_to_database("org", "my_bucket").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "org_my%5Fbucket");
|
|
||||||
|
|
||||||
let got = org_and_bucket_to_database("org", "my__bucket").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "org_my%5F%5Fbucket");
|
|
||||||
|
|
||||||
let got = org_and_bucket_to_database("my_org", "my_bucket").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "my%5Forg_my%5Fbucket");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_org_bucket_map_db_contains_underscore_and_percent() {
|
|
||||||
let got = org_and_bucket_to_database("my%5Forg", "bucket").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "my%255Forg_bucket");
|
|
||||||
|
|
||||||
let got = org_and_bucket_to_database("my%5Forg_", "bucket").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "my%255Forg%5F_bucket");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_bad_database_name_is_encoded() {
|
|
||||||
let got = org_and_bucket_to_database("org", "bucket?").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "org_bucket%3F");
|
|
||||||
|
|
||||||
let got = org_and_bucket_to_database("org!", "bucket").unwrap();
|
|
||||||
assert_eq!(got.as_str(), "org%21_bucket");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_empty_org_bucket() {
|
|
||||||
let err = org_and_bucket_to_database("", "")
|
|
||||||
.expect_err("should fail with empty org/bucket valuese");
|
|
||||||
assert!(matches!(err, OrgBucketMappingError::NotSpecified));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -8,7 +8,9 @@ description = "Shared data types in the IOx NG architecture"
|
||||||
data_types = { path = "../data_types" }
|
data_types = { path = "../data_types" }
|
||||||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||||
ordered-float = "3"
|
ordered-float = "3"
|
||||||
|
percent-encoding = "2.1.0"
|
||||||
schema = { path = "../schema" }
|
schema = { path = "../schema" }
|
||||||
|
snafu = "0.7"
|
||||||
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] }
|
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] }
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
|
@ -11,8 +11,11 @@
|
||||||
)]
|
)]
|
||||||
|
|
||||||
use influxdb_line_protocol::FieldValue;
|
use influxdb_line_protocol::FieldValue;
|
||||||
|
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
|
||||||
use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
|
use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
|
||||||
|
use snafu::{ResultExt, Snafu};
|
||||||
use std::{
|
use std::{
|
||||||
|
borrow::Cow,
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
convert::TryFrom,
|
convert::TryFrom,
|
||||||
fmt::Write,
|
fmt::Write,
|
||||||
|
@ -23,14 +26,13 @@ use std::{
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub use data_types::{
|
pub use data_types::{
|
||||||
names::{org_and_bucket_to_database, OrgBucketMappingError},
|
|
||||||
non_empty::NonEmptyString,
|
non_empty::NonEmptyString,
|
||||||
partition_metadata::{
|
partition_metadata::{
|
||||||
ColumnSummary, InfluxDbType, PartitionAddr, StatValues, Statistics, TableSummary,
|
ColumnSummary, InfluxDbType, PartitionAddr, StatValues, Statistics, TableSummary,
|
||||||
},
|
},
|
||||||
sequence::Sequence,
|
sequence::Sequence,
|
||||||
timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME},
|
timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME},
|
||||||
DatabaseName,
|
DatabaseName, DatabaseNameError,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Unique ID for a `Namespace`
|
/// Unique ID for a `Namespace`
|
||||||
|
@ -1219,6 +1221,40 @@ impl std::fmt::Display for Scalar {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Snafu)]
|
||||||
|
#[allow(missing_docs)]
|
||||||
|
pub enum OrgBucketMappingError {
|
||||||
|
#[snafu(display("Invalid database name: {}", source))]
|
||||||
|
InvalidDatabaseName { source: DatabaseNameError },
|
||||||
|
|
||||||
|
#[snafu(display("missing org/bucket value"))]
|
||||||
|
NotSpecified,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Map an InfluxDB 2.X org & bucket into an IOx DatabaseName.
|
||||||
|
///
|
||||||
|
/// This function ensures the mapping is unambiguous by requiring both `org` and
|
||||||
|
/// `bucket` to not contain the `_` character in addition to the
|
||||||
|
/// [`DatabaseName`] validation.
|
||||||
|
pub fn org_and_bucket_to_database<'a, O: AsRef<str>, B: AsRef<str>>(
|
||||||
|
org: O,
|
||||||
|
bucket: B,
|
||||||
|
) -> Result<DatabaseName<'a>, OrgBucketMappingError> {
|
||||||
|
const SEPARATOR: char = '_';
|
||||||
|
|
||||||
|
let org: Cow<'_, str> = utf8_percent_encode(org.as_ref(), NON_ALPHANUMERIC).into();
|
||||||
|
let bucket: Cow<'_, str> = utf8_percent_encode(bucket.as_ref(), NON_ALPHANUMERIC).into();
|
||||||
|
|
||||||
|
// An empty org or bucket is not acceptable.
|
||||||
|
if org.is_empty() || bucket.is_empty() {
|
||||||
|
return Err(OrgBucketMappingError::NotSpecified);
|
||||||
|
}
|
||||||
|
|
||||||
|
let db_name = format!("{}{}{}", org.as_ref(), SEPARATOR, bucket.as_ref());
|
||||||
|
|
||||||
|
DatabaseName::new(db_name).context(InvalidDatabaseNameSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -1441,4 +1477,51 @@ mod tests {
|
||||||
r#""col1"='' AND "col2"='foo' AND "col3"='fo\\o' AND "col4"='fo\'o'"#
|
r#""col1"='' AND "col2"='foo' AND "col3"='fo\\o' AND "col4"='fo\'o'"#
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_org_bucket_map_db_ok() {
|
||||||
|
let got = org_and_bucket_to_database("org", "bucket").expect("failed on valid DB mapping");
|
||||||
|
|
||||||
|
assert_eq!(got.as_str(), "org_bucket");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_org_bucket_map_db_contains_underscore() {
|
||||||
|
let got = org_and_bucket_to_database("my_org", "bucket").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "my%5Forg_bucket");
|
||||||
|
|
||||||
|
let got = org_and_bucket_to_database("org", "my_bucket").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "org_my%5Fbucket");
|
||||||
|
|
||||||
|
let got = org_and_bucket_to_database("org", "my__bucket").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "org_my%5F%5Fbucket");
|
||||||
|
|
||||||
|
let got = org_and_bucket_to_database("my_org", "my_bucket").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "my%5Forg_my%5Fbucket");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_org_bucket_map_db_contains_underscore_and_percent() {
|
||||||
|
let got = org_and_bucket_to_database("my%5Forg", "bucket").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "my%255Forg_bucket");
|
||||||
|
|
||||||
|
let got = org_and_bucket_to_database("my%5Forg_", "bucket").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "my%255Forg%5F_bucket");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_bad_database_name_is_encoded() {
|
||||||
|
let got = org_and_bucket_to_database("org", "bucket?").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "org_bucket%3F");
|
||||||
|
|
||||||
|
let got = org_and_bucket_to_database("org!", "bucket").unwrap();
|
||||||
|
assert_eq!(got.as_str(), "org%21_bucket");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_empty_org_bucket() {
|
||||||
|
let err = org_and_bucket_to_database("", "")
|
||||||
|
.expect_err("should fail with empty org/bucket valuese");
|
||||||
|
assert!(matches!(err, OrgBucketMappingError::NotSpecified));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ edition = "2021"
|
||||||
# Workspace dependencies, in alphabetical order
|
# Workspace dependencies, in alphabetical order
|
||||||
clap_blocks = { path = "../clap_blocks" }
|
clap_blocks = { path = "../clap_blocks" }
|
||||||
data_types = { path = "../data_types" }
|
data_types = { path = "../data_types" }
|
||||||
|
data_types2 = { path = "../data_types2" }
|
||||||
dml = { path = "../dml" }
|
dml = { path = "../dml" }
|
||||||
metric = { path = "../metric" }
|
metric = { path = "../metric" }
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
|
|
|
@ -2,19 +2,16 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
use data_types2::{
|
||||||
use serde::Deserialize;
|
org_and_bucket_to_database, DatabaseName, NonEmptyString, OrgBucketMappingError,
|
||||||
use snafu::{OptionExt, ResultExt, Snafu};
|
|
||||||
|
|
||||||
use data_types::{
|
|
||||||
names::{org_and_bucket_to_database, OrgBucketMappingError},
|
|
||||||
non_empty::NonEmptyString,
|
|
||||||
DatabaseName,
|
|
||||||
};
|
};
|
||||||
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
|
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
|
||||||
|
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||||
use mutable_batch_lp::LinesConverter;
|
use mutable_batch_lp::LinesConverter;
|
||||||
use observability_deps::tracing::debug;
|
use observability_deps::tracing::debug;
|
||||||
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
|
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
|
|
||||||
use crate::{http::utils::parse_body, server_type::ServerType};
|
use crate::{http::utils::parse_body, server_type::ServerType};
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,8 @@ use crate::{
|
||||||
input::GrpcInputs,
|
input::GrpcInputs,
|
||||||
StorageService,
|
StorageService,
|
||||||
};
|
};
|
||||||
use data_types::{error::ErrorLogger, names::org_and_bucket_to_database, DatabaseName};
|
use data_types::error::ErrorLogger;
|
||||||
|
use data_types2::{org_and_bucket_to_database, DatabaseName};
|
||||||
use generated_types::{
|
use generated_types::{
|
||||||
google::protobuf::Empty, literal_or_regex::Value as RegexOrLiteralValue,
|
google::protobuf::Empty, literal_or_regex::Value as RegexOrLiteralValue,
|
||||||
offsets_response::PartitionOffsetResponse, storage_server::Storage, tag_key_predicate,
|
offsets_response::PartitionOffsetResponse, storage_server::Storage, tag_key_predicate,
|
||||||
|
|
Loading…
Reference in New Issue