refactor: accept NamespaceName with Namespace create (#7774)
Co-authored-by: Dom <dom@itsallbroken.com>pull/24376/head
parent
9c5028676e
commit
5fe8affb18
|
@ -137,8 +137,8 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use chrono::TimeZone;
|
use chrono::TimeZone;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ColumnId, ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileParams,
|
ColumnId, ColumnSet, CompactionLevel, NamespaceId, NamespaceName, ParquetFile,
|
||||||
PartitionId, TableId, Timestamp,
|
ParquetFileParams, PartitionId, TableId, Timestamp,
|
||||||
};
|
};
|
||||||
use iox_catalog::{interface::Catalog, mem::MemCatalog};
|
use iox_catalog::{interface::Catalog, mem::MemCatalog};
|
||||||
use object_store::path::Path;
|
use object_store::path::Path;
|
||||||
|
@ -157,7 +157,10 @@ mod tests {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_parquet_file_test", None)
|
.create(
|
||||||
|
&NamespaceName::new("namespace_parquet_file_test").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
|
|
@ -87,7 +87,9 @@ async fn create_namespace<R>(name: &str, repos: &mut R) -> Result<Namespace, Upd
|
||||||
where
|
where
|
||||||
R: RepoCollection + ?Sized,
|
R: RepoCollection + ?Sized,
|
||||||
{
|
{
|
||||||
match repos.namespaces().create(name, None).await {
|
let namespace_name = NamespaceName::new(name)
|
||||||
|
.map_err(|_| UpdateCatalogError::NamespaceCreationError(name.to_string()))?;
|
||||||
|
match repos.namespaces().create(&namespace_name, None).await {
|
||||||
Ok(ns) => Ok(ns),
|
Ok(ns) => Ok(ns),
|
||||||
Err(iox_catalog::interface::Error::NameExists { .. }) => {
|
Err(iox_catalog::interface::Error::NameExists { .. }) => {
|
||||||
// presumably it got created in the meantime?
|
// presumably it got created in the meantime?
|
||||||
|
@ -428,7 +430,7 @@ mod tests {
|
||||||
// create namespace, table and columns for weather measurement
|
// create namespace, table and columns for weather measurement
|
||||||
let namespace = txn
|
let namespace = txn
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("1234_5678", None)
|
.create(&NamespaceName::new("1234_5678").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace created");
|
.expect("namespace created");
|
||||||
let mut table = txn
|
let mut table = txn
|
||||||
|
@ -520,7 +522,7 @@ mod tests {
|
||||||
// create namespace, table and columns for weather measurement
|
// create namespace, table and columns for weather measurement
|
||||||
let namespace = txn
|
let namespace = txn
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("1234_5678", None)
|
.create(&NamespaceName::new("1234_5678").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace created");
|
.expect("namespace created");
|
||||||
let mut table = txn
|
let mut table = txn
|
||||||
|
@ -585,7 +587,7 @@ mod tests {
|
||||||
// create namespace, table and columns for weather measurement
|
// create namespace, table and columns for weather measurement
|
||||||
let namespace = txn
|
let namespace = txn
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("1234_5678", None)
|
.create(&NamespaceName::new("1234_5678").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace created");
|
.expect("namespace created");
|
||||||
let mut table = txn
|
let mut table = txn
|
||||||
|
|
|
@ -114,9 +114,10 @@ mod tests {
|
||||||
|
|
||||||
let (namespace_id, table_id) = {
|
let (namespace_id, table_id) = {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
|
let table_ns_name = data_types::NamespaceName::new(TABLE_NAME).unwrap();
|
||||||
let ns = repos
|
let ns = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(TABLE_NAME, None)
|
.create(&table_ns_name, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|
|
@ -298,7 +298,13 @@ pub(crate) async fn populate_catalog(
|
||||||
table: &str,
|
table: &str,
|
||||||
) -> (NamespaceId, TableId) {
|
) -> (NamespaceId, TableId) {
|
||||||
let mut c = catalog.repositories().await;
|
let mut c = catalog.repositories().await;
|
||||||
let ns_id = c.namespaces().create(namespace, None).await.unwrap().id;
|
let namespace_name = data_types::NamespaceName::new(namespace).unwrap();
|
||||||
|
let ns_id = c
|
||||||
|
.namespaces()
|
||||||
|
.create(&namespace_name, None)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.id;
|
||||||
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
|
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
|
||||||
|
|
||||||
(ns_id, table_id)
|
(ns_id, table_id)
|
||||||
|
|
|
@ -17,7 +17,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
|
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId,
|
Namespace, NamespaceId, NamespaceName, NamespaceSchema, ParquetFile, PartitionKey,
|
||||||
|
SequenceNumber, TableId,
|
||||||
};
|
};
|
||||||
use dml::{DmlMeta, DmlWrite};
|
use dml::{DmlMeta, DmlWrite};
|
||||||
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
|
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
|
||||||
|
@ -207,7 +208,7 @@ where
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(name, None)
|
.create(&NamespaceName::new(name).unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("failed to create test namespace");
|
.expect("failed to create test namespace");
|
||||||
|
|
||||||
|
|
|
@ -2,9 +2,9 @@
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceSchema,
|
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
|
NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId,
|
||||||
SkippedCompaction, Table, TableId, TableSchema, Timestamp,
|
PartitionKey, SkippedCompaction, Table, TableId, TableSchema, Timestamp,
|
||||||
};
|
};
|
||||||
use iox_time::TimeProvider;
|
use iox_time::TimeProvider;
|
||||||
use snafu::{OptionExt, Snafu};
|
use snafu::{OptionExt, Snafu};
|
||||||
|
@ -35,6 +35,9 @@ pub enum CasFailure<T> {
|
||||||
#[allow(missing_copy_implementations, missing_docs)]
|
#[allow(missing_copy_implementations, missing_docs)]
|
||||||
#[snafu(visibility(pub(crate)))]
|
#[snafu(visibility(pub(crate)))]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
#[snafu(display("invalid name: {}", name))]
|
||||||
|
InvalidName { name: String },
|
||||||
|
|
||||||
#[snafu(display("name {} already exists", name))]
|
#[snafu(display("name {} already exists", name))]
|
||||||
NameExists { name: String },
|
NameExists { name: String },
|
||||||
|
|
||||||
|
@ -307,7 +310,11 @@ pub trait NamespaceRepo: Send + Sync {
|
||||||
/// Creates the namespace in the catalog. If one by the same name already exists, an
|
/// Creates the namespace in the catalog. If one by the same name already exists, an
|
||||||
/// error is returned.
|
/// error is returned.
|
||||||
/// Specify `None` for `retention_period_ns` to get infinite retention.
|
/// Specify `None` for `retention_period_ns` to get infinite retention.
|
||||||
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
|
async fn create(
|
||||||
|
&mut self,
|
||||||
|
name: &NamespaceName,
|
||||||
|
retention_period_ns: Option<i64>,
|
||||||
|
) -> Result<Namespace>;
|
||||||
|
|
||||||
/// Update retention period for a namespace
|
/// Update retention period for a namespace
|
||||||
async fn update_retention_period(
|
async fn update_retention_period(
|
||||||
|
@ -787,14 +794,14 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_namespace(catalog: Arc<dyn Catalog>) {
|
async fn test_namespace(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace_name = "test_namespace";
|
let namespace_name = NamespaceName::new("test_namespace").unwrap();
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(namespace_name, None)
|
.create(&namespace_name, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(namespace.id > NamespaceId::new(0));
|
assert!(namespace.id > NamespaceId::new(0));
|
||||||
assert_eq!(namespace.name, namespace_name);
|
assert_eq!(namespace.name, namespace_name.as_str());
|
||||||
|
|
||||||
// Assert default values for service protection limits.
|
// Assert default values for service protection limits.
|
||||||
assert_eq!(namespace.max_tables, DEFAULT_MAX_TABLES);
|
assert_eq!(namespace.max_tables, DEFAULT_MAX_TABLES);
|
||||||
|
@ -803,7 +810,7 @@ pub(crate) mod test_helpers {
|
||||||
DEFAULT_MAX_COLUMNS_PER_TABLE
|
DEFAULT_MAX_COLUMNS_PER_TABLE
|
||||||
);
|
);
|
||||||
|
|
||||||
let conflict = repos.namespaces().create(namespace_name, None).await;
|
let conflict = repos.namespaces().create(&namespace_name, None).await;
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
conflict.unwrap_err(),
|
conflict.unwrap_err(),
|
||||||
Error::NameExists { name: _ }
|
Error::NameExists { name: _ }
|
||||||
|
@ -826,7 +833,7 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
let found = repos
|
let found = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.get_by_name(namespace_name, SoftDeletedRows::ExcludeDeleted)
|
.get_by_name(&namespace_name, SoftDeletedRows::ExcludeDeleted)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("namespace should be there");
|
.expect("namespace should be there");
|
||||||
|
@ -839,10 +846,10 @@ pub(crate) mod test_helpers {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(not_found.is_none());
|
assert!(not_found.is_none());
|
||||||
|
|
||||||
let namespace2_name = "test_namespace2";
|
let namespace2_name = NamespaceName::new("test_namespace2").unwrap();
|
||||||
let namespace2 = repos
|
let namespace2 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(namespace2_name, None)
|
.create(&namespace2_name, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut namespaces = repos
|
let mut namespaces = repos
|
||||||
|
@ -856,7 +863,7 @@ pub(crate) mod test_helpers {
|
||||||
const NEW_TABLE_LIMIT: i32 = 15000;
|
const NEW_TABLE_LIMIT: i32 = 15000;
|
||||||
let modified = repos
|
let modified = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.update_table_limit(namespace_name, NEW_TABLE_LIMIT)
|
.update_table_limit(namespace_name.as_str(), NEW_TABLE_LIMIT)
|
||||||
.await
|
.await
|
||||||
.expect("namespace should be updateable");
|
.expect("namespace should be updateable");
|
||||||
assert_eq!(NEW_TABLE_LIMIT, modified.max_tables);
|
assert_eq!(NEW_TABLE_LIMIT, modified.max_tables);
|
||||||
|
@ -864,7 +871,7 @@ pub(crate) mod test_helpers {
|
||||||
const NEW_COLUMN_LIMIT: i32 = 1500;
|
const NEW_COLUMN_LIMIT: i32 = 1500;
|
||||||
let modified = repos
|
let modified = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.update_column_limit(namespace_name, NEW_COLUMN_LIMIT)
|
.update_column_limit(namespace_name.as_str(), NEW_COLUMN_LIMIT)
|
||||||
.await
|
.await
|
||||||
.expect("namespace should be updateable");
|
.expect("namespace should be updateable");
|
||||||
assert_eq!(NEW_COLUMN_LIMIT, modified.max_columns_per_table);
|
assert_eq!(NEW_COLUMN_LIMIT, modified.max_columns_per_table);
|
||||||
|
@ -872,7 +879,7 @@ pub(crate) mod test_helpers {
|
||||||
const NEW_RETENTION_PERIOD_NS: i64 = 5 * 60 * 60 * 1000 * 1000 * 1000;
|
const NEW_RETENTION_PERIOD_NS: i64 = 5 * 60 * 60 * 1000 * 1000 * 1000;
|
||||||
let modified = repos
|
let modified = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.update_retention_period(namespace_name, Some(NEW_RETENTION_PERIOD_NS))
|
.update_retention_period(namespace_name.as_str(), Some(NEW_RETENTION_PERIOD_NS))
|
||||||
.await
|
.await
|
||||||
.expect("namespace should be updateable");
|
.expect("namespace should be updateable");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
@ -882,25 +889,25 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
let modified = repos
|
let modified = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.update_retention_period(namespace_name, None)
|
.update_retention_period(namespace_name.as_str(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace should be updateable");
|
.expect("namespace should be updateable");
|
||||||
assert!(modified.retention_period_ns.is_none());
|
assert!(modified.retention_period_ns.is_none());
|
||||||
|
|
||||||
// create namespace with retention period NULL
|
// create namespace with retention period NULL
|
||||||
let namespace3_name = "test_namespace3";
|
let namespace3_name = NamespaceName::new("test_namespace3").unwrap();
|
||||||
let namespace3 = repos
|
let namespace3 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(namespace3_name, None)
|
.create(&namespace3_name, None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace with NULL retention should be created");
|
.expect("namespace with NULL retention should be created");
|
||||||
assert!(namespace3.retention_period_ns.is_none());
|
assert!(namespace3.retention_period_ns.is_none());
|
||||||
|
|
||||||
// create namespace with retention period
|
// create namespace with retention period
|
||||||
let namespace4_name = "test_namespace4";
|
let namespace4_name = NamespaceName::new("test_namespace4").unwrap();
|
||||||
let namespace4 = repos
|
let namespace4 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(namespace4_name, Some(NEW_RETENTION_PERIOD_NS))
|
.create(&namespace4_name, Some(NEW_RETENTION_PERIOD_NS))
|
||||||
.await
|
.await
|
||||||
.expect("namespace with 5-hour retention should be created");
|
.expect("namespace with 5-hour retention should be created");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
@ -910,7 +917,7 @@ pub(crate) mod test_helpers {
|
||||||
// reset retention period to NULL to avoid affecting later tests
|
// reset retention period to NULL to avoid affecting later tests
|
||||||
repos
|
repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.update_retention_period(namespace4_name, None)
|
.update_retention_period(&namespace4_name, None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace should be updateable");
|
.expect("namespace should be updateable");
|
||||||
|
|
||||||
|
@ -947,8 +954,16 @@ pub(crate) mod test_helpers {
|
||||||
async fn test_namespace_soft_deletion(catalog: Arc<dyn Catalog>) {
|
async fn test_namespace_soft_deletion(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
|
|
||||||
let deleted_ns = repos.namespaces().create("deleted-ns", None).await.unwrap();
|
let deleted_ns = repos
|
||||||
let active_ns = repos.namespaces().create("active-ns", None).await.unwrap();
|
.namespaces()
|
||||||
|
.create(&"deleted-ns".try_into().unwrap(), None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let active_ns = repos
|
||||||
|
.namespaces()
|
||||||
|
.create(&"active-ns".try_into().unwrap(), None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Mark "deleted-ns" as soft-deleted.
|
// Mark "deleted-ns" as soft-deleted.
|
||||||
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
|
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
|
||||||
|
@ -1104,7 +1119,7 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_table_test", None)
|
.create(&NamespaceName::new("namespace_table_test").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -1139,7 +1154,11 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(vec![t.clone()], tables);
|
assert_eq!(vec![t.clone()], tables);
|
||||||
|
|
||||||
// test we can create a table of the same name in a different namespace
|
// test we can create a table of the same name in a different namespace
|
||||||
let namespace2 = repos.namespaces().create("two", None).await.unwrap();
|
let namespace2 = repos
|
||||||
|
.namespaces()
|
||||||
|
.create(&NamespaceName::new("two").unwrap(), None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
assert_ne!(namespace, namespace2);
|
assert_ne!(namespace, namespace2);
|
||||||
let test_table = repos
|
let test_table = repos
|
||||||
.tables()
|
.tables()
|
||||||
|
@ -1237,7 +1256,7 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_column_test", None)
|
.create(&NamespaceName::new("namespace_column_test").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
@ -1370,7 +1389,10 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_partition_test", None)
|
.create(
|
||||||
|
&NamespaceName::new("namespace_partition_test").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
@ -1652,7 +1674,10 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_parquet_file_test", None)
|
.create(
|
||||||
|
&NamespaceName::new("namespace_parquet_file_test").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
@ -1837,7 +1862,10 @@ pub(crate) mod test_helpers {
|
||||||
// test list_by_namespace_not_to_delete
|
// test list_by_namespace_not_to_delete
|
||||||
let namespace2 = repos
|
let namespace2 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_parquet_file_test1", None)
|
.create(
|
||||||
|
&NamespaceName::new("namespace_parquet_file_test1").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table2 = repos
|
let table2 = repos
|
||||||
|
@ -2060,12 +2088,12 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace_1 = repos
|
let namespace_1 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("retention_broken_1", None)
|
.create(&NamespaceName::new("retention_broken_1").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let namespace_2 = repos
|
let namespace_2 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("retention_broken_2", Some(1))
|
.create(&NamespaceName::new("retention_broken_2").unwrap(), Some(1))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table_1 = repos
|
let table_1 = repos
|
||||||
|
@ -2140,7 +2168,10 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("test_partitions_new_file_between", None)
|
.create(
|
||||||
|
&NamespaceName::new("test_partitions_new_file_between").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
@ -2507,7 +2538,8 @@ pub(crate) mod test_helpers {
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(
|
.create(
|
||||||
"namespace_parquet_file_test_list_by_partiton_not_to_delete",
|
&NamespaceName::new("namespace_parquet_file_test_list_by_partiton_not_to_delete")
|
||||||
|
.unwrap(),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
@ -2616,7 +2648,10 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_update_to_compaction_level_1_test", None)
|
.create(
|
||||||
|
&NamespaceName::new("namespace_update_to_compaction_level_1_test").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
@ -2702,7 +2737,10 @@ pub(crate) mod test_helpers {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace_1 = repos
|
let namespace_1 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_test_delete_namespace_1", None)
|
.create(
|
||||||
|
&NamespaceName::new("namespace_test_delete_namespace_1").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table_1 = repos
|
let table_1 = repos
|
||||||
|
@ -2757,7 +2795,10 @@ pub(crate) mod test_helpers {
|
||||||
// it, let's create another so we can ensure that doesn't get deleted.
|
// it, let's create another so we can ensure that doesn't get deleted.
|
||||||
let namespace_2 = repos
|
let namespace_2 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_test_delete_namespace_2", None)
|
.create(
|
||||||
|
&NamespaceName::new("namespace_test_delete_namespace_2").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table_2 = repos
|
let table_2 = repos
|
||||||
|
@ -2943,7 +2984,7 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
let mut txn = catalog_captured.start_transaction().await.unwrap();
|
let mut txn = catalog_captured.start_transaction().await.unwrap();
|
||||||
txn.namespaces()
|
txn.namespaces()
|
||||||
.create("test_txn_isolation", None)
|
.create(&NamespaceName::new("test_txn_isolation").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -2980,7 +3021,7 @@ pub(crate) mod test_helpers {
|
||||||
let capture = TracingCapture::new();
|
let capture = TracingCapture::new();
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut txn = catalog.start_transaction().await.unwrap();
|
||||||
txn.namespaces()
|
txn.namespaces()
|
||||||
.create("test_txn_drop", None)
|
.create(&NamespaceName::new("test_txn_drop").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
drop(txn);
|
drop(txn);
|
||||||
|
@ -3009,7 +3050,10 @@ pub(crate) mod test_helpers {
|
||||||
where
|
where
|
||||||
R: RepoCollection + ?Sized,
|
R: RepoCollection + ?Sized,
|
||||||
{
|
{
|
||||||
let namespace = repos.namespaces().create(namespace_name, None).await;
|
let namespace = repos
|
||||||
|
.namespaces()
|
||||||
|
.create(&NamespaceName::new(namespace_name).unwrap(), None)
|
||||||
|
.await;
|
||||||
|
|
||||||
let namespace = match namespace {
|
let namespace = match namespace {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
|
|
|
@ -210,6 +210,7 @@ mod tests {
|
||||||
interface::{get_schema_by_name, SoftDeletedRows},
|
interface::{get_schema_by_name, SoftDeletedRows},
|
||||||
mem::MemCatalog,
|
mem::MemCatalog,
|
||||||
};
|
};
|
||||||
|
use data_types::NamespaceName;
|
||||||
|
|
||||||
// Generate a test that simulates multiple, sequential writes in `lp` and
|
// Generate a test that simulates multiple, sequential writes in `lp` and
|
||||||
// asserts the resulting schema.
|
// asserts the resulting schema.
|
||||||
|
@ -231,6 +232,7 @@ mod tests {
|
||||||
use std::ops::DerefMut;
|
use std::ops::DerefMut;
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
const NAMESPACE_NAME: &str = "bananas";
|
const NAMESPACE_NAME: &str = "bananas";
|
||||||
|
let ns_name = NamespaceName::new(NAMESPACE_NAME).unwrap();
|
||||||
|
|
||||||
let metrics = Arc::new(metric::Registry::default());
|
let metrics = Arc::new(metric::Registry::default());
|
||||||
let repo = MemCatalog::new(metrics);
|
let repo = MemCatalog::new(metrics);
|
||||||
|
@ -238,7 +240,7 @@ mod tests {
|
||||||
|
|
||||||
let namespace = txn
|
let namespace = txn
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(NAMESPACE_NAME, None)
|
.create(&ns_name, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|
|
@ -12,9 +12,9 @@ use crate::{
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
|
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
|
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
|
||||||
Table, TableId, Timestamp,
|
SkippedCompaction, Table, TableId, Timestamp,
|
||||||
};
|
};
|
||||||
use iox_time::{SystemProvider, TimeProvider};
|
use iox_time::{SystemProvider, TimeProvider};
|
||||||
use observability_deps::tracing::warn;
|
use observability_deps::tracing::warn;
|
||||||
|
@ -217,10 +217,14 @@ impl RepoCollection for MemTxn {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl NamespaceRepo for MemTxn {
|
impl NamespaceRepo for MemTxn {
|
||||||
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace> {
|
async fn create(
|
||||||
|
&mut self,
|
||||||
|
name: &NamespaceName,
|
||||||
|
retention_period_ns: Option<i64>,
|
||||||
|
) -> Result<Namespace> {
|
||||||
let stage = self.stage();
|
let stage = self.stage();
|
||||||
|
|
||||||
if stage.namespaces.iter().any(|n| n.name == name) {
|
if stage.namespaces.iter().any(|n| n.name == name.as_str()) {
|
||||||
return Err(Error::NameExists {
|
return Err(Error::NameExists {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
});
|
});
|
||||||
|
|
|
@ -6,9 +6,9 @@ use crate::interface::{
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
|
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, ParquetFile,
|
||||||
ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
|
||||||
Timestamp,
|
Table, TableId, Timestamp,
|
||||||
};
|
};
|
||||||
use iox_time::{SystemProvider, TimeProvider};
|
use iox_time::{SystemProvider, TimeProvider};
|
||||||
use metric::{DurationHistogram, Metric};
|
use metric::{DurationHistogram, Metric};
|
||||||
|
@ -145,7 +145,7 @@ macro_rules! decorate {
|
||||||
decorate!(
|
decorate!(
|
||||||
impl_trait = NamespaceRepo,
|
impl_trait = NamespaceRepo,
|
||||||
methods = [
|
methods = [
|
||||||
"namespace_create" = create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
|
"namespace_create" = create(&mut self, name: &NamespaceName, retention_period_ns: Option<i64>) -> Result<Namespace>;
|
||||||
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
|
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
|
||||||
"namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>>;
|
"namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>>;
|
||||||
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result<Option<Namespace>>;
|
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result<Option<Namespace>>;
|
||||||
|
|
|
@ -16,9 +16,9 @@ use crate::{
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
|
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, ParquetFile,
|
||||||
ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
|
||||||
Timestamp,
|
Table, TableId, Timestamp,
|
||||||
};
|
};
|
||||||
use iox_time::{SystemProvider, TimeProvider};
|
use iox_time::{SystemProvider, TimeProvider};
|
||||||
use observability_deps::tracing::{debug, info, warn};
|
use observability_deps::tracing::{debug, info, warn};
|
||||||
|
@ -578,7 +578,11 @@ impl RepoCollection for PostgresTxn {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl NamespaceRepo for PostgresTxn {
|
impl NamespaceRepo for PostgresTxn {
|
||||||
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace> {
|
async fn create(
|
||||||
|
&mut self,
|
||||||
|
name: &NamespaceName,
|
||||||
|
retention_period_ns: Option<i64>,
|
||||||
|
) -> Result<Namespace> {
|
||||||
let rec = sqlx::query_as::<_, Namespace>(
|
let rec = sqlx::query_as::<_, Namespace>(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables )
|
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables )
|
||||||
|
@ -586,7 +590,7 @@ impl NamespaceRepo for PostgresTxn {
|
||||||
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
|
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(name) // $1
|
.bind(name.as_str()) // $1
|
||||||
.bind(SHARED_TOPIC_ID) // $2
|
.bind(SHARED_TOPIC_ID) // $2
|
||||||
.bind(SHARED_QUERY_POOL_ID) // $3
|
.bind(SHARED_QUERY_POOL_ID) // $3
|
||||||
.bind(retention_period_ns) // $4
|
.bind(retention_period_ns) // $4
|
||||||
|
@ -1822,7 +1826,7 @@ mod tests {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("ns4", None)
|
.create(&NamespaceName::new("ns4").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace create failed")
|
.expect("namespace create failed")
|
||||||
.id;
|
.id;
|
||||||
|
@ -1960,7 +1964,7 @@ mod tests {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("ns4", None)
|
.create(&NamespaceName::new("ns4").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace create failed")
|
.expect("namespace create failed")
|
||||||
.id;
|
.id;
|
||||||
|
@ -2123,7 +2127,7 @@ mod tests {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("ns4", None)
|
.create(&NamespaceName::new("ns4").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace create failed")
|
.expect("namespace create failed")
|
||||||
.id;
|
.id;
|
||||||
|
|
|
@ -16,9 +16,9 @@ use crate::{
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
|
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
|
||||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
|
NamespaceName, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId,
|
||||||
Table, TableId, Timestamp,
|
PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
@ -340,7 +340,11 @@ impl RepoCollection for SqliteTxn {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl NamespaceRepo for SqliteTxn {
|
impl NamespaceRepo for SqliteTxn {
|
||||||
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace> {
|
async fn create(
|
||||||
|
&mut self,
|
||||||
|
name: &NamespaceName,
|
||||||
|
retention_period_ns: Option<i64>,
|
||||||
|
) -> Result<Namespace> {
|
||||||
let rec = sqlx::query_as::<_, Namespace>(
|
let rec = sqlx::query_as::<_, Namespace>(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables )
|
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables )
|
||||||
|
@ -348,7 +352,7 @@ VALUES ( $1, $2, $3, $4, $5 )
|
||||||
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
|
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(name) // $1
|
.bind(name.as_str()) // $1
|
||||||
.bind(SHARED_TOPIC_ID) // $2
|
.bind(SHARED_TOPIC_ID) // $2
|
||||||
.bind(SHARED_QUERY_POOL_ID) // $3
|
.bind(SHARED_QUERY_POOL_ID) // $3
|
||||||
.bind(retention_period_ns) // $4
|
.bind(retention_period_ns) // $4
|
||||||
|
@ -1552,7 +1556,7 @@ mod tests {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("ns4", None)
|
.create(&NamespaceName::new("ns4").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace create failed")
|
.expect("namespace create failed")
|
||||||
.id;
|
.id;
|
||||||
|
@ -1605,7 +1609,7 @@ mod tests {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("ns4", None)
|
.create(&NamespaceName::new("ns4").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace create failed")
|
.expect("namespace create failed")
|
||||||
.id;
|
.id;
|
||||||
|
@ -1767,7 +1771,7 @@ mod tests {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("ns4", None)
|
.create(&NamespaceName::new("ns4").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.expect("namespace create failed")
|
.expect("namespace create failed")
|
||||||
.id;
|
.id;
|
||||||
|
|
|
@ -5,8 +5,9 @@ use arrow::{
|
||||||
record_batch::RecordBatch,
|
record_batch::RecordBatch,
|
||||||
};
|
};
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Column, ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceSchema,
|
Column, ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceName,
|
||||||
ParquetFile, ParquetFileParams, Partition, PartitionId, Table, TableId, TableSchema, Timestamp,
|
NamespaceSchema, ParquetFile, ParquetFileParams, Partition, PartitionId, Table, TableId,
|
||||||
|
TableSchema, Timestamp,
|
||||||
};
|
};
|
||||||
use datafusion::physical_plan::metrics::Count;
|
use datafusion::physical_plan::metrics::Count;
|
||||||
use datafusion_util::MemoryStream;
|
use datafusion_util::MemoryStream;
|
||||||
|
@ -143,9 +144,10 @@ impl TestCatalog {
|
||||||
retention_period_ns: Option<i64>,
|
retention_period_ns: Option<i64>,
|
||||||
) -> Arc<TestNamespace> {
|
) -> Arc<TestNamespace> {
|
||||||
let mut repos = self.catalog.repositories().await;
|
let mut repos = self.catalog.repositories().await;
|
||||||
|
let namespace_name = NamespaceName::new(name).unwrap();
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(name, retention_period_ns)
|
.create(&namespace_name, retention_period_ns)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|
|
@ -391,7 +391,11 @@ mod tests {
|
||||||
let catalog = Arc::new(MemCatalog::new(Default::default()));
|
let catalog = Arc::new(MemCatalog::new(Default::default()));
|
||||||
|
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos.namespaces().create("test_ns", None).await.unwrap();
|
let namespace = repos
|
||||||
|
.namespaces()
|
||||||
|
.create(&NamespaceName::new("test_ns").unwrap(), None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let table = repos
|
let table = repos
|
||||||
.tables()
|
.tables()
|
||||||
|
|
|
@ -107,7 +107,7 @@ where
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(namespace.as_str(), retention_period_ns)
|
.create(namespace, retention_period_ns)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
|
|
@ -270,7 +270,10 @@ async fn test_write_propagate_ids() {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("bananas_test", None)
|
.create(
|
||||||
|
&data_types::NamespaceName::new("bananas_test").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.expect("failed to update table limit");
|
.expect("failed to update table limit");
|
||||||
|
|
||||||
|
@ -352,7 +355,10 @@ async fn test_delete_unsupported() {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("bananas_test", None)
|
.create(
|
||||||
|
&data_types::NamespaceName::new("bananas_test").unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.expect("failed to update table limit");
|
.expect("failed to update table limit");
|
||||||
|
|
||||||
|
|
|
@ -197,7 +197,9 @@ fn to_partition(p: data_types::Partition) -> Partition {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp};
|
use data_types::{
|
||||||
|
ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp,
|
||||||
|
};
|
||||||
use generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService;
|
use generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService;
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::mem::MemCatalog;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
@ -214,7 +216,7 @@ mod tests {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("catalog_partition_test", None)
|
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
@ -277,7 +279,7 @@ mod tests {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("catalog_partition_test", None)
|
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
|
|
@ -96,7 +96,9 @@ impl object_store_service_server::ObjectStoreService for ObjectStoreService {
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp};
|
use data_types::{
|
||||||
|
ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp,
|
||||||
|
};
|
||||||
use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService;
|
use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService;
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::mem::MemCatalog;
|
||||||
use object_store::{memory::InMemory, ObjectStore};
|
use object_store::{memory::InMemory, ObjectStore};
|
||||||
|
@ -112,7 +114,7 @@ mod tests {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("catalog_partition_test", None)
|
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
|
|
@ -81,7 +81,7 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use data_types::ColumnType;
|
use data_types::{ColumnType, NamespaceName};
|
||||||
use generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService;
|
use generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService;
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::mem::MemCatalog;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -95,7 +95,7 @@ mod tests {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_schema_test", None)
|
.create(&NamespaceName::new("namespace_schema_test").unwrap(), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = repos
|
let table = repos
|
||||||
|
|
Loading…
Reference in New Issue