Merge branch 'main' into dependabot/cargo/uuid-1.3.3
commit
06a2345708
|
@ -137,10 +137,14 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use chrono::TimeZone;
|
use chrono::TimeZone;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ColumnId, ColumnSet, CompactionLevel, NamespaceId, NamespaceName, ParquetFile,
|
ColumnId, ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileParams,
|
||||||
ParquetFileParams, PartitionId, TableId, Timestamp,
|
PartitionId, TableId, Timestamp,
|
||||||
|
};
|
||||||
|
use iox_catalog::{
|
||||||
|
interface::Catalog,
|
||||||
|
mem::MemCatalog,
|
||||||
|
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||||
};
|
};
|
||||||
use iox_catalog::{interface::Catalog, mem::MemCatalog};
|
|
||||||
use object_store::path::Path;
|
use object_store::path::Path;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parquet_file::ParquetFilePath;
|
use parquet_file::ParquetFilePath;
|
||||||
|
@ -155,19 +159,8 @@ mod tests {
|
||||||
let metric_registry = Arc::new(metric::Registry::new());
|
let metric_registry = Arc::new(metric::Registry::new());
|
||||||
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
|
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
|
||||||
.create(
|
|
||||||
&NamespaceName::new("namespace_parquet_file_test").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let partition = repos
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("one".into(), table.id)
|
.create_or_get("one".into(), table.id)
|
||||||
|
|
|
@ -345,7 +345,10 @@ mod tests {
|
||||||
use crate::{AggregateTSMField, AggregateTSMTag};
|
use crate::{AggregateTSMField, AggregateTSMTag};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use data_types::{PartitionId, TableId};
|
use data_types::{PartitionId, TableId};
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::{
|
||||||
|
mem::MemCatalog,
|
||||||
|
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||||
|
};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -428,17 +431,9 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.expect("started transaction");
|
.expect("started transaction");
|
||||||
// create namespace, table and columns for weather measurement
|
// create namespace, table and columns for weather measurement
|
||||||
let namespace = txn
|
let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *txn, "weather", &namespace).await;
|
||||||
.create(&NamespaceName::new("1234_5678").unwrap(), None)
|
let mut table = TableSchema::new_empty_from(&table);
|
||||||
.await
|
|
||||||
.expect("namespace created");
|
|
||||||
let mut table = txn
|
|
||||||
.tables()
|
|
||||||
.create_or_get("weather", namespace.id)
|
|
||||||
.await
|
|
||||||
.map(|t| TableSchema::new_empty_from(&t))
|
|
||||||
.expect("table created");
|
|
||||||
let time_col = txn
|
let time_col = txn
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("time", table.id, ColumnType::Time)
|
.create_or_get("time", table.id, ColumnType::Time)
|
||||||
|
@ -520,17 +515,9 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.expect("started transaction");
|
.expect("started transaction");
|
||||||
// create namespace, table and columns for weather measurement
|
// create namespace, table and columns for weather measurement
|
||||||
let namespace = txn
|
let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *txn, "weather", &namespace).await;
|
||||||
.create(&NamespaceName::new("1234_5678").unwrap(), None)
|
let mut table = TableSchema::new_empty_from(&table);
|
||||||
.await
|
|
||||||
.expect("namespace created");
|
|
||||||
let mut table = txn
|
|
||||||
.tables()
|
|
||||||
.create_or_get("weather", namespace.id)
|
|
||||||
.await
|
|
||||||
.map(|t| TableSchema::new_empty_from(&t))
|
|
||||||
.expect("table created");
|
|
||||||
let time_col = txn
|
let time_col = txn
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("time", table.id, ColumnType::Time)
|
.create_or_get("time", table.id, ColumnType::Time)
|
||||||
|
@ -585,17 +572,9 @@ mod tests {
|
||||||
.expect("started transaction");
|
.expect("started transaction");
|
||||||
|
|
||||||
// create namespace, table and columns for weather measurement
|
// create namespace, table and columns for weather measurement
|
||||||
let namespace = txn
|
let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *txn, "weather", &namespace).await;
|
||||||
.create(&NamespaceName::new("1234_5678").unwrap(), None)
|
let mut table = TableSchema::new_empty_from(&table);
|
||||||
.await
|
|
||||||
.expect("namespace created");
|
|
||||||
let mut table = txn
|
|
||||||
.tables()
|
|
||||||
.create_or_get("weather", namespace.id)
|
|
||||||
.await
|
|
||||||
.map(|t| TableSchema::new_empty_from(&t))
|
|
||||||
.expect("table created");
|
|
||||||
let time_col = txn
|
let time_col = txn
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("time", table.id, ColumnType::Time)
|
.create_or_get("time", table.id, ColumnType::Time)
|
||||||
|
|
|
@ -99,6 +99,7 @@ mod tests {
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
|
use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
@ -114,18 +115,10 @@ mod tests {
|
||||||
|
|
||||||
let (namespace_id, table_id) = {
|
let (namespace_id, table_id) = {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let table_ns_name = data_types::NamespaceName::new(TABLE_NAME).unwrap();
|
let ns = arbitrary_namespace(&mut *repos, NAMESPACE_NAME).await;
|
||||||
let ns = repos
|
|
||||||
.namespaces()
|
|
||||||
.create(&table_ns_name, None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let table = repos
|
let table = arbitrary_table(&mut *repos, TABLE_NAME, &ns)
|
||||||
.tables()
|
.await;
|
||||||
.create_or_get(TABLE_NAME, ns.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
(ns.id, table.id)
|
(ns.id, table.id)
|
||||||
};
|
};
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId};
|
use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId};
|
||||||
use dml::{DmlMeta, DmlWrite};
|
use dml::{DmlMeta, DmlWrite};
|
||||||
use iox_catalog::interface::Catalog;
|
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use mutable_batch_lp::lines_to_batches;
|
use mutable_batch_lp::lines_to_batches;
|
||||||
use schema::Projection;
|
use schema::Projection;
|
||||||
|
@ -298,13 +298,7 @@ pub(crate) async fn populate_catalog(
|
||||||
table: &str,
|
table: &str,
|
||||||
) -> (NamespaceId, TableId) {
|
) -> (NamespaceId, TableId) {
|
||||||
let mut c = catalog.repositories().await;
|
let mut c = catalog.repositories().await;
|
||||||
let namespace_name = data_types::NamespaceName::new(namespace).unwrap();
|
let ns_id = arbitrary_namespace(&mut *c, namespace).await.id;
|
||||||
let ns_id = c
|
|
||||||
.namespaces()
|
|
||||||
.create(&namespace_name, None)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.id;
|
|
||||||
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
|
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
|
||||||
|
|
||||||
(ns_id, table_id)
|
(ns_id, table_id)
|
||||||
|
|
|
@ -17,8 +17,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
|
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
|
||||||
use data_types::{
|
use data_types::{
|
||||||
Namespace, NamespaceId, NamespaceName, NamespaceSchema, ParquetFile, PartitionKey,
|
Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId,
|
||||||
SequenceNumber, TableId,
|
|
||||||
};
|
};
|
||||||
use dml::{DmlMeta, DmlWrite};
|
use dml::{DmlMeta, DmlWrite};
|
||||||
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
|
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
|
||||||
|
@ -29,6 +28,7 @@ use ingester::{IngesterGuard, IngesterRpcInterface};
|
||||||
use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
|
use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
|
||||||
use iox_catalog::{
|
use iox_catalog::{
|
||||||
interface::{Catalog, SoftDeletedRows},
|
interface::{Catalog, SoftDeletedRows},
|
||||||
|
test_helpers::arbitrary_namespace,
|
||||||
validate_or_insert_schema,
|
validate_or_insert_schema,
|
||||||
};
|
};
|
||||||
use iox_time::TimeProvider;
|
use iox_time::TimeProvider;
|
||||||
|
@ -203,14 +203,8 @@ where
|
||||||
name: &str,
|
name: &str,
|
||||||
retention_period_ns: Option<i64>,
|
retention_period_ns: Option<i64>,
|
||||||
) -> Namespace {
|
) -> Namespace {
|
||||||
let ns = self
|
let mut repos = self.catalog.repositories().await;
|
||||||
.catalog
|
let ns = arbitrary_namespace(&mut *repos, name).await;
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new(name).unwrap(), None)
|
|
||||||
.await
|
|
||||||
.expect("failed to create test namespace");
|
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
self.namespaces
|
self.namespaces
|
||||||
|
|
|
@ -736,7 +736,10 @@ pub async fn list_schemas(
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) mod test_helpers {
|
pub(crate) mod test_helpers {
|
||||||
use crate::{validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES};
|
use crate::{
|
||||||
|
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||||
|
validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
|
||||||
|
};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use ::test_helpers::{assert_contains, tracing::TracingCapture};
|
use ::test_helpers::{assert_contains, tracing::TracingCapture};
|
||||||
|
@ -846,12 +849,7 @@ pub(crate) mod test_helpers {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(not_found.is_none());
|
assert!(not_found.is_none());
|
||||||
|
|
||||||
let namespace2_name = NamespaceName::new("test_namespace2").unwrap();
|
let namespace2 = arbitrary_namespace(&mut *repos, "test_namespace2").await;
|
||||||
let namespace2 = repos
|
|
||||||
.namespaces()
|
|
||||||
.create(&namespace2_name, None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let mut namespaces = repos
|
let mut namespaces = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.list(SoftDeletedRows::ExcludeDeleted)
|
.list(SoftDeletedRows::ExcludeDeleted)
|
||||||
|
@ -894,13 +892,8 @@ pub(crate) mod test_helpers {
|
||||||
.expect("namespace should be updateable");
|
.expect("namespace should be updateable");
|
||||||
assert!(modified.retention_period_ns.is_none());
|
assert!(modified.retention_period_ns.is_none());
|
||||||
|
|
||||||
// create namespace with retention period NULL
|
// create namespace with retention period NULL (the default)
|
||||||
let namespace3_name = NamespaceName::new("test_namespace3").unwrap();
|
let namespace3 = arbitrary_namespace(&mut *repos, "test_namespace3").await;
|
||||||
let namespace3 = repos
|
|
||||||
.namespaces()
|
|
||||||
.create(&namespace3_name, None)
|
|
||||||
.await
|
|
||||||
.expect("namespace with NULL retention should be created");
|
|
||||||
assert!(namespace3.retention_period_ns.is_none());
|
assert!(namespace3.retention_period_ns.is_none());
|
||||||
|
|
||||||
// create namespace with retention period
|
// create namespace with retention period
|
||||||
|
@ -954,16 +947,8 @@ pub(crate) mod test_helpers {
|
||||||
async fn test_namespace_soft_deletion(catalog: Arc<dyn Catalog>) {
|
async fn test_namespace_soft_deletion(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
|
|
||||||
let deleted_ns = repos
|
let deleted_ns = arbitrary_namespace(&mut *repos, "deleted-ns").await;
|
||||||
.namespaces()
|
let active_ns = arbitrary_namespace(&mut *repos, "active-ns").await;
|
||||||
.create(&"deleted-ns".try_into().unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let active_ns = repos
|
|
||||||
.namespaces()
|
|
||||||
.create(&"active-ns".try_into().unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Mark "deleted-ns" as soft-deleted.
|
// Mark "deleted-ns" as soft-deleted.
|
||||||
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
|
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
|
||||||
|
@ -1117,23 +1102,11 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_table(catalog: Arc<dyn Catalog>) {
|
async fn test_table(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "namespace_table_test").await;
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("namespace_table_test").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// test we can create or get a table
|
// test we can create or get a table
|
||||||
let t = repos
|
let t = arbitrary_table(&mut *repos, "test_table", &namespace).await;
|
||||||
.tables()
|
let tt = arbitrary_table(&mut *repos, "test_table", &namespace).await;
|
||||||
.create_or_get("test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let tt = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert!(t.id > TableId::new(0));
|
assert!(t.id > TableId::new(0));
|
||||||
assert_eq!(t, tt);
|
assert_eq!(t, tt);
|
||||||
|
|
||||||
|
@ -1154,26 +1127,14 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(vec![t.clone()], tables);
|
assert_eq!(vec![t.clone()], tables);
|
||||||
|
|
||||||
// test we can create a table of the same name in a different namespace
|
// test we can create a table of the same name in a different namespace
|
||||||
let namespace2 = repos
|
let namespace2 = arbitrary_namespace(&mut *repos, "two").await;
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("two").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_ne!(namespace, namespace2);
|
assert_ne!(namespace, namespace2);
|
||||||
let test_table = repos
|
let test_table = arbitrary_table(&mut *repos, "test_table", &namespace2).await;
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace2.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_ne!(tt, test_table);
|
assert_ne!(tt, test_table);
|
||||||
assert_eq!(test_table.namespace_id, namespace2.id);
|
assert_eq!(test_table.namespace_id, namespace2.id);
|
||||||
|
|
||||||
// test get by namespace and name
|
// test get by namespace and name
|
||||||
let foo_table = repos
|
let foo_table = arbitrary_table(&mut *repos, "foo", &namespace2).await;
|
||||||
.tables()
|
|
||||||
.create_or_get("foo", namespace2.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
repos
|
repos
|
||||||
.tables()
|
.tables()
|
||||||
|
@ -1254,16 +1215,8 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_column(catalog: Arc<dyn Catalog>) {
|
async fn test_column(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "namespace_column_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
|
||||||
.create(&NamespaceName::new("namespace_column_test").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(table.namespace_id, namespace.id);
|
assert_eq!(table.namespace_id, namespace.id);
|
||||||
|
|
||||||
// test we can create or get a column
|
// test we can create or get a column
|
||||||
|
@ -1290,11 +1243,7 @@ pub(crate) mod test_helpers {
|
||||||
assert!(matches!(err, Error::ColumnTypeMismatch { .. }));
|
assert!(matches!(err, Error::ColumnTypeMismatch { .. }));
|
||||||
|
|
||||||
// test that we can create a column of the same name under a different table
|
// test that we can create a column of the same name under a different table
|
||||||
let table2 = repos
|
let table2 = arbitrary_table(&mut *repos, "test_table_2", &namespace).await;
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table_2", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let ccc = repos
|
let ccc = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("column_test", table2.id, ColumnType::U64)
|
.create_or_get("column_test", table2.id, ColumnType::U64)
|
||||||
|
@ -1361,11 +1310,7 @@ pub(crate) mod test_helpers {
|
||||||
));
|
));
|
||||||
|
|
||||||
// test per-namespace column limits are NOT enforced with create_or_get_many_unchecked
|
// test per-namespace column limits are NOT enforced with create_or_get_many_unchecked
|
||||||
let table3 = repos
|
let table3 = arbitrary_table(&mut *repos, "test_table_3", &namespace).await;
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table_3", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let mut columns = HashMap::new();
|
let mut columns = HashMap::new();
|
||||||
columns.insert("apples", ColumnType::Tag);
|
columns.insert("apples", ColumnType::Tag);
|
||||||
columns.insert("oranges", ColumnType::Tag);
|
columns.insert("oranges", ColumnType::Tag);
|
||||||
|
@ -1387,19 +1332,8 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_partition(catalog: Arc<dyn Catalog>) {
|
async fn test_partition(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "namespace_partition_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
|
||||||
.create(
|
|
||||||
&NamespaceName::new("namespace_partition_test").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut created = BTreeMap::new();
|
let mut created = BTreeMap::new();
|
||||||
for key in ["foo", "bar"] {
|
for key in ["foo", "bar"] {
|
||||||
|
@ -1672,24 +1606,9 @@ pub(crate) mod test_helpers {
|
||||||
/// tests many interactions with the catalog and parquet files. See the individual conditions herein
|
/// tests many interactions with the catalog and parquet files. See the individual conditions herein
|
||||||
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
|
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
|
||||||
.create(
|
let other_table = arbitrary_table(&mut *repos, "other", &namespace).await;
|
||||||
&NamespaceName::new("namespace_parquet_file_test").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let other_table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("other", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let partition = repos
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("one".into(), table.id)
|
.create_or_get("one".into(), table.id)
|
||||||
|
@ -1860,19 +1779,8 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(files.len(), 1);
|
assert_eq!(files.len(), 1);
|
||||||
|
|
||||||
// test list_by_namespace_not_to_delete
|
// test list_by_namespace_not_to_delete
|
||||||
let namespace2 = repos
|
let namespace2 = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test1").await;
|
||||||
.namespaces()
|
let table2 = arbitrary_table(&mut *repos, "test_table2", &namespace2).await;
|
||||||
.create(
|
|
||||||
&NamespaceName::new("namespace_parquet_file_test1").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table2 = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table2", namespace2.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let partition2 = repos
|
let partition2 = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("foo".into(), table2.id)
|
.create_or_get("foo".into(), table2.id)
|
||||||
|
@ -2086,26 +1994,14 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_parquet_file_delete_broken(catalog: Arc<dyn Catalog>) {
|
async fn test_parquet_file_delete_broken(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace_1 = repos
|
let namespace_1 = arbitrary_namespace(&mut *repos, "retention_broken_1").await;
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("retention_broken_1").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let namespace_2 = repos
|
let namespace_2 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(&NamespaceName::new("retention_broken_2").unwrap(), Some(1))
|
.create(&NamespaceName::new("retention_broken_2").unwrap(), Some(1))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table_1 = repos
|
let table_1 = arbitrary_table(&mut *repos, "test_table", &namespace_1).await;
|
||||||
.tables()
|
let table_2 = arbitrary_table(&mut *repos, "test_table", &namespace_2).await;
|
||||||
.create_or_get("test_table", namespace_1.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table_2 = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace_2.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let partition_1 = repos
|
let partition_1 = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("one".into(), table_1.id)
|
.create_or_get("one".into(), table_1.id)
|
||||||
|
@ -2166,19 +2062,9 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_partitions_new_file_between(catalog: Arc<dyn Catalog>) {
|
async fn test_partitions_new_file_between(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "test_partitions_new_file_between").await;
|
||||||
.namespaces()
|
let table =
|
||||||
.create(
|
arbitrary_table(&mut *repos, "test_table_for_new_file_between", &namespace).await;
|
||||||
&NamespaceName::new("test_partitions_new_file_between").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table_for_new_file_between", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// param for the tests
|
// param for the tests
|
||||||
let time_now = Timestamp::from(catalog.time_provider().now());
|
let time_now = Timestamp::from(catalog.time_provider().now());
|
||||||
|
@ -2535,20 +2421,12 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_list_by_partiton_not_to_delete(catalog: Arc<dyn Catalog>) {
|
async fn test_list_by_partiton_not_to_delete(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(
|
||||||
.namespaces()
|
&mut *repos,
|
||||||
.create(
|
"namespace_parquet_file_test_list_by_partiton_not_to_delete",
|
||||||
&NamespaceName::new("namespace_parquet_file_test_list_by_partiton_not_to_delete")
|
)
|
||||||
.unwrap(),
|
.await;
|
||||||
None,
|
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let partition = repos
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
|
@ -2646,19 +2524,9 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
|
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace =
|
||||||
.namespaces()
|
arbitrary_namespace(&mut *repos, "namespace_update_to_compaction_level_1_test").await;
|
||||||
.create(
|
let table = arbitrary_table(&mut *repos, "update_table", &namespace).await;
|
||||||
&NamespaceName::new("namespace_update_to_compaction_level_1_test").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("update_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let partition = repos
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("test_update_to_compaction_level_1_one".into(), table.id)
|
.create_or_get("test_update_to_compaction_level_1_one".into(), table.id)
|
||||||
|
@ -2735,19 +2603,9 @@ pub(crate) mod test_helpers {
|
||||||
/// effective.
|
/// effective.
|
||||||
async fn test_delete_namespace(catalog: Arc<dyn Catalog>) {
|
async fn test_delete_namespace(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace_1 = repos
|
let namespace_1 =
|
||||||
.namespaces()
|
arbitrary_namespace(&mut *repos, "namespace_test_delete_namespace_1").await;
|
||||||
.create(
|
let table_1 = arbitrary_table(&mut *repos, "test_table_1", &namespace_1).await;
|
||||||
&NamespaceName::new("namespace_test_delete_namespace_1").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table_1 = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table_1", namespace_1.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let _c = repos
|
let _c = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("column_test_1", table_1.id, ColumnType::Tag)
|
.create_or_get("column_test_1", table_1.id, ColumnType::Tag)
|
||||||
|
@ -2793,19 +2651,9 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
// we've now created a namespace with a table and parquet files. before we test deleting
|
// we've now created a namespace with a table and parquet files. before we test deleting
|
||||||
// it, let's create another so we can ensure that doesn't get deleted.
|
// it, let's create another so we can ensure that doesn't get deleted.
|
||||||
let namespace_2 = repos
|
let namespace_2 =
|
||||||
.namespaces()
|
arbitrary_namespace(&mut *repos, "namespace_test_delete_namespace_2").await;
|
||||||
.create(
|
let table_2 = arbitrary_table(&mut *repos, "test_table_2", &namespace_2).await;
|
||||||
&NamespaceName::new("namespace_test_delete_namespace_2").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table_2 = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("test_table_2", namespace_2.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let _c = repos
|
let _c = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("column_test_2", table_2.id, ColumnType::Tag)
|
.create_or_get("column_test_2", table_2.id, ColumnType::Tag)
|
||||||
|
@ -2983,10 +2831,7 @@ pub(crate) mod test_helpers {
|
||||||
barrier_captured.wait().await;
|
barrier_captured.wait().await;
|
||||||
|
|
||||||
let mut txn = catalog_captured.start_transaction().await.unwrap();
|
let mut txn = catalog_captured.start_transaction().await.unwrap();
|
||||||
txn.namespaces()
|
arbitrary_namespace(&mut *txn, "test_txn_isolation").await;
|
||||||
.create(&NamespaceName::new("test_txn_isolation").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||||
txn.abort().await.unwrap();
|
txn.abort().await.unwrap();
|
||||||
|
@ -3020,10 +2865,7 @@ pub(crate) mod test_helpers {
|
||||||
async fn test_txn_drop(catalog: Arc<dyn Catalog>) {
|
async fn test_txn_drop(catalog: Arc<dyn Catalog>) {
|
||||||
let capture = TracingCapture::new();
|
let capture = TracingCapture::new();
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut txn = catalog.start_transaction().await.unwrap();
|
||||||
txn.namespaces()
|
arbitrary_namespace(&mut *txn, "test_txn_drop").await;
|
||||||
.create(&NamespaceName::new("test_txn_drop").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
drop(txn);
|
drop(txn);
|
||||||
|
|
||||||
// got a warning
|
// got a warning
|
||||||
|
|
|
@ -201,6 +201,56 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Catalog helper functions for creation of catalog objects
|
||||||
|
pub mod test_helpers {
|
||||||
|
use crate::RepoCollection;
|
||||||
|
use data_types::{Namespace, NamespaceName, Table};
|
||||||
|
|
||||||
|
/// When the details of the namespace don't matter; the test just needs *a* catalog namespace
|
||||||
|
/// with a particular name.
|
||||||
|
///
|
||||||
|
/// Use [`NamespaceRepo::create`] directly if:
|
||||||
|
///
|
||||||
|
/// - The values of the parameters to `create` need to be different than what's here
|
||||||
|
/// - The values of the parameters to `create` are relevant to the behavior under test
|
||||||
|
/// - You expect namespace creation to fail in the test
|
||||||
|
///
|
||||||
|
/// [`NamespaceRepo::create`]: crate::interface::NamespaceRepo::create
|
||||||
|
pub async fn arbitrary_namespace<R: RepoCollection + ?Sized>(
|
||||||
|
repos: &mut R,
|
||||||
|
name: &str,
|
||||||
|
) -> Namespace {
|
||||||
|
let namespace_name = NamespaceName::new(name).unwrap();
|
||||||
|
repos
|
||||||
|
.namespaces()
|
||||||
|
.create(&namespace_name, None)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When the details of the table don't matter; the test just needs *a* catalog table
|
||||||
|
/// with a particular name in a particular namespace.
|
||||||
|
///
|
||||||
|
/// Use [`TableRepo::create_or_get`] directly if:
|
||||||
|
///
|
||||||
|
/// - The values of the parameters to `create_or_get` need to be different than what's here
|
||||||
|
/// - The values of the parameters to `create_or_get` are relevant to the behavior under test
|
||||||
|
/// - You expect table creation to fail in the test
|
||||||
|
///
|
||||||
|
/// [`TableRepo::create_or_get`]: crate::interface::TableRepo::create_or_get
|
||||||
|
pub async fn arbitrary_table<R: RepoCollection + ?Sized>(
|
||||||
|
repos: &mut R,
|
||||||
|
name: &str,
|
||||||
|
namespace: &Namespace,
|
||||||
|
) -> Table {
|
||||||
|
repos
|
||||||
|
.tables()
|
||||||
|
.create_or_get(name, namespace.id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
@ -210,7 +260,6 @@ mod tests {
|
||||||
interface::{get_schema_by_name, SoftDeletedRows},
|
interface::{get_schema_by_name, SoftDeletedRows},
|
||||||
mem::MemCatalog,
|
mem::MemCatalog,
|
||||||
};
|
};
|
||||||
use data_types::NamespaceName;
|
|
||||||
|
|
||||||
// Generate a test that simulates multiple, sequential writes in `lp` and
|
// Generate a test that simulates multiple, sequential writes in `lp` and
|
||||||
// asserts the resulting schema.
|
// asserts the resulting schema.
|
||||||
|
@ -228,21 +277,17 @@ mod tests {
|
||||||
#[allow(clippy::bool_assert_comparison)]
|
#[allow(clippy::bool_assert_comparison)]
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn [<test_validate_schema_ $name>]() {
|
async fn [<test_validate_schema_ $name>]() {
|
||||||
use crate::interface::Catalog;
|
use crate::{interface::Catalog, test_helpers::arbitrary_namespace};
|
||||||
use std::ops::DerefMut;
|
use std::ops::DerefMut;
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
const NAMESPACE_NAME: &str = "bananas";
|
const NAMESPACE_NAME: &str = "bananas";
|
||||||
let ns_name = NamespaceName::new(NAMESPACE_NAME).unwrap();
|
|
||||||
|
|
||||||
let metrics = Arc::new(metric::Registry::default());
|
let metrics = Arc::new(metric::Registry::default());
|
||||||
let repo = MemCatalog::new(metrics);
|
let repo = MemCatalog::new(metrics);
|
||||||
let mut txn = repo.start_transaction().await.unwrap();
|
let mut txn = repo.start_transaction().await.unwrap();
|
||||||
|
|
||||||
let namespace = txn
|
let namespace = arbitrary_namespace(&mut *txn, NAMESPACE_NAME)
|
||||||
.namespaces()
|
.await;
|
||||||
.create(&ns_name, None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let schema = NamespaceSchema::new_empty_from(&namespace);
|
let schema = NamespaceSchema::new_empty_from(&namespace);
|
||||||
|
|
||||||
|
|
|
@ -1638,6 +1638,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::test_helpers::{arbitrary_namespace, arbitrary_table};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use data_types::{ColumnId, ColumnSet};
|
use data_types::{ColumnId, ColumnSet};
|
||||||
use metric::{Attributes, DurationHistogram, Metric};
|
use metric::{Attributes, DurationHistogram, Metric};
|
||||||
|
@ -1821,38 +1822,21 @@ mod tests {
|
||||||
|
|
||||||
let postgres = setup_db().await;
|
let postgres = setup_db().await;
|
||||||
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
|
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
|
||||||
|
let mut repos = postgres.repositories().await;
|
||||||
|
|
||||||
let namespace_id = postgres
|
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
|
||||||
.repositories()
|
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
|
||||||
.await
|
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("ns4").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.expect("namespace create failed")
|
|
||||||
.id;
|
|
||||||
let table_id = postgres
|
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.tables()
|
|
||||||
.create_or_get("table", namespace_id)
|
|
||||||
.await
|
|
||||||
.expect("create table failed")
|
|
||||||
.id;
|
|
||||||
|
|
||||||
let key = "bananas";
|
let key = "bananas";
|
||||||
|
|
||||||
let a = postgres
|
let a = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(key.into(), table_id)
|
.create_or_get(key.into(), table_id)
|
||||||
.await
|
.await
|
||||||
.expect("should create OK");
|
.expect("should create OK");
|
||||||
|
|
||||||
// Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent.
|
// Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent.
|
||||||
let b = postgres
|
let b = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(key.into(), table_id)
|
.create_or_get(key.into(), table_id)
|
||||||
.await
|
.await
|
||||||
|
@ -1959,22 +1943,12 @@ mod tests {
|
||||||
let postgres = setup_db().await;
|
let postgres = setup_db().await;
|
||||||
let metrics = Arc::clone(&postgres.metrics);
|
let metrics = Arc::clone(&postgres.metrics);
|
||||||
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
|
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
|
||||||
|
let mut repos = postgres.repositories().await;
|
||||||
|
|
||||||
let namespace_id = postgres
|
let namespace = arbitrary_namespace(&mut *repos, "ns4")
|
||||||
.repositories()
|
.await;
|
||||||
|
let table_id = arbitrary_table(&mut *repos, "table", &namespace)
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("ns4").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.expect("namespace create failed")
|
|
||||||
.id;
|
|
||||||
let table_id = postgres
|
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.tables()
|
|
||||||
.create_or_get("table", namespace_id)
|
|
||||||
.await
|
|
||||||
.expect("create table failed")
|
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
$(
|
$(
|
||||||
|
@ -1983,9 +1957,7 @@ mod tests {
|
||||||
insert.insert($col_name, $col_type);
|
insert.insert($col_name, $col_type);
|
||||||
)+
|
)+
|
||||||
|
|
||||||
let got = postgres
|
let got = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get_many_unchecked(table_id, insert.clone())
|
.create_or_get_many_unchecked(table_id, insert.clone())
|
||||||
.await;
|
.await;
|
||||||
|
@ -2122,29 +2094,14 @@ mod tests {
|
||||||
let postgres = setup_db().await;
|
let postgres = setup_db().await;
|
||||||
let pool = postgres.pool.clone();
|
let pool = postgres.pool.clone();
|
||||||
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
|
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
|
||||||
|
let mut repos = postgres.repositories().await;
|
||||||
let namespace_id = postgres
|
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
|
||||||
.repositories()
|
let namespace_id = namespace.id;
|
||||||
.await
|
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("ns4").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.expect("namespace create failed")
|
|
||||||
.id;
|
|
||||||
let table_id = postgres
|
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.tables()
|
|
||||||
.create_or_get("table", namespace_id)
|
|
||||||
.await
|
|
||||||
.expect("create table failed")
|
|
||||||
.id;
|
|
||||||
|
|
||||||
let key = "bananas";
|
let key = "bananas";
|
||||||
|
|
||||||
let partition_id = postgres
|
let partition_id = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(key.into(), table_id)
|
.create_or_get(key.into(), table_id)
|
||||||
.await
|
.await
|
||||||
|
@ -2169,9 +2126,7 @@ mod tests {
|
||||||
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
|
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
|
||||||
max_l0_created_at: time_now,
|
max_l0_created_at: time_now,
|
||||||
};
|
};
|
||||||
let f1 = postgres
|
let f1 = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.create(p1.clone())
|
.create(p1.clone())
|
||||||
.await
|
.await
|
||||||
|
@ -2179,9 +2134,7 @@ mod tests {
|
||||||
// insert the same again with a different size; we should then have 3x1337 as total file size
|
// insert the same again with a different size; we should then have 3x1337 as total file size
|
||||||
p1.object_store_id = Uuid::new_v4();
|
p1.object_store_id = Uuid::new_v4();
|
||||||
p1.file_size_bytes *= 2;
|
p1.file_size_bytes *= 2;
|
||||||
let _f2 = postgres
|
let _f2 = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.create(p1.clone())
|
.create(p1.clone())
|
||||||
.await
|
.await
|
||||||
|
@ -2196,9 +2149,7 @@ mod tests {
|
||||||
assert_eq!(total_file_size_bytes, 1337 * 3);
|
assert_eq!(total_file_size_bytes, 1337 * 3);
|
||||||
|
|
||||||
// flag f1 for deletion and assert that the total file size is reduced accordingly.
|
// flag f1 for deletion and assert that the total file size is reduced accordingly.
|
||||||
postgres
|
repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.flag_for_delete(f1.id)
|
.flag_for_delete(f1.id)
|
||||||
.await
|
.await
|
||||||
|
@ -2213,9 +2164,7 @@ mod tests {
|
||||||
|
|
||||||
// actually deleting shouldn't change the total
|
// actually deleting shouldn't change the total
|
||||||
let now = Timestamp::from(time_provider.now());
|
let now = Timestamp::from(time_provider.now());
|
||||||
postgres
|
repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.delete_old_ids_only(now)
|
.delete_old_ids_only(now)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -1508,6 +1508,7 @@ fn is_unique_violation(e: &sqlx::Error) -> bool {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::test_helpers::{arbitrary_namespace, arbitrary_table};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use metric::{Attributes, DurationHistogram, Metric};
|
use metric::{Attributes, DurationHistogram, Metric};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -1549,40 +1550,22 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_partition_create_or_get_idempotent() {
|
async fn test_partition_create_or_get_idempotent() {
|
||||||
let sqlite = setup_db().await;
|
let sqlite = setup_db().await;
|
||||||
|
|
||||||
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
|
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
|
||||||
|
let mut repos = sqlite.repositories().await;
|
||||||
|
|
||||||
let namespace_id = sqlite
|
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
|
||||||
.repositories()
|
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
|
||||||
.await
|
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("ns4").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.expect("namespace create failed")
|
|
||||||
.id;
|
|
||||||
let table_id = sqlite
|
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.tables()
|
|
||||||
.create_or_get("table", namespace_id)
|
|
||||||
.await
|
|
||||||
.expect("create table failed")
|
|
||||||
.id;
|
|
||||||
|
|
||||||
let key = "bananas";
|
let key = "bananas";
|
||||||
|
|
||||||
let a = sqlite
|
let a = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(key.into(), table_id)
|
.create_or_get(key.into(), table_id)
|
||||||
.await
|
.await
|
||||||
.expect("should create OK");
|
.expect("should create OK");
|
||||||
|
|
||||||
// Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent.
|
// Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent.
|
||||||
let b = sqlite
|
let b = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(key.into(), table_id)
|
.create_or_get(key.into(), table_id)
|
||||||
.await
|
.await
|
||||||
|
@ -1602,24 +1585,13 @@ mod tests {
|
||||||
async fn [<test_column_create_or_get_many_unchecked_ $name>]() {
|
async fn [<test_column_create_or_get_many_unchecked_ $name>]() {
|
||||||
let sqlite = setup_db().await;
|
let sqlite = setup_db().await;
|
||||||
let metrics = Arc::clone(&sqlite.metrics);
|
let metrics = Arc::clone(&sqlite.metrics);
|
||||||
|
|
||||||
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
|
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
|
||||||
|
let mut repos = sqlite.repositories().await;
|
||||||
|
|
||||||
let namespace_id = sqlite
|
let namespace = arbitrary_namespace(&mut *repos, "ns4")
|
||||||
.repositories()
|
.await;
|
||||||
|
let table_id = arbitrary_table(&mut *repos, "table", &namespace)
|
||||||
.await
|
.await
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("ns4").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.expect("namespace create failed")
|
|
||||||
.id;
|
|
||||||
let table_id = sqlite
|
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.tables()
|
|
||||||
.create_or_get("table", namespace_id)
|
|
||||||
.await
|
|
||||||
.expect("create table failed")
|
|
||||||
.id;
|
.id;
|
||||||
|
|
||||||
$(
|
$(
|
||||||
|
@ -1628,9 +1600,7 @@ mod tests {
|
||||||
insert.insert($col_name, $col_type);
|
insert.insert($col_name, $col_type);
|
||||||
)+
|
)+
|
||||||
|
|
||||||
let got = sqlite
|
let got = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get_many_unchecked(table_id, insert.clone())
|
.create_or_get_many_unchecked(table_id, insert.clone())
|
||||||
.await;
|
.await;
|
||||||
|
@ -1764,31 +1734,16 @@ mod tests {
|
||||||
async fn test_billing_summary_on_parqet_file_creation() {
|
async fn test_billing_summary_on_parqet_file_creation() {
|
||||||
let sqlite = setup_db().await;
|
let sqlite = setup_db().await;
|
||||||
let pool = sqlite.pool.clone();
|
let pool = sqlite.pool.clone();
|
||||||
|
|
||||||
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
|
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
|
||||||
|
let mut repos = sqlite.repositories().await;
|
||||||
|
|
||||||
let namespace_id = sqlite
|
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
|
||||||
.repositories()
|
let namespace_id = namespace.id;
|
||||||
.await
|
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("ns4").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.expect("namespace create failed")
|
|
||||||
.id;
|
|
||||||
let table_id = sqlite
|
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.tables()
|
|
||||||
.create_or_get("table", namespace_id)
|
|
||||||
.await
|
|
||||||
.expect("create table failed")
|
|
||||||
.id;
|
|
||||||
|
|
||||||
let key = "bananas";
|
let key = "bananas";
|
||||||
|
|
||||||
let partition_id = sqlite
|
let partition_id = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(key.into(), table_id)
|
.create_or_get(key.into(), table_id)
|
||||||
.await
|
.await
|
||||||
|
@ -1813,9 +1768,7 @@ mod tests {
|
||||||
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
|
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
|
||||||
max_l0_created_at: time_now,
|
max_l0_created_at: time_now,
|
||||||
};
|
};
|
||||||
let f1 = sqlite
|
let f1 = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.create(p1.clone())
|
.create(p1.clone())
|
||||||
.await
|
.await
|
||||||
|
@ -1823,9 +1776,7 @@ mod tests {
|
||||||
// insert the same again with a different size; we should then have 3x1337 as total file size
|
// insert the same again with a different size; we should then have 3x1337 as total file size
|
||||||
p1.object_store_id = Uuid::new_v4();
|
p1.object_store_id = Uuid::new_v4();
|
||||||
p1.file_size_bytes *= 2;
|
p1.file_size_bytes *= 2;
|
||||||
let _f2 = sqlite
|
let _f2 = repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.create(p1.clone())
|
.create(p1.clone())
|
||||||
.await
|
.await
|
||||||
|
@ -1840,9 +1791,7 @@ mod tests {
|
||||||
assert_eq!(total_file_size_bytes, 1337 * 3);
|
assert_eq!(total_file_size_bytes, 1337 * 3);
|
||||||
|
|
||||||
// flag f1 for deletion and assert that the total file size is reduced accordingly.
|
// flag f1 for deletion and assert that the total file size is reduced accordingly.
|
||||||
sqlite
|
repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.flag_for_delete(f1.id)
|
.flag_for_delete(f1.id)
|
||||||
.await
|
.await
|
||||||
|
@ -1857,9 +1806,7 @@ mod tests {
|
||||||
|
|
||||||
// actually deleting shouldn't change the total
|
// actually deleting shouldn't change the total
|
||||||
let now = Timestamp::from(time_provider.now());
|
let now = Timestamp::from(time_provider.now());
|
||||||
sqlite
|
repos
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.delete_old_ids_only(now)
|
.delete_old_ids_only(now)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -16,6 +16,7 @@ use iox_catalog::{
|
||||||
get_schema_by_id, get_table_columns_by_id, Catalog, PartitionRepo, SoftDeletedRows,
|
get_schema_by_id, get_table_columns_by_id, Catalog, PartitionRepo, SoftDeletedRows,
|
||||||
},
|
},
|
||||||
mem::MemCatalog,
|
mem::MemCatalog,
|
||||||
|
test_helpers::arbitrary_table,
|
||||||
};
|
};
|
||||||
use iox_query::{
|
use iox_query::{
|
||||||
exec::{DedicatedExecutors, Executor, ExecutorConfig},
|
exec::{DedicatedExecutors, Executor, ExecutorConfig},
|
||||||
|
@ -220,11 +221,7 @@ impl TestNamespace {
|
||||||
pub async fn create_table(self: &Arc<Self>, name: &str) -> Arc<TestTable> {
|
pub async fn create_table(self: &Arc<Self>, name: &str) -> Arc<TestTable> {
|
||||||
let mut repos = self.catalog.catalog.repositories().await;
|
let mut repos = self.catalog.catalog.repositories().await;
|
||||||
|
|
||||||
let table = repos
|
let table = arbitrary_table(&mut *repos, name, &self.namespace).await;
|
||||||
.tables()
|
|
||||||
.create_or_get(name, self.namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Arc::new(TestTable {
|
Arc::new(TestTable {
|
||||||
catalog: Arc::clone(&self.catalog),
|
catalog: Arc::clone(&self.catalog),
|
||||||
|
|
|
@ -382,7 +382,10 @@ where
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use data_types::ColumnType;
|
use data_types::ColumnType;
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::{
|
||||||
|
mem::MemCatalog,
|
||||||
|
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||||
|
};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
@ -391,17 +394,9 @@ mod tests {
|
||||||
let catalog = Arc::new(MemCatalog::new(Default::default()));
|
let catalog = Arc::new(MemCatalog::new(Default::default()));
|
||||||
|
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "test_ns").await;
|
||||||
.namespaces()
|
|
||||||
.create(&NamespaceName::new("test_ns").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let table = repos
|
let table = arbitrary_table(&mut *repos, "name", &namespace).await;
|
||||||
.tables()
|
|
||||||
.create_or_get("name", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let _column = repos
|
let _column = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("name", table.id, ColumnType::U64)
|
.create_or_get("name", table.id, ColumnType::U64)
|
||||||
|
|
|
@ -5,7 +5,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
|
||||||
use generated_types::influxdata::{iox::ingester::v1::WriteRequest, pbdata::v1::DatabaseBatch};
|
use generated_types::influxdata::{iox::ingester::v1::WriteRequest, pbdata::v1::DatabaseBatch};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use hyper::{Body, Request, StatusCode};
|
use hyper::{Body, Request, StatusCode};
|
||||||
use iox_catalog::interface::SoftDeletedRows;
|
use iox_catalog::{interface::SoftDeletedRows, test_helpers::arbitrary_namespace};
|
||||||
use iox_time::{SystemProvider, TimeProvider};
|
use iox_time::{SystemProvider, TimeProvider};
|
||||||
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
|
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
|
||||||
use router::dml_handlers::{DmlError, RetentionError, SchemaError};
|
use router::dml_handlers::{DmlError, RetentionError, SchemaError};
|
||||||
|
@ -265,17 +265,7 @@ async fn test_write_propagate_ids() {
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Create the namespace and a set of tables.
|
// Create the namespace and a set of tables.
|
||||||
let ns = ctx
|
let ns = arbitrary_namespace(&mut *ctx.catalog().repositories().await, "bananas_test").await;
|
||||||
.catalog()
|
|
||||||
.repositories()
|
|
||||||
.await
|
|
||||||
.namespaces()
|
|
||||||
.create(
|
|
||||||
&data_types::NamespaceName::new("bananas_test").unwrap(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("failed to update table limit");
|
|
||||||
|
|
||||||
let catalog = ctx.catalog();
|
let catalog = ctx.catalog();
|
||||||
let ids = ["another", "test", "table", "platanos"]
|
let ids = ["another", "test", "table", "platanos"]
|
||||||
|
|
|
@ -197,11 +197,12 @@ fn to_partition(p: data_types::Partition) -> Partition {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use data_types::{
|
use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp};
|
||||||
ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp,
|
|
||||||
};
|
|
||||||
use generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService;
|
use generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService;
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::{
|
||||||
|
mem::MemCatalog,
|
||||||
|
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||||
|
};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -214,16 +215,8 @@ mod tests {
|
||||||
let metrics = Arc::new(metric::Registry::default());
|
let metrics = Arc::new(metric::Registry::default());
|
||||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
|
||||||
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("schema_test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let partition = repos
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("foo".into(), table.id)
|
.create_or_get("foo".into(), table.id)
|
||||||
|
@ -277,16 +270,8 @@ mod tests {
|
||||||
let metrics = Arc::new(metric::Registry::default());
|
let metrics = Arc::new(metric::Registry::default());
|
||||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
|
||||||
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("schema_test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
partition1 = repos
|
partition1 = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("foo".into(), table.id)
|
.create_or_get("foo".into(), table.id)
|
||||||
|
|
|
@ -96,11 +96,12 @@ impl object_store_service_server::ObjectStoreService for ObjectStoreService {
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use data_types::{
|
use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp};
|
||||||
ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp,
|
|
||||||
};
|
|
||||||
use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService;
|
use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService;
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::{
|
||||||
|
mem::MemCatalog,
|
||||||
|
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||||
|
};
|
||||||
use object_store::{memory::InMemory, ObjectStore};
|
use object_store::{memory::InMemory, ObjectStore};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
@ -112,16 +113,8 @@ mod tests {
|
||||||
let metrics = Arc::new(metric::Registry::default());
|
let metrics = Arc::new(metric::Registry::default());
|
||||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
|
||||||
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("schema_test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let partition = repos
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("foo".into(), table.id)
|
.create_or_get("foo".into(), table.id)
|
||||||
|
|
|
@ -81,9 +81,12 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use data_types::{ColumnType, NamespaceName};
|
use data_types::ColumnType;
|
||||||
use generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService;
|
use generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService;
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::{
|
||||||
|
mem::MemCatalog,
|
||||||
|
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||||
|
};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -93,16 +96,8 @@ mod tests {
|
||||||
let metrics = Arc::new(metric::Registry::default());
|
let metrics = Arc::new(metric::Registry::default());
|
||||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = repos
|
let namespace = arbitrary_namespace(&mut *repos, "namespace_schema_test").await;
|
||||||
.namespaces()
|
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
|
||||||
.create(&NamespaceName::new("namespace_schema_test").unwrap(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let table = repos
|
|
||||||
.tables()
|
|
||||||
.create_or_get("schema_test_table", namespace.id)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
repos
|
repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("schema_test_column", table.id, ColumnType::Tag)
|
.create_or_get("schema_test_column", table.id, ColumnType::Tag)
|
||||||
|
|
Loading…
Reference in New Issue