refactor: org & bucket parser on NamespaceName
Moves the function org_and_bucket_to_namespace() to be an associated method (constructor) on the NamespaceName itself.pull/24376/head
parent
259dc04937
commit
65034cfaa6
|
@ -20,30 +20,6 @@ pub enum OrgBucketMappingError {
|
|||
NotSpecified,
|
||||
}
|
||||
|
||||
/// Map an InfluxDB 2.X org & bucket into an IOx NamespaceName.
|
||||
///
|
||||
/// This function ensures the mapping is unambiguous by requiring both `org` and
|
||||
/// `bucket` to not contain the `_` character in addition to the
|
||||
/// [`NamespaceName`] validation.
|
||||
pub fn org_and_bucket_to_namespace<'a, O: AsRef<str>, B: AsRef<str>>(
|
||||
org: O,
|
||||
bucket: B,
|
||||
) -> Result<NamespaceName<'a>, OrgBucketMappingError> {
|
||||
const SEPARATOR: char = '_';
|
||||
|
||||
let org: Cow<'_, str> = utf8_percent_encode(org.as_ref(), NON_ALPHANUMERIC).into();
|
||||
let bucket: Cow<'_, str> = utf8_percent_encode(bucket.as_ref(), NON_ALPHANUMERIC).into();
|
||||
|
||||
// An empty org or bucket is not acceptable.
|
||||
if org.is_empty() || bucket.is_empty() {
|
||||
return Err(OrgBucketMappingError::NotSpecified);
|
||||
}
|
||||
|
||||
let db_name = format!("{}{}{}", org.as_ref(), SEPARATOR, bucket.as_ref());
|
||||
|
||||
NamespaceName::new(db_name).context(InvalidNamespaceNameSnafu)
|
||||
}
|
||||
|
||||
/// [`NamespaceName`] name validation errors.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_docs)]
|
||||
|
@ -121,6 +97,29 @@ impl<'a> NamespaceName<'a> {
|
|||
pub fn as_str(&self) -> &str {
|
||||
self.0.as_ref()
|
||||
}
|
||||
|
||||
/// Map an InfluxDB 2.X org & bucket into an IOx NamespaceName.
|
||||
///
|
||||
/// This function ensures the mapping is unambiguous by encoding any
|
||||
/// non-alphanumeric characters in both `org` and `bucket` in addition to
|
||||
/// the validation performed in [`NamespaceName::new()`].
|
||||
pub fn from_org_and_bucket<O: AsRef<str>, B: AsRef<str>>(
|
||||
org: O,
|
||||
bucket: B,
|
||||
) -> Result<Self, OrgBucketMappingError> {
|
||||
let org = org.as_ref();
|
||||
let bucket = bucket.as_ref();
|
||||
|
||||
if org.is_empty() || bucket.is_empty() {
|
||||
return Err(OrgBucketMappingError::NotSpecified);
|
||||
}
|
||||
|
||||
let prefix: Cow<'_, str> = utf8_percent_encode(org, NON_ALPHANUMERIC).into();
|
||||
let suffix: Cow<'_, str> = utf8_percent_encode(bucket, NON_ALPHANUMERIC).into();
|
||||
|
||||
let db_name = format!("{}_{}", prefix, suffix);
|
||||
Self::new(db_name).context(InvalidNamespaceNameSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::From<NamespaceName<'a>> for String {
|
||||
|
@ -171,47 +170,48 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_org_bucket_map_db_ok() {
|
||||
let got = org_and_bucket_to_namespace("org", "bucket").expect("failed on valid DB mapping");
|
||||
let got = NamespaceName::from_org_and_bucket("org", "bucket")
|
||||
.expect("failed on valid DB mapping");
|
||||
|
||||
assert_eq!(got.as_str(), "org_bucket");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_org_bucket_map_db_contains_underscore() {
|
||||
let got = org_and_bucket_to_namespace("my_org", "bucket").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("my_org", "bucket").unwrap();
|
||||
assert_eq!(got.as_str(), "my%5Forg_bucket");
|
||||
|
||||
let got = org_and_bucket_to_namespace("org", "my_bucket").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("org", "my_bucket").unwrap();
|
||||
assert_eq!(got.as_str(), "org_my%5Fbucket");
|
||||
|
||||
let got = org_and_bucket_to_namespace("org", "my__bucket").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("org", "my__bucket").unwrap();
|
||||
assert_eq!(got.as_str(), "org_my%5F%5Fbucket");
|
||||
|
||||
let got = org_and_bucket_to_namespace("my_org", "my_bucket").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("my_org", "my_bucket").unwrap();
|
||||
assert_eq!(got.as_str(), "my%5Forg_my%5Fbucket");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_org_bucket_map_db_contains_underscore_and_percent() {
|
||||
let got = org_and_bucket_to_namespace("my%5Forg", "bucket").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("my%5Forg", "bucket").unwrap();
|
||||
assert_eq!(got.as_str(), "my%255Forg_bucket");
|
||||
|
||||
let got = org_and_bucket_to_namespace("my%5Forg_", "bucket").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("my%5Forg_", "bucket").unwrap();
|
||||
assert_eq!(got.as_str(), "my%255Forg%5F_bucket");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_namespace_name_is_encoded() {
|
||||
let got = org_and_bucket_to_namespace("org", "bucket?").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("org", "bucket?").unwrap();
|
||||
assert_eq!(got.as_str(), "org_bucket%3F");
|
||||
|
||||
let got = org_and_bucket_to_namespace("org!", "bucket").unwrap();
|
||||
let got = NamespaceName::from_org_and_bucket("org!", "bucket").unwrap();
|
||||
assert_eq!(got.as_str(), "org%21_bucket");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_org_bucket() {
|
||||
let err = org_and_bucket_to_namespace("", "")
|
||||
let err = NamespaceName::from_org_and_bucket("", "")
|
||||
.expect_err("should fail with empty org/bucket valuese");
|
||||
assert!(matches!(err, OrgBucketMappingError::NotSpecified));
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ use self::generated_types::{shard_service_client::ShardServiceClient, *};
|
|||
use crate::{AggregateTSMMeasurement, AggregateTSMSchema};
|
||||
use chrono::{format::StrftimeItems, offset::FixedOffset, DateTime, Duration};
|
||||
use data_types::{
|
||||
org_and_bucket_to_namespace, ColumnType, Namespace, NamespaceSchema, OrgBucketMappingError,
|
||||
Partition, PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId,
|
||||
ColumnType, Namespace, NamespaceName, NamespaceSchema, OrgBucketMappingError, Partition,
|
||||
PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId,
|
||||
};
|
||||
use influxdb_iox_client::connection::{Connection, GrpcConnection};
|
||||
use iox_catalog::interface::{
|
||||
|
@ -64,7 +64,7 @@ pub async fn update_iox_catalog<'a>(
|
|||
connection: Connection,
|
||||
) -> Result<(), UpdateCatalogError> {
|
||||
let namespace_name =
|
||||
org_and_bucket_to_namespace(&merged_tsm_schema.org_id, &merged_tsm_schema.bucket_id)
|
||||
NamespaceName::from_org_and_bucket(&merged_tsm_schema.org_id, &merged_tsm_schema.bucket_id)
|
||||
.map_err(UpdateCatalogError::InvalidOrgBucket)?;
|
||||
let mut repos = catalog.repositories().await;
|
||||
let iox_schema = match get_schema_by_name(
|
||||
|
|
|
@ -4,7 +4,7 @@ mod delete_predicate;
|
|||
|
||||
use authz::{Action, Authorizer, Permission, Resource};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use data_types::{org_and_bucket_to_namespace, OrgBucketMappingError};
|
||||
use data_types::{NamespaceName, OrgBucketMappingError};
|
||||
use futures::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode};
|
||||
|
@ -415,7 +415,7 @@ where
|
|||
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
|
||||
|
||||
let write_info = WriteInfo::try_from(&req)?;
|
||||
let namespace = org_and_bucket_to_namespace(&write_info.org, &write_info.bucket)
|
||||
let namespace = NamespaceName::from_org_and_bucket(&write_info.org, &write_info.bucket)
|
||||
.map_err(OrgBucketError::MappingFail)?;
|
||||
|
||||
let token = req
|
||||
|
@ -503,7 +503,7 @@ where
|
|||
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
|
||||
|
||||
let account = WriteInfo::try_from(&req)?;
|
||||
let namespace = org_and_bucket_to_namespace(&account.org, &account.bucket)
|
||||
let namespace = NamespaceName::from_org_and_bucket(&account.org, &account.bucket)
|
||||
.map_err(OrgBucketError::MappingFail)?;
|
||||
|
||||
trace!(org=%account.org, bucket=%account.bucket, %namespace, "processing delete request");
|
||||
|
|
|
@ -13,7 +13,7 @@ use crate::{
|
|||
response_chunking::ChunkReadResponses,
|
||||
StorageService,
|
||||
};
|
||||
use data_types::{org_and_bucket_to_namespace, NamespaceName};
|
||||
use data_types::NamespaceName;
|
||||
use datafusion::error::DataFusionError;
|
||||
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
|
||||
use generated_types::{
|
||||
|
@ -1101,7 +1101,7 @@ where
|
|||
}
|
||||
|
||||
fn get_namespace_name(input: &impl GrpcInputs) -> Result<NamespaceName<'static>, Status> {
|
||||
org_and_bucket_to_namespace(input.org_id()?.to_string(), &input.bucket_name()?)
|
||||
NamespaceName::from_org_and_bucket(input.org_id()?.to_string(), &input.bucket_name()?)
|
||||
.map_err(|e| Status::internal(e.to_string()))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue