Merge pull request #3505 from influxdata/pd/refactor-catalog-api

refactor: Clean up the Catalog API
pull/24376/head
kodiakhq[bot] 2022-01-24 15:20:12 +00:00 committed by GitHub
commit bf0bb3c643
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 279 additions and 237 deletions

View File

@ -7,8 +7,8 @@ use uuid::Uuid;
use crate::server::IngesterServer;
use iox_catalog::interface::{
KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber,
SequencerId, TableId, Tombstone,
Catalog, KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId,
TableId, Tombstone,
};
use mutable_batch::MutableBatch;
use parking_lot::RwLock;
@ -54,11 +54,9 @@ pub struct Sequencers {
impl Sequencers {
/// One time initialize Sequencers of this Ingester
pub async fn initialize<T: RepoCollection + Send + Sync>(
ingester: &IngesterServer<'_, T>,
) -> Result<Self> {
pub async fn initialize<T: Catalog>(ingester: &IngesterServer<'_, T>) -> Result<Self> {
// Get sequencer ids from the catalog
let sequencer_repro = ingester.iox_catalog.sequencer();
let sequencer_repro = ingester.iox_catalog.sequencers();
let mut sequencers = BTreeMap::default();
let topic = ingester.get_topic();
for shard in ingester.get_kafka_partitions() {

View File

@ -3,13 +3,13 @@
use std::sync::Arc;
use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection};
use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId};
/// The [`IngesterServer`] manages the lifecycle and contains all state for
/// an `ingester` server instance.
pub struct IngesterServer<'a, T>
where
T: RepoCollection + Send + Sync,
T: Catalog,
{
/// Kafka Topic assigned to this ingester
kafka_topic: KafkaTopic,
@ -21,7 +21,7 @@ where
impl<'a, T> IngesterServer<'a, T>
where
T: RepoCollection + Send + Sync,
T: Catalog,
{
/// Initialize the Ingester
pub fn new(topic: KafkaTopic, shard_ids: Vec<KafkaPartition>, catalog: &'a Arc<T>) -> Self {

View File

@ -6,7 +6,6 @@ use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Snafu)]
@ -247,32 +246,32 @@ impl std::fmt::Display for ParquetFileId {
}
}
/// Container that can return repos for each of the catalog data types.
/// Trait that contains methods for working with the catalog
#[async_trait]
pub trait RepoCollection {
pub trait Catalog: Send + Sync {
/// repo for kafka topics
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send>;
fn kafka_topics(&self) -> &dyn KafkaTopicRepo;
/// repo fo rquery pools
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send>;
fn query_pools(&self) -> &dyn QueryPoolRepo;
/// repo for namespaces
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send>;
fn namespaces(&self) -> &dyn NamespaceRepo;
/// repo for tables
fn table(&self) -> Arc<dyn TableRepo + Sync + Send>;
fn tables(&self) -> &dyn TableRepo;
/// repo for columns
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send>;
fn columns(&self) -> &dyn ColumnRepo;
/// repo for sequencers
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send>;
fn sequencers(&self) -> &dyn SequencerRepo;
/// repo for partitions
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send>;
fn partitions(&self) -> &dyn PartitionRepo;
/// repo for tombstones
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send>;
fn tombstones(&self) -> &dyn TombstoneRepo;
/// repo for parquet_files
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send>;
fn parquet_files(&self) -> &dyn ParquetFileRepo;
}
/// Functions for working with Kafka topics in the catalog.
#[async_trait]
pub trait KafkaTopicRepo {
pub trait KafkaTopicRepo: Send + Sync {
/// Creates the kafka topic in the catalog or gets the existing record by name.
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic>;
@ -282,14 +281,14 @@ pub trait KafkaTopicRepo {
/// Functions for working with query pools in the catalog.
#[async_trait]
pub trait QueryPoolRepo {
pub trait QueryPoolRepo: Send + Sync {
/// Creates the query pool in the catalog or gets the existing record by name.
async fn create_or_get(&self, name: &str) -> Result<QueryPool>;
}
/// Functions for working with namespaces in the catalog
#[async_trait]
pub trait NamespaceRepo {
pub trait NamespaceRepo: Send + Sync {
/// Creates the namespace in the catalog. If one by the same name already exists, an
/// error is returned.
async fn create(
@ -306,7 +305,7 @@ pub trait NamespaceRepo {
/// Functions for working with tables in the catalog
#[async_trait]
pub trait TableRepo {
pub trait TableRepo: Send + Sync {
/// Creates the table in the catalog or get the existing record by name.
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table>;
@ -316,7 +315,7 @@ pub trait TableRepo {
/// Functions for working with columns in the catalog
#[async_trait]
pub trait ColumnRepo {
pub trait ColumnRepo: Send + Sync {
/// Creates the column in the catalog or returns the existing column. Will return a
/// `Error::ColumnTypeMismatch` if the existing column type doesn't match the type
/// the caller is attempting to create.
@ -333,7 +332,7 @@ pub trait ColumnRepo {
/// Functions for working with sequencers in the catalog
#[async_trait]
pub trait SequencerRepo {
pub trait SequencerRepo: Send + Sync {
/// create a sequencer record for the kafka topic and partition or return the existing record
async fn create_or_get(
&self,
@ -358,7 +357,7 @@ pub trait SequencerRepo {
/// Functions for working with IOx partitions in the catalog. Note that these are how
/// IOx splits up data within a database, which is differenet than Kafka partitions.
#[async_trait]
pub trait PartitionRepo {
pub trait PartitionRepo: Send + Sync {
/// create or get a partition record for the given partition key, sequencer and table
async fn create_or_get(
&self,
@ -373,7 +372,7 @@ pub trait PartitionRepo {
/// Functions for working with tombstones in the catalog
#[async_trait]
pub trait TombstoneRepo {
pub trait TombstoneRepo: Send + Sync {
/// create or get a tombstone
async fn create_or_get(
&self,
@ -397,7 +396,7 @@ pub trait TombstoneRepo {
/// Functions for working with parquet file pointers in the catalog
#[async_trait]
pub trait ParquetFileRepo {
pub trait ParquetFileRepo: Send + Sync {
/// create the parquet file
#[allow(clippy::too_many_arguments)]
async fn create(
@ -519,22 +518,19 @@ impl NamespaceSchema {
}
/// Gets the namespace schema including all tables and columns.
pub async fn get_schema_by_name<T: RepoCollection + Send + Sync>(
pub async fn get_schema_by_name(
name: &str,
repo: &T,
catalog: &dyn Catalog,
) -> Result<Option<NamespaceSchema>> {
let namespace_repo = repo.namespace();
let table_repo = repo.table();
let column_repo = repo.column();
let namespace = namespace_repo
let namespace = catalog
.namespaces()
.get_by_name(name)
.await?
.context(NamespaceNotFoundSnafu { name })?;
// get the columns first just in case someone else is creating schema while we're doing this.
let columns = column_repo.list_by_namespace_id(namespace.id).await?;
let tables = table_repo.list_by_namespace_id(namespace.id).await?;
let columns = catalog.columns().list_by_namespace_id(namespace.id).await?;
let tables = catalog.tables().list_by_namespace_id(namespace.id).await?;
let mut namespace = NamespaceSchema::new(
namespace.id,
@ -813,25 +809,22 @@ pub struct ParquetFile {
pub(crate) mod test_helpers {
use super::*;
use futures::{stream::FuturesOrdered, StreamExt};
use std::sync::Arc;
pub(crate) async fn test_repo<T, F>(new_repo: F)
where
T: RepoCollection + Send + Sync,
F: Fn() -> T + Send + Sync,
{
test_kafka_topic(&new_repo()).await;
test_query_pool(&new_repo()).await;
test_namespace(&new_repo()).await;
test_table(&new_repo()).await;
test_column(&new_repo()).await;
test_sequencer(&new_repo()).await;
test_partition(&new_repo()).await;
test_tombstone(&new_repo()).await;
test_parquet_file(&new_repo()).await;
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_kafka_topic(Arc::clone(&catalog)).await;
test_query_pool(Arc::clone(&catalog)).await;
test_namespace(Arc::clone(&catalog)).await;
test_table(Arc::clone(&catalog)).await;
test_column(Arc::clone(&catalog)).await;
test_sequencer(Arc::clone(&catalog)).await;
test_partition(Arc::clone(&catalog)).await;
test_tombstone(Arc::clone(&catalog)).await;
test_parquet_file(Arc::clone(&catalog)).await;
}
async fn test_kafka_topic<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka_repo = repo.kafka_topic();
async fn test_kafka_topic(catalog: Arc<dyn Catalog>) {
let kafka_repo = catalog.kafka_topics();
let k = kafka_repo.create_or_get("foo").await.unwrap();
assert!(k.id > KafkaTopicId::new(0));
assert_eq!(k.name, "foo");
@ -843,8 +836,8 @@ pub(crate) mod test_helpers {
assert!(k3.is_none());
}
async fn test_query_pool<T: RepoCollection + Send + Sync>(repo: &T) {
let query_repo = repo.query_pool();
async fn test_query_pool(catalog: Arc<dyn Catalog>) {
let query_repo = catalog.query_pools();
let q = query_repo.create_or_get("foo").await.unwrap();
assert!(q.id > QueryPoolId::new(0));
assert_eq!(q.name, "foo");
@ -852,10 +845,10 @@ pub(crate) mod test_helpers {
assert_eq!(q, q2);
}
async fn test_namespace<T: RepoCollection + Send + Sync>(repo: &T) {
let namespace_repo = repo.namespace();
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
async fn test_namespace(catalog: Arc<dyn Catalog>) {
let namespace_repo = catalog.namespaces();
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace_name = "test_namespace";
let namespace = namespace_repo
@ -881,53 +874,75 @@ pub(crate) mod test_helpers {
assert_eq!(namespace, found);
}
async fn test_table<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_table(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_table_test", "inf", kafka.id, pool.id)
.await
.unwrap();
// test we can create or get a table
let table_repo = repo.table();
let t = table_repo
let t = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let tt = table_repo
let tt = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
assert!(t.id > TableId::new(0));
assert_eq!(t, tt);
let tables = table_repo.list_by_namespace_id(namespace.id).await.unwrap();
let tables = catalog
.tables()
.list_by_namespace_id(namespace.id)
.await
.unwrap();
assert_eq!(vec![t], tables);
// test we can create a table of the same name in a different namespace
let namespace2 = catalog
.namespaces()
.create("two", "inf", kafka.id, pool.id)
.await
.unwrap();
assert_ne!(namespace, namespace2);
let test_table = catalog
.tables()
.create_or_get("test_table", namespace2.id)
.await
.unwrap();
assert_ne!(tt, test_table);
assert_eq!(test_table.namespace_id, namespace2.id)
}
async fn test_column<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_column(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_column_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
assert_eq!(table.namespace_id, namespace.id);
// test we can create or get a column
let column_repo = repo.column();
let c = column_repo
let c = catalog
.columns()
.create_or_get("column_test", table.id, ColumnType::Tag)
.await
.unwrap();
let cc = column_repo
let cc = catalog
.columns()
.create_or_get("column_test", table.id, ColumnType::Tag)
.await
.unwrap();
@ -935,7 +950,8 @@ pub(crate) mod test_helpers {
assert_eq!(c, cc);
// test that attempting to create an already defined column of a different type returns error
let err = column_repo
let err = catalog
.columns()
.create_or_get("column_test", table.id, ColumnType::U64)
.await
.expect_err("should error with wrong column type");
@ -949,35 +965,40 @@ pub(crate) mod test_helpers {
));
// test that we can create a column of the same name under a different table
let table2 = repo
.table()
let table2 = catalog
.tables()
.create_or_get("test_table_2", namespace.id)
.await
.unwrap();
let ccc = column_repo
let ccc = catalog
.columns()
.create_or_get("column_test", table2.id, ColumnType::U64)
.await
.unwrap();
assert_ne!(c, ccc);
let columns = column_repo
let columns = catalog
.columns()
.list_by_namespace_id(namespace.id)
.await
.unwrap();
assert_eq!(vec![c, ccc], columns);
}
async fn test_sequencer<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo
.kafka_topic()
async fn test_sequencer(catalog: Arc<dyn Catalog>) {
let kafka = catalog
.kafka_topics()
.create_or_get("sequencer_test")
.await
.unwrap();
let sequencer_repo = repo.sequencer();
// Create 10 sequencers
let created = (1..=10)
.map(|partition| sequencer_repo.create_or_get(&kafka, KafkaPartition::new(partition)))
.map(|partition| {
catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(partition))
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
@ -987,7 +1008,8 @@ pub(crate) mod test_helpers {
.await;
// List them and assert they match
let listed = sequencer_repo
let listed = catalog
.sequencers()
.list_by_kafka_topic(&kafka)
.await
.expect("failed to list sequencers")
@ -999,7 +1021,8 @@ pub(crate) mod test_helpers {
// get by the sequencer id and partition
let kafka_partition = KafkaPartition::new(1);
let sequencer = sequencer_repo
let sequencer = catalog
.sequencers()
.get_by_topic_id_and_partition(kafka.id, kafka_partition)
.await
.unwrap()
@ -1007,42 +1030,45 @@ pub(crate) mod test_helpers {
assert_eq!(kafka.id, sequencer.kafka_topic_id);
assert_eq!(kafka_partition, sequencer.kafka_partition);
let sequencer = sequencer_repo
let sequencer = catalog
.sequencers()
.get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523))
.await
.unwrap();
assert!(sequencer.is_none());
}
async fn test_partition<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_partition(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_partition_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = repo
.sequencer()
let sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let other_sequencer = repo
.sequencer()
let other_sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(2))
.await
.unwrap();
let partition_repo = repo.partition();
let created = ["foo", "bar"]
.iter()
.map(|key| partition_repo.create_or_get(key, sequencer.id, table.id))
.map(|key| {
catalog
.partitions()
.create_or_get(key, sequencer.id, table.id)
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create partition");
@ -1050,13 +1076,15 @@ pub(crate) mod test_helpers {
})
.collect::<BTreeMap<_, _>>()
.await;
let _ = partition_repo
let _ = catalog
.partitions()
.create_or_get("asdf", other_sequencer.id, table.id)
.await
.unwrap();
// List them and assert they match
let listed = partition_repo
let listed = catalog
.partitions()
.list_by_sequencer(sequencer.id)
.await
.expect("failed to list partitions")
@ -1067,34 +1095,34 @@ pub(crate) mod test_helpers {
assert_eq!(created, listed);
}
async fn test_tombstone<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_tombstone(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_tombstone_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let other_table = repo
.table()
let other_table = catalog
.tables()
.create_or_get("other", namespace.id)
.await
.unwrap();
let sequencer = repo
.sequencer()
let sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let tombstone_repo = repo.tombstone();
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let t1 = tombstone_repo
let t1 = catalog
.tombstones()
.create_or_get(
table.id,
sequencer.id,
@ -1111,7 +1139,8 @@ pub(crate) mod test_helpers {
assert_eq!(t1.min_time, min_time);
assert_eq!(t1.max_time, max_time);
assert_eq!(t1.serialized_predicate, "whatevs");
let t2 = tombstone_repo
let t2 = catalog
.tombstones()
.create_or_get(
other_table.id,
sequencer.id,
@ -1122,7 +1151,8 @@ pub(crate) mod test_helpers {
)
.await
.unwrap();
let t3 = tombstone_repo
let t3 = catalog
.tombstones()
.create_or_get(
table.id,
sequencer.id,
@ -1134,43 +1164,44 @@ pub(crate) mod test_helpers {
.await
.unwrap();
let listed = tombstone_repo
let listed = catalog
.tombstones()
.list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
.await
.unwrap();
assert_eq!(vec![t2, t3], listed);
}
async fn test_parquet_file<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_parquet_file_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let other_table = repo
.table()
let other_table = catalog
.tables()
.create_or_get("other", namespace.id)
.await
.unwrap();
let sequencer = repo
.sequencer()
let sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition = repo
.partition()
let partition = catalog
.partitions()
.create_or_get("one", sequencer.id, table.id)
.await
.unwrap();
let other_partition = repo
.partition()
let other_partition = catalog
.partitions()
.create_or_get("one", sequencer.id, other_table.id)
.await
.unwrap();
@ -1178,7 +1209,7 @@ pub(crate) mod test_helpers {
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let parquet_repo = repo.parquet_file();
let parquet_repo = catalog.parquet_files();
let parquet_file = parquet_repo
.create(
sequencer.id,

View File

@ -12,8 +12,8 @@
)]
use crate::interface::{
column_type_from_field, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic,
NamespaceSchema, QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId,
column_type_from_field, Catalog, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic,
NamespaceSchema, QueryPool, Result, Sequencer, SequencerId, TableId,
};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::ParsedLine;
@ -36,10 +36,10 @@ pub mod postgres;
/// If another writer attempts to create a column of the same name with a different
/// type at the same time and beats this caller to it, an error will be returned. If another
/// writer adds the same schema before this one, then this will load that schema here.
pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
pub async fn validate_or_insert_schema(
lines: Vec<ParsedLine<'_>>,
schema: &NamespaceSchema,
repo: &T,
catalog: &dyn Catalog,
) -> Result<Option<NamespaceSchema>> {
// table name to table_id
let mut new_tables: BTreeMap<String, TableId> = BTreeMap::new();
@ -66,8 +66,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
None => {
let entry = new_columns.entry(table.id).or_default();
if entry.get(key.as_str()).is_none() {
let column_repo = repo.column();
let column = column_repo
let column = catalog
.columns()
.create_or_get(key.as_str(), table.id, ColumnType::Tag)
.await?;
entry.insert(
@ -97,8 +97,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
let entry = new_columns.entry(table.id).or_default();
if entry.get(key.as_str()).is_none() {
let data_type = column_type_from_field(value);
let column_repo = repo.column();
let column = column_repo
let column = catalog
.columns()
.create_or_get(key.as_str(), table.id, data_type)
.await?;
entry.insert(
@ -113,15 +113,16 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
}
}
None => {
let table_repo = repo.table();
let new_table = table_repo.create_or_get(table_name, schema.id).await?;
let new_table = catalog
.tables()
.create_or_get(table_name, schema.id)
.await?;
let new_table_columns = new_columns.entry(new_table.id).or_default();
let column_repo = repo.column();
if let Some(tagset) = &line.series.tag_set {
for (key, _) in tagset {
let new_column = column_repo
let new_column = catalog
.columns()
.create_or_get(key.as_str(), new_table.id, ColumnType::Tag)
.await?;
new_table_columns.insert(
@ -135,7 +136,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
}
for (key, value) in &line.field_set {
let data_type = column_type_from_field(value);
let new_column = column_repo
let new_column = catalog
.columns()
.create_or_get(key.as_str(), new_table.id, data_type)
.await?;
new_table_columns.insert(
@ -146,7 +148,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
},
);
}
let time_column = column_repo
let time_column = catalog
.columns()
.create_or_get(TIME_COLUMN, new_table.id, ColumnType::Time)
.await?;
new_table_columns.insert(
@ -173,19 +176,25 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
/// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for
/// each of the partitions.
pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
pub async fn create_or_get_default_records(
kafka_partition_count: i32,
repo: &T,
catalog: &dyn Catalog,
) -> Result<(KafkaTopic, QueryPool, BTreeMap<SequencerId, Sequencer>)> {
let kafka_repo = repo.kafka_topic();
let query_repo = repo.query_pool();
let sequencer_repo = repo.sequencer();
let kafka_topic = kafka_repo.create_or_get(SHARED_KAFKA_TOPIC).await?;
let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?;
let kafka_topic = catalog
.kafka_topics()
.create_or_get(SHARED_KAFKA_TOPIC)
.await?;
let query_pool = catalog
.query_pools()
.create_or_get(SHARED_QUERY_POOL)
.await?;
let sequencers = (1..=kafka_partition_count)
.map(|partition| sequencer_repo.create_or_get(&kafka_topic, KafkaPartition::new(partition)))
.map(|partition| {
catalog
.sequencers()
.create_or_get(&kafka_topic, KafkaPartition::new(partition))
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
@ -207,13 +216,13 @@ mod tests {
#[tokio::test]
async fn test_validate_or_insert_schema() {
let repo = Arc::new(MemCatalog::new());
let repo = MemCatalog::new();
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap();
let namespace_name = "validate_schema";
// now test with a new namespace
let namespace = repo
.namespace()
.namespaces()
.create(namespace_name, "inf", kafka_topic.id, query_pool.id)
.await
.unwrap();

View File

@ -2,16 +2,16 @@
//! used for testing or for an IOx designed to run without catalog persistence.
use crate::interface::{
Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId,
ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo,
RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic,
KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile,
ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId,
QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
};
use async_trait::async_trait;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use uuid::Uuid;
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
@ -48,41 +48,41 @@ struct MemCollections {
parquet_files: Vec<ParquetFile>,
}
impl RepoCollection for Arc<MemCatalog> {
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send> {
Self::clone(self) as Arc<dyn KafkaTopicRepo + Sync + Send>
impl Catalog for MemCatalog {
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
self
}
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send> {
Self::clone(self) as Arc<dyn QueryPoolRepo + Sync + Send>
fn query_pools(&self) -> &dyn QueryPoolRepo {
self
}
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send> {
Self::clone(self) as Arc<dyn NamespaceRepo + Sync + Send>
fn namespaces(&self) -> &dyn NamespaceRepo {
self
}
fn table(&self) -> Arc<dyn TableRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TableRepo + Sync + Send>
fn tables(&self) -> &dyn TableRepo {
self
}
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ColumnRepo + Sync + Send>
fn columns(&self) -> &dyn ColumnRepo {
self
}
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
fn sequencers(&self) -> &dyn SequencerRepo {
self
}
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
fn partitions(&self) -> &dyn PartitionRepo {
self
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
fn tombstones(&self) -> &dyn TombstoneRepo {
self
}
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ParquetFileRepo + Sync + Send>
fn parquet_files(&self) -> &dyn ParquetFileRepo {
self
}
}
@ -180,7 +180,11 @@ impl TableRepo for MemCatalog {
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let table = match collections.tables.iter().find(|t| t.name == name) {
let table = match collections
.tables
.iter()
.find(|t| t.name == name && t.namespace_id == namespace_id)
{
Some(t) => t,
None => {
let table = Table {
@ -250,18 +254,22 @@ impl ColumnRepo for MemCatalog {
}
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
let mut columns = vec![];
let collections = self.collections.lock().expect("mutex poisoned");
for t in collections
let table_ids: Vec<_> = collections
.tables
.iter()
.filter(|t| t.namespace_id == namespace_id)
{
for c in collections.columns.iter().filter(|c| c.table_id == t.id) {
columns.push(c.clone());
}
}
.map(|t| t.id)
.collect();
println!("tables: {:?}", collections.tables);
println!("table_ids: {:?}", table_ids);
let columns: Vec<_> = collections
.columns
.iter()
.filter(|c| table_ids.contains(&c.table_id))
.cloned()
.collect();
Ok(columns)
}
@ -488,11 +496,10 @@ impl ParquetFileRepo for MemCatalog {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[tokio::test]
async fn test_mem_repo() {
let f = || Arc::new(MemCatalog::new());
crate::interface::test_helpers::test_repo(f).await;
async fn test_catalog() {
crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new())).await;
}
}

View File

@ -1,16 +1,15 @@
//! A Postgres backed implementation of the Catalog
use crate::interface::{
Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId,
ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo,
RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
TableRepo, Timestamp, Tombstone, TombstoneRepo,
Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo,
Timestamp, Tombstone, TombstoneRepo,
};
use async_trait::async_trait;
use observability_deps::tracing::info;
use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
@ -62,41 +61,41 @@ impl PostgresCatalog {
}
}
impl RepoCollection for Arc<PostgresCatalog> {
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send> {
Self::clone(self) as Arc<dyn KafkaTopicRepo + Sync + Send>
impl Catalog for PostgresCatalog {
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
self
}
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send> {
Self::clone(self) as Arc<dyn QueryPoolRepo + Sync + Send>
fn query_pools(&self) -> &dyn QueryPoolRepo {
self
}
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send> {
Self::clone(self) as Arc<dyn NamespaceRepo + Sync + Send>
fn namespaces(&self) -> &dyn NamespaceRepo {
self
}
fn table(&self) -> Arc<dyn TableRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TableRepo + Sync + Send>
fn tables(&self) -> &dyn TableRepo {
self
}
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ColumnRepo + Sync + Send>
fn columns(&self) -> &dyn ColumnRepo {
self
}
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
fn sequencers(&self) -> &dyn SequencerRepo {
self
}
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
fn partitions(&self) -> &dyn PartitionRepo {
self
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
fn tombstones(&self) -> &dyn TombstoneRepo {
self
}
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ParquetFileRepo + Sync + Send>
fn parquet_files(&self) -> &dyn ParquetFileRepo {
self
}
}
@ -586,6 +585,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
mod tests {
use super::*;
use std::env;
use std::sync::Arc;
// Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
macro_rules! maybe_skip_integration {
@ -624,17 +624,15 @@ mod tests {
}};
}
async fn setup_db() -> Arc<PostgresCatalog> {
async fn setup_db() -> PostgresCatalog {
let dsn = std::env::var("DATABASE_URL").unwrap();
Arc::new(
PostgresCatalog::connect("test", SCHEMA_NAME, &dsn)
.await
.unwrap(),
)
PostgresCatalog::connect("test", SCHEMA_NAME, &dsn)
.await
.unwrap()
}
#[tokio::test]
async fn test_repo() {
async fn test_catalog() {
// If running an integration test on your laptop, this requires that you have Postgres
// running and that you've done the sqlx migrations. See the README in this crate for
// info to set it up.
@ -642,10 +640,9 @@ mod tests {
let postgres = setup_db().await;
clear_schema(&postgres.pool).await;
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let f = || Arc::clone(&postgres);
crate::interface::test_helpers::test_repo(f).await;
crate::interface::test_helpers::test_catalog(postgres).await;
}
async fn clear_schema(pool: &Pool<Postgres>) {