From afdff2b1db83662c9a50b0f2acaf54582e8170bb Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 4 May 2022 17:04:29 -0400 Subject: [PATCH] fix: Move DatabaseName to data_types2 --- Cargo.lock | 3 +- data_types/src/database_name.rs | 187 --------------------- data_types/src/database_rules.rs | 276 ------------------------------- data_types/src/lib.rs | 3 - data_types2/Cargo.toml | 3 + data_types2/src/lib.rs | 188 ++++++++++++++++++++- service_grpc_flight/Cargo.toml | 2 +- service_grpc_flight/src/lib.rs | 19 +-- write_buffer/src/config.rs | 3 +- 9 files changed, 202 insertions(+), 482 deletions(-) delete mode 100644 data_types/src/database_name.rs delete mode 100644 data_types/src/database_rules.rs diff --git a/Cargo.lock b/Cargo.lock index 0d2ec75941..402acad833 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1179,6 +1179,7 @@ dependencies = [ "schema", "snafu", "sqlx", + "test_helpers", "uuid 0.8.2", "workspace-hack", ] @@ -5195,7 +5196,7 @@ dependencies = [ "arrow", "arrow-flight", "bytes", - "data_types", + "data_types2", "datafusion 0.1.0", "futures", "generated_types", diff --git a/data_types/src/database_name.rs b/data_types/src/database_name.rs deleted file mode 100644 index eff74ac764..0000000000 --- a/data_types/src/database_name.rs +++ /dev/null @@ -1,187 +0,0 @@ -use snafu::Snafu; -use std::{borrow::Cow, ops::RangeInclusive}; - -/// Length constraints for a database name. -/// -/// A `RangeInclusive` is a closed interval, covering [1, 64] -const LENGTH_CONSTRAINT: RangeInclusive = 1..=64; - -/// Database name validation errors. -#[derive(Debug, Snafu)] -pub enum DatabaseNameError { - #[snafu(display( - "Database name {} length must be between {} and {} characters", - name, - LENGTH_CONSTRAINT.start(), - LENGTH_CONSTRAINT.end() - ))] - LengthConstraint { name: String }, - - #[snafu(display( - "Database name '{}' contains invalid character. Character number {} is a control which is not allowed.", name, bad_char_offset - ))] - BadChars { - bad_char_offset: usize, - name: String, - }, -} - -/// A correctly formed database name. -/// -/// Using this wrapper type allows the consuming code to enforce the invariant -/// that only valid names are provided. -/// -/// This type derefs to a `str` and therefore can be used in place of anything -/// that is expecting a `str`: -/// -/// ```rust -/// # use data_types::DatabaseName; -/// fn print_database(s: &str) { -/// println!("database name: {}", s); -/// } -/// -/// let db = DatabaseName::new("data").unwrap(); -/// print_database(&db); -/// ``` -/// -/// But this is not reciprocal - functions that wish to accept only -/// pre-validated names can use `DatabaseName` as a parameter. -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct DatabaseName<'a>(Cow<'a, str>); - -impl<'a> DatabaseName<'a> { - pub fn new>>(name: T) -> Result { - let name: Cow<'a, str> = name.into(); - - if !LENGTH_CONSTRAINT.contains(&name.len()) { - return Err(DatabaseNameError::LengthConstraint { - name: name.to_string(), - }); - } - - // Validate the name contains only valid characters. - // - // NOTE: If changing these characters, please update the error message - // above. - if let Some(bad_char_offset) = name.chars().position(|c| c.is_control()) { - return BadCharsSnafu { - bad_char_offset, - name, - } - .fail(); - }; - - Ok(Self(name)) - } - - pub fn as_str(&self) -> &str { - self.0.as_ref() - } -} - -impl<'a> std::convert::From> for String { - fn from(name: DatabaseName<'a>) -> Self { - name.0.to_string() - } -} - -impl<'a> std::convert::From<&DatabaseName<'a>> for String { - fn from(name: &DatabaseName<'a>) -> Self { - name.to_string() - } -} - -impl<'a> std::convert::TryFrom<&'a str> for DatabaseName<'a> { - type Error = DatabaseNameError; - - fn try_from(v: &'a str) -> Result { - Self::new(v) - } -} - -impl<'a> std::convert::TryFrom for DatabaseName<'a> { - type Error = DatabaseNameError; - - fn try_from(v: String) -> Result { - Self::new(v) - } -} - -impl<'a> std::ops::Deref for DatabaseName<'a> { - type Target = str; - - fn deref(&self) -> &Self::Target { - self.as_str() - } -} - -impl<'a> std::fmt::Display for DatabaseName<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::convert::TryFrom; - use test_helpers::assert_contains; - - #[test] - fn test_deref() { - let db = DatabaseName::new("my_example_name").unwrap(); - assert_eq!(&*db, "my_example_name"); - } - - #[test] - fn test_too_short() { - let name = "".to_string(); - let got = DatabaseName::try_from(name).unwrap_err(); - - assert!(matches!( - got, - DatabaseNameError::LengthConstraint { name: _n } - )); - } - - #[test] - fn test_too_long() { - let name = "my_example_name_that_is_quite_a_bit_longer_than_allowed_even_though_database_names_can_be_quite_long_bananas".to_string(); - let got = DatabaseName::try_from(name).unwrap_err(); - - assert!(matches!( - got, - DatabaseNameError::LengthConstraint { name: _n } - )); - } - - #[test] - fn test_bad_chars_null() { - let got = DatabaseName::new("example\x00").unwrap_err(); - assert_contains!(got.to_string() , "Database name 'example\x00' contains invalid character. Character number 7 is a control which is not allowed."); - } - - #[test] - fn test_bad_chars_high_control() { - let got = DatabaseName::new("\u{007f}example").unwrap_err(); - assert_contains!(got.to_string() , "Database name '\u{007f}example' contains invalid character. Character number 0 is a control which is not allowed."); - } - - #[test] - fn test_bad_chars_tab() { - let got = DatabaseName::new("example\tdb").unwrap_err(); - assert_contains!(got.to_string() , "Database name 'example\tdb' contains invalid character. Character number 7 is a control which is not allowed."); - } - - #[test] - fn test_bad_chars_newline() { - let got = DatabaseName::new("my_example\ndb").unwrap_err(); - assert_contains!(got.to_string() , "Database name 'my_example\ndb' contains invalid character. Character number 10 is a control which is not allowed."); - } - - #[test] - fn test_ok_chars() { - let db = DatabaseName::new("my-example-db_with_underscores and spaces").unwrap(); - assert_eq!(&*db, "my-example-db_with_underscores and spaces"); - } -} diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs deleted file mode 100644 index 8b350eaabe..0000000000 --- a/data_types/src/database_rules.rs +++ /dev/null @@ -1,276 +0,0 @@ -use crate::{write_buffer::WriteBufferConnection, DatabaseName}; -use snafu::Snafu; -use std::{ - num::{NonZeroU32, NonZeroU64, NonZeroUsize}, - time::Duration, -}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Error in {}: {}", source_module, source))] - PassThrough { - source_module: &'static str, - source: Box, - }, - - #[snafu(display("No sharding rule matches table: {}", table))] - NoShardingRuleMatches { table: String }, - - #[snafu(display("No shards defined"))] - NoShardsDefined, -} - -pub type Result = std::result::Result; - -/// `DatabaseRules` contains the rules for replicating data, sending data to -/// subscribers, and querying data for a single database. This information is -/// provided by and exposed to operators. -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct DatabaseRules { - /// The name of the database - pub name: DatabaseName<'static>, - - /// Template that generates a partition key for each row inserted into the - /// db - pub partition_template: PartitionTemplate, - - /// Configure how data flows through the system - pub lifecycle_rules: LifecycleRules, - - /// Duration for which the cleanup loop should sleep on average. - /// Defaults to 500 seconds. - pub worker_cleanup_avg_sleep: Duration, - - /// An optional connection string to a write buffer for either writing or reading. - pub write_buffer_connection: Option, -} - -impl DatabaseRules { - pub fn new(name: DatabaseName<'static>) -> Self { - Self { - name, - partition_template: Default::default(), - lifecycle_rules: Default::default(), - worker_cleanup_avg_sleep: Duration::from_secs(500), - write_buffer_connection: None, - } - } - - pub fn db_name(&self) -> &str { - self.name.as_str() - } -} - -pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000; -pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100; -pub const DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE: Duration = Duration::from_secs(24 * 60 * 60); -pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000; -pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000; -pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60; -pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60; - -/// Configures how data automatically flows through the system -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct LifecycleRules { - /// Once the total amount of buffered data in memory reaches this size start - /// dropping data from memory - pub buffer_size_soft: Option, - - /// Once the amount of data in memory reaches this size start - /// rejecting writes - pub buffer_size_hard: Option, - - /// Persists chunks to object storage. - pub persist: bool, - - /// Do not allow writing new data to this database - pub immutable: bool, - - /// If the background worker doesn't find anything to do it - /// will sleep for this many milliseconds before looking again - pub worker_backoff_millis: NonZeroU64, - - /// The maximum number of permitted concurrently executing compactions. - pub max_active_compactions: MaxActiveCompactions, - - /// After how many transactions should IOx write a new checkpoint? - pub catalog_transactions_until_checkpoint: NonZeroU64, - - /// Prune catalog transactions older than the given age. - /// - /// Keeping old transaction can be useful for debugging. - pub catalog_transaction_prune_age: Duration, - - /// Once a partition hasn't received a write for this period of time, - /// it will be compacted and, if set, persisted. Writers will generally - /// have this amount of time to send late arriving writes or this could - /// be their clock skew. - pub late_arrive_window_seconds: NonZeroU32, - - /// Maximum number of rows before triggering persistence - pub persist_row_threshold: NonZeroUsize, - - /// Maximum age of a write before triggering persistence - pub persist_age_threshold_seconds: NonZeroU32, - - /// Maximum number of rows to buffer in a MUB chunk before compacting it - pub mub_row_threshold: NonZeroUsize, - - /// Use up to this amount of space in bytes for caching Parquet files. None - /// will disable Parquet file caching. - pub parquet_cache_limit: Option, -} - -#[derive(Debug, PartialEq, Clone)] -pub enum MaxActiveCompactions { - /// The maximum number of permitted concurrently executing compactions. - /// It is not currently possible to set a limit that disables compactions - /// entirely, nor is it possible to set an "unlimited" value. - MaxActiveCompactions(NonZeroU32), - - // The maximum number of concurrent active compactions that can run - // expressed as a fraction of the available cpus (rounded to the next smallest non-zero integer). - MaxActiveCompactionsCpuFraction { - fraction: f32, - effective: NonZeroU32, - }, -} - -impl MaxActiveCompactions { - pub fn new(fraction: f32) -> Self { - let cpus = num_cpus::get() as f32 * fraction; - let effective = (cpus as u32).saturating_sub(1) + 1; - let effective = NonZeroU32::new(effective).unwrap(); - Self::MaxActiveCompactionsCpuFraction { - fraction, - effective, - } - } - - pub fn get(&self) -> u32 { - match self { - Self::MaxActiveCompactions(effective) => effective, - Self::MaxActiveCompactionsCpuFraction { effective, .. } => effective, - } - .get() - } -} - -// Defaults to number of CPUs. -impl Default for MaxActiveCompactions { - fn default() -> Self { - Self::new(1.0) - } -} - -// Required because database rules must be Eq but cannot derive Eq for Self -// since f32 is not Eq. -impl Eq for MaxActiveCompactions {} - -impl LifecycleRules { - /// The max timestamp skew across concurrent writers before persisted chunks might overlap - pub fn late_arrive_window(&self) -> Duration { - Duration::from_secs(self.late_arrive_window_seconds.get() as u64) - } -} - -impl Default for LifecycleRules { - fn default() -> Self { - Self { - buffer_size_soft: None, - buffer_size_hard: None, - persist: false, - immutable: false, - worker_backoff_millis: NonZeroU64::new(DEFAULT_WORKER_BACKOFF_MILLIS).unwrap(), - max_active_compactions: Default::default(), - catalog_transactions_until_checkpoint: NonZeroU64::new( - DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT, - ) - .unwrap(), - catalog_transaction_prune_age: DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE, - late_arrive_window_seconds: NonZeroU32::new(DEFAULT_LATE_ARRIVE_WINDOW_SECONDS) - .unwrap(), - persist_row_threshold: NonZeroUsize::new(DEFAULT_PERSIST_ROW_THRESHOLD).unwrap(), - persist_age_threshold_seconds: NonZeroU32::new(DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS) - .unwrap(), - mub_row_threshold: NonZeroUsize::new(DEFAULT_MUB_ROW_THRESHOLD).unwrap(), - parquet_cache_limit: None, - } - } -} - -/// `PartitionTemplate` is used to compute the partition key of each row that -/// gets written. It can consist of the table name, a column name and its value, -/// a formatted time, or a string column and regex captures of its value. For -/// columns that do not appear in the input row, a blank value is output. -/// -/// The key is constructed in order of the template parts; thus ordering changes -/// what partition key is generated. -#[derive(Debug, Default, Eq, PartialEq, Clone)] -pub struct PartitionTemplate { - pub parts: Vec, -} - -/// `TemplatePart` specifies what part of a row should be used to compute this -/// part of a partition key. -#[derive(Debug, Eq, PartialEq, Clone)] -pub enum TemplatePart { - /// The name of a table - Table, - /// The value in a named column - Column(String), - /// Applies a `strftime` format to the "time" column. - /// - /// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce - /// partition key parts such as "2021-03-14 12:25:21" and - /// "2021-04-14 12:24:21" - TimeFormat(String), - /// Applies a regex to the value in a string column - RegexCapture(RegexCapture), - /// Applies a `strftime` pattern to some column other than "time" - StrftimeColumn(StrftimeColumn), -} - -/// `RegexCapture` is for pulling parts of a string column into the partition -/// key. -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct RegexCapture { - pub column: String, - pub regex: String, -} - -/// [`StrftimeColumn`] is used to create a time based partition key off some -/// column other than the builtin `time` column. -/// -/// The value of the named column is formatted using a `strftime` -/// style string. -/// -/// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce -/// partition key parts such as "2021-03-14 12:25:21" and -/// "2021-04-14 12:24:21" -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct StrftimeColumn { - pub column: String, - pub format: String, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_max_active_compactions_cpu_fraction() { - let n = MaxActiveCompactions::new(1.0); - let cpus = n.get(); - - let n = MaxActiveCompactions::new(0.5); - let half_cpus = n.get(); - - assert_eq!(half_cpus, cpus / 2); - - let n = MaxActiveCompactions::new(0.0); - let non_zero = n.get(); - - assert_eq!(non_zero, 1); - } -} diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 07b479f8c1..22b92dd497 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -13,8 +13,6 @@ pub mod boolean_flag; pub mod chunk_metadata; pub mod consistent_hasher; -mod database_name; -pub mod database_rules; pub mod error; pub mod job; pub mod partition_metadata; @@ -24,4 +22,3 @@ pub mod server_id; pub mod timestamp; pub mod write_buffer; pub mod write_summary; -pub use database_name::*; diff --git a/data_types2/Cargo.toml b/data_types2/Cargo.toml index e160be0bb2..77dea8c888 100644 --- a/data_types2/Cargo.toml +++ b/data_types2/Cargo.toml @@ -14,3 +14,6 @@ snafu = "0.7" sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "postgres", "uuid"] } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} + +[dev-dependencies] # In alphabetical order +test_helpers = { path = "../test_helpers" } diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index dba8b548fb..07828173e9 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -20,7 +20,7 @@ use std::{ convert::TryFrom, fmt::Write, num::{FpCategory, NonZeroU32}, - ops::{Add, Deref, Sub}, + ops::{Add, Deref, RangeInclusive, Sub}, sync::Arc, }; use uuid::Uuid; @@ -31,7 +31,6 @@ pub use data_types::{ }, sequence::Sequence, timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME}, - DatabaseName, DatabaseNameError, }; /// Unique ID for a `Namespace` @@ -1282,10 +1281,137 @@ impl Deref for NonEmptyString { } } +/// Length constraints for a database name. +/// +/// A `RangeInclusive` is a closed interval, covering [1, 64] +const LENGTH_CONSTRAINT: RangeInclusive = 1..=64; + +/// Database name validation errors. +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +pub enum DatabaseNameError { + #[snafu(display( + "Database name {} length must be between {} and {} characters", + name, + LENGTH_CONSTRAINT.start(), + LENGTH_CONSTRAINT.end() + ))] + LengthConstraint { name: String }, + + #[snafu(display( + "Database name '{}' contains invalid character. \ + Character number {} is a control which is not allowed.", + name, + bad_char_offset + ))] + BadChars { + bad_char_offset: usize, + name: String, + }, +} + +/// A correctly formed database name. +/// +/// Using this wrapper type allows the consuming code to enforce the invariant +/// that only valid names are provided. +/// +/// This type derefs to a `str` and therefore can be used in place of anything +/// that is expecting a `str`: +/// +/// ```rust +/// # use data_types2::DatabaseName; +/// fn print_database(s: &str) { +/// println!("database name: {}", s); +/// } +/// +/// let db = DatabaseName::new("data").unwrap(); +/// print_database(&db); +/// ``` +/// +/// But this is not reciprocal - functions that wish to accept only +/// pre-validated names can use `DatabaseName` as a parameter. +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct DatabaseName<'a>(Cow<'a, str>); + +impl<'a> DatabaseName<'a> { + /// Create a new, valid DatabaseName. + pub fn new>>(name: T) -> Result { + let name: Cow<'a, str> = name.into(); + + if !LENGTH_CONSTRAINT.contains(&name.len()) { + return Err(DatabaseNameError::LengthConstraint { + name: name.to_string(), + }); + } + + // Validate the name contains only valid characters. + // + // NOTE: If changing these characters, please update the error message + // above. + if let Some(bad_char_offset) = name.chars().position(|c| c.is_control()) { + return BadCharsSnafu { + bad_char_offset, + name, + } + .fail(); + }; + + Ok(Self(name)) + } + + /// Borrow a string slice of the name. + pub fn as_str(&self) -> &str { + self.0.as_ref() + } +} + +impl<'a> std::convert::From> for String { + fn from(name: DatabaseName<'a>) -> Self { + name.0.to_string() + } +} + +impl<'a> std::convert::From<&DatabaseName<'a>> for String { + fn from(name: &DatabaseName<'a>) -> Self { + name.to_string() + } +} + +impl<'a> std::convert::TryFrom<&'a str> for DatabaseName<'a> { + type Error = DatabaseNameError; + + fn try_from(v: &'a str) -> Result { + Self::new(v) + } +} + +impl<'a> std::convert::TryFrom for DatabaseName<'a> { + type Error = DatabaseNameError; + + fn try_from(v: String) -> Result { + Self::new(v) + } +} + +impl<'a> std::ops::Deref for DatabaseName<'a> { + type Target = str; + + fn deref(&self) -> &Self::Target { + self.as_str() + } +} + +impl<'a> std::fmt::Display for DatabaseName<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + #[cfg(test)] mod tests { use super::*; use ordered_float::OrderedFloat; + use test_helpers::assert_contains; #[test] fn test_chunk_id_new() { @@ -1551,4 +1677,62 @@ mod tests { .expect_err("should fail with empty org/bucket valuese"); assert!(matches!(err, OrgBucketMappingError::NotSpecified)); } + + #[test] + fn test_deref() { + let db = DatabaseName::new("my_example_name").unwrap(); + assert_eq!(&*db, "my_example_name"); + } + + #[test] + fn test_too_short() { + let name = "".to_string(); + let got = DatabaseName::try_from(name).unwrap_err(); + + assert!(matches!( + got, + DatabaseNameError::LengthConstraint { name: _n } + )); + } + + #[test] + fn test_too_long() { + let name = "my_example_name_that_is_quite_a_bit_longer_than_allowed_even_though_database_names_can_be_quite_long_bananas".to_string(); + let got = DatabaseName::try_from(name).unwrap_err(); + + assert!(matches!( + got, + DatabaseNameError::LengthConstraint { name: _n } + )); + } + + #[test] + fn test_bad_chars_null() { + let got = DatabaseName::new("example\x00").unwrap_err(); + assert_contains!(got.to_string() , "Database name 'example\x00' contains invalid character. Character number 7 is a control which is not allowed."); + } + + #[test] + fn test_bad_chars_high_control() { + let got = DatabaseName::new("\u{007f}example").unwrap_err(); + assert_contains!(got.to_string() , "Database name '\u{007f}example' contains invalid character. Character number 0 is a control which is not allowed."); + } + + #[test] + fn test_bad_chars_tab() { + let got = DatabaseName::new("example\tdb").unwrap_err(); + assert_contains!(got.to_string() , "Database name 'example\tdb' contains invalid character. Character number 7 is a control which is not allowed."); + } + + #[test] + fn test_bad_chars_newline() { + let got = DatabaseName::new("my_example\ndb").unwrap_err(); + assert_contains!(got.to_string() , "Database name 'my_example\ndb' contains invalid character. Character number 10 is a control which is not allowed."); + } + + #[test] + fn test_ok_chars() { + let db = DatabaseName::new("my-example-db_with_underscores and spaces").unwrap(); + assert_eq!(&*db, "my-example-db_with_underscores and spaces"); + } } diff --git a/service_grpc_flight/Cargo.toml b/service_grpc_flight/Cargo.toml index 9c4ba72b85..4799c2f7ed 100644 --- a/service_grpc_flight/Cargo.toml +++ b/service_grpc_flight/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] # Workspace dependencies, in alphabetical order -data_types = { path = "../data_types" } +data_types2 = { path = "../data_types2" } datafusion = { path = "../datafusion" } generated_types = { path = "../generated_types" } observability_deps = { path = "../observability_deps" } diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 89e067c9c7..e3129997b9 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -1,7 +1,4 @@ //! Implements the native gRPC IOx query API using Arrow Flight -use std::fmt::Debug; -use std::task::Poll; -use std::{pin::Pin, sync::Arc}; use arrow::{ array::{make_array, ArrayRef, MutableArrayData}, @@ -15,24 +12,24 @@ use arrow_flight::{ HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; use bytes::{Bytes, BytesMut}; +use data_types2::{DatabaseName, DatabaseNameError}; use datafusion::physical_plan::ExecutionPlan; use futures::{SinkExt, Stream, StreamExt}; use generated_types::influxdata::iox::querier::v1 as proto; +use observability_deps::tracing::{info, warn}; use pin_project::{pin_project, pinned_drop}; use prost::Message; -use query::{QueryCompletedToken, QueryDatabase}; +use query::{ + exec::{ExecutionContextProvider, IOxSessionContext}, + QueryCompletedToken, QueryDatabase, +}; use serde::Deserialize; -use service_common::QueryDatabaseProvider; +use service_common::{planner::Planner, QueryDatabaseProvider}; use snafu::{ResultExt, Snafu}; +use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll}; use tokio::task::JoinHandle; use tonic::{Request, Response, Streaming}; -use data_types::{DatabaseName, DatabaseNameError}; -use observability_deps::tracing::{info, warn}; -use query::exec::{ExecutionContextProvider, IOxSessionContext}; - -use service_common::planner::Planner; - #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] pub enum Error { diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 8b8fe139c2..140f175e53 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -193,7 +193,8 @@ mod tests { core::test_utils::random_topic_name, maybe_skip_kafka_integration, mock::MockBufferSharedState, }; - use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName}; + use data_types::write_buffer::WriteBufferCreationConfig; + use data_types2::DatabaseName; use std::{convert::TryFrom, num::NonZeroU32}; use tempfile::TempDir;