Merge branch 'main' into crepererum/dml_size

pull/24376/head
kodiakhq[bot] 2022-01-20 10:44:48 +00:00 committed by GitHub
commit 6c4d449db8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 2116 additions and 321 deletions

16
Cargo.lock generated
View File

@ -1853,6 +1853,19 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "ingester"
version = "0.1.0"
dependencies = [
"arrow",
"iox_catalog",
"mutable_batch",
"parking_lot",
"snafu",
"uuid",
"workspace-hack",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -1898,6 +1911,7 @@ dependencies = [
"snafu",
"sqlx",
"tokio",
"uuid",
"workspace-hack",
]
@ -4338,6 +4352,7 @@ dependencies = [
"thiserror",
"tokio-stream",
"url",
"uuid",
"whoami",
]
@ -5408,6 +5423,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-subscriber",
"uuid",
]
[[package]]

View File

@ -17,6 +17,7 @@ members = [
"influxdb_storage_client",
"influxdb_tsm",
"influxdb2_client",
"ingester",
"internal_types",
"iox_catalog",
"iox_data_generator",

14
ingester/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "ingester"
version = "0.1.0"
authors = ["Nga Tran <nga-tran@live.com>"]
edition = "2021"
[dependencies]
arrow = { version = "7.0", features = ["prettyprint"] }
iox_catalog = { path = "../iox_catalog" }
mutable_batch = { path = "../mutable_batch"}
parking_lot = "0.11.2"
snafu = "0.7"
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}

202
ingester/src/data.rs Normal file
View File

@ -0,0 +1,202 @@
//! Data for the lifecycle of the Ingeter
//!
use arrow::record_batch::RecordBatch;
use std::{collections::BTreeMap, sync::Arc};
use uuid::Uuid;
use crate::server::IngesterServer;
use iox_catalog::interface::{
KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber,
SequencerId, TableId, Tombstone,
};
use mutable_batch::MutableBatch;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Error while reading Topic {}", name))]
ReadTopic {
source: iox_catalog::interface::Error,
name: String,
},
#[snafu(display("Error while reading Kafka Partition id {}", id.get()))]
ReadSequencer {
source: iox_catalog::interface::Error,
id: KafkaPartition,
},
#[snafu(display(
"Sequencer record not found for kafka_topic_id {} and kafka_partition {}",
kafka_topic_id,
kafka_partition
))]
SequencerNotFound {
kafka_topic_id: KafkaTopicId,
kafka_partition: KafkaPartition,
},
}
/// A specialized `Error` for Ingester Data errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Ingester Data: a Mapp of Shard ID to its Data
#[derive(Default)]
pub struct Sequencers {
// This map gets set up on initialization of the ingester so it won't ever be modified.
// The content of each SequenceData will get changed when more namespaces and tables
// get ingested.
data: BTreeMap<SequencerId, Arc<SequencerData>>,
}
impl Sequencers {
/// One time initialize Sequencers of this Ingester
pub async fn initialize<T: RepoCollection + Send + Sync>(
ingester: &IngesterServer<'_, T>,
) -> Result<Self> {
// Get sequencer ids from the catalog
let sequencer_repro = ingester.iox_catalog.sequencer();
let mut sequencers = BTreeMap::default();
let topic = ingester.get_topic();
for shard in ingester.get_kafka_partitions() {
let sequencer = sequencer_repro
.get_by_topic_id_and_partition(topic.id, shard)
.await
.context(ReadSequencerSnafu { id: shard })?
.context(SequencerNotFoundSnafu {
kafka_topic_id: topic.id,
kafka_partition: shard,
})?;
// Create empty buffer for each sequencer
sequencers.insert(sequencer.id, Arc::new(SequencerData::default()));
}
Ok(Self { data: sequencers })
}
}
/// Data of a Shard
#[derive(Default)]
pub struct SequencerData {
// New namespaces can come in at any time so we need to be able to add new ones
namespaces: RwLock<BTreeMap<NamespaceId, Arc<NamespaceData>>>,
}
/// Data of a Namespace that belongs to a given Shard
#[derive(Default)]
pub struct NamespaceData {
tables: RwLock<BTreeMap<TableId, Arc<TableData>>>,
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Default)]
pub struct TableData {
// Map pf partition key to its data
partition_data: RwLock<BTreeMap<String, Arc<PartitionData>>>,
}
/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard
pub struct PartitionData {
id: PartitionId,
inner: RwLock<DataBuffer>,
}
/// Data of an IOx partition split into batches
/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐
/// │ Buffer │ │ Snapshots │ │ Persisting │
/// │ ┌───────────────────┐ │ │ │ │ │
/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │
/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
/// │ ┌───────────────────┐ │ │ │ │ │ │
/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │
/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │
/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │
/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘
#[derive(Default)]
pub struct DataBuffer {
/// Buffer of incoming writes
pub buffer: Vec<BufferBatch>,
/// Buffer of tombstones whose time range may overlap with this partition.
/// These tombstone first will be written into the Catalog and then here.
/// When a persist is called, these tombstones will be moved into the
/// PersistingBatch to get applied in those data.
pub deletes: Vec<Tombstone>,
/// Data in `buffer` will be moved to a `snapshot` when one of these happens:
/// . A background persist is called
/// . A read request from Querier
/// The `buffer` will be empty when this happens.
snapshots: Vec<Arc<SnapshotBatch>>,
/// When a persist is called, data in `buffer` will be moved to a `snapshot`
/// and then all `snapshots` will be moved to a `persisting`.
/// Both `buffer` and 'snaphots` will be empty when this happens.
persisting: Option<Arc<PersistingBatch>>,
// Extra Notes:
// . In MVP, we will only persist a set of sanpshots at a time.
// In later version, multiple perssiting operations may be happenning concurrently but
// their persisted info must be added into the Catalog in thier data
// ingesting order.
// . When a read request comes from a Querier, all data from `snaphots`
// and `persisting` must be sent to the Querier.
// . After the `persiting` data is persisted and successfully added
// into the Catalog, it will be removed from this Data Buffer.
// This data might be added into an extra cache to serve up to
// Queriers that may not have loaded the parquet files from object
// storage yet. But this will be decided after MVP.
}
/// BufferBatch is a MutauableBatch with its ingesting order, sequencer_number, that
/// helps the ingester keep the batches of data in thier ingesting order
pub struct BufferBatch {
/// Sequencer number of the ingesting data
pub sequencer_number: SequenceNumber,
/// Ingesting data
pub data: MutableBatch,
}
/// SnapshotBatch contains data of many contiguous BufferBatches
pub struct SnapshotBatch {
/// Min sequencer number of its comebined BufferBatches
pub min_sequencer_number: SequenceNumber,
/// Max sequencer number of its comebined BufferBatches
pub max_sequencer_number: SequenceNumber,
/// Data of its comebined BufferBatches kept in one RecordBatch
pub data: RecordBatch,
}
/// PersistingBatch contains all needed info and data for creating
/// a parquet file for given set of SnapshotBatches
pub struct PersistingBatch {
/// Sesquencer id of the data
pub sequencer_id: SequencerId,
/// Table id of the data
pub table_id: TableId,
/// Parittion Id of the data
pub partition_id: PartitionId,
/// Id of to-be-created parquet file of this data
pub object_store_id: Uuid,
/// data to be persisted
pub data: Vec<SnapshotBatch>,
/// delete predicates to be appied to the data
/// before perssiting
pub deletes: Vec<Tombstone>,
}

