diff --git a/Cargo.lock b/Cargo.lock index 02e85ba6b5..04a96ff666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1853,6 +1853,19 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "ingester" +version = "0.1.0" +dependencies = [ + "arrow", + "iox_catalog", + "mutable_batch", + "parking_lot", + "snafu", + "uuid", + "workspace-hack", +] + [[package]] name = "instant" version = "0.1.12" @@ -1898,6 +1911,7 @@ dependencies = [ "snafu", "sqlx", "tokio", + "uuid", "workspace-hack", ] @@ -4338,6 +4352,7 @@ dependencies = [ "thiserror", "tokio-stream", "url", + "uuid", "whoami", ] @@ -5408,6 +5423,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8612b307da..4c6cafda3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "influxdb_storage_client", "influxdb_tsm", "influxdb2_client", + "ingester", "internal_types", "iox_catalog", "iox_data_generator", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml new file mode 100644 index 0000000000..7e1d8f1719 --- /dev/null +++ b/ingester/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ingester" +version = "0.1.0" +authors = ["Nga Tran "] +edition = "2021" + +[dependencies] +arrow = { version = "7.0", features = ["prettyprint"] } +iox_catalog = { path = "../iox_catalog" } +mutable_batch = { path = "../mutable_batch"} +parking_lot = "0.11.2" +snafu = "0.7" +uuid = { version = "0.8", features = ["v4"] } +workspace-hack = { path = "../workspace-hack"} diff --git a/ingester/src/data.rs b/ingester/src/data.rs new file mode 100644 index 0000000000..b838faca54 --- /dev/null +++ b/ingester/src/data.rs @@ -0,0 +1,202 @@ +//! Data for the lifecycle of the Ingeter +//! + +use arrow::record_batch::RecordBatch; +use std::{collections::BTreeMap, sync::Arc}; +use uuid::Uuid; + +use crate::server::IngesterServer; +use iox_catalog::interface::{ + KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber, + SequencerId, TableId, Tombstone, +}; +use mutable_batch::MutableBatch; +use parking_lot::RwLock; +use snafu::{OptionExt, ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error while reading Topic {}", name))] + ReadTopic { + source: iox_catalog::interface::Error, + name: String, + }, + + #[snafu(display("Error while reading Kafka Partition id {}", id.get()))] + ReadSequencer { + source: iox_catalog::interface::Error, + id: KafkaPartition, + }, + + #[snafu(display( + "Sequencer record not found for kafka_topic_id {} and kafka_partition {}", + kafka_topic_id, + kafka_partition + ))] + SequencerNotFound { + kafka_topic_id: KafkaTopicId, + kafka_partition: KafkaPartition, + }, +} + +/// A specialized `Error` for Ingester Data errors +pub type Result = std::result::Result; + +/// Ingester Data: a Mapp of Shard ID to its Data +#[derive(Default)] +pub struct Sequencers { + // This map gets set up on initialization of the ingester so it won't ever be modified. + // The content of each SequenceData will get changed when more namespaces and tables + // get ingested. + data: BTreeMap>, +} + +impl Sequencers { + /// One time initialize Sequencers of this Ingester + pub async fn initialize( + ingester: &IngesterServer<'_, T>, + ) -> Result { + // Get sequencer ids from the catalog + let sequencer_repro = ingester.iox_catalog.sequencer(); + let mut sequencers = BTreeMap::default(); + let topic = ingester.get_topic(); + for shard in ingester.get_kafka_partitions() { + let sequencer = sequencer_repro + .get_by_topic_id_and_partition(topic.id, shard) + .await + .context(ReadSequencerSnafu { id: shard })? + .context(SequencerNotFoundSnafu { + kafka_topic_id: topic.id, + kafka_partition: shard, + })?; + // Create empty buffer for each sequencer + sequencers.insert(sequencer.id, Arc::new(SequencerData::default())); + } + + Ok(Self { data: sequencers }) + } +} + +/// Data of a Shard +#[derive(Default)] +pub struct SequencerData { + // New namespaces can come in at any time so we need to be able to add new ones + namespaces: RwLock>>, +} + +/// Data of a Namespace that belongs to a given Shard +#[derive(Default)] +pub struct NamespaceData { + tables: RwLock>>, +} + +/// Data of a Table in a given Namesapce that belongs to a given Shard +#[derive(Default)] +pub struct TableData { + // Map pf partition key to its data + partition_data: RwLock>>, +} + +/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard +pub struct PartitionData { + id: PartitionId, + inner: RwLock, +} + +/// Data of an IOx partition split into batches +/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐ +/// │ Buffer │ │ Snapshots │ │ Persisting │ +/// │ ┌───────────────────┐ │ │ │ │ │ +/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ +/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │ +/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ +/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ +/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ +/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ +/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ +/// │ ┌───────────────────┐ │ │ │ │ │ │ +/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │ +/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │ +/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ +/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │ +/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ +/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ +/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ +/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ +/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ +/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘ +#[derive(Default)] +pub struct DataBuffer { + /// Buffer of incoming writes + pub buffer: Vec, + + /// Buffer of tombstones whose time range may overlap with this partition. + /// These tombstone first will be written into the Catalog and then here. + /// When a persist is called, these tombstones will be moved into the + /// PersistingBatch to get applied in those data. + pub deletes: Vec, + + /// Data in `buffer` will be moved to a `snapshot` when one of these happens: + /// . A background persist is called + /// . A read request from Querier + /// The `buffer` will be empty when this happens. + snapshots: Vec>, + /// When a persist is called, data in `buffer` will be moved to a `snapshot` + /// and then all `snapshots` will be moved to a `persisting`. + /// Both `buffer` and 'snaphots` will be empty when this happens. + persisting: Option>, + // Extra Notes: + // . In MVP, we will only persist a set of sanpshots at a time. + // In later version, multiple perssiting operations may be happenning concurrently but + // their persisted info must be added into the Catalog in thier data + // ingesting order. + // . When a read request comes from a Querier, all data from `snaphots` + // and `persisting` must be sent to the Querier. + // . After the `persiting` data is persisted and successfully added + // into the Catalog, it will be removed from this Data Buffer. + // This data might be added into an extra cache to serve up to + // Queriers that may not have loaded the parquet files from object + // storage yet. But this will be decided after MVP. +} +/// BufferBatch is a MutauableBatch with its ingesting order, sequencer_number, that +/// helps the ingester keep the batches of data in thier ingesting order +pub struct BufferBatch { + /// Sequencer number of the ingesting data + pub sequencer_number: SequenceNumber, + /// Ingesting data + pub data: MutableBatch, +} + +/// SnapshotBatch contains data of many contiguous BufferBatches +pub struct SnapshotBatch { + /// Min sequencer number of its comebined BufferBatches + pub min_sequencer_number: SequenceNumber, + /// Max sequencer number of its comebined BufferBatches + pub max_sequencer_number: SequenceNumber, + /// Data of its comebined BufferBatches kept in one RecordBatch + pub data: RecordBatch, +} + +/// PersistingBatch contains all needed info and data for creating +/// a parquet file for given set of SnapshotBatches +pub struct PersistingBatch { + /// Sesquencer id of the data + pub sequencer_id: SequencerId, + + /// Table id of the data + pub table_id: TableId, + + /// Parittion Id of the data + pub partition_id: PartitionId, + + /// Id of to-be-created parquet file of this data + pub object_store_id: Uuid, + + /// data to be persisted + pub data: Vec, + + /// delete predicates to be appied to the data + /// before perssiting + pub deletes: Vec, +} diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs new file mode 100644 index 0000000000..31bc719a49 --- /dev/null +++ b/ingester/src/lib.rs @@ -0,0 +1,17 @@ +//! IOx ingester implementation. +//! Design doc: https://docs.google.com/document/d/14NlzBiWwn0H37QxnE0k3ybTU58SKyUZmdgYpVw6az0Q/edit# +//! + +#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] +#![warn( + missing_copy_implementations, + missing_docs, + clippy::explicit_iter_loop, + clippy::future_not_send, + clippy::use_self, + clippy::clone_on_ref_ptr +)] + +#[allow(dead_code)] +pub mod data; +pub mod server; diff --git a/ingester/src/server.rs b/ingester/src/server.rs new file mode 100644 index 0000000000..11ce6dc553 --- /dev/null +++ b/ingester/src/server.rs @@ -0,0 +1,54 @@ +//! Ingester Server +//! + +use std::sync::Arc; + +use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection}; + +/// The [`IngesterServer`] manages the lifecycle and contains all state for +/// an `ingester` server instance. +pub struct IngesterServer<'a, T> +where + T: RepoCollection + Send + Sync, +{ + /// Kafka Topic assigned to this ingester + kafka_topic: KafkaTopic, + /// Kafka Partitions (Shards) assigned to this INgester + kafka_partitions: Vec, + /// Catalog of this ingester + pub iox_catalog: &'a Arc, +} + +impl<'a, T> IngesterServer<'a, T> +where + T: RepoCollection + Send + Sync, +{ + /// Initialize the Ingester + pub fn new(topic: KafkaTopic, shard_ids: Vec, catalog: &'a Arc) -> Self { + Self { + kafka_topic: topic, + kafka_partitions: shard_ids, + iox_catalog: catalog, + } + } + + /// Return a kafka topic + pub fn get_topic(&self) -> KafkaTopic { + self.kafka_topic.clone() + } + + /// Return a kafka topic id + pub fn get_topic_id(&self) -> KafkaTopicId { + self.kafka_topic.id + } + + /// Return a kafka topic name + pub fn get_topic_name(&self) -> String { + self.kafka_topic.name.clone() + } + + /// Return Kafka Partitions + pub fn get_kafka_partitions(&self) -> Vec { + self.kafka_partitions.clone() + } +} diff --git a/iox_catalog/Cargo.toml b/iox_catalog/Cargo.toml index be3c2ea82f..50d63d10f3 100644 --- a/iox_catalog/Cargo.toml +++ b/iox_catalog/Cargo.toml @@ -10,10 +10,11 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] } futures = "0.3" observability_deps = { path = "../observability_deps" } snafu = "0.7" -sqlx = { version = "0.5", features = [ "runtime-tokio-native-tls" , "postgres" ] } +sqlx = { version = "0.5", features = [ "runtime-tokio-native-tls" , "postgres", "uuid" ] } tokio = { version = "1.13", features = ["full", "io-util", "macros", "parking_lot", "rt-multi-thread", "time"] } influxdb_line_protocol = { path = "../influxdb_line_protocol" } workspace-hack = { path = "../workspace-hack"} +uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] # In alphabetical order dotenv = "0.15.0" diff --git a/iox_catalog/migrations/20211229171744_initial_schema.sql b/iox_catalog/migrations/20211229171744_initial_schema.sql index 16fe51b09f..6c8606ec73 100644 --- a/iox_catalog/migrations/20211229171744_initial_schema.sql +++ b/iox_catalog/migrations/20211229171744_initial_schema.sql @@ -84,15 +84,15 @@ CREATE TABLE IF NOT EXISTS iox_catalog.parquet_file id BIGINT GENERATED ALWAYS AS IDENTITY, sequencer_id SMALLINT NOT NULL, table_id INT NOT NULL, - partition_id INT NOT NULL, - file_location VARCHAR NOT NULL, + partition_id BIGINT NOT NULL, + object_store_id uuid NOT NULL, min_sequence_number BIGINT, max_sequence_number BIGINT, min_time BIGINT, max_time BIGINT, to_delete BOOLEAN, PRIMARY KEY (id), - CONSTRAINT parquet_location_unique UNIQUE (file_location) + CONSTRAINT parquet_location_unique UNIQUE (object_store_id) ); CREATE TABLE IF NOT EXISTS iox_catalog.tombstone @@ -104,7 +104,8 @@ CREATE TABLE IF NOT EXISTS iox_catalog.tombstone min_time BIGINT NOT NULL, max_time BIGINT NOT NULL, serialized_predicate TEXT NOT NULL, - PRIMARY KEY (id) + PRIMARY KEY (id), + CONSTRAINT tombstone_unique UNIQUE (table_id, sequencer_id, sequence_number) ); CREATE TABLE IF NOT EXISTS iox_catalog.processed_tombstone diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index e3e122fe0f..d72e91a4ee 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -2,11 +2,12 @@ use async_trait::async_trait; use influxdb_line_protocol::FieldValue; -use snafu::Snafu; +use snafu::{OptionExt, Snafu}; use std::collections::BTreeMap; use std::convert::TryFrom; use std::fmt::Formatter; use std::sync::Arc; +use uuid::Uuid; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -33,11 +34,219 @@ pub enum Error { name ))] UnknownColumnType { data_type: i16, name: String }, + + #[snafu(display("namespace {} not found", name))] + NamespaceNotFound { name: String }, + + #[snafu(display("parquet file with object_store_id {} already exists", object_store_id))] + FileExists { object_store_id: Uuid }, + + #[snafu(display("parquet_file record {} not found", id))] + ParquetRecordNotFound { id: ParquetFileId }, } /// A specialized `Error` for Catalog errors pub type Result = std::result::Result; +/// Unique ID for a `Namespace` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct NamespaceId(i32); + +#[allow(missing_docs)] +impl NamespaceId { + pub fn new(v: i32) -> Self { + Self(v) + } + pub fn get(&self) -> i32 { + self.0 + } +} + +/// Unique ID for a `KafkaTopic` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct KafkaTopicId(i32); + +#[allow(missing_docs)] +impl KafkaTopicId { + pub fn new(v: i32) -> Self { + Self(v) + } + pub fn get(&self) -> i32 { + self.0 + } +} + +impl std::fmt::Display for KafkaTopicId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Unique ID for a `QueryPool` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct QueryPoolId(i16); + +#[allow(missing_docs)] +impl QueryPoolId { + pub fn new(v: i16) -> Self { + Self(v) + } + pub fn get(&self) -> i16 { + self.0 + } +} + +/// Unique ID for a `Table` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct TableId(i32); + +#[allow(missing_docs)] +impl TableId { + pub fn new(v: i32) -> Self { + Self(v) + } + pub fn get(&self) -> i32 { + self.0 + } +} + +/// Unique ID for a `Column` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct ColumnId(i32); + +#[allow(missing_docs)] +impl ColumnId { + pub fn new(v: i32) -> Self { + Self(v) + } + pub fn get(&self) -> i32 { + self.0 + } +} + +/// Unique ID for a `Sequencer` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct SequencerId(i16); + +#[allow(missing_docs)] +impl SequencerId { + pub fn new(v: i16) -> Self { + Self(v) + } + pub fn get(&self) -> i16 { + self.0 + } +} + +/// The kafka partition identifier. This is in the actual Kafka cluster. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct KafkaPartition(i32); + +#[allow(missing_docs)] +impl KafkaPartition { + pub fn new(v: i32) -> Self { + Self(v) + } + pub fn get(&self) -> i32 { + self.0 + } +} + +impl std::fmt::Display for KafkaPartition { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Unique ID for a `Partition` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct PartitionId(i64); + +#[allow(missing_docs)] +impl PartitionId { + pub fn new(v: i64) -> Self { + Self(v) + } + pub fn get(&self) -> i64 { + self.0 + } +} + +/// Unique ID for a `Tombstone` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct TombstoneId(i64); + +#[allow(missing_docs)] +impl TombstoneId { + pub fn new(v: i64) -> Self { + Self(v) + } + pub fn get(&self) -> i64 { + self.0 + } +} + +/// A sequence number from a `Sequencer` (kafka partition) +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct SequenceNumber(i64); + +#[allow(missing_docs)] +impl SequenceNumber { + pub fn new(v: i64) -> Self { + Self(v) + } + pub fn get(&self) -> i64 { + self.0 + } +} + +/// A time in nanoseconds from epoch +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct Timestamp(i64); + +#[allow(missing_docs)] +impl Timestamp { + pub fn new(v: i64) -> Self { + Self(v) + } + pub fn get(&self) -> i64 { + self.0 + } +} + +/// Unique ID for a `ParquetFile` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct ParquetFileId(i64); + +#[allow(missing_docs)] +impl ParquetFileId { + pub fn new(v: i64) -> Self { + Self(v) + } + pub fn get(&self) -> i64 { + self.0 + } +} + +impl std::fmt::Display for ParquetFileId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // Use `self.number` to refer to each positional data point. + write!(f, "{}", self.0) + } +} + /// Container that can return repos for each of the catalog data types. #[async_trait] pub trait RepoCollection { @@ -53,6 +262,12 @@ pub trait RepoCollection { fn column(&self) -> Arc; /// repo for sequencers fn sequencer(&self) -> Arc; + /// repo for partitions + fn partition(&self) -> Arc; + /// repo for tombstones + fn tombstone(&self) -> Arc; + /// repo for parquet_files + fn parquet_file(&self) -> Arc; } /// Functions for working with Kafka topics in the catalog. @@ -60,6 +275,9 @@ pub trait RepoCollection { pub trait KafkaTopicRepo { /// Creates the kafka topic in the catalog or gets the existing record by name. async fn create_or_get(&self, name: &str) -> Result; + + /// Gets the kafka topic by its unique name + async fn get_by_name(&self, name: &str) -> Result>; } /// Functions for working with query pools in the catalog. @@ -72,28 +290,28 @@ pub trait QueryPoolRepo { /// Functions for working with namespaces in the catalog #[async_trait] pub trait NamespaceRepo { - /// Creates the namespace in the catalog, or get the existing record by name. Then - /// constructs a namespace schema with all tables and columns under the namespace. + /// Creates the namespace in the catalog. If one by the same name already exists, an + /// error is returned. async fn create( &self, name: &str, retention_duration: &str, - kafka_topic_id: i32, - query_pool_id: i16, - ) -> Result; + kafka_topic_id: KafkaTopicId, + query_pool_id: QueryPoolId, + ) -> Result; - /// Gets the namespace schema including all tables and columns. - async fn get_by_name(&self, name: &str) -> Result>; + /// Gets the namespace by its unique name. + async fn get_by_name(&self, name: &str) -> Result>; } /// Functions for working with tables in the catalog #[async_trait] pub trait TableRepo { /// Creates the table in the catalog or get the existing record by name. - async fn create_or_get(&self, name: &str, namespace_id: i32) -> Result; + async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result
; /// Lists all tables in the catalog for the given namespace id. - async fn list_by_namespace_id(&self, namespace_id: i32) -> Result>; + async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result>; } /// Functions for working with columns in the catalog @@ -105,74 +323,160 @@ pub trait ColumnRepo { async fn create_or_get( &self, name: &str, - table_id: i32, + table_id: TableId, column_type: ColumnType, ) -> Result; /// Lists all columns in the passed in namespace id. - async fn list_by_namespace_id(&self, namespace_id: i32) -> Result>; + async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result>; } /// Functions for working with sequencers in the catalog #[async_trait] pub trait SequencerRepo { /// create a sequencer record for the kafka topic and partition or return the existing record - async fn create_or_get(&self, topic: &KafkaTopic, partition: i32) -> Result; + async fn create_or_get( + &self, + topic: &KafkaTopic, + partition: KafkaPartition, + ) -> Result; + + /// get the sequencer record by `KafkaTopicId` and `KafkaPartition` + async fn get_by_topic_id_and_partition( + &self, + topic_id: KafkaTopicId, + partition: KafkaPartition, + ) -> Result>; /// list all sequencers async fn list(&self) -> Result>; + + /// list all sequencers for a given kafka topic + async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result>; +} + +/// Functions for working with IOx partitions in the catalog. Note that these are how +/// IOx splits up data within a database, which is differenet than Kafka partitions. +#[async_trait] +pub trait PartitionRepo { + /// create or get a partition record for the given partition key, sequencer and table + async fn create_or_get( + &self, + key: &str, + sequencer_id: SequencerId, + table_id: TableId, + ) -> Result; + + /// return partitions for a given sequencer + async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result>; +} + +/// Functions for working with tombstones in the catalog +#[async_trait] +pub trait TombstoneRepo { + /// create or get a tombstone + async fn create_or_get( + &self, + table_id: TableId, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, + predicate: &str, + ) -> Result; + + /// return all tombstones for the sequencer with a sequence number greater than that + /// passed in. This will be used by the ingester on startup to see what tombstones + /// might have to be applied to data that is read from the write buffer. + async fn list_tombstones_by_sequencer_greater_than( + &self, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + ) -> Result>; +} + +/// Functions for working with parquet file pointers in the catalog +#[async_trait] +pub trait ParquetFileRepo { + /// create the parquet file + #[allow(clippy::too_many_arguments)] + async fn create( + &self, + sequencer_id: SequencerId, + table_id: TableId, + partition_id: PartitionId, + object_store_id: Uuid, + min_sequence_number: SequenceNumber, + max_sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, + ) -> Result; + + /// Flag the parquet file for deletion + async fn flag_for_delete(&self, id: ParquetFileId) -> Result<()>; + + /// Get all parquet files for a sequencer with a max_sequence_number greater than the + /// one passed in. The ingester will use this on startup to see which files were persisted + /// that are greater than its min_unpersisted_number so that it can discard any data in + /// these partitions on replay. + async fn list_by_sequencer_greater_than( + &self, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + ) -> Result>; } /// Data object for a kafka topic -#[derive(Debug, Eq, PartialEq, sqlx::FromRow)] +#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)] pub struct KafkaTopic { /// The id of the topic - pub id: i32, + pub id: KafkaTopicId, /// The unique name of the topic pub name: String, } /// Data object for a query pool -#[derive(Debug, Eq, PartialEq, sqlx::FromRow)] +#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)] pub struct QueryPool { /// The id of the pool - pub id: i16, + pub id: QueryPoolId, /// The unique name of the pool pub name: String, } /// Data object for a namespace -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)] pub struct Namespace { /// The id of the namespace - pub id: i32, + pub id: NamespaceId, /// The unique name of the namespace pub name: String, /// The retention duration as a string. 'inf' or not present represents infinite duration (i.e. never drop data). #[sqlx(default)] pub retention_duration: Option, /// The kafka topic that writes to this namespace will land in - pub kafka_topic_id: i32, + pub kafka_topic_id: KafkaTopicId, /// The query pool assigned to answer queries for this namespace - pub query_pool_id: i16, + pub query_pool_id: QueryPoolId, } -/// Schema collection for a namespace +/// Schema collection for a namespace. This is an in-memory object useful for a schema +/// cache. #[derive(Debug, Clone, Eq, PartialEq)] pub struct NamespaceSchema { /// the namespace id - pub id: i32, + pub id: NamespaceId, /// the kafka topic this namespace gets data written to - pub kafka_topic_id: i32, + pub kafka_topic_id: KafkaTopicId, /// the query pool assigned to answer queries for this namespace - pub query_pool_id: i16, + pub query_pool_id: QueryPoolId, /// the tables in the namespace by name pub tables: BTreeMap, } impl NamespaceSchema { /// Create a new `NamespaceSchema` - pub fn new(id: i32, kafka_topic_id: i32, query_pool_id: i16) -> Self { + pub fn new(id: NamespaceId, kafka_topic_id: KafkaTopicId, query_pool_id: QueryPoolId) -> Self { Self { id, tables: BTreeMap::new(), @@ -186,8 +490,8 @@ impl NamespaceSchema { /// method takes them in to add them to the schema. pub fn add_tables_and_columns( &mut self, - new_tables: BTreeMap, - new_columns: BTreeMap>, + new_tables: BTreeMap, + new_columns: BTreeMap>, ) { for (table_name, table_id) in new_tables { self.tables @@ -203,7 +507,7 @@ impl NamespaceSchema { } } - fn get_table_mut(&mut self, table_id: i32) -> Option<&mut TableSchema> { + fn get_table_mut(&mut self, table_id: TableId) -> Option<&mut TableSchema> { for table in self.tables.values_mut() { if table.id == table_id { return Some(table); @@ -214,13 +518,70 @@ impl NamespaceSchema { } } +/// Gets the namespace schema including all tables and columns. +pub async fn get_schema_by_name( + name: &str, + repo: &T, +) -> Result> { + let namespace_repo = repo.namespace(); + let table_repo = repo.table(); + let column_repo = repo.column(); + + let namespace = namespace_repo + .get_by_name(name) + .await? + .context(NamespaceNotFoundSnafu { name })?; + + // get the columns first just in case someone else is creating schema while we're doing this. + let columns = column_repo.list_by_namespace_id(namespace.id).await?; + let tables = table_repo.list_by_namespace_id(namespace.id).await?; + + let mut namespace = NamespaceSchema::new( + namespace.id, + namespace.kafka_topic_id, + namespace.query_pool_id, + ); + + let mut table_id_to_schema = BTreeMap::new(); + for t in tables { + table_id_to_schema.insert(t.id, (t.name, TableSchema::new(t.id))); + } + + for c in columns { + let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap(); + match ColumnType::try_from(c.column_type) { + Ok(column_type) => { + t.columns.insert( + c.name, + ColumnSchema { + id: c.id, + column_type, + }, + ); + } + _ => { + return Err(Error::UnknownColumnType { + data_type: c.column_type, + name: c.name.to_string(), + }); + } + } + } + + for (_, (table_name, schema)) in table_id_to_schema { + namespace.tables.insert(table_name, schema); + } + + Ok(Some(namespace)) +} + /// Data object for a table -#[derive(Debug, sqlx::FromRow, Eq, PartialEq)] +#[derive(Debug, Clone, sqlx::FromRow, Eq, PartialEq)] pub struct Table { /// The id of the table - pub id: i32, + pub id: TableId, /// The namespace id that the table is in - pub namespace_id: i32, + pub namespace_id: NamespaceId, /// The name of the table, which is unique within the associated namespace pub name: String, } @@ -229,14 +590,14 @@ pub struct Table { #[derive(Debug, Clone, Eq, PartialEq)] pub struct TableSchema { /// the table id - pub id: i32, + pub id: TableId, /// the table's columns by their name pub columns: BTreeMap, } impl TableSchema { /// Initialize new `TableSchema` - pub fn new(id: i32) -> Self { + pub fn new(id: TableId) -> Self { Self { id, columns: BTreeMap::new(), @@ -252,12 +613,12 @@ impl TableSchema { } /// Data object for a column -#[derive(Debug, sqlx::FromRow, Eq, PartialEq)] +#[derive(Debug, Clone, sqlx::FromRow, Eq, PartialEq)] pub struct Column { /// the column id - pub id: i32, + pub id: ColumnId, /// the table id the column is in - pub table_id: i32, + pub table_id: TableId, /// the name of the column, which is unique in the table pub name: String, /// the logical type of the column @@ -286,7 +647,7 @@ impl Column { #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub struct ColumnSchema { /// the column id - pub id: i32, + pub id: ColumnId, /// the column type pub column_type: ColumnType, } @@ -379,14 +740,507 @@ pub fn column_type_from_field(field_value: &FieldValue) -> ColumnType { #[derive(Debug, Copy, Clone, PartialEq, sqlx::FromRow)] pub struct Sequencer { /// the id of the sequencer - pub id: i16, + pub id: SequencerId, /// the topic the sequencer is reading from - pub kafka_topic_id: i32, + pub kafka_topic_id: KafkaTopicId, /// the kafka partition the sequencer is reading from - pub kafka_partition: i32, + pub kafka_partition: KafkaPartition, /// The minimum unpersisted sequence number. Because different tables /// can be persisted at different times, it is possible some data has been persisted /// with a higher sequence number than this. However, all data with a sequence number /// lower than this must have been persisted to Parquet. pub min_unpersisted_sequence_number: i64, } + +/// Data object for a partition. The combination of sequencer, table and key are unique (i.e. only one record can exist for each combo) +#[derive(Debug, Clone, PartialEq, sqlx::FromRow)] +pub struct Partition { + /// the id of the partition + pub id: PartitionId, + /// the sequencer the data in the partition arrived from + pub sequencer_id: SequencerId, + /// the table the partition is under + pub table_id: TableId, + /// the string key of the partition + pub partition_key: String, +} + +/// Data object for a tombstone. +#[derive(Debug, Clone, PartialEq, sqlx::FromRow)] +pub struct Tombstone { + /// the id of the tombstone + pub id: TombstoneId, + /// the table the tombstone is associated with + pub table_id: TableId, + /// the sequencer the tombstone was sent through + pub sequencer_id: SequencerId, + /// the sequence nubmer assigned to the tombstone from the sequencer + pub sequence_number: SequenceNumber, + /// the min time (inclusive) that the delete applies to + pub min_time: Timestamp, + /// the max time (exclusive) that the delete applies to + pub max_time: Timestamp, + /// the full delete predicate + pub serialized_predicate: String, +} + +/// Data for a parquet file reference in the catalog. +#[derive(Debug, Copy, Clone, PartialEq, sqlx::FromRow)] +pub struct ParquetFile { + /// the id of the file in the catalog + pub id: ParquetFileId, + /// the sequencer that sequenced writes that went into this file + pub sequencer_id: SequencerId, + /// the table + pub table_id: TableId, + /// the partition + pub partition_id: PartitionId, + /// the uuid used in the object store path for this file + pub object_store_id: Uuid, + /// the minimum sequence number from a record in this file + pub min_sequence_number: SequenceNumber, + /// the maximum sequence number from a record in this file + pub max_sequence_number: SequenceNumber, + /// the min timestamp of data in this file + pub min_time: Timestamp, + /// the max timestamp of data in this file + pub max_time: Timestamp, + /// flag to mark that this file should be deleted from object storage + pub to_delete: bool, +} + +#[cfg(test)] +pub(crate) mod test_helpers { + use super::*; + use futures::{stream::FuturesOrdered, StreamExt}; + + pub(crate) async fn test_repo(new_repo: F) + where + T: RepoCollection + Send + Sync, + F: Fn() -> T + Send + Sync, + { + test_kafka_topic(&new_repo()).await; + test_query_pool(&new_repo()).await; + test_namespace(&new_repo()).await; + test_table(&new_repo()).await; + test_column(&new_repo()).await; + test_sequencer(&new_repo()).await; + test_partition(&new_repo()).await; + test_tombstone(&new_repo()).await; + test_parquet_file(&new_repo()).await; + } + + async fn test_kafka_topic(repo: &T) { + let kafka_repo = repo.kafka_topic(); + let k = kafka_repo.create_or_get("foo").await.unwrap(); + assert!(k.id > KafkaTopicId::new(0)); + assert_eq!(k.name, "foo"); + let k2 = kafka_repo.create_or_get("foo").await.unwrap(); + assert_eq!(k, k2); + let k3 = kafka_repo.get_by_name("foo").await.unwrap().unwrap(); + assert_eq!(k3, k); + let k3 = kafka_repo.get_by_name("asdf").await.unwrap(); + assert!(k3.is_none()); + } + + async fn test_query_pool(repo: &T) { + let query_repo = repo.query_pool(); + let q = query_repo.create_or_get("foo").await.unwrap(); + assert!(q.id > QueryPoolId::new(0)); + assert_eq!(q.name, "foo"); + let q2 = query_repo.create_or_get("foo").await.unwrap(); + assert_eq!(q, q2); + } + + async fn test_namespace(repo: &T) { + let namespace_repo = repo.namespace(); + let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); + let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + + let namespace_name = "test_namespace"; + let namespace = namespace_repo + .create(namespace_name, "inf", kafka.id, pool.id) + .await + .unwrap(); + assert!(namespace.id > NamespaceId::new(0)); + assert_eq!(namespace.name, namespace_name); + + let conflict = namespace_repo + .create(namespace_name, "inf", kafka.id, pool.id) + .await; + assert!(matches!( + conflict.unwrap_err(), + Error::NameExists { name: _ } + )); + + let found = namespace_repo + .get_by_name(namespace_name) + .await + .unwrap() + .expect("namespace should be there"); + assert_eq!(namespace, found); + } + + async fn test_table(repo: &T) { + let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); + let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + let namespace = repo + .namespace() + .create("namespace_table_test", "inf", kafka.id, pool.id) + .await + .unwrap(); + + // test we can create or get a table + let table_repo = repo.table(); + let t = table_repo + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let tt = table_repo + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + assert!(t.id > TableId::new(0)); + assert_eq!(t, tt); + + let tables = table_repo.list_by_namespace_id(namespace.id).await.unwrap(); + assert_eq!(vec![t], tables); + } + + async fn test_column(repo: &T) { + let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); + let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + let namespace = repo + .namespace() + .create("namespace_column_test", "inf", kafka.id, pool.id) + .await + .unwrap(); + let table = repo + .table() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + + // test we can create or get a column + let column_repo = repo.column(); + let c = column_repo + .create_or_get("column_test", table.id, ColumnType::Tag) + .await + .unwrap(); + let cc = column_repo + .create_or_get("column_test", table.id, ColumnType::Tag) + .await + .unwrap(); + assert!(c.id > ColumnId::new(0)); + assert_eq!(c, cc); + + // test that attempting to create an already defined column of a different type returns error + let err = column_repo + .create_or_get("column_test", table.id, ColumnType::U64) + .await + .expect_err("should error with wrong column type"); + assert!(matches!( + err, + Error::ColumnTypeMismatch { + name: _, + existing: _, + new: _ + } + )); + + // test that we can create a column of the same name under a different table + let table2 = repo + .table() + .create_or_get("test_table_2", namespace.id) + .await + .unwrap(); + let ccc = column_repo + .create_or_get("column_test", table2.id, ColumnType::U64) + .await + .unwrap(); + assert_ne!(c, ccc); + + let columns = column_repo + .list_by_namespace_id(namespace.id) + .await + .unwrap(); + assert_eq!(vec![c, ccc], columns); + } + + async fn test_sequencer(repo: &T) { + let kafka = repo + .kafka_topic() + .create_or_get("sequencer_test") + .await + .unwrap(); + let sequencer_repo = repo.sequencer(); + + // Create 10 sequencers + let created = (1..=10) + .map(|partition| sequencer_repo.create_or_get(&kafka, KafkaPartition::new(partition))) + .collect::>() + .map(|v| { + let v = v.expect("failed to create sequencer"); + (v.id, v) + }) + .collect::>() + .await; + + // List them and assert they match + let listed = sequencer_repo + .list_by_kafka_topic(&kafka) + .await + .expect("failed to list sequencers") + .into_iter() + .map(|v| (v.id, v)) + .collect::>(); + + assert_eq!(created, listed); + + // get by the sequencer id and partition + let kafka_partition = KafkaPartition::new(1); + let sequencer = sequencer_repo + .get_by_topic_id_and_partition(kafka.id, kafka_partition) + .await + .unwrap() + .unwrap(); + assert_eq!(kafka.id, sequencer.kafka_topic_id); + assert_eq!(kafka_partition, sequencer.kafka_partition); + + let sequencer = sequencer_repo + .get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523)) + .await + .unwrap(); + assert!(sequencer.is_none()); + } + + async fn test_partition(repo: &T) { + let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); + let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + let namespace = repo + .namespace() + .create("namespace_partition_test", "inf", kafka.id, pool.id) + .await + .unwrap(); + let table = repo + .table() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let sequencer = repo + .sequencer() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + let other_sequencer = repo + .sequencer() + .create_or_get(&kafka, KafkaPartition::new(2)) + .await + .unwrap(); + + let partition_repo = repo.partition(); + + let created = ["foo", "bar"] + .iter() + .map(|key| partition_repo.create_or_get(key, sequencer.id, table.id)) + .collect::>() + .map(|v| { + let v = v.expect("failed to create partition"); + (v.id, v) + }) + .collect::>() + .await; + let _ = partition_repo + .create_or_get("asdf", other_sequencer.id, table.id) + .await + .unwrap(); + + // List them and assert they match + let listed = partition_repo + .list_by_sequencer(sequencer.id) + .await + .expect("failed to list partitions") + .into_iter() + .map(|v| (v.id, v)) + .collect::>(); + + assert_eq!(created, listed); + } + + async fn test_tombstone(repo: &T) { + let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); + let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + let namespace = repo + .namespace() + .create("namespace_tombstone_test", "inf", kafka.id, pool.id) + .await + .unwrap(); + let table = repo + .table() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let other_table = repo + .table() + .create_or_get("other", namespace.id) + .await + .unwrap(); + let sequencer = repo + .sequencer() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + + let tombstone_repo = repo.tombstone(); + let min_time = Timestamp::new(1); + let max_time = Timestamp::new(10); + let t1 = tombstone_repo + .create_or_get( + table.id, + sequencer.id, + SequenceNumber::new(1), + min_time, + max_time, + "whatevs", + ) + .await + .unwrap(); + assert!(t1.id > TombstoneId::new(0)); + assert_eq!(t1.sequencer_id, sequencer.id); + assert_eq!(t1.sequence_number, SequenceNumber::new(1)); + assert_eq!(t1.min_time, min_time); + assert_eq!(t1.max_time, max_time); + assert_eq!(t1.serialized_predicate, "whatevs"); + let t2 = tombstone_repo + .create_or_get( + other_table.id, + sequencer.id, + SequenceNumber::new(2), + min_time, + max_time, + "bleh", + ) + .await + .unwrap(); + let t3 = tombstone_repo + .create_or_get( + table.id, + sequencer.id, + SequenceNumber::new(3), + min_time, + max_time, + "sdf", + ) + .await + .unwrap(); + + let listed = tombstone_repo + .list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) + .await + .unwrap(); + assert_eq!(vec![t2, t3], listed); + } + + async fn test_parquet_file(repo: &T) { + let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); + let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + let namespace = repo + .namespace() + .create("namespace_parquet_file_test", "inf", kafka.id, pool.id) + .await + .unwrap(); + let table = repo + .table() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let other_table = repo + .table() + .create_or_get("other", namespace.id) + .await + .unwrap(); + let sequencer = repo + .sequencer() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + let partition = repo + .partition() + .create_or_get("one", sequencer.id, table.id) + .await + .unwrap(); + let other_partition = repo + .partition() + .create_or_get("one", sequencer.id, other_table.id) + .await + .unwrap(); + + let min_time = Timestamp::new(1); + let max_time = Timestamp::new(10); + + let parquet_repo = repo.parquet_file(); + let parquet_file = parquet_repo + .create( + sequencer.id, + partition.table_id, + partition.id, + Uuid::new_v4(), + SequenceNumber::new(10), + SequenceNumber::new(140), + min_time, + max_time, + ) + .await + .unwrap(); + + // verify that trying to create a file with the same UUID throws an error + let err = parquet_repo + .create( + sequencer.id, + partition.table_id, + partition.id, + parquet_file.object_store_id, + SequenceNumber::new(10), + SequenceNumber::new(140), + min_time, + max_time, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::FileExists { object_store_id: _ })); + + let other_file = parquet_repo + .create( + sequencer.id, + other_partition.table_id, + other_partition.id, + Uuid::new_v4(), + SequenceNumber::new(45), + SequenceNumber::new(200), + min_time, + max_time, + ) + .await + .unwrap(); + + let files = parquet_repo + .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) + .await + .unwrap(); + assert_eq!(vec![parquet_file, other_file], files); + let files = parquet_repo + .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150)) + .await + .unwrap(); + assert_eq!(vec![other_file], files); + + // verify that to_delete is initially set to false and that it can be updated to true + assert!(!parquet_file.to_delete); + parquet_repo.flag_for_delete(parquet_file.id).await.unwrap(); + let files = parquet_repo + .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) + .await + .unwrap(); + assert!(files.first().unwrap().to_delete); + } +} diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 1c9c6e0654..a23de38af4 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -12,8 +12,8 @@ )] use crate::interface::{ - column_type_from_field, ColumnSchema, ColumnType, Error, KafkaTopic, NamespaceSchema, - QueryPool, RepoCollection, Result, Sequencer, + column_type_from_field, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic, + NamespaceSchema, QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId, }; use futures::{stream::FuturesOrdered, StreamExt}; use influxdb_line_protocol::ParsedLine; @@ -25,6 +25,7 @@ const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC; const TIME_COLUMN: &str = "time"; pub mod interface; +pub mod mem; pub mod postgres; /// Given the lines of a write request and an in memory schema, this will validate the write @@ -41,9 +42,9 @@ pub async fn validate_or_insert_schema( repo: &T, ) -> Result> { // table name to table_id - let mut new_tables: BTreeMap = BTreeMap::new(); + let mut new_tables: BTreeMap = BTreeMap::new(); // table_id to map of column name to column - let mut new_columns: BTreeMap> = BTreeMap::new(); + let mut new_columns: BTreeMap> = BTreeMap::new(); for line in &lines { let table_name = line.series.measurement.as_str(); @@ -175,7 +176,7 @@ pub async fn validate_or_insert_schema( pub async fn create_or_get_default_records( kafka_partition_count: i32, repo: &T, -) -> Result<(KafkaTopic, QueryPool, BTreeMap)> { +) -> Result<(KafkaTopic, QueryPool, BTreeMap)> { let kafka_repo = repo.kafka_topic(); let query_repo = repo.query_pool(); let sequencer_repo = repo.sequencer(); @@ -184,7 +185,7 @@ pub async fn create_or_get_default_records( let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?; let sequencers = (1..=kafka_partition_count) - .map(|partition| sequencer_repo.create_or_get(&kafka_topic, partition)) + .map(|partition| sequencer_repo.create_or_get(&kafka_topic, KafkaPartition::new(partition))) .collect::>() .map(|v| { let v = v.expect("failed to create sequencer"); @@ -195,3 +196,100 @@ pub async fn create_or_get_default_records( Ok((kafka_topic, query_pool, sequencers)) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::interface::get_schema_by_name; + use crate::mem::MemCatalog; + use influxdb_line_protocol::parse_lines; + use std::sync::Arc; + + #[tokio::test] + async fn test_validate_or_insert_schema() { + let repo = Arc::new(MemCatalog::new()); + let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap(); + + let namespace_name = "validate_schema"; + // now test with a new namespace + let namespace = repo + .namespace() + .create(namespace_name, "inf", kafka_topic.id, query_pool.id) + .await + .unwrap(); + let data = r#" +m1,t1=a,t2=b f1=2i,f2=2.0 1 +m1,t1=a f1=3i 2 +m2,t3=b f1=true 1 + "#; + + // test that new schema gets returned + let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect(); + let schema = Arc::new(NamespaceSchema::new( + namespace.id, + namespace.kafka_topic_id, + namespace.query_pool_id, + )); + let new_schema = validate_or_insert_schema(lines, &schema, &repo) + .await + .unwrap(); + let new_schema = new_schema.unwrap(); + + // ensure new schema is in the db + let schema_from_db = get_schema_by_name(namespace_name, &repo) + .await + .unwrap() + .unwrap(); + assert_eq!(new_schema, schema_from_db); + + // test that a new table will be created + let data = r#" +m1,t1=c f1=1i 2 +new_measurement,t9=a f10=true 1 + "#; + let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect(); + let new_schema = validate_or_insert_schema(lines, &schema_from_db, &repo) + .await + .unwrap() + .unwrap(); + let new_table = new_schema.tables.get("new_measurement").unwrap(); + assert_eq!( + ColumnType::Bool, + new_table.columns.get("f10").unwrap().column_type + ); + assert_eq!( + ColumnType::Tag, + new_table.columns.get("t9").unwrap().column_type + ); + let schema = get_schema_by_name(namespace_name, &repo) + .await + .unwrap() + .unwrap(); + assert_eq!(new_schema, schema); + + // test that a new column for an existing table will be created + // test that a new table will be created + let data = r#" +m1,new_tag=c new_field=1i 2 + "#; + let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect(); + let new_schema = validate_or_insert_schema(lines, &schema, &repo) + .await + .unwrap() + .unwrap(); + let table = new_schema.tables.get("m1").unwrap(); + assert_eq!( + ColumnType::I64, + table.columns.get("new_field").unwrap().column_type + ); + assert_eq!( + ColumnType::Tag, + table.columns.get("new_tag").unwrap().column_type + ); + let schema = get_schema_by_name(namespace_name, &repo) + .await + .unwrap() + .unwrap(); + assert_eq!(new_schema, schema); + } +} diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs new file mode 100644 index 0000000000..c4cf0333b1 --- /dev/null +++ b/iox_catalog/src/mem.rs @@ -0,0 +1,498 @@ +//! This module implements an in-memory implementation of the iox_catalog interface. It can be +//! used for testing or for an IOx designed to run without catalog persistence. + +use crate::interface::{ + Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, + KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, + ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, + RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, + TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, +}; +use async_trait::async_trait; +use std::convert::TryFrom; +use std::fmt::Formatter; +use std::sync::{Arc, Mutex}; +use uuid::Uuid; + +/// In-memory catalog that implements the `RepoCollection` and individual repo traits from +/// the catalog interface. +#[derive(Default)] +pub struct MemCatalog { + collections: Mutex, +} + +impl MemCatalog { + /// return new initialized `MemCatalog` + pub fn new() -> Self { + Self::default() + } +} + +impl std::fmt::Debug for MemCatalog { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let c = self.collections.lock().expect("mutex poisoned"); + write!(f, "MemCatalog[ {:?} ]", c) + } +} + +#[derive(Default, Debug)] +struct MemCollections { + kafka_topics: Vec, + query_pools: Vec, + namespaces: Vec, + tables: Vec
, + columns: Vec, + sequencers: Vec, + partitions: Vec, + tombstones: Vec, + parquet_files: Vec, +} + +impl RepoCollection for Arc { + fn kafka_topic(&self) -> Arc { + Self::clone(self) as Arc + } + + fn query_pool(&self) -> Arc { + Self::clone(self) as Arc + } + + fn namespace(&self) -> Arc { + Self::clone(self) as Arc + } + + fn table(&self) -> Arc { + Self::clone(self) as Arc + } + + fn column(&self) -> Arc { + Self::clone(self) as Arc + } + + fn sequencer(&self) -> Arc { + Self::clone(self) as Arc + } + + fn partition(&self) -> Arc { + Self::clone(self) as Arc + } + + fn tombstone(&self) -> Arc { + Self::clone(self) as Arc + } + + fn parquet_file(&self) -> Arc { + Self::clone(self) as Arc + } +} + +#[async_trait] +impl KafkaTopicRepo for MemCatalog { + async fn create_or_get(&self, name: &str) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + + let topic = match collections.kafka_topics.iter().find(|t| t.name == name) { + Some(t) => t, + None => { + let topic = KafkaTopic { + id: KafkaTopicId::new(collections.kafka_topics.len() as i32 + 1), + name: name.to_string(), + }; + collections.kafka_topics.push(topic); + collections.kafka_topics.last().unwrap() + } + }; + + Ok(topic.clone()) + } + + async fn get_by_name(&self, name: &str) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + let kafka_topic = collections + .kafka_topics + .iter() + .find(|t| t.name == name) + .cloned(); + Ok(kafka_topic) + } +} + +#[async_trait] +impl QueryPoolRepo for MemCatalog { + async fn create_or_get(&self, name: &str) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + + let pool = match collections.query_pools.iter().find(|t| t.name == name) { + Some(t) => t, + None => { + let pool = QueryPool { + id: QueryPoolId::new(collections.query_pools.len() as i16 + 1), + name: name.to_string(), + }; + collections.query_pools.push(pool); + collections.query_pools.last().unwrap() + } + }; + + Ok(pool.clone()) + } +} + +#[async_trait] +impl NamespaceRepo for MemCatalog { + async fn create( + &self, + name: &str, + retention_duration: &str, + kafka_topic_id: KafkaTopicId, + query_pool_id: QueryPoolId, + ) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + if collections.namespaces.iter().any(|n| n.name == name) { + return Err(Error::NameExists { + name: name.to_string(), + }); + } + + let namespace = Namespace { + id: NamespaceId::new(collections.namespaces.len() as i32 + 1), + name: name.to_string(), + kafka_topic_id, + query_pool_id, + retention_duration: Some(retention_duration.to_string()), + }; + collections.namespaces.push(namespace); + Ok(collections.namespaces.last().unwrap().clone()) + } + + async fn get_by_name(&self, name: &str) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + Ok(collections + .namespaces + .iter() + .find(|n| n.name == name) + .cloned()) + } +} + +#[async_trait] +impl TableRepo for MemCatalog { + async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result
{ + let mut collections = self.collections.lock().expect("mutex poisoned"); + + let table = match collections.tables.iter().find(|t| t.name == name) { + Some(t) => t, + None => { + let table = Table { + id: TableId::new(collections.tables.len() as i32 + 1), + namespace_id, + name: name.to_string(), + }; + collections.tables.push(table); + collections.tables.last().unwrap() + } + }; + + Ok(table.clone()) + } + + async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + let tables: Vec<_> = collections + .tables + .iter() + .filter(|t| t.namespace_id == namespace_id) + .cloned() + .collect(); + Ok(tables) + } +} + +#[async_trait] +impl ColumnRepo for MemCatalog { + async fn create_or_get( + &self, + name: &str, + table_id: TableId, + column_type: ColumnType, + ) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + + let column = match collections + .columns + .iter() + .find(|t| t.name == name && t.table_id == table_id) + { + Some(c) => { + if column_type as i16 != c.column_type { + return Err(Error::ColumnTypeMismatch { + name: name.to_string(), + existing: ColumnType::try_from(c.column_type).unwrap().to_string(), + new: column_type.to_string(), + }); + } + + c + } + None => { + let column = Column { + id: ColumnId::new(collections.columns.len() as i32 + 1), + table_id, + name: name.to_string(), + column_type: column_type as i16, + }; + collections.columns.push(column); + collections.columns.last().unwrap() + } + }; + + Ok(column.clone()) + } + + async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result> { + let mut columns = vec![]; + + let collections = self.collections.lock().expect("mutex poisoned"); + for t in collections + .tables + .iter() + .filter(|t| t.namespace_id == namespace_id) + { + for c in collections.columns.iter().filter(|c| c.table_id == t.id) { + columns.push(c.clone()); + } + } + + Ok(columns) + } +} + +#[async_trait] +impl SequencerRepo for MemCatalog { + async fn create_or_get( + &self, + topic: &KafkaTopic, + partition: KafkaPartition, + ) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + + let sequencer = match collections + .sequencers + .iter() + .find(|s| s.kafka_topic_id == topic.id && s.kafka_partition == partition) + { + Some(t) => t, + None => { + let sequencer = Sequencer { + id: SequencerId::new(collections.sequencers.len() as i16 + 1), + kafka_topic_id: topic.id, + kafka_partition: partition, + min_unpersisted_sequence_number: 0, + }; + collections.sequencers.push(sequencer); + collections.sequencers.last().unwrap() + } + }; + + Ok(*sequencer) + } + + async fn get_by_topic_id_and_partition( + &self, + topic_id: KafkaTopicId, + partition: KafkaPartition, + ) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + let sequencer = collections + .sequencers + .iter() + .find(|s| s.kafka_topic_id == topic_id && s.kafka_partition == partition) + .cloned(); + Ok(sequencer) + } + + async fn list(&self) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + Ok(collections.sequencers.clone()) + } + + async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + let sequencers: Vec<_> = collections + .sequencers + .iter() + .filter(|s| s.kafka_topic_id == topic.id) + .cloned() + .collect(); + Ok(sequencers) + } +} + +#[async_trait] +impl PartitionRepo for MemCatalog { + async fn create_or_get( + &self, + key: &str, + sequencer_id: SequencerId, + table_id: TableId, + ) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + let partition = match collections.partitions.iter().find(|p| { + p.partition_key == key && p.sequencer_id == sequencer_id && p.table_id == table_id + }) { + Some(p) => p, + None => { + let p = Partition { + id: PartitionId::new(collections.partitions.len() as i64 + 1), + sequencer_id, + table_id, + partition_key: key.to_string(), + }; + collections.partitions.push(p); + collections.partitions.last().unwrap() + } + }; + + Ok(partition.clone()) + } + + async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + let partitions: Vec<_> = collections + .partitions + .iter() + .filter(|p| p.sequencer_id == sequencer_id) + .cloned() + .collect(); + Ok(partitions) + } +} + +#[async_trait] +impl TombstoneRepo for MemCatalog { + async fn create_or_get( + &self, + table_id: TableId, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, + predicate: &str, + ) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + let tombstone = match collections.tombstones.iter().find(|t| { + t.table_id == table_id + && t.sequencer_id == sequencer_id + && t.sequence_number == sequence_number + }) { + Some(t) => t, + None => { + let t = Tombstone { + id: TombstoneId::new(collections.tombstones.len() as i64 + 1), + table_id, + sequencer_id, + sequence_number, + min_time, + max_time, + serialized_predicate: predicate.to_string(), + }; + collections.tombstones.push(t); + collections.tombstones.last().unwrap() + } + }; + + Ok(tombstone.clone()) + } + + async fn list_tombstones_by_sequencer_greater_than( + &self, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + ) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + let tombstones: Vec<_> = collections + .tombstones + .iter() + .filter(|t| t.sequencer_id == sequencer_id && t.sequence_number > sequence_number) + .cloned() + .collect(); + Ok(tombstones) + } +} + +#[async_trait] +impl ParquetFileRepo for MemCatalog { + async fn create( + &self, + sequencer_id: SequencerId, + table_id: TableId, + partition_id: PartitionId, + object_store_id: Uuid, + min_sequence_number: SequenceNumber, + max_sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, + ) -> Result { + let mut collections = self.collections.lock().expect("mutex poisoned"); + if collections + .parquet_files + .iter() + .any(|f| f.object_store_id == object_store_id) + { + return Err(Error::FileExists { object_store_id }); + } + + let parquet_file = ParquetFile { + id: ParquetFileId::new(collections.parquet_files.len() as i64 + 1), + sequencer_id, + table_id, + partition_id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time, + max_time, + to_delete: false, + }; + collections.parquet_files.push(parquet_file); + Ok(*collections.parquet_files.last().unwrap()) + } + + async fn flag_for_delete(&self, id: ParquetFileId) -> Result<()> { + let mut collections = self.collections.lock().expect("mutex poisoned"); + + match collections.parquet_files.iter_mut().find(|p| p.id == id) { + Some(f) => f.to_delete = true, + None => return Err(Error::ParquetRecordNotFound { id }), + } + + Ok(()) + } + + async fn list_by_sequencer_greater_than( + &self, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + ) -> Result> { + let collections = self.collections.lock().expect("mutex poisoned"); + let files: Vec<_> = collections + .parquet_files + .iter() + .filter(|f| f.sequencer_id == sequencer_id && f.max_sequence_number > sequence_number) + .cloned() + .collect(); + Ok(files) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_mem_repo() { + let f = || Arc::new(MemCatalog::new()); + + crate::interface::test_helpers::test_repo(f).await; + } +} diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 1bb43ca80b..2b052a9738 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1,17 +1,18 @@ //! A Postgres backed implementation of the Catalog use crate::interface::{ - Column, ColumnRepo, ColumnSchema, ColumnType, Error, KafkaTopic, KafkaTopicRepo, Namespace, - NamespaceRepo, NamespaceSchema, QueryPool, QueryPoolRepo, RepoCollection, Result, Sequencer, - SequencerRepo, Table, TableRepo, TableSchema, + Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, + KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, + ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, + RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, + TableRepo, Timestamp, Tombstone, TombstoneRepo, }; use async_trait::async_trait; use observability_deps::tracing::info; use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres}; -use std::collections::BTreeMap; -use std::convert::TryFrom; use std::sync::Arc; use std::time::Duration; +use uuid::Uuid; const MAX_CONNECTIONS: u32 = 5; const CONNECT_TIMEOUT: Duration = Duration::from_secs(2); @@ -19,41 +20,46 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(500); #[allow(dead_code)] const SCHEMA_NAME: &str = "iox_catalog"; -/// Connect to the catalog store. -pub async fn connect_catalog_store( - app_name: &'static str, - schema_name: &'static str, - dsn: &str, -) -> Result, sqlx::Error> { - let pool = PgPoolOptions::new() - .min_connections(1) - .max_connections(MAX_CONNECTIONS) - .connect_timeout(CONNECT_TIMEOUT) - .idle_timeout(IDLE_TIMEOUT) - .test_before_acquire(true) - .after_connect(move |c| { - Box::pin(async move { - // Tag the connection with the provided application name. - c.execute(sqlx::query("SET application_name = '$1';").bind(app_name)) - .await?; - let search_path_query = format!("SET search_path TO {}", schema_name); - c.execute(sqlx::query(&search_path_query)).await?; - - Ok(()) - }) - }) - .connect(dsn) - .await?; - - // Log a connection was successfully established and include the application - // name for cross-correlation between Conductor logs & database connections. - info!(application_name=%app_name, "connected to catalog store"); - - Ok(pool) +/// In-memory catalog that implements the `RepoCollection` and individual repo traits. +#[derive(Debug)] +pub struct PostgresCatalog { + pool: Pool, } -struct PostgresCatalog { - pool: Pool, +impl PostgresCatalog { + /// Connect to the catalog store. + pub async fn connect( + app_name: &'static str, + schema_name: &'static str, + dsn: &str, + ) -> Result { + let pool = PgPoolOptions::new() + .min_connections(1) + .max_connections(MAX_CONNECTIONS) + .connect_timeout(CONNECT_TIMEOUT) + .idle_timeout(IDLE_TIMEOUT) + .test_before_acquire(true) + .after_connect(move |c| { + Box::pin(async move { + // Tag the connection with the provided application name. + c.execute(sqlx::query("SET application_name = '$1';").bind(app_name)) + .await?; + let search_path_query = format!("SET search_path TO {}", schema_name); + c.execute(sqlx::query(&search_path_query)).await?; + + Ok(()) + }) + }) + .connect(dsn) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + // Log a connection was successfully established and include the application + // name for cross-correlation between Conductor logs & database connections. + info!(application_name=%app_name, "connected to catalog store"); + + Ok(Self { pool }) + } } impl RepoCollection for Arc { @@ -80,6 +86,18 @@ impl RepoCollection for Arc { fn sequencer(&self) -> Arc { Self::clone(self) as Arc } + + fn partition(&self) -> Arc { + Self::clone(self) as Arc + } + + fn tombstone(&self) -> Arc { + Self::clone(self) as Arc + } + + fn parquet_file(&self) -> Arc { + Self::clone(self) as Arc + } } #[async_trait] @@ -100,6 +118,25 @@ DO UPDATE SET name = kafka_topic.name RETURNING *; Ok(rec) } + + async fn get_by_name(&self, name: &str) -> Result> { + let rec = sqlx::query_as::<_, KafkaTopic>( + r#" +SELECT * FROM kafka_topic WHERE name = $1; + "#, + ) + .bind(&name) // $1 + .fetch_one(&self.pool) + .await; + + if let Err(sqlx::Error::RowNotFound) = rec { + return Ok(None); + } + + let kafka_topic = rec.map_err(|e| Error::SqlxError { source: e })?; + + Ok(Some(kafka_topic)) + } } #[async_trait] @@ -128,9 +165,9 @@ impl NamespaceRepo for PostgresCatalog { &self, name: &str, retention_duration: &str, - kafka_topic_id: i32, - query_pool_id: i16, - ) -> Result { + kafka_topic_id: KafkaTopicId, + query_pool_id: QueryPoolId, + ) -> Result { let rec = sqlx::query_as::<_, Namespace>( r#" INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id ) @@ -156,11 +193,10 @@ RETURNING * } })?; - Ok(NamespaceSchema::new(rec.id, kafka_topic_id, query_pool_id)) + Ok(rec) } - async fn get_by_name(&self, name: &str) -> Result> { - // TODO: maybe get all the data in a single call to Postgres? + async fn get_by_name(&self, name: &str) -> Result> { let rec = sqlx::query_as::<_, Namespace>( r#" SELECT * FROM namespace WHERE name = $1; @@ -175,53 +211,14 @@ SELECT * FROM namespace WHERE name = $1; } let namespace = rec.map_err(|e| Error::SqlxError { source: e })?; - // get the columns first just in case someone else is creating schema while we're doing this. - let columns = ColumnRepo::list_by_namespace_id(self, namespace.id).await?; - let tables = TableRepo::list_by_namespace_id(self, namespace.id).await?; - let mut namespace = NamespaceSchema::new( - namespace.id, - namespace.kafka_topic_id, - namespace.query_pool_id, - ); - - let mut table_id_to_schema = BTreeMap::new(); - for t in tables { - table_id_to_schema.insert(t.id, (t.name, TableSchema::new(t.id))); - } - - for c in columns { - let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap(); - match ColumnType::try_from(c.column_type) { - Ok(column_type) => { - t.columns.insert( - c.name, - ColumnSchema { - id: c.id, - column_type, - }, - ); - } - _ => { - return Err(Error::UnknownColumnType { - data_type: c.column_type, - name: c.name.to_string(), - }); - } - } - } - - for (_, (table_name, schema)) in table_id_to_schema { - namespace.tables.insert(table_name, schema); - } - - return Ok(Some(namespace)); + Ok(Some(namespace)) } } #[async_trait] impl TableRepo for PostgresCatalog { - async fn create_or_get(&self, name: &str, namespace_id: i32) -> Result
{ + async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result
{ let rec = sqlx::query_as::<_, Table>( r#" INSERT INTO table_name ( name, namespace_id ) @@ -245,7 +242,7 @@ DO UPDATE SET name = table_name.name RETURNING *; Ok(rec) } - async fn list_by_namespace_id(&self, namespace_id: i32) -> Result> { + async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result> { let rec = sqlx::query_as::<_, Table>( r#" SELECT * FROM table_name @@ -266,7 +263,7 @@ impl ColumnRepo for PostgresCatalog { async fn create_or_get( &self, name: &str, - table_id: i32, + table_id: TableId, column_type: ColumnType, ) -> Result { let ct = column_type as i16; @@ -303,7 +300,7 @@ DO UPDATE SET name = column_name.name RETURNING *; Ok(rec) } - async fn list_by_namespace_id(&self, namespace_id: i32) -> Result> { + async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result> { let rec = sqlx::query_as::<_, Column>( r#" SELECT column_name.* FROM table_name @@ -322,7 +319,11 @@ WHERE table_name.namespace_id = $1; #[async_trait] impl SequencerRepo for PostgresCatalog { - async fn create_or_get(&self, topic: &KafkaTopic, partition: i32) -> Result { + async fn create_or_get( + &self, + topic: &KafkaTopic, + partition: KafkaPartition, + ) -> Result { sqlx::query_as::<_, Sequencer>( r#" INSERT INTO sequencer @@ -346,12 +347,206 @@ impl SequencerRepo for PostgresCatalog { }) } + async fn get_by_topic_id_and_partition( + &self, + topic_id: KafkaTopicId, + partition: KafkaPartition, + ) -> Result> { + let rec = sqlx::query_as::<_, Sequencer>( + r#" +SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2; + "#, + ) + .bind(topic_id) // $1 + .bind(partition) // $2 + .fetch_one(&self.pool) + .await; + + if let Err(sqlx::Error::RowNotFound) = rec { + return Ok(None); + } + + let sequencer = rec.map_err(|e| Error::SqlxError { source: e })?; + + Ok(Some(sequencer)) + } + async fn list(&self) -> Result> { sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#) .fetch_all(&self.pool) .await .map_err(|e| Error::SqlxError { source: e }) } + + async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result> { + sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#) + .bind(&topic.id) // $1 + .fetch_all(&self.pool) + .await + .map_err(|e| Error::SqlxError { source: e }) + } +} + +#[async_trait] +impl PartitionRepo for PostgresCatalog { + async fn create_or_get( + &self, + key: &str, + sequencer_id: SequencerId, + table_id: TableId, + ) -> Result { + sqlx::query_as::<_, Partition>( + r#" + INSERT INTO partition + ( partition_key, sequencer_id, table_id ) + VALUES + ( $1, $2, $3 ) + ON CONFLICT ON CONSTRAINT partition_key_unique + DO UPDATE SET partition_key = partition.partition_key RETURNING *; + "#, + ) + .bind(key) // $1 + .bind(&sequencer_id) // $2 + .bind(&table_id) // $3 + .fetch_one(&self.pool) + .await + .map_err(|e| { + if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + }) + } + + async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result> { + sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#) + .bind(&sequencer_id) // $1 + .fetch_all(&self.pool) + .await + .map_err(|e| Error::SqlxError { source: e }) + } +} + +#[async_trait] +impl TombstoneRepo for PostgresCatalog { + async fn create_or_get( + &self, + table_id: TableId, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, + predicate: &str, + ) -> Result { + sqlx::query_as::<_, Tombstone>( + r#" + INSERT INTO tombstone + ( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate ) + VALUES + ( $1, $2, $3, $4, $5, $6 ) + ON CONFLICT ON CONSTRAINT tombstone_unique + DO UPDATE SET table_id = tombstone.table_id RETURNING *; + "#, + ) + .bind(&table_id) // $1 + .bind(&sequencer_id) // $2 + .bind(&sequence_number) // $3 + .bind(&min_time) // $4 + .bind(&max_time) // $5 + .bind(predicate) // $6 + .fetch_one(&self.pool) + .await + .map_err(|e| { + if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + }) + } + + async fn list_tombstones_by_sequencer_greater_than( + &self, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + ) -> Result> { + sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#) + .bind(&sequencer_id) // $1 + .bind(&sequence_number) // $2 + .fetch_all(&self.pool) + .await + .map_err(|e| Error::SqlxError { source: e }) + } +} + +#[async_trait] +impl ParquetFileRepo for PostgresCatalog { + async fn create( + &self, + sequencer_id: SequencerId, + table_id: TableId, + partition_id: PartitionId, + object_store_id: Uuid, + min_sequence_number: SequenceNumber, + max_sequence_number: SequenceNumber, + min_time: Timestamp, + max_time: Timestamp, + ) -> Result { + let rec = sqlx::query_as::<_, ParquetFile>( + r#" +INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, to_delete ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, false ) +RETURNING * + "#, + ) + .bind(sequencer_id) // $1 + .bind(table_id) // $2 + .bind(partition_id) // $3 + .bind(object_store_id) // $4 + .bind(min_sequence_number) // $5 + .bind(max_sequence_number) // $6 + .bind(min_time) // $7 + .bind(max_time) // $8 + .fetch_one(&self.pool) + .await + .map_err(|e| { + if is_unique_violation(&e) { + Error::FileExists { + object_store_id, + } + } else if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + })?; + + Ok(rec) + } + + async fn flag_for_delete(&self, id: ParquetFileId) -> Result<()> { + let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = true WHERE id = $1;"#) + .bind(&id) // $1 + .execute(&self.pool) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + Ok(()) + } + + async fn list_by_sequencer_greater_than( + &self, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + ) -> Result> { + sqlx::query_as::<_, ParquetFile>(r#"SELECT * FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 ORDER BY id;"#) + .bind(&sequencer_id) // $1 + .bind(&sequence_number) // $2 + .fetch_all(&self.pool) + .await + .map_err(|e| Error::SqlxError { source: e }) + } } /// The error code returned by Postgres for a unique constraint violation. @@ -390,9 +585,6 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { #[cfg(test)] mod tests { use super::*; - use crate::{create_or_get_default_records, validate_or_insert_schema}; - use futures::{stream::FuturesOrdered, StreamExt}; - use influxdb_line_protocol::parse_lines; use std::env; // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set. @@ -432,202 +624,47 @@ mod tests { }}; } - async fn setup_db() -> (Arc, KafkaTopic, QueryPool) { + async fn setup_db() -> Arc { let dsn = std::env::var("DATABASE_URL").unwrap(); - let pool = connect_catalog_store("test", SCHEMA_NAME, &dsn) - .await - .unwrap(); - let postgres_catalog = Arc::new(PostgresCatalog { pool }); - - let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &postgres_catalog) - .await - .unwrap(); - (postgres_catalog, kafka_topic, query_pool) + Arc::new( + PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) + .await + .unwrap(), + ) } #[tokio::test] - async fn test_catalog() { + async fn test_repo() { // If running an integration test on your laptop, this requires that you have Postgres // running and that you've done the sqlx migrations. See the README in this crate for // info to set it up. maybe_skip_integration!(); - let (postgres, kafka_topic, query_pool) = setup_db().await; + let postgres = setup_db().await; clear_schema(&postgres.pool).await; - let namespace = NamespaceRepo::create(postgres.as_ref(), "foo", "inf", 0, 0).await; - assert!(matches!( - namespace.unwrap_err(), - Error::ForeignKeyViolation { source: _ } - )); - let namespace = NamespaceRepo::create( - postgres.as_ref(), - "foo", - "inf", - kafka_topic.id, - query_pool.id, - ) - .await - .unwrap(); - assert!(namespace.id > 0); - assert_eq!(namespace.kafka_topic_id, kafka_topic.id); - assert_eq!(namespace.query_pool_id, query_pool.id); + let f = || Arc::clone(&postgres); - // test that we can create or get a table - let t = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id) - .await - .unwrap(); - let tt = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id) - .await - .unwrap(); - assert!(t.id > 0); - assert_eq!(t, tt); - - // test that we can craete or get a column - let c = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64) - .await - .unwrap(); - let cc = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64) - .await - .unwrap(); - assert!(c.id > 0); - assert_eq!(c, cc); - - // test that attempting to create an already defined column of a different type returns error - let err = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::F64) - .await - .expect_err("should error with wrong column type"); - assert!(matches!( - err, - Error::ColumnTypeMismatch { - name: _, - existing: _, - new: _ - } - )); - - // now test with a new namespace - let namespace = NamespaceRepo::create( - postgres.as_ref(), - "asdf", - "inf", - kafka_topic.id, - query_pool.id, - ) - .await - .unwrap(); - let data = r#" -m1,t1=a,t2=b f1=2i,f2=2.0 1 -m1,t1=a f1=3i 2 -m2,t3=b f1=true 1 - "#; - - // test that new schema gets returned - let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect(); - let schema = Arc::new(NamespaceSchema::new( - namespace.id, - namespace.kafka_topic_id, - namespace.query_pool_id, - )); - let new_schema = validate_or_insert_schema(lines, &schema, &postgres) - .await - .unwrap(); - let new_schema = new_schema.unwrap(); - - // ensure new schema is in the db - let schema_from_db = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf") - .await - .unwrap() - .unwrap(); - assert_eq!(new_schema, schema_from_db); - - // test that a new table will be created - let data = r#" -m1,t1=c f1=1i 2 -new_measurement,t9=a f10=true 1 - "#; - let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect(); - let new_schema = validate_or_insert_schema(lines, &schema_from_db, &postgres) - .await - .unwrap() - .unwrap(); - let new_table = new_schema.tables.get("new_measurement").unwrap(); - assert_eq!( - ColumnType::Bool, - new_table.columns.get("f10").unwrap().column_type - ); - assert_eq!( - ColumnType::Tag, - new_table.columns.get("t9").unwrap().column_type - ); - let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf") - .await - .unwrap() - .unwrap(); - assert_eq!(new_schema, schema); - - // test that a new column for an existing table will be created - // test that a new table will be created - let data = r#" -m1,new_tag=c new_field=1i 2 - "#; - let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect(); - let new_schema = validate_or_insert_schema(lines, &schema, &postgres) - .await - .unwrap() - .unwrap(); - let table = new_schema.tables.get("m1").unwrap(); - assert_eq!( - ColumnType::I64, - table.columns.get("new_field").unwrap().column_type - ); - assert_eq!( - ColumnType::Tag, - table.columns.get("new_tag").unwrap().column_type - ); - let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf") - .await - .unwrap() - .unwrap(); - assert_eq!(new_schema, schema); - } - - #[tokio::test] - async fn test_sequencers() { - maybe_skip_integration!(); - - let (postgres, kafka_topic, _query_pool) = setup_db().await; - clear_schema(&postgres.pool).await; - - // Create 10 sequencers - let created = (1..=10) - .map(|partition| { - SequencerRepo::create_or_get(postgres.as_ref(), &kafka_topic, partition) - }) - .collect::>() - .map(|v| { - let v = v.expect("failed to create sequencer"); - (v.id, v) - }) - .collect::>() - .await; - - // List them and assert they match - let listed = SequencerRepo::list(postgres.as_ref()) - .await - .expect("failed to list sequencers") - .into_iter() - .map(|v| (v.id, v)) - .collect::>(); - - assert_eq!(created, listed); + crate::interface::test_helpers::test_repo(f).await; } async fn clear_schema(pool: &Pool) { + sqlx::query("delete from tombstone;") + .execute(pool) + .await + .unwrap(); + sqlx::query("delete from parquet_file;") + .execute(pool) + .await + .unwrap(); sqlx::query("delete from column_name;") .execute(pool) .await .unwrap(); + sqlx::query("delete from partition;") + .execute(pool) + .await + .unwrap(); sqlx::query("delete from table_name;") .execute(pool) .await diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index aa265f90c0..d93dbcb42c 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -53,6 +53,7 @@ tower = { version = "0.4", features = ["balance", "buffer", "discover", "futures tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_debug", "std", "tracing-attributes"] } tracing-core = { version = "0.1", features = ["lazy_static", "std"] } tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "lazy_static", "matchers", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log"] } +uuid = { version = "0.8", features = ["getrandom", "std", "v4"] } [build-dependencies] ahash = { version = "0.7", features = ["std"] } @@ -86,5 +87,6 @@ smallvec = { version = "1", default-features = false, features = ["union"] } syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] } tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] } tokio-stream = { version = "0.1", features = ["fs", "net", "time"] } +uuid = { version = "0.8", features = ["getrandom", "std", "v4"] } ### END HAKARI SECTION