chore: Merge branch 'main' into ntran/ingester

pull/24376/head
NGA-TRAN 2022-01-18 14:04:44 -05:00
commit 5d3c9de7aa
4 changed files with 842 additions and 310 deletions

View File

@ -2,7 +2,7 @@
use async_trait::async_trait;
use influxdb_line_protocol::FieldValue;
use snafu::Snafu;
use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::fmt::Formatter;
@ -33,11 +33,104 @@ pub enum Error {
name
))]
UnknownColumnType { data_type: i16, name: String },
#[snafu(display("namespace {} not found", name))]
NamespaceNotFound { name: String },
}
/// A specialized `Error` for Catalog errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Unique ID for a `Namespace`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct NamespaceId(i32);
#[allow(missing_docs)]
impl NamespaceId {
pub fn new(v: i32) -> Self {
Self(v)
}
pub fn get(&self) -> i32 {
self.0
}
}
/// Unique ID for a `KafkaTopic`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct KafkaTopicId(i32);
#[allow(missing_docs)]
impl KafkaTopicId {
pub fn new(v: i32) -> Self {
Self(v)
}
pub fn get(&self) -> i32 {
self.0
}
}
/// Unique ID for a `QueryPool`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct QueryPoolId(i16);
#[allow(missing_docs)]
impl QueryPoolId {
pub fn new(v: i16) -> Self {
Self(v)
}
pub fn get(&self) -> i16 {
self.0
}
}
/// Unique ID for a `Table`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct TableId(i32);
#[allow(missing_docs)]
impl TableId {
pub fn new(v: i32) -> Self {
Self(v)
}
pub fn get(&self) -> i32 {
self.0
}
}
/// Unique ID for a `Column`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct ColumnId(i32);
#[allow(missing_docs)]
impl ColumnId {
pub fn new(v: i32) -> Self {
Self(v)
}
pub fn get(&self) -> i32 {
self.0
}
}
/// Unique ID for a `Sequencer`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct SequencerId(i16);
#[allow(missing_docs)]
impl SequencerId {
pub fn new(v: i16) -> Self {
Self(v)
}
pub fn get(&self) -> i16 {
self.0
}
}
/// Container that can return repos for each of the catalog data types.
#[async_trait]
pub trait RepoCollection {
@ -72,28 +165,28 @@ pub trait QueryPoolRepo {
/// Functions for working with namespaces in the catalog
#[async_trait]
pub trait NamespaceRepo {
/// Creates the namespace in the catalog, or get the existing record by name. Then
/// constructs a namespace schema with all tables and columns under the namespace.
/// Creates the namespace in the catalog. If one by the same name already exists, an
/// error is returned.
async fn create(
&self,
name: &str,
retention_duration: &str,
kafka_topic_id: i32,
query_pool_id: i16,
) -> Result<NamespaceSchema>;
kafka_topic_id: KafkaTopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace>;
/// Gets the namespace schema including all tables and columns.
async fn get_by_name(&self, name: &str) -> Result<Option<NamespaceSchema>>;
/// Gets the namespace by its unique name.
async fn get_by_name(&self, name: &str) -> Result<Option<Namespace>>;
}
/// Functions for working with tables in the catalog
#[async_trait]
pub trait TableRepo {
/// Creates the table in the catalog or get the existing record by name.
async fn create_or_get(&self, name: &str, namespace_id: i32) -> Result<Table>;
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table>;
/// Lists all tables in the catalog for the given namespace id.
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Table>>;
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Table>>;
}
/// Functions for working with columns in the catalog
@ -105,12 +198,12 @@ pub trait ColumnRepo {
async fn create_or_get(
&self,
name: &str,
table_id: i32,
table_id: TableId,
column_type: ColumnType,
) -> Result<Column>;
/// Lists all columns in the passed in namespace id.
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Column>>;
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>>;
}
/// Functions for working with sequencers in the catalog
@ -121,58 +214,62 @@ pub trait SequencerRepo {
/// list all sequencers
async fn list(&self) -> Result<Vec<Sequencer>>;
/// list all sequencers for a given kafka topic
async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result<Vec<Sequencer>>;
}
/// Data object for a kafka topic
#[derive(Debug, Eq, PartialEq, sqlx::FromRow)]
#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct KafkaTopic {
/// The id of the topic
pub id: i32,
pub id: KafkaTopicId,
/// The unique name of the topic
pub name: String,
}
/// Data object for a query pool
#[derive(Debug, Eq, PartialEq, sqlx::FromRow)]
#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct QueryPool {
/// The id of the pool
pub id: i16,
pub id: QueryPoolId,
/// The unique name of the pool
pub name: String,
}
/// Data object for a namespace
#[derive(Debug, sqlx::FromRow)]
#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct Namespace {
/// The id of the namespace
pub id: i32,
pub id: NamespaceId,
/// The unique name of the namespace
pub name: String,
/// The retention duration as a string. 'inf' or not present represents infinite duration (i.e. never drop data).
#[sqlx(default)]
pub retention_duration: Option<String>,
/// The kafka topic that writes to this namespace will land in
pub kafka_topic_id: i32,
pub kafka_topic_id: KafkaTopicId,
/// The query pool assigned to answer queries for this namespace
pub query_pool_id: i16,
pub query_pool_id: QueryPoolId,
}
/// Schema collection for a namespace
/// Schema collection for a namespace. This is an in-memory object useful for a schema
/// cache.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct NamespaceSchema {
/// the namespace id
pub id: i32,
pub id: NamespaceId,
/// the kafka topic this namespace gets data written to
pub kafka_topic_id: i32,
pub kafka_topic_id: KafkaTopicId,
/// the query pool assigned to answer queries for this namespace
pub query_pool_id: i16,
pub query_pool_id: QueryPoolId,
/// the tables in the namespace by name
pub tables: BTreeMap<String, TableSchema>,
}
impl NamespaceSchema {
/// Create a new `NamespaceSchema`
pub fn new(id: i32, kafka_topic_id: i32, query_pool_id: i16) -> Self {
pub fn new(id: NamespaceId, kafka_topic_id: KafkaTopicId, query_pool_id: QueryPoolId) -> Self {
Self {
id,
tables: BTreeMap::new(),
@ -186,8 +283,8 @@ impl NamespaceSchema {
/// method takes them in to add them to the schema.
pub fn add_tables_and_columns(
&mut self,
new_tables: BTreeMap<String, i32>,
new_columns: BTreeMap<i32, BTreeMap<String, ColumnSchema>>,
new_tables: BTreeMap<String, TableId>,
new_columns: BTreeMap<TableId, BTreeMap<String, ColumnSchema>>,
) {
for (table_name, table_id) in new_tables {
self.tables
@ -203,7 +300,7 @@ impl NamespaceSchema {
}
}
fn get_table_mut(&mut self, table_id: i32) -> Option<&mut TableSchema> {
fn get_table_mut(&mut self, table_id: TableId) -> Option<&mut TableSchema> {
for table in self.tables.values_mut() {
if table.id == table_id {
return Some(table);
@ -214,13 +311,70 @@ impl NamespaceSchema {
}
}
/// Gets the namespace schema including all tables and columns.
pub async fn get_schema_by_name<T: RepoCollection + Send + Sync>(
name: &str,
repo: &T,
) -> Result<Option<NamespaceSchema>> {
let namespace_repo = repo.namespace();
let table_repo = repo.table();
let column_repo = repo.column();
let namespace = namespace_repo
.get_by_name(name)
.await?
.context(NamespaceNotFoundSnafu { name })?;
// get the columns first just in case someone else is creating schema while we're doing this.
let columns = column_repo.list_by_namespace_id(namespace.id).await?;
let tables = table_repo.list_by_namespace_id(namespace.id).await?;
let mut namespace = NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
);
let mut table_id_to_schema = BTreeMap::new();
for t in tables {
table_id_to_schema.insert(t.id, (t.name, TableSchema::new(t.id)));
}
for c in columns {
let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap();
match ColumnType::try_from(c.column_type) {
Ok(column_type) => {
t.columns.insert(
c.name,
ColumnSchema {
id: c.id,
column_type,
},
);
}
_ => {
return Err(Error::UnknownColumnType {
data_type: c.column_type,
name: c.name.to_string(),
});
}
}
}
for (_, (table_name, schema)) in table_id_to_schema {
namespace.tables.insert(table_name, schema);
}
Ok(Some(namespace))
}
/// Data object for a table
#[derive(Debug, sqlx::FromRow, Eq, PartialEq)]
#[derive(Debug, Clone, sqlx::FromRow, Eq, PartialEq)]
pub struct Table {
/// The id of the table
pub id: i32,
pub id: TableId,
/// The namespace id that the table is in
pub namespace_id: i32,
pub namespace_id: NamespaceId,
/// The name of the table, which is unique within the associated namespace
pub name: String,
}
@ -229,14 +383,14 @@ pub struct Table {
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct TableSchema {
/// the table id
pub id: i32,
pub id: TableId,
/// the table's columns by their name
pub columns: BTreeMap<String, ColumnSchema>,
}
impl TableSchema {
/// Initialize new `TableSchema`
pub fn new(id: i32) -> Self {
pub fn new(id: TableId) -> Self {
Self {
id,
columns: BTreeMap::new(),
@ -252,12 +406,12 @@ impl TableSchema {
}
/// Data object for a column
#[derive(Debug, sqlx::FromRow, Eq, PartialEq)]
#[derive(Debug, Clone, sqlx::FromRow, Eq, PartialEq)]
pub struct Column {
/// the column id
pub id: i32,
pub id: ColumnId,
/// the table id the column is in
pub table_id: i32,
pub table_id: TableId,
/// the name of the column, which is unique in the table
pub name: String,
/// the logical type of the column
@ -286,7 +440,7 @@ impl Column {
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct ColumnSchema {
/// the column id
pub id: i32,
pub id: ColumnId,
/// the column type
pub column_type: ColumnType,
}
@ -379,9 +533,9 @@ pub fn column_type_from_field(field_value: &FieldValue) -> ColumnType {
#[derive(Debug, Copy, Clone, PartialEq, sqlx::FromRow)]
pub struct Sequencer {
/// the id of the sequencer
pub id: i16,
pub id: SequencerId,
/// the topic the sequencer is reading from
pub kafka_topic_id: i32,
pub kafka_topic_id: KafkaTopicId,
/// the kafka partition the sequencer is reading from
pub kafka_partition: i32,
/// The minimum unpersisted sequence number. Because different tables
@ -390,3 +544,186 @@ pub struct Sequencer {
/// lower than this must have been persisted to Parquet.
pub min_unpersisted_sequence_number: i64,
}
#[cfg(test)]
pub(crate) mod test_helpers {
use super::*;
use futures::{stream::FuturesOrdered, StreamExt};
pub(crate) async fn test_repo<T, F>(new_repo: F)
where
T: RepoCollection + Send + Sync,
F: Fn() -> T + Send + Sync,
{
test_kafka_topic(&new_repo()).await;
test_query_pool(&new_repo()).await;
test_namespace(&new_repo()).await;
test_table(&new_repo()).await;
test_column(&new_repo()).await;
test_sequencer(&new_repo()).await;
}
async fn test_kafka_topic<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka_repo = repo.kafka_topic();
let k = kafka_repo.create_or_get("foo").await.unwrap();
assert!(k.id > KafkaTopicId::new(0));
assert_eq!(k.name, "foo");
let k2 = kafka_repo.create_or_get("foo").await.unwrap();
assert_eq!(k, k2);
}
async fn test_query_pool<T: RepoCollection + Send + Sync>(repo: &T) {
let query_repo = repo.query_pool();
let q = query_repo.create_or_get("foo").await.unwrap();
assert!(q.id > QueryPoolId::new(0));
assert_eq!(q.name, "foo");
let q2 = query_repo.create_or_get("foo").await.unwrap();
assert_eq!(q, q2);
}
async fn test_namespace<T: RepoCollection + Send + Sync>(repo: &T) {
let namespace_repo = repo.namespace();
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace_name = "test_namespace";
let namespace = namespace_repo
.create(namespace_name, "inf", kafka.id, pool.id)
.await
.unwrap();
assert!(namespace.id > NamespaceId::new(0));
assert_eq!(namespace.name, namespace_name);
let conflict = namespace_repo
.create(namespace_name, "inf", kafka.id, pool.id)
.await;
assert!(matches!(
conflict.unwrap_err(),
Error::NameExists { name: _ }
));
let found = namespace_repo
.get_by_name(namespace_name)
.await
.unwrap()
.expect("namespace should be there");
assert_eq!(namespace, found);
}
async fn test_table<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
.create("namespace_table_test", "inf", kafka.id, pool.id)
.await
.unwrap();
// test we can create or get a table
let table_repo = repo.table();
let t = table_repo
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let tt = table_repo
.create_or_get("test_table", namespace.id)
.await
.unwrap();
assert!(t.id > TableId::new(0));
assert_eq!(t, tt);
let tables = table_repo.list_by_namespace_id(namespace.id).await.unwrap();
assert_eq!(vec![t], tables);
}
async fn test_column<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
.create("namespace_column_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
// test we can create or get a column
let column_repo = repo.column();
let c = column_repo
.create_or_get("column_test", table.id, ColumnType::Tag)
.await
.unwrap();
let cc = column_repo
.create_or_get("column_test", table.id, ColumnType::Tag)
.await
.unwrap();
assert!(c.id > ColumnId::new(0));
assert_eq!(c, cc);
// test that attempting to create an already defined column of a different type returns error
let err = column_repo
.create_or_get("column_test", table.id, ColumnType::U64)
.await
.expect_err("should error with wrong column type");
assert!(matches!(
err,
Error::ColumnTypeMismatch {
name: _,
existing: _,
new: _
}
));
// test that we can create a column of the same name under a different table
let table2 = repo
.table()
.create_or_get("test_table_2", namespace.id)
.await
.unwrap();
let ccc = column_repo
.create_or_get("column_test", table2.id, ColumnType::U64)
.await
.unwrap();
assert_ne!(c, ccc);
let columns = column_repo
.list_by_namespace_id(namespace.id)
.await
.unwrap();
assert_eq!(vec![c, ccc], columns);
}
async fn test_sequencer<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo
.kafka_topic()
.create_or_get("sequencer_test")
.await
.unwrap();
let sequencer_repo = repo.sequencer();
// Create 10 sequencers
let created = (1..=10)
.map(|partition| sequencer_repo.create_or_get(&kafka, partition))
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
(v.id, v)
})
.collect::<BTreeMap<_, _>>()
.await;
// List them and assert they match
let listed = sequencer_repo
.list_by_kafka_topic(&kafka)
.await
.expect("failed to list sequencers")
.into_iter()
.map(|v| (v.id, v))
.collect::<BTreeMap<_, _>>();
assert_eq!(created, listed);
}
}

View File

@ -13,7 +13,7 @@
use crate::interface::{
column_type_from_field, ColumnSchema, ColumnType, Error, KafkaTopic, NamespaceSchema,
QueryPool, RepoCollection, Result, Sequencer,
QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId,
};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::ParsedLine;
@ -25,6 +25,7 @@ const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC;
const TIME_COLUMN: &str = "time";
pub mod interface;
pub mod mem;
pub mod postgres;
/// Given the lines of a write request and an in memory schema, this will validate the write
@ -41,9 +42,9 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
repo: &T,
) -> Result<Option<NamespaceSchema>> {
// table name to table_id
let mut new_tables: BTreeMap<String, i32> = BTreeMap::new();
let mut new_tables: BTreeMap<String, TableId> = BTreeMap::new();
// table_id to map of column name to column
let mut new_columns: BTreeMap<i32, BTreeMap<String, ColumnSchema>> = BTreeMap::new();
let mut new_columns: BTreeMap<TableId, BTreeMap<String, ColumnSchema>> = BTreeMap::new();
for line in &lines {
let table_name = line.series.measurement.as_str();
@ -175,7 +176,7 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
kafka_partition_count: i32,
repo: &T,
) -> Result<(KafkaTopic, QueryPool, BTreeMap<i16, Sequencer>)> {
) -> Result<(KafkaTopic, QueryPool, BTreeMap<SequencerId, Sequencer>)> {
let kafka_repo = repo.kafka_topic();
let query_repo = repo.query_pool();
let sequencer_repo = repo.sequencer();
@ -195,3 +196,100 @@ pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
Ok((kafka_topic, query_pool, sequencers))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::interface::get_schema_by_name;
use crate::mem::MemCatalog;
use influxdb_line_protocol::parse_lines;
use std::sync::Arc;
#[tokio::test]
async fn test_validate_or_insert_schema() {
let repo = Arc::new(MemCatalog::new());
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap();
let namespace_name = "validate_schema";
// now test with a new namespace
let namespace = repo
.namespace()
.create(namespace_name, "inf", kafka_topic.id, query_pool.id)
.await
.unwrap();
let data = r#"
m1,t1=a,t2=b f1=2i,f2=2.0 1
m1,t1=a f1=3i 2
m2,t3=b f1=true 1
"#;
// test that new schema gets returned
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let schema = Arc::new(NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
));
let new_schema = validate_or_insert_schema(lines, &schema, &repo)
.await
.unwrap();
let new_schema = new_schema.unwrap();
// ensure new schema is in the db
let schema_from_db = get_schema_by_name(namespace_name, &repo)
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema_from_db);
// test that a new table will be created
let data = r#"
m1,t1=c f1=1i 2
new_measurement,t9=a f10=true 1
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema_from_db, &repo)
.await
.unwrap()
.unwrap();
let new_table = new_schema.tables.get("new_measurement").unwrap();
assert_eq!(
ColumnType::Bool,
new_table.columns.get("f10").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
new_table.columns.get("t9").unwrap().column_type
);
let schema = get_schema_by_name(namespace_name, &repo)
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
// test that a new column for an existing table will be created
// test that a new table will be created
let data = r#"
m1,new_tag=c new_field=1i 2
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema, &repo)
.await
.unwrap()
.unwrap();
let table = new_schema.tables.get("m1").unwrap();
assert_eq!(
ColumnType::I64,
table.columns.get("new_field").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
table.columns.get("new_tag").unwrap().column_type
);
let schema = get_schema_by_name(namespace_name, &repo)
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
}
}

295
iox_catalog/src/mem.rs Normal file
View File

@ -0,0 +1,295 @@
//! This module implements an in-memory implementation of the iox_catalog interface. It can be
//! used for testing or for an IOx designed to run without catalog persistence.
use crate::interface::{
Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaTopic, KafkaTopicId, KafkaTopicRepo,
Namespace, NamespaceId, NamespaceRepo, QueryPool, QueryPoolId, QueryPoolRepo, RepoCollection,
Result, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo,
};
use async_trait::async_trait;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
/// the catalog interface.
#[derive(Default)]
pub struct MemCatalog {
collections: Mutex<MemCollections>,
}
impl MemCatalog {
/// return new initialized `MemCatalog`
pub fn new() -> Self {
Self::default()
}
}
impl std::fmt::Debug for MemCatalog {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let c = self.collections.lock().expect("mutex poisoned");
write!(f, "MemCatalog[ {:?} ]", c)
}
}
#[derive(Default, Debug)]
struct MemCollections {
kafka_topics: Vec<KafkaTopic>,
query_pools: Vec<QueryPool>,
namespaces: Vec<Namespace>,
tables: Vec<Table>,
columns: Vec<Column>,
sequencers: Vec<Sequencer>,
}
impl RepoCollection for Arc<MemCatalog> {
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send> {
Self::clone(self) as Arc<dyn KafkaTopicRepo + Sync + Send>
}
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send> {
Self::clone(self) as Arc<dyn QueryPoolRepo + Sync + Send>
}
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send> {
Self::clone(self) as Arc<dyn NamespaceRepo + Sync + Send>
}
fn table(&self) -> Arc<dyn TableRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TableRepo + Sync + Send>
}
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ColumnRepo + Sync + Send>
}
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
}
}
#[async_trait]
impl KafkaTopicRepo for MemCatalog {
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let topic = match collections.kafka_topics.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let topic = KafkaTopic {
id: KafkaTopicId::new(collections.kafka_topics.len() as i32 + 1),
name: name.to_string(),
};
collections.kafka_topics.push(topic);
collections.kafka_topics.last().unwrap()
}
};
Ok(topic.clone())
}
}
#[async_trait]
impl QueryPoolRepo for MemCatalog {
async fn create_or_get(&self, name: &str) -> Result<QueryPool> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let pool = match collections.query_pools.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let pool = QueryPool {
id: QueryPoolId::new(collections.query_pools.len() as i16 + 1),
name: name.to_string(),
};
collections.query_pools.push(pool);
collections.query_pools.last().unwrap()
}
};
Ok(pool.clone())
}
}
#[async_trait]
impl NamespaceRepo for MemCatalog {
async fn create(
&self,
name: &str,
retention_duration: &str,
kafka_topic_id: KafkaTopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace> {
let mut collections = self.collections.lock().expect("mutex poisoned");
if collections.namespaces.iter().any(|n| n.name == name) {
return Err(Error::NameExists {
name: name.to_string(),
});
}
let namespace = Namespace {
id: NamespaceId::new(collections.namespaces.len() as i32 + 1),
name: name.to_string(),
kafka_topic_id,
query_pool_id,
retention_duration: Some(retention_duration.to_string()),
};
collections.namespaces.push(namespace);
Ok(collections.namespaces.last().unwrap().clone())
}
async fn get_by_name(&self, name: &str) -> Result<Option<Namespace>> {
let collections = self.collections.lock().expect("mutex poisoned");
Ok(collections
.namespaces
.iter()
.find(|n| n.name == name)
.cloned())
}
}
#[async_trait]
impl TableRepo for MemCatalog {
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let table = match collections.tables.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let table = Table {
id: TableId::new(collections.tables.len() as i32 + 1),
namespace_id,
name: name.to_string(),
};
collections.tables.push(table);
collections.tables.last().unwrap()
}
};
Ok(table.clone())
}
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
let collections = self.collections.lock().expect("mutex poisoned");
let tables: Vec<_> = collections
.tables
.iter()
.filter(|t| t.namespace_id == namespace_id)
.cloned()
.collect();
Ok(tables)
}
}
#[async_trait]
impl ColumnRepo for MemCatalog {
async fn create_or_get(
&self,
name: &str,
table_id: TableId,
column_type: ColumnType,
) -> Result<Column> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let column = match collections
.columns
.iter()
.find(|t| t.name == name && t.table_id == table_id)
{
Some(c) => {
if column_type as i16 != c.column_type {
return Err(Error::ColumnTypeMismatch {
name: name.to_string(),
existing: ColumnType::try_from(c.column_type).unwrap().to_string(),
new: column_type.to_string(),
});
}
c
}
None => {
let column = Column {
id: ColumnId::new(collections.columns.len() as i32 + 1),
table_id,
name: name.to_string(),
column_type: column_type as i16,
};
collections.columns.push(column);
collections.columns.last().unwrap()
}
};
Ok(column.clone())
}
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
let mut columns = vec![];
let collections = self.collections.lock().expect("mutex poisoned");
for t in collections
.tables
.iter()
.filter(|t| t.namespace_id == namespace_id)
{
for c in collections.columns.iter().filter(|c| c.table_id == t.id) {
columns.push(c.clone());
}
}
Ok(columns)
}
}
#[async_trait]
impl SequencerRepo for MemCatalog {
async fn create_or_get(&self, topic: &KafkaTopic, partition: i32) -> Result<Sequencer> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let sequencer = match collections
.sequencers
.iter()
.find(|s| s.kafka_topic_id == topic.id && s.kafka_partition == partition)
{
Some(t) => t,
None => {
let sequencer = Sequencer {
id: SequencerId::new(collections.sequencers.len() as i16 + 1),
kafka_topic_id: topic.id,
kafka_partition: partition,
min_unpersisted_sequence_number: 0,
};
collections.sequencers.push(sequencer);
collections.sequencers.last().unwrap()
}
};
Ok(*sequencer)
}
async fn list(&self) -> Result<Vec<Sequencer>> {
let collections = self.collections.lock().expect("mutex poisoned");
Ok(collections.sequencers.clone())
}
async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
let collections = self.collections.lock().expect("mutex poisoned");
let sequencers: Vec<_> = collections
.sequencers
.iter()
.filter(|s| s.kafka_topic_id == topic.id)
.cloned()
.collect();
Ok(sequencers)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mem_repo() {
let f = || Arc::new(MemCatalog::new());
crate::interface::test_helpers::test_repo(f).await;
}
}