17
ingester/src/lib.rs Normal file
View File

@ -0,0 +1,17 @@
//! IOx ingester implementation.
//! Design doc: https://docs.google.com/document/d/14NlzBiWwn0H37QxnE0k3ybTU58SKyUZmdgYpVw6az0Q/edit#
//!
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#[allow(dead_code)]
pub mod data;
pub mod server;

54
ingester/src/server.rs Normal file
View File

@ -0,0 +1,54 @@
//! Ingester Server
//!
use std::sync::Arc;
use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection};
/// The [`IngesterServer`] manages the lifecycle and contains all state for
/// an `ingester` server instance.
pub struct IngesterServer<'a, T>
where
T: RepoCollection + Send + Sync,
{
/// Kafka Topic assigned to this ingester
kafka_topic: KafkaTopic,
/// Kafka Partitions (Shards) assigned to this INgester
kafka_partitions: Vec<KafkaPartition>,
/// Catalog of this ingester
pub iox_catalog: &'a Arc<T>,
}
impl<'a, T> IngesterServer<'a, T>
where
T: RepoCollection + Send + Sync,
{
/// Initialize the Ingester
pub fn new(topic: KafkaTopic, shard_ids: Vec<KafkaPartition>, catalog: &'a Arc<T>) -> Self {
Self {
kafka_topic: topic,
kafka_partitions: shard_ids,
iox_catalog: catalog,
}
}
/// Return a kafka topic
pub fn get_topic(&self) -> KafkaTopic {
self.kafka_topic.clone()
}
/// Return a kafka topic id
pub fn get_topic_id(&self) -> KafkaTopicId {
self.kafka_topic.id
}
/// Return a kafka topic name
pub fn get_topic_name(&self) -> String {
self.kafka_topic.name.clone()
}
/// Return Kafka Partitions
pub fn get_kafka_partitions(&self) -> Vec<KafkaPartition> {
self.kafka_partitions.clone()
}
}

View File

