fix: Move DatabaseName to data_types2
parent
1ea4a40b1f
commit
afdff2b1db
|
@ -1179,6 +1179,7 @@ dependencies = [
|
|||
"schema",
|
||||
"snafu",
|
||||
"sqlx",
|
||||
"test_helpers",
|
||||
"uuid 0.8.2",
|
||||
"workspace-hack",
|
||||
]
|
||||
|
@ -5195,7 +5196,7 @@ dependencies = [
|
|||
"arrow",
|
||||
"arrow-flight",
|
||||
"bytes",
|
||||
"data_types",
|
||||
"data_types2",
|
||||
"datafusion 0.1.0",
|
||||
"futures",
|
||||
"generated_types",
|
||||
|
|
|
@ -1,187 +0,0 @@
|
|||
use snafu::Snafu;
|
||||
use std::{borrow::Cow, ops::RangeInclusive};
|
||||
|
||||
/// Length constraints for a database name.
|
||||
///
|
||||
/// A `RangeInclusive` is a closed interval, covering [1, 64]
|
||||
const LENGTH_CONSTRAINT: RangeInclusive<usize> = 1..=64;
|
||||
|
||||
/// Database name validation errors.
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum DatabaseNameError {
|
||||
#[snafu(display(
|
||||
"Database name {} length must be between {} and {} characters",
|
||||
name,
|
||||
LENGTH_CONSTRAINT.start(),
|
||||
LENGTH_CONSTRAINT.end()
|
||||
))]
|
||||
LengthConstraint { name: String },
|
||||
|
||||
#[snafu(display(
|
||||
"Database name '{}' contains invalid character. Character number {} is a control which is not allowed.", name, bad_char_offset
|
||||
))]
|
||||
BadChars {
|
||||
bad_char_offset: usize,
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// A correctly formed database name.
|
||||
///
|
||||
/// Using this wrapper type allows the consuming code to enforce the invariant
|
||||
/// that only valid names are provided.
|
||||
///
|
||||
/// This type derefs to a `str` and therefore can be used in place of anything
|
||||
/// that is expecting a `str`:
|
||||
///
|
||||
/// ```rust
|
||||
/// # use data_types::DatabaseName;
|
||||
/// fn print_database(s: &str) {
|
||||
/// println!("database name: {}", s);
|
||||
/// }
|
||||
///
|
||||
/// let db = DatabaseName::new("data").unwrap();
|
||||
/// print_database(&db);
|
||||
/// ```
|
||||
///
|
||||
/// But this is not reciprocal - functions that wish to accept only
|
||||
/// pre-validated names can use `DatabaseName` as a parameter.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||
pub struct DatabaseName<'a>(Cow<'a, str>);
|
||||
|
||||
impl<'a> DatabaseName<'a> {
|
||||
pub fn new<T: Into<Cow<'a, str>>>(name: T) -> Result<Self, DatabaseNameError> {
|
||||
let name: Cow<'a, str> = name.into();
|
||||
|
||||
if !LENGTH_CONSTRAINT.contains(&name.len()) {
|
||||
return Err(DatabaseNameError::LengthConstraint {
|
||||
name: name.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Validate the name contains only valid characters.
|
||||
//
|
||||
// NOTE: If changing these characters, please update the error message
|
||||
// above.
|
||||
if let Some(bad_char_offset) = name.chars().position(|c| c.is_control()) {
|
||||
return BadCharsSnafu {
|
||||
bad_char_offset,
|
||||
name,
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
Ok(Self(name))
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> &str {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::From<DatabaseName<'a>> for String {
|
||||
fn from(name: DatabaseName<'a>) -> Self {
|
||||
name.0.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::From<&DatabaseName<'a>> for String {
|
||||
fn from(name: &DatabaseName<'a>) -> Self {
|
||||
name.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::TryFrom<&'a str> for DatabaseName<'a> {
|
||||
type Error = DatabaseNameError;
|
||||
|
||||
fn try_from(v: &'a str) -> Result<Self, Self::Error> {
|
||||
Self::new(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::TryFrom<String> for DatabaseName<'a> {
|
||||
type Error = DatabaseNameError;
|
||||
|
||||
fn try_from(v: String) -> Result<Self, Self::Error> {
|
||||
Self::new(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::ops::Deref for DatabaseName<'a> {
|
||||
type Target = str;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::fmt::Display for DatabaseName<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::convert::TryFrom;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
#[test]
|
||||
fn test_deref() {
|
||||
let db = DatabaseName::new("my_example_name").unwrap();
|
||||
assert_eq!(&*db, "my_example_name");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_too_short() {
|
||||
let name = "".to_string();
|
||||
let got = DatabaseName::try_from(name).unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
got,
|
||||
DatabaseNameError::LengthConstraint { name: _n }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_too_long() {
|
||||
let name = "my_example_name_that_is_quite_a_bit_longer_than_allowed_even_though_database_names_can_be_quite_long_bananas".to_string();
|
||||
let got = DatabaseName::try_from(name).unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
got,
|
||||
DatabaseNameError::LengthConstraint { name: _n }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_null() {
|
||||
let got = DatabaseName::new("example\x00").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name 'example\x00' contains invalid character. Character number 7 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_high_control() {
|
||||
let got = DatabaseName::new("\u{007f}example").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name '\u{007f}example' contains invalid character. Character number 0 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_tab() {
|
||||
let got = DatabaseName::new("example\tdb").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name 'example\tdb' contains invalid character. Character number 7 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_newline() {
|
||||
let got = DatabaseName::new("my_example\ndb").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name 'my_example\ndb' contains invalid character. Character number 10 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ok_chars() {
|
||||
let db = DatabaseName::new("my-example-db_with_underscores and spaces").unwrap();
|
||||
assert_eq!(&*db, "my-example-db_with_underscores and spaces");
|
||||
}
|
||||
}
|
|
@ -1,276 +0,0 @@
|
|||
use crate::{write_buffer::WriteBufferConnection, DatabaseName};
|
||||
use snafu::Snafu;
|
||||
use std::{
|
||||
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Error in {}: {}", source_module, source))]
|
||||
PassThrough {
|
||||
source_module: &'static str,
|
||||
source: Box<dyn std::error::Error + Send + Sync + 'static>,
|
||||
},
|
||||
|
||||
#[snafu(display("No sharding rule matches table: {}", table))]
|
||||
NoShardingRuleMatches { table: String },
|
||||
|
||||
#[snafu(display("No shards defined"))]
|
||||
NoShardsDefined,
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// `DatabaseRules` contains the rules for replicating data, sending data to
|
||||
/// subscribers, and querying data for a single database. This information is
|
||||
/// provided by and exposed to operators.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct DatabaseRules {
|
||||
/// The name of the database
|
||||
pub name: DatabaseName<'static>,
|
||||
|
||||
/// Template that generates a partition key for each row inserted into the
|
||||
/// db
|
||||
pub partition_template: PartitionTemplate,
|
||||
|
||||
/// Configure how data flows through the system
|
||||
pub lifecycle_rules: LifecycleRules,
|
||||
|
||||
/// Duration for which the cleanup loop should sleep on average.
|
||||
/// Defaults to 500 seconds.
|
||||
pub worker_cleanup_avg_sleep: Duration,
|
||||
|
||||
/// An optional connection string to a write buffer for either writing or reading.
|
||||
pub write_buffer_connection: Option<WriteBufferConnection>,
|
||||
}
|
||||
|
||||
impl DatabaseRules {
|
||||
pub fn new(name: DatabaseName<'static>) -> Self {
|
||||
Self {
|
||||
name,
|
||||
partition_template: Default::default(),
|
||||
lifecycle_rules: Default::default(),
|
||||
worker_cleanup_avg_sleep: Duration::from_secs(500),
|
||||
write_buffer_connection: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn db_name(&self) -> &str {
|
||||
self.name.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000;
|
||||
pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100;
|
||||
pub const DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000;
|
||||
pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000;
|
||||
pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60;
|
||||
pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60;
|
||||
|
||||
/// Configures how data automatically flows through the system
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct LifecycleRules {
|
||||
/// Once the total amount of buffered data in memory reaches this size start
|
||||
/// dropping data from memory
|
||||
pub buffer_size_soft: Option<NonZeroUsize>,
|
||||
|
||||
/// Once the amount of data in memory reaches this size start
|
||||
/// rejecting writes
|
||||
pub buffer_size_hard: Option<NonZeroUsize>,
|
||||
|
||||
/// Persists chunks to object storage.
|
||||
pub persist: bool,
|
||||
|
||||
/// Do not allow writing new data to this database
|
||||
pub immutable: bool,
|
||||
|
||||
/// If the background worker doesn't find anything to do it
|
||||
/// will sleep for this many milliseconds before looking again
|
||||
pub worker_backoff_millis: NonZeroU64,
|
||||
|
||||
/// The maximum number of permitted concurrently executing compactions.
|
||||
pub max_active_compactions: MaxActiveCompactions,
|
||||
|
||||
/// After how many transactions should IOx write a new checkpoint?
|
||||
pub catalog_transactions_until_checkpoint: NonZeroU64,
|
||||
|
||||
/// Prune catalog transactions older than the given age.
|
||||
///
|
||||
/// Keeping old transaction can be useful for debugging.
|
||||
pub catalog_transaction_prune_age: Duration,
|
||||
|
||||
/// Once a partition hasn't received a write for this period of time,
|
||||
/// it will be compacted and, if set, persisted. Writers will generally
|
||||
/// have this amount of time to send late arriving writes or this could
|
||||
/// be their clock skew.
|
||||
pub late_arrive_window_seconds: NonZeroU32,
|
||||
|
||||
/// Maximum number of rows before triggering persistence
|
||||
pub persist_row_threshold: NonZeroUsize,
|
||||
|
||||
/// Maximum age of a write before triggering persistence
|
||||
pub persist_age_threshold_seconds: NonZeroU32,
|
||||
|
||||
/// Maximum number of rows to buffer in a MUB chunk before compacting it
|
||||
pub mub_row_threshold: NonZeroUsize,
|
||||
|
||||
/// Use up to this amount of space in bytes for caching Parquet files. None
|
||||
/// will disable Parquet file caching.
|
||||
pub parquet_cache_limit: Option<NonZeroU64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum MaxActiveCompactions {
|
||||
/// The maximum number of permitted concurrently executing compactions.
|
||||
/// It is not currently possible to set a limit that disables compactions
|
||||
/// entirely, nor is it possible to set an "unlimited" value.
|
||||
MaxActiveCompactions(NonZeroU32),
|
||||
|
||||
// The maximum number of concurrent active compactions that can run
|
||||
// expressed as a fraction of the available cpus (rounded to the next smallest non-zero integer).
|
||||
MaxActiveCompactionsCpuFraction {
|
||||
fraction: f32,
|
||||
effective: NonZeroU32,
|
||||
},
|
||||
}
|
||||
|
||||
impl MaxActiveCompactions {
|
||||
pub fn new(fraction: f32) -> Self {
|
||||
let cpus = num_cpus::get() as f32 * fraction;
|
||||
let effective = (cpus as u32).saturating_sub(1) + 1;
|
||||
let effective = NonZeroU32::new(effective).unwrap();
|
||||
Self::MaxActiveCompactionsCpuFraction {
|
||||
fraction,
|
||||
effective,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self) -> u32 {
|
||||
match self {
|
||||
Self::MaxActiveCompactions(effective) => effective,
|
||||
Self::MaxActiveCompactionsCpuFraction { effective, .. } => effective,
|
||||
}
|
||||
.get()
|
||||
}
|
||||
}
|
||||
|
||||
// Defaults to number of CPUs.
|
||||
impl Default for MaxActiveCompactions {
|
||||
fn default() -> Self {
|
||||
Self::new(1.0)
|
||||
}
|
||||
}
|
||||
|
||||
// Required because database rules must be Eq but cannot derive Eq for Self
|
||||
// since f32 is not Eq.
|
||||
impl Eq for MaxActiveCompactions {}
|
||||
|
||||
impl LifecycleRules {
|
||||
/// The max timestamp skew across concurrent writers before persisted chunks might overlap
|
||||
pub fn late_arrive_window(&self) -> Duration {
|
||||
Duration::from_secs(self.late_arrive_window_seconds.get() as u64)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LifecycleRules {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
buffer_size_soft: None,
|
||||
buffer_size_hard: None,
|
||||
persist: false,
|
||||
immutable: false,
|
||||
worker_backoff_millis: NonZeroU64::new(DEFAULT_WORKER_BACKOFF_MILLIS).unwrap(),
|
||||
max_active_compactions: Default::default(),
|
||||
catalog_transactions_until_checkpoint: NonZeroU64::new(
|
||||
DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
|
||||
)
|
||||
.unwrap(),
|
||||
catalog_transaction_prune_age: DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE,
|
||||
late_arrive_window_seconds: NonZeroU32::new(DEFAULT_LATE_ARRIVE_WINDOW_SECONDS)
|
||||
.unwrap(),
|
||||
persist_row_threshold: NonZeroUsize::new(DEFAULT_PERSIST_ROW_THRESHOLD).unwrap(),
|
||||
persist_age_threshold_seconds: NonZeroU32::new(DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS)
|
||||
.unwrap(),
|
||||
mub_row_threshold: NonZeroUsize::new(DEFAULT_MUB_ROW_THRESHOLD).unwrap(),
|
||||
parquet_cache_limit: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `PartitionTemplate` is used to compute the partition key of each row that
|
||||
/// gets written. It can consist of the table name, a column name and its value,
|
||||
/// a formatted time, or a string column and regex captures of its value. For
|
||||
/// columns that do not appear in the input row, a blank value is output.
|
||||
///
|
||||
/// The key is constructed in order of the template parts; thus ordering changes
|
||||
/// what partition key is generated.
|
||||
#[derive(Debug, Default, Eq, PartialEq, Clone)]
|
||||
pub struct PartitionTemplate {
|
||||
pub parts: Vec<TemplatePart>,
|
||||
}
|
||||
|
||||
/// `TemplatePart` specifies what part of a row should be used to compute this
|
||||
/// part of a partition key.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub enum TemplatePart {
|
||||
/// The name of a table
|
||||
Table,
|
||||
/// The value in a named column
|
||||
Column(String),
|
||||
/// Applies a `strftime` format to the "time" column.
|
||||
///
|
||||
/// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce
|
||||
/// partition key parts such as "2021-03-14 12:25:21" and
|
||||
/// "2021-04-14 12:24:21"
|
||||
TimeFormat(String),
|
||||
/// Applies a regex to the value in a string column
|
||||
RegexCapture(RegexCapture),
|
||||
/// Applies a `strftime` pattern to some column other than "time"
|
||||
StrftimeColumn(StrftimeColumn),
|
||||
}
|
||||
|
||||
/// `RegexCapture` is for pulling parts of a string column into the partition
|
||||
/// key.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct RegexCapture {
|
||||
pub column: String,
|
||||
pub regex: String,
|
||||
}
|
||||
|
||||
/// [`StrftimeColumn`] is used to create a time based partition key off some
|
||||
/// column other than the builtin `time` column.
|
||||
///
|
||||
/// The value of the named column is formatted using a `strftime`
|
||||
/// style string.
|
||||
///
|
||||
/// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce
|
||||
/// partition key parts such as "2021-03-14 12:25:21" and
|
||||
/// "2021-04-14 12:24:21"
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct StrftimeColumn {
|
||||
pub column: String,
|
||||
pub format: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_max_active_compactions_cpu_fraction() {
|
||||
let n = MaxActiveCompactions::new(1.0);
|
||||
let cpus = n.get();
|
||||
|
||||
let n = MaxActiveCompactions::new(0.5);
|
||||
let half_cpus = n.get();
|
||||
|
||||
assert_eq!(half_cpus, cpus / 2);
|
||||
|
||||
let n = MaxActiveCompactions::new(0.0);
|
||||
let non_zero = n.get();
|
||||
|
||||
assert_eq!(non_zero, 1);
|
||||
}
|
||||
}
|
|
@ -13,8 +13,6 @@
|
|||
pub mod boolean_flag;
|
||||
pub mod chunk_metadata;
|
||||
pub mod consistent_hasher;
|
||||
mod database_name;
|
||||
pub mod database_rules;
|
||||
pub mod error;
|
||||
pub mod job;
|
||||
pub mod partition_metadata;
|
||||
|
@ -24,4 +22,3 @@ pub mod server_id;
|
|||
pub mod timestamp;
|
||||
pub mod write_buffer;
|
||||
pub mod write_summary;
|
||||
pub use database_name::*;
|
||||
|
|
|
@ -14,3 +14,6 @@ snafu = "0.7"
|
|||
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] }
|
||||
uuid = { version = "0.8", features = ["v4"] }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
|
|
@ -20,7 +20,7 @@ use std::{
|
|||
convert::TryFrom,
|
||||
fmt::Write,
|
||||
num::{FpCategory, NonZeroU32},
|
||||
ops::{Add, Deref, Sub},
|
||||
ops::{Add, Deref, RangeInclusive, Sub},
|
||||
sync::Arc,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
@ -31,7 +31,6 @@ pub use data_types::{
|
|||
},
|
||||
sequence::Sequence,
|
||||
timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME},
|
||||
DatabaseName, DatabaseNameError,
|
||||
};
|
||||
|
||||
/// Unique ID for a `Namespace`
|
||||
|
@ -1282,10 +1281,137 @@ impl Deref for NonEmptyString {
|
|||
}
|
||||
}
|
||||
|
||||
/// Length constraints for a database name.
|
||||
///
|
||||
/// A `RangeInclusive` is a closed interval, covering [1, 64]
|
||||
const LENGTH_CONSTRAINT: RangeInclusive<usize> = 1..=64;
|
||||
|
||||
/// Database name validation errors.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_docs)]
|
||||
pub enum DatabaseNameError {
|
||||
#[snafu(display(
|
||||
"Database name {} length must be between {} and {} characters",
|
||||
name,
|
||||
LENGTH_CONSTRAINT.start(),
|
||||
LENGTH_CONSTRAINT.end()
|
||||
))]
|
||||
LengthConstraint { name: String },
|
||||
|
||||
#[snafu(display(
|
||||
"Database name '{}' contains invalid character. \
|
||||
Character number {} is a control which is not allowed.",
|
||||
name,
|
||||
bad_char_offset
|
||||
))]
|
||||
BadChars {
|
||||
bad_char_offset: usize,
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// A correctly formed database name.
|
||||
///
|
||||
/// Using this wrapper type allows the consuming code to enforce the invariant
|
||||
/// that only valid names are provided.
|
||||
///
|
||||
/// This type derefs to a `str` and therefore can be used in place of anything
|
||||
/// that is expecting a `str`:
|
||||
///
|
||||
/// ```rust
|
||||
/// # use data_types2::DatabaseName;
|
||||
/// fn print_database(s: &str) {
|
||||
/// println!("database name: {}", s);
|
||||
/// }
|
||||
///
|
||||
/// let db = DatabaseName::new("data").unwrap();
|
||||
/// print_database(&db);
|
||||
/// ```
|
||||
///
|
||||
/// But this is not reciprocal - functions that wish to accept only
|
||||
/// pre-validated names can use `DatabaseName` as a parameter.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||
pub struct DatabaseName<'a>(Cow<'a, str>);
|
||||
|
||||
impl<'a> DatabaseName<'a> {
|
||||
/// Create a new, valid DatabaseName.
|
||||
pub fn new<T: Into<Cow<'a, str>>>(name: T) -> Result<Self, DatabaseNameError> {
|
||||
let name: Cow<'a, str> = name.into();
|
||||
|
||||
if !LENGTH_CONSTRAINT.contains(&name.len()) {
|
||||
return Err(DatabaseNameError::LengthConstraint {
|
||||
name: name.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Validate the name contains only valid characters.
|
||||
//
|
||||
// NOTE: If changing these characters, please update the error message
|
||||
// above.
|
||||
if let Some(bad_char_offset) = name.chars().position(|c| c.is_control()) {
|
||||
return BadCharsSnafu {
|
||||
bad_char_offset,
|
||||
name,
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
Ok(Self(name))
|
||||
}
|
||||
|
||||
/// Borrow a string slice of the name.
|
||||
pub fn as_str(&self) -> &str {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::From<DatabaseName<'a>> for String {
|
||||
fn from(name: DatabaseName<'a>) -> Self {
|
||||
name.0.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::From<&DatabaseName<'a>> for String {
|
||||
fn from(name: &DatabaseName<'a>) -> Self {
|
||||
name.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::TryFrom<&'a str> for DatabaseName<'a> {
|
||||
type Error = DatabaseNameError;
|
||||
|
||||
fn try_from(v: &'a str) -> Result<Self, Self::Error> {
|
||||
Self::new(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::convert::TryFrom<String> for DatabaseName<'a> {
|
||||
type Error = DatabaseNameError;
|
||||
|
||||
fn try_from(v: String) -> Result<Self, Self::Error> {
|
||||
Self::new(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::ops::Deref for DatabaseName<'a> {
|
||||
type Target = str;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::fmt::Display for DatabaseName<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use ordered_float::OrderedFloat;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
#[test]
|
||||
fn test_chunk_id_new() {
|
||||
|
@ -1551,4 +1677,62 @@ mod tests {
|
|||
.expect_err("should fail with empty org/bucket valuese");
|
||||
assert!(matches!(err, OrgBucketMappingError::NotSpecified));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deref() {
|
||||
let db = DatabaseName::new("my_example_name").unwrap();
|
||||
assert_eq!(&*db, "my_example_name");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_too_short() {
|
||||
let name = "".to_string();
|
||||
let got = DatabaseName::try_from(name).unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
got,
|
||||
DatabaseNameError::LengthConstraint { name: _n }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_too_long() {
|
||||
let name = "my_example_name_that_is_quite_a_bit_longer_than_allowed_even_though_database_names_can_be_quite_long_bananas".to_string();
|
||||
let got = DatabaseName::try_from(name).unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
got,
|
||||
DatabaseNameError::LengthConstraint { name: _n }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_null() {
|
||||
let got = DatabaseName::new("example\x00").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name 'example\x00' contains invalid character. Character number 7 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_high_control() {
|
||||
let got = DatabaseName::new("\u{007f}example").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name '\u{007f}example' contains invalid character. Character number 0 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_tab() {
|
||||
let got = DatabaseName::new("example\tdb").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name 'example\tdb' contains invalid character. Character number 7 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_chars_newline() {
|
||||
let got = DatabaseName::new("my_example\ndb").unwrap_err();
|
||||
assert_contains!(got.to_string() , "Database name 'my_example\ndb' contains invalid character. Character number 10 is a control which is not allowed.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ok_chars() {
|
||||
let db = DatabaseName::new("my-example-db_with_underscores and spaces").unwrap();
|
||||
assert_eq!(&*db, "my-example-db_with_underscores and spaces");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
# Workspace dependencies, in alphabetical order
|
||||
data_types = { path = "../data_types" }
|
||||
data_types2 = { path = "../data_types2" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
generated_types = { path = "../generated_types" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
//! Implements the native gRPC IOx query API using Arrow Flight
|
||||
use std::fmt::Debug;
|
||||
use std::task::Poll;
|
||||
use std::{pin::Pin, sync::Arc};
|
||||
|
||||
use arrow::{
|
||||
array::{make_array, ArrayRef, MutableArrayData},
|
||||
|
@ -15,24 +12,24 @@ use arrow_flight::{
|
|||
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
|
||||
};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use data_types2::{DatabaseName, DatabaseNameError};
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use futures::{SinkExt, Stream, StreamExt};
|
||||
use generated_types::influxdata::iox::querier::v1 as proto;
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use pin_project::{pin_project, pinned_drop};
|
||||
use prost::Message;
|
||||
use query::{QueryCompletedToken, QueryDatabase};
|
||||
use query::{
|
||||
exec::{ExecutionContextProvider, IOxSessionContext},
|
||||
QueryCompletedToken, QueryDatabase,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use service_common::QueryDatabaseProvider;
|
||||
use service_common::{planner::Planner, QueryDatabaseProvider};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll};
|
||||
use tokio::task::JoinHandle;
|
||||
use tonic::{Request, Response, Streaming};
|
||||
|
||||
use data_types::{DatabaseName, DatabaseNameError};
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use query::exec::{ExecutionContextProvider, IOxSessionContext};
|
||||
|
||||
use service_common::planner::Planner;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
|
|
|
@ -193,7 +193,8 @@ mod tests {
|
|||
core::test_utils::random_topic_name, maybe_skip_kafka_integration,
|
||||
mock::MockBufferSharedState,
|
||||
};
|
||||
use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName};
|
||||
use data_types::write_buffer::WriteBufferCreationConfig;
|
||||
use data_types2::DatabaseName;
|
||||
use std::{convert::TryFrom, num::NonZeroU32};
|
||||
use tempfile::TempDir;
|
||||
|
||||
|
|
Loading…
Reference in New Issue