View File

@ -1,15 +1,13 @@
//! A Postgres backed implementation of the Catalog
use crate::interface::{
Column, ColumnRepo, ColumnSchema, ColumnType, Error, KafkaTopic, KafkaTopicRepo, Namespace,
NamespaceRepo, NamespaceSchema, QueryPool, QueryPoolRepo, RepoCollection, Result, Sequencer,
SequencerRepo, Table, TableRepo, TableSchema,
Column, ColumnRepo, ColumnType, Error, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace,
NamespaceId, NamespaceRepo, QueryPool, QueryPoolId, QueryPoolRepo, RepoCollection, Result,
Sequencer, SequencerRepo, Table, TableId, TableRepo,
};
use async_trait::async_trait;
use observability_deps::tracing::info;
use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
@ -19,41 +17,46 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
#[allow(dead_code)]
const SCHEMA_NAME: &str = "iox_catalog";
/// Connect to the catalog store.
pub async fn connect_catalog_store(
app_name: &'static str,
schema_name: &'static str,
dsn: &str,
) -> Result<Pool<Postgres>, sqlx::Error> {
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(MAX_CONNECTIONS)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.test_before_acquire(true)
.after_connect(move |c| {
Box::pin(async move {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
})
})
.connect(dsn)
.await?;
// Log a connection was successfully established and include the application
// name for cross-correlation between Conductor logs & database connections.
info!(application_name=%app_name, "connected to catalog store");
Ok(pool)
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
#[derive(Debug)]
pub struct PostgresCatalog {
pool: Pool<Postgres>,
}
struct PostgresCatalog {
pool: Pool<Postgres>,
impl PostgresCatalog {
/// Connect to the catalog store.
pub async fn connect(
app_name: &'static str,
schema_name: &'static str,
dsn: &str,
) -> Result<Self> {
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(MAX_CONNECTIONS)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.test_before_acquire(true)
.after_connect(move |c| {
Box::pin(async move {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
})
})
.connect(dsn)
.await
.map_err(|e| Error::SqlxError { source: e })?;
// Log a connection was successfully established and include the application
// name for cross-correlation between Conductor logs & database connections.
info!(application_name=%app_name, "connected to catalog store");
Ok(Self { pool })
}
}
impl RepoCollection for Arc<PostgresCatalog> {
@ -128,9 +131,9 @@ impl NamespaceRepo for PostgresCatalog {
&self,
name: &str,
retention_duration: &str,
kafka_topic_id: i32,
query_pool_id: i16,
) -> Result<NamespaceSchema> {
kafka_topic_id: KafkaTopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id )
@ -156,10 +159,10 @@ RETURNING *
}
})?;
Ok(NamespaceSchema::new(rec.id, kafka_topic_id, query_pool_id))
Ok(rec)
}
async fn get_by_name(&self, name: &str) -> Result<Option<NamespaceSchema>> {
async fn get_by_name(&self, name: &str) -> Result<Option<Namespace>> {
// TODO: maybe get all the data in a single call to Postgres?
let rec = sqlx::query_as::<_, Namespace>(
r#"
@ -175,53 +178,14 @@ SELECT * FROM namespace WHERE name = $1;
}
let namespace = rec.map_err(|e| Error::SqlxError { source: e })?;
// get the columns first just in case someone else is creating schema while we're doing this.
let columns = ColumnRepo::list_by_namespace_id(self, namespace.id).await?;
let tables = TableRepo::list_by_namespace_id(self, namespace.id).await?;
let mut namespace = NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
);
let mut table_id_to_schema = BTreeMap::new();
for t in tables {
table_id_to_schema.insert(t.id, (t.name, TableSchema::new(t.id)));
}
for c in columns {
let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap();
match ColumnType::try_from(c.column_type) {
Ok(column_type) => {
t.columns.insert(
c.name,
ColumnSchema {
id: c.id,
column_type,
},
);
}
_ => {
return Err(Error::UnknownColumnType {
data_type: c.column_type,
name: c.name.to_string(),
});
}
}
}
for (_, (table_name, schema)) in table_id_to_schema {
namespace.tables.insert(table_name, schema);
}
return Ok(Some(namespace));
Ok(Some(namespace))
}
}
#[async_trait]
impl TableRepo for PostgresCatalog {
async fn create_or_get(&self, name: &str, namespace_id: i32) -> Result<Table> {
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
let rec = sqlx::query_as::<_, Table>(
r#"
INSERT INTO table_name ( name, namespace_id )
@ -245,7 +209,7 @@ DO UPDATE SET name = table_name.name RETURNING *;
Ok(rec)
}
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Table>> {
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
let rec = sqlx::query_as::<_, Table>(
r#"
SELECT * FROM table_name
@ -266,7 +230,7 @@ impl ColumnRepo for PostgresCatalog {
async fn create_or_get(
&self,
name: &str,
table_id: i32,
table_id: TableId,
column_type: ColumnType,
) -> Result<Column> {
let ct = column_type as i16;
@ -303,7 +267,7 @@ DO UPDATE SET name = column_name.name RETURNING *;
Ok(rec)
}
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Column>> {
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
let rec = sqlx::query_as::<_, Column>(
r#"
SELECT column_name.* FROM table_name
@ -352,6 +316,14 @@ impl SequencerRepo for PostgresCatalog {
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#)
.bind(&topic.id) // $1
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
/// The error code returned by Postgres for a unique constraint violation.
@ -390,9 +362,6 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::{create_or_get_default_records, validate_or_insert_schema};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::parse_lines;
use std::env;
// Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
@ -432,195 +401,28 @@ mod tests {
}};
}
async fn setup_db() -> (Arc<PostgresCatalog>, KafkaTopic, QueryPool) {
async fn setup_db() -> Arc<PostgresCatalog> {
let dsn = std::env::var("DATABASE_URL").unwrap();
let pool = connect_catalog_store("test", SCHEMA_NAME, &dsn)
.await
.unwrap();
let postgres_catalog = Arc::new(PostgresCatalog { pool });
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &postgres_catalog)
.await
.unwrap();
(postgres_catalog, kafka_topic, query_pool)
Arc::new(
PostgresCatalog::connect("test", SCHEMA_NAME, &dsn)
.await
.unwrap(),
)
}
#[tokio::test]
async fn test_catalog() {
async fn test_repo() {
// If running an integration test on your laptop, this requires that you have Postgres
// running and that you've done the sqlx migrations. See the README in this crate for
// info to set it up.
maybe_skip_integration!();
let (postgres, kafka_topic, query_pool) = setup_db().await;
let postgres = setup_db().await;
clear_schema(&postgres.pool).await;
let namespace = NamespaceRepo::create(postgres.as_ref(), "foo", "inf", 0, 0).await;
assert!(matches!(
namespace.unwrap_err(),
Error::ForeignKeyViolation { source: _ }
));
let namespace = NamespaceRepo::create(
postgres.as_ref(),
"foo",
"inf",
kafka_topic.id,
query_pool.id,
)
.await
.unwrap();
assert!(namespace.id > 0);
assert_eq!(namespace.kafka_topic_id, kafka_topic.id);
assert_eq!(namespace.query_pool_id, query_pool.id);
let f = || Arc::clone(&postgres);
// test that we can create or get a table
let t = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id)
.await
.unwrap();
let tt = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id)
.await
.unwrap();
assert!(t.id > 0);
assert_eq!(t, tt);
// test that we can craete or get a column
let c = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64)
.await
.unwrap();
let cc = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64)
.await
.unwrap();
assert!(c.id > 0);
assert_eq!(c, cc);
// test that attempting to create an already defined column of a different type returns error
let err = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::F64)
.await
.expect_err("should error with wrong column type");
assert!(matches!(
err,
Error::ColumnTypeMismatch {
name: _,
existing: _,
new: _
}
));
// now test with a new namespace
let namespace = NamespaceRepo::create(
postgres.as_ref(),
"asdf",
"inf",
kafka_topic.id,
query_pool.id,
)
.await
.unwrap();
let data = r#"
m1,t1=a,t2=b f1=2i,f2=2.0 1
m1,t1=a f1=3i 2
m2,t3=b f1=true 1
"#;
// test that new schema gets returned
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let schema = Arc::new(NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
));
let new_schema = validate_or_insert_schema(lines, &schema, &postgres)
.await
.unwrap();
let new_schema = new_schema.unwrap();
// ensure new schema is in the db
let schema_from_db = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema_from_db);
// test that a new table will be created
let data = r#"
m1,t1=c f1=1i 2
new_measurement,t9=a f10=true 1
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema_from_db, &postgres)
.await
.unwrap()
.unwrap();
let new_table = new_schema.tables.get("new_measurement").unwrap();
assert_eq!(
ColumnType::Bool,
new_table.columns.get("f10").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
new_table.columns.get("t9").unwrap().column_type
);
let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
// test that a new column for an existing table will be created
// test that a new table will be created
let data = r#"
m1,new_tag=c new_field=1i 2
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema, &postgres)
.await
.unwrap()
.unwrap();
let table = new_schema.tables.get("m1").unwrap();
assert_eq!(
ColumnType::I64,
table.columns.get("new_field").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
table.columns.get("new_tag").unwrap().column_type
);
let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
}
#[tokio::test]
async fn test_sequencers() {
maybe_skip_integration!();
let (postgres, kafka_topic, _query_pool) = setup_db().await;
clear_schema(&postgres.pool).await;
// Create 10 sequencers
let created = (1..=10)
.map(|partition| {
SequencerRepo::create_or_get(postgres.as_ref(), &kafka_topic, partition)
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
(v.id, v)
})
.collect::<BTreeMap<_, _>>()
.await;
// List them and assert they match
let listed = SequencerRepo::list(postgres.as_ref())
.await
.expect("failed to list sequencers")
.into_iter()
.map(|v| (v.id, v))
.collect::<BTreeMap<_, _>>();
assert_eq!(created, listed);
crate::interface::test_helpers::test_repo(f).await;
}
async fn clear_schema(pool: &Pool<Postgres>) {