@ -10,10 +10,11 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
futures = "0.3"
observability_deps = { path = "../observability_deps" }
snafu = "0.7"
sqlx = { version = "0.5", features = [ "runtime-tokio-native-tls" , "postgres" ] }
sqlx = { version = "0.5", features = [ "runtime-tokio-native-tls" , "postgres", "uuid" ] }
tokio = { version = "1.13", features = ["full", "io-util", "macros", "parking_lot", "rt-multi-thread", "time"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
workspace-hack = { path = "../workspace-hack"}
uuid = { version = "0.8", features = ["v4"] }
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"

View File

@ -84,15 +84,15 @@ CREATE TABLE IF NOT EXISTS iox_catalog.parquet_file
id BIGINT GENERATED ALWAYS AS IDENTITY,
sequencer_id SMALLINT NOT NULL,
table_id INT NOT NULL,
partition_id INT NOT NULL,
file_location VARCHAR NOT NULL,
partition_id BIGINT NOT NULL,
object_store_id uuid NOT NULL,
min_sequence_number BIGINT,
max_sequence_number BIGINT,
min_time BIGINT,
max_time BIGINT,
to_delete BOOLEAN,
PRIMARY KEY (id),
CONSTRAINT parquet_location_unique UNIQUE (file_location)
CONSTRAINT parquet_location_unique UNIQUE (object_store_id)
);
CREATE TABLE IF NOT EXISTS iox_catalog.tombstone
@ -104,7 +104,8 @@ CREATE TABLE IF NOT EXISTS iox_catalog.tombstone
min_time BIGINT NOT NULL,
max_time BIGINT NOT NULL,
serialized_predicate TEXT NOT NULL,
PRIMARY KEY (id)
PRIMARY KEY (id),
CONSTRAINT tombstone_unique UNIQUE (table_id, sequencer_id, sequence_number)
);
CREATE TABLE IF NOT EXISTS iox_catalog.processed_tombstone

File diff suppressed because it is too large Load Diff

View File

@ -12,8 +12,8 @@
)]
use crate::interface::{
column_type_from_field, ColumnSchema, ColumnType, Error, KafkaTopic, NamespaceSchema,
QueryPool, RepoCollection, Result, Sequencer,
column_type_from_field, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic,
NamespaceSchema, QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId,
};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::ParsedLine;
@ -25,6 +25,7 @@ const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC;
const TIME_COLUMN: &str = "time";
pub mod interface;
pub mod mem;
pub mod postgres;
/// Given the lines of a write request and an in memory schema, this will validate the write
@ -41,9 +42,9 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
repo: &T,
) -> Result<Option<NamespaceSchema>> {
// table name to table_id
let mut new_tables: BTreeMap<String, i32> = BTreeMap::new();
let mut new_tables: BTreeMap<String, TableId> = BTreeMap::new();
// table_id to map of column name to column
let mut new_columns: BTreeMap<i32, BTreeMap<String, ColumnSchema>> = BTreeMap::new();
let mut new_columns: BTreeMap<TableId, BTreeMap<String, ColumnSchema>> = BTreeMap::new();
for line in &lines {
let table_name = line.series.measurement.as_str();
@ -175,7 +176,7 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
kafka_partition_count: i32,
repo: &T,
) -> Result<(KafkaTopic, QueryPool, BTreeMap<i16, Sequencer>)> {
) -> Result<(KafkaTopic, QueryPool, BTreeMap<SequencerId, Sequencer>)> {
let kafka_repo = repo.kafka_topic();
let query_repo = repo.query_pool();
let sequencer_repo = repo.sequencer();
@ -184,7 +185,7 @@ pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?;
let sequencers = (1..=kafka_partition_count)
.map(|partition| sequencer_repo.create_or_get(&kafka_topic, partition))
.map(|partition| sequencer_repo.create_or_get(&kafka_topic, KafkaPartition::new(partition)))
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
@ -195,3 +196,100 @@ pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
Ok((kafka_topic, query_pool, sequencers))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::interface::get_schema_by_name;
use crate::mem::MemCatalog;
use influxdb_line_protocol::parse_lines;
use std::sync::Arc;
#[tokio::test]
async fn test_validate_or_insert_schema() {
let repo = Arc::new(MemCatalog::new());
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap();
let namespace_name = "validate_schema";
// now test with a new namespace
let namespace = repo
.namespace()
.create(namespace_name, "inf", kafka_topic.id, query_pool.id)
.await
.unwrap();
let data = r#"
m1,t1=a,t2=b f1=2i,f2=2.0 1
m1,t1=a f1=3i 2
m2,t3=b f1=true 1
"#;
// test that new schema gets returned
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let schema = Arc::new(NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
));
let new_schema = validate_or_insert_schema(lines, &schema, &repo)
.await
.unwrap();
let new_schema = new_schema.unwrap();
// ensure new schema is in the db
let schema_from_db = get_schema_by_name(namespace_name, &repo)
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema_from_db);
// test that a new table will be created
let data = r#"
m1,t1=c f1=1i 2
new_measurement,t9=a f10=true 1
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema_from_db, &repo)
.await
.unwrap()
.unwrap();
let new_table = new_schema.tables.get("new_measurement").unwrap();
assert_eq!(
ColumnType::Bool,
new_table.columns.get("f10").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
new_table.columns.get("t9").unwrap().column_type
);
let schema = get_schema_by_name(namespace_name, &repo)
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
// test that a new column for an existing table will be created
// test that a new table will be created
let data = r#"
m1,new_tag=c new_field=1i 2
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema, &repo)
.await
.unwrap()
.unwrap();
let table = new_schema.tables.get("m1").unwrap();
assert_eq!(
ColumnType::I64,
table.columns.get("new_field").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
table.columns.get("new_tag").unwrap().column_type
);
let schema = get_schema_by_name(namespace_name, &repo)
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
}
}

498
iox_catalog/src/mem.rs Normal file
View File

@ -0,0 +1,498 @@
//! This module implements an in-memory implementation of the iox_catalog interface. It can be
//! used for testing or for an IOx designed to run without catalog persistence.
use crate::interface::{
Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId,
ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo,
RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
};
use async_trait::async_trait;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
/// the catalog interface.
#[derive(Default)]
pub struct MemCatalog {
collections: Mutex<MemCollections>,
}
impl MemCatalog {
/// return new initialized `MemCatalog`
pub fn new() -> Self {
Self::default()
}
}
impl std::fmt::Debug for MemCatalog {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let c = self.collections.lock().expect("mutex poisoned");
write!(f, "MemCatalog[ {:?} ]", c)
}
}
#[derive(Default, Debug)]
struct MemCollections {
kafka_topics: Vec<KafkaTopic>,
query_pools: Vec<QueryPool>,
namespaces: Vec<Namespace>,
tables: Vec<Table>,
columns: Vec<Column>,
sequencers: Vec<Sequencer>,
partitions: Vec<Partition>,
tombstones: Vec<Tombstone>,
parquet_files: Vec<ParquetFile>,
}
impl RepoCollection for Arc<MemCatalog> {
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send> {
Self::clone(self) as Arc<dyn KafkaTopicRepo + Sync + Send>
}
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send> {
Self::clone(self) as Arc<dyn QueryPoolRepo + Sync + Send>
}
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send> {
Self::clone(self) as Arc<dyn NamespaceRepo + Sync + Send>
}
fn table(&self) -> Arc<dyn TableRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TableRepo + Sync + Send>
}
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ColumnRepo + Sync + Send>
}
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
}
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
}
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ParquetFileRepo + Sync + Send>
}
}
#[async_trait]
impl KafkaTopicRepo for MemCatalog {
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let topic = match collections.kafka_topics.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let topic = KafkaTopic {
id: KafkaTopicId::new(collections.kafka_topics.len() as i32 + 1),
name: name.to_string(),
};
collections.kafka_topics.push(topic);
collections.kafka_topics.last().unwrap()
}
};
Ok(topic.clone())
}
async fn get_by_name(&self, name: &str) -> Result<Option<KafkaTopic>> {
let collections = self.collections.lock().expect("mutex poisoned");
let kafka_topic = collections
.kafka_topics
.iter()
.find(|t| t.name == name)
.cloned();
Ok(kafka_topic)
}
}
#[async_trait]
impl QueryPoolRepo for MemCatalog {
async fn create_or_get(&self, name: &str) -> Result<QueryPool> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let pool = match collections.query_pools.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let pool = QueryPool {
id: QueryPoolId::new(collections.query_pools.len() as i16 + 1),
name: name.to_string(),
};
collections.query_pools.push(pool);
collections.query_pools.last().unwrap()
}
};
Ok(pool.clone())
}
}
#[async_trait]
impl NamespaceRepo for MemCatalog {
async fn create(
&self,
name: &str,
retention_duration: &str,
kafka_topic_id: KafkaTopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace> {
let mut collections = self.collections.lock().expect("mutex poisoned");
if collections.namespaces.iter().any(|n| n.name == name) {
return Err(Error::NameExists {
name: name.to_string(),
});
}
let namespace = Namespace {
id: NamespaceId::new(collections.namespaces.len() as i32 + 1),
name: name.to_string(),
kafka_topic_id,
query_pool_id,
retention_duration: Some(retention_duration.to_string()),
};
collections.namespaces.push(namespace);
Ok(collections.namespaces.last().unwrap().clone())
}
async fn get_by_name(&self, name: &str) -> Result<Option<Namespace>> {
let collections = self.collections.lock().expect("mutex poisoned");
Ok(collections
.namespaces
.iter()
.find(|n| n.name == name)
.cloned())
}
}
#[async_trait]
impl TableRepo for MemCatalog {
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let table = match collections.tables.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let table = Table {
id: TableId::new(collections.tables.len() as i32 + 1),
namespace_id,
name: name.to_string(),
};
collections.tables.push(table);
collections.tables.last().unwrap()
}
};
Ok(table.clone())
}
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
let collections = self.collections.lock().expect("mutex poisoned");
let tables: Vec<_> = collections
.tables
.iter()
.filter(|t| t.namespace_id == namespace_id)
.cloned()
.collect();
Ok(tables)
}
}
#[async_trait]
impl ColumnRepo for MemCatalog {
async fn create_or_get(
&self,
name: &str,
table_id: TableId,
column_type: ColumnType,
) -> Result<Column> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let column = match collections
.columns
.iter()
.find(|t| t.name == name && t.table_id == table_id)
{
Some(c) => {
if column_type as i16 != c.column_type {
return Err(Error::ColumnTypeMismatch {
name: name.to_string(),
existing: ColumnType::try_from(c.column_type).unwrap().to_string(),
new: column_type.to_string(),
});
}
c
}
None => {
let column = Column {
id: ColumnId::new(collections.columns.len() as i32 + 1),
table_id,
name: name.to_string(),
column_type: column_type as i16,
};
collections.columns.push(column);
collections.columns.last().unwrap()
}
};
Ok(column.clone())
}
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
let mut columns = vec![];
let collections = self.collections.lock().expect("mutex poisoned");
for t in collections
.tables
.iter()
.filter(|t| t.namespace_id == namespace_id)
{
for c in collections.columns.iter().filter(|c| c.table_id == t.id) {
columns.push(c.clone());
}
}
Ok(columns)
}
}
#[async_trait]
impl SequencerRepo for MemCatalog {
async fn create_or_get(
&self,
topic: &KafkaTopic,
partition: KafkaPartition,
) -> Result<Sequencer> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let sequencer = match collections
.sequencers
.iter()
.find(|s| s.kafka_topic_id == topic.id && s.kafka_partition == partition)
{
Some(t) => t,
None => {
let sequencer = Sequencer {
id: SequencerId::new(collections.sequencers.len() as i16 + 1),
kafka_topic_id: topic.id,
kafka_partition: partition,
min_unpersisted_sequence_number: 0,
};
collections.sequencers.push(sequencer);
collections.sequencers.last().unwrap()
}
};
Ok(*sequencer)
}
async fn get_by_topic_id_and_partition(
&self,
topic_id: KafkaTopicId,
partition: KafkaPartition,
) -> Result<Option<Sequencer>> {
let collections = self.collections.lock().expect("mutex poisoned");
let sequencer = collections
.sequencers
.iter()
.find(|s| s.kafka_topic_id == topic_id && s.kafka_partition == partition)
.cloned();
Ok(sequencer)
}
async fn list(&self) -> Result<Vec<Sequencer>> {
let collections = self.collections.lock().expect("mutex poisoned");
Ok(collections.sequencers.clone())
}
async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
let collections = self.collections.lock().expect("mutex poisoned");
let sequencers: Vec<_> = collections
.sequencers
.iter()
.filter(|s| s.kafka_topic_id == topic.id)
.cloned()
.collect();
Ok(sequencers)
}
}
#[async_trait]
impl PartitionRepo for MemCatalog {
async fn create_or_get(
&self,
key: &str,
sequencer_id: SequencerId,
table_id: TableId,
) -> Result<Partition> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let partition = match collections.partitions.iter().find(|p| {
p.partition_key == key && p.sequencer_id == sequencer_id && p.table_id == table_id
}) {
Some(p) => p,
None => {
let p = Partition {
id: PartitionId::new(collections.partitions.len() as i64 + 1),
sequencer_id,
table_id,
partition_key: key.to_string(),
};
collections.partitions.push(p);
collections.partitions.last().unwrap()
}
};
Ok(partition.clone())
}
async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
let collections = self.collections.lock().expect("mutex poisoned");
let partitions: Vec<_> = collections
.partitions
.iter()
.filter(|p| p.sequencer_id == sequencer_id)
.cloned()
.collect();
Ok(partitions)
}
}
#[async_trait]
impl TombstoneRepo for MemCatalog {
async fn create_or_get(
&self,
table_id: TableId,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
min_time: Timestamp,
max_time: Timestamp,
predicate: &str,
) -> Result<Tombstone> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let tombstone = match collections.tombstones.iter().find(|t| {
t.table_id == table_id
&& t.sequencer_id == sequencer_id
&& t.sequence_number == sequence_number
}) {
Some(t) => t,
None => {
let t = Tombstone {
id: TombstoneId::new(collections.tombstones.len() as i64 + 1),
table_id,
sequencer_id,
sequence_number,
min_time,
max_time,
serialized_predicate: predicate.to_string(),
};
collections.tombstones.push(t);
collections.tombstones.last().unwrap()
}
};
Ok(tombstone.clone())
}
async fn list_tombstones_by_sequencer_greater_than(
&self,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<Tombstone>> {
let collections = self.collections.lock().expect("mutex poisoned");
let tombstones: Vec<_> = collections
.tombstones
.iter()
.filter(|t| t.sequencer_id == sequencer_id && t.sequence_number > sequence_number)
.cloned()
.collect();
Ok(tombstones)
}
}
#[async_trait]
impl ParquetFileRepo for MemCatalog {
async fn create(
&self,
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
object_store_id: Uuid,
min_sequence_number: SequenceNumber,
max_sequence_number: SequenceNumber,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<ParquetFile> {
let mut collections = self.collections.lock().expect("mutex poisoned");
if collections
.parquet_files
.iter()
.any(|f| f.object_store_id == object_store_id)
{
return Err(Error::FileExists { object_store_id });
}
let parquet_file = ParquetFile {
id: ParquetFileId::new(collections.parquet_files.len() as i64 + 1),
sequencer_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete: false,
};
collections.parquet_files.push(parquet_file);
Ok(*collections.parquet_files.last().unwrap())
}
async fn flag_for_delete(&self, id: ParquetFileId) -> Result<()> {
let mut collections = self.collections.lock().expect("mutex poisoned");
match collections.parquet_files.iter_mut().find(|p| p.id == id) {
Some(f) => f.to_delete = true,
None => return Err(Error::ParquetRecordNotFound { id }),
}
Ok(())
}
async fn list_by_sequencer_greater_than(
&self,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>> {
let collections = self.collections.lock().expect("mutex poisoned");
let files: Vec<_> = collections
.parquet_files
.iter()
.filter(|f| f.sequencer_id == sequencer_id && f.max_sequence_number > sequence_number)
.cloned()
.collect();
Ok(files)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mem_repo() {
let f = || Arc::new(MemCatalog::new());
crate::interface::test_helpers::test_repo(f).await;
}
}

View File

@ -1,17 +1,18 @@
//! A Postgres backed implementation of the Catalog
use crate::interface::{
Column, ColumnRepo, ColumnSchema, ColumnType, Error, KafkaTopic, KafkaTopicRepo, Namespace,
NamespaceRepo, NamespaceSchema, QueryPool, QueryPoolRepo, RepoCollection, Result, Sequencer,
SequencerRepo, Table, TableRepo, TableSchema,
Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId,
ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo,
RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
TableRepo, Timestamp, Tombstone, TombstoneRepo,
};
use async_trait::async_trait;
use observability_deps::tracing::info;
use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
const MAX_CONNECTIONS: u32 = 5;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
@ -19,41 +20,46 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
#[allow(dead_code)]
const SCHEMA_NAME: &str = "iox_catalog";
/// Connect to the catalog store.
pub async fn connect_catalog_store(
app_name: &'static str,
schema_name: &'static str,
dsn: &str,
) -> Result<Pool<Postgres>, sqlx::Error> {
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(MAX_CONNECTIONS)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.test_before_acquire(true)
.after_connect(move |c| {
Box::pin(async move {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
})
})
.connect(dsn)
.await?;
// Log a connection was successfully established and include the application
// name for cross-correlation between Conductor logs & database connections.
info!(application_name=%app_name, "connected to catalog store");
Ok(pool)
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
#[derive(Debug)]
pub struct PostgresCatalog {
pool: Pool<Postgres>,
}
struct PostgresCatalog {
pool: Pool<Postgres>,
impl PostgresCatalog {
/// Connect to the catalog store.
pub async fn connect(
app_name: &'static str,
schema_name: &'static str,
dsn: &str,
) -> Result<Self> {
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(MAX_CONNECTIONS)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.test_before_acquire(true)
.after_connect(move |c| {
Box::pin(async move {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
})
})
.connect(dsn)
.await
.map_err(|e| Error::SqlxError { source: e })?;
// Log a connection was successfully established and include the application
// name for cross-correlation between Conductor logs & database connections.
info!(application_name=%app_name, "connected to catalog store");
Ok(Self { pool })
}
}
impl RepoCollection for Arc<PostgresCatalog> {
@ -80,6 +86,18 @@ impl RepoCollection for Arc<PostgresCatalog> {
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
}
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
}
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ParquetFileRepo + Sync + Send>
}
}
#[async_trait]
@ -100,6 +118,25 @@ DO UPDATE SET name = kafka_topic.name RETURNING *;
Ok(rec)
}
async fn get_by_name(&self, name: &str) -> Result<Option<KafkaTopic>> {
let rec = sqlx::query_as::<_, KafkaTopic>(
r#"
SELECT * FROM kafka_topic WHERE name = $1;
"#,
)
.bind(&name) // $1
.fetch_one(&self.pool)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let kafka_topic = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(kafka_topic))
}
}
#[async_trait]
@ -128,9 +165,9 @@ impl NamespaceRepo for PostgresCatalog {
&self,
name: &str,
retention_duration: &str,
kafka_topic_id: i32,
query_pool_id: i16,
) -> Result<NamespaceSchema> {
kafka_topic_id: KafkaTopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id )
@ -156,11 +193,10 @@ RETURNING *
}
})?;
Ok(NamespaceSchema::new(rec.id, kafka_topic_id, query_pool_id))
Ok(rec)
}
async fn get_by_name(&self, name: &str) -> Result<Option<NamespaceSchema>> {
// TODO: maybe get all the data in a single call to Postgres?
async fn get_by_name(&self, name: &str) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT * FROM namespace WHERE name = $1;
@ -175,53 +211,14 @@ SELECT * FROM namespace WHERE name = $1;
}
let namespace = rec.map_err(|e| Error::SqlxError { source: e })?;
// get the columns first just in case someone else is creating schema while we're doing this.
let columns = ColumnRepo::list_by_namespace_id(self, namespace.id).await?;
let tables = TableRepo::list_by_namespace_id(self, namespace.id).await?;
let mut namespace = NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
);
let mut table_id_to_schema = BTreeMap::new();
for t in tables {
table_id_to_schema.insert(t.id, (t.name, TableSchema::new(t.id)));
}
for c in columns {
let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap();
match ColumnType::try_from(c.column_type) {
Ok(column_type) => {
t.columns.insert(
c.name,
ColumnSchema {
id: c.id,
column_type,
},
);
}
_ => {
return Err(Error::UnknownColumnType {
data_type: c.column_type,
name: c.name.to_string(),
});
}
}
}
for (_, (table_name, schema)) in table_id_to_schema {
namespace.tables.insert(table_name, schema);
}
return Ok(Some(namespace));
Ok(Some(namespace))
}
}
#[async_trait]
impl TableRepo for PostgresCatalog {
async fn create_or_get(&self, name: &str, namespace_id: i32) -> Result<Table> {
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
let rec = sqlx::query_as::<_, Table>(
r#"
INSERT INTO table_name ( name, namespace_id )
@ -245,7 +242,7 @@ DO UPDATE SET name = table_name.name RETURNING *;
Ok(rec)
}
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Table>> {
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
let rec = sqlx::query_as::<_, Table>(
r#"
SELECT * FROM table_name
@ -266,7 +263,7 @@ impl ColumnRepo for PostgresCatalog {
async fn create_or_get(
&self,
name: &str,
table_id: i32,
table_id: TableId,
column_type: ColumnType,
) -> Result<Column> {
let ct = column_type as i16;
@ -303,7 +300,7 @@ DO UPDATE SET name = column_name.name RETURNING *;
Ok(rec)
}
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Column>> {
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
let rec = sqlx::query_as::<_, Column>(
r#"
SELECT column_name.* FROM table_name
@ -322,7 +319,11 @@ WHERE table_name.namespace_id = $1;
#[async_trait]
impl SequencerRepo for PostgresCatalog {
async fn create_or_get(&self, topic: &KafkaTopic, partition: i32) -> Result<Sequencer> {
async fn create_or_get(
&self,
topic: &KafkaTopic,
partition: KafkaPartition,
) -> Result<Sequencer> {
sqlx::query_as::<_, Sequencer>(
r#"
INSERT INTO sequencer
@ -346,12 +347,206 @@ impl SequencerRepo for PostgresCatalog {
})
}
async fn get_by_topic_id_and_partition(
&self,
topic_id: KafkaTopicId,
partition: KafkaPartition,
) -> Result<Option<Sequencer>> {
let rec = sqlx::query_as::<_, Sequencer>(
r#"
SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2;
"#,
)
.bind(topic_id) // $1
.bind(partition) // $2
.fetch_one(&self.pool)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let sequencer = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(sequencer))
}
async fn list(&self) -> Result<Vec<Sequencer>> {
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#)
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#)
.bind(&topic.id) // $1
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]
impl PartitionRepo for PostgresCatalog {
async fn create_or_get(
&self,
key: &str,
sequencer_id: SequencerId,
table_id: TableId,
) -> Result<Partition> {
sqlx::query_as::<_, Partition>(
r#"
INSERT INTO partition
( partition_key, sequencer_id, table_id )
VALUES
( $1, $2, $3 )
ON CONFLICT ON CONSTRAINT partition_key_unique
DO UPDATE SET partition_key = partition.partition_key RETURNING *;
"#,
)
.bind(key) // $1
.bind(&sequencer_id) // $2
.bind(&table_id) // $3
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})
}
async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#)
.bind(&sequencer_id) // $1
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]
impl TombstoneRepo for PostgresCatalog {
async fn create_or_get(
&self,
table_id: TableId,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
min_time: Timestamp,
max_time: Timestamp,
predicate: &str,
) -> Result<Tombstone> {
sqlx::query_as::<_, Tombstone>(
r#"
INSERT INTO tombstone
( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate )
VALUES
( $1, $2, $3, $4, $5, $6 )
ON CONFLICT ON CONSTRAINT tombstone_unique
DO UPDATE SET table_id = tombstone.table_id RETURNING *;
"#,
)
.bind(&table_id) // $1
.bind(&sequencer_id) // $2
.bind(&sequence_number) // $3
.bind(&min_time) // $4
.bind(&max_time) // $5
.bind(predicate) // $6
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})
}
async fn list_tombstones_by_sequencer_greater_than(
&self,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<Tombstone>> {
sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#)
.bind(&sequencer_id) // $1
.bind(&sequence_number) // $2
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]
impl ParquetFileRepo for PostgresCatalog {
async fn create(
&self,
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
object_store_id: Uuid,
min_sequence_number: SequenceNumber,
max_sequence_number: SequenceNumber,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<ParquetFile> {
let rec = sqlx::query_as::<_, ParquetFile>(
r#"
INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, to_delete )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, false )
RETURNING *
"#,
)
.bind(sequencer_id) // $1
.bind(table_id) // $2
.bind(partition_id) // $3
.bind(object_store_id) // $4
.bind(min_sequence_number) // $5
.bind(max_sequence_number) // $6
.bind(min_time) // $7
.bind(max_time) // $8
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_unique_violation(&e) {
Error::FileExists {
object_store_id,
}
} else if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
Ok(rec)
}
async fn flag_for_delete(&self, id: ParquetFileId) -> Result<()> {
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = true WHERE id = $1;"#)
.bind(&id) // $1
.execute(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
}
async fn list_by_sequencer_greater_than(
&self,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(r#"SELECT * FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 ORDER BY id;"#)
.bind(&sequencer_id) // $1
.bind(&sequence_number) // $2
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
/// The error code returned by Postgres for a unique constraint violation.
@ -390,9 +585,6 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::{create_or_get_default_records, validate_or_insert_schema};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::parse_lines;
use std::env;
// Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
@ -432,202 +624,47 @@ mod tests {
}};
}
async fn setup_db() -> (Arc<PostgresCatalog>, KafkaTopic, QueryPool) {
async fn setup_db() -> Arc<PostgresCatalog> {
let dsn = std::env::var("DATABASE_URL").unwrap();
let pool = connect_catalog_store("test", SCHEMA_NAME, &dsn)
.await
.unwrap();
let postgres_catalog = Arc::new(PostgresCatalog { pool });
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &postgres_catalog)
.await
.unwrap();
(postgres_catalog, kafka_topic, query_pool)
Arc::new(
PostgresCatalog::connect("test", SCHEMA_NAME, &dsn)
.await
.unwrap(),
)
}
#[tokio::test]
async fn test_catalog() {
async fn test_repo() {
// If running an integration test on your laptop, this requires that you have Postgres
// running and that you've done the sqlx migrations. See the README in this crate for
// info to set it up.
maybe_skip_integration!();
let (postgres, kafka_topic, query_pool) = setup_db().await;
let postgres = setup_db().await;
clear_schema(&postgres.pool).await;
let namespace = NamespaceRepo::create(postgres.as_ref(), "foo", "inf", 0, 0).await;
assert!(matches!(
namespace.unwrap_err(),
Error::ForeignKeyViolation { source: _ }
));
let namespace = NamespaceRepo::create(
postgres.as_ref(),
"foo",
"inf",
kafka_topic.id,
query_pool.id,
)
.await
.unwrap();
assert!(namespace.id > 0);
assert_eq!(namespace.kafka_topic_id, kafka_topic.id);
assert_eq!(namespace.query_pool_id, query_pool.id);
let f = || Arc::clone(&postgres);
// test that we can create or get a table
let t = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id)
.await
.unwrap();
let tt = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id)
.await
.unwrap();
assert!(t.id > 0);
assert_eq!(t, tt);
// test that we can craete or get a column
let c = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64)
.await
.unwrap();
let cc = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64)
.await
.unwrap();
assert!(c.id > 0);
assert_eq!(c, cc);
// test that attempting to create an already defined column of a different type returns error
let err = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::F64)
.await
.expect_err("should error with wrong column type");
assert!(matches!(
err,
Error::ColumnTypeMismatch {
name: _,
existing: _,
new: _
}
));
// now test with a new namespace
let namespace = NamespaceRepo::create(
postgres.as_ref(),
"asdf",
"inf",
kafka_topic.id,
query_pool.id,
)
.await
.unwrap();
let data = r#"
m1,t1=a,t2=b f1=2i,f2=2.0 1
m1,t1=a f1=3i 2
m2,t3=b f1=true 1
"#;
// test that new schema gets returned
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let schema = Arc::new(NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
));
let new_schema = validate_or_insert_schema(lines, &schema, &postgres)
.await
.unwrap();
let new_schema = new_schema.unwrap();
// ensure new schema is in the db
let schema_from_db = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema_from_db);
// test that a new table will be created
let data = r#"
m1,t1=c f1=1i 2
new_measurement,t9=a f10=true 1
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema_from_db, &postgres)
.await
.unwrap()
.unwrap();
let new_table = new_schema.tables.get("new_measurement").unwrap();
assert_eq!(
ColumnType::Bool,
new_table.columns.get("f10").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
new_table.columns.get("t9").unwrap().column_type
);
let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
// test that a new column for an existing table will be created
// test that a new table will be created
let data = r#"
m1,new_tag=c new_field=1i 2
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema, &postgres)
.await
.unwrap()
.unwrap();
let table = new_schema.tables.get("m1").unwrap();
assert_eq!(
ColumnType::I64,
table.columns.get("new_field").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
table.columns.get("new_tag").unwrap().column_type
);
let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
}
#[tokio::test]
async fn test_sequencers() {
maybe_skip_integration!();
let (postgres, kafka_topic, _query_pool) = setup_db().await;
clear_schema(&postgres.pool).await;
// Create 10 sequencers
let created = (1..=10)
.map(|partition| {
SequencerRepo::create_or_get(postgres.as_ref(), &kafka_topic, partition)
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
(v.id, v)
})
.collect::<BTreeMap<_, _>>()
.await;
// List them and assert they match
let listed = SequencerRepo::list(postgres.as_ref())
.await
.expect("failed to list sequencers")
.into_iter()
.map(|v| (v.id, v))
.collect::<BTreeMap<_, _>>();
assert_eq!(created, listed);
crate::interface::test_helpers::test_repo(f).await;
}
async fn clear_schema(pool: &Pool<Postgres>) {
sqlx::query("delete from tombstone;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from parquet_file;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from column_name;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from partition;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from table_name;")
.execute(pool)
.await

View File

@ -53,6 +53,7 @@ tower = { version = "0.4", features = ["balance", "buffer", "discover", "futures
tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_debug", "std", "tracing-attributes"] }
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "lazy_static", "matchers", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log"] }
uuid = { version = "0.8", features = ["getrandom", "std", "v4"] }
[build-dependencies]
ahash = { version = "0.7", features = ["std"] }
@ -86,5 +87,6 @@ smallvec = { version = "1", default-features = false, features = ["union"] }
syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] }
tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
uuid = { version = "0.8", features = ["getrandom", "std", "v4"] }
### END HAKARI SECTION