refactor: catalog transaction (#3660)
* refactor: catalog Unit of Work (= transaction) Setup an inteface to handle Units of Work within our catalog. Previously both the Postgres and the in-mem backend used "mini-transactions on demand". Now the caller has a clear way to establish boundaries and gets read and write isolation. A single `Arc<dyn Catalog>` can create as many `Box<dyn UnitOfWork>` as you like, but note that depending on the backend you may not scale infinitely (postgres will likely impose certain limits and the in-mem backend limits concurrency to 1 to keep things simple). * docs: improve wording Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * refactor: rename Unit of Work to Transaction * test: improve `test_txn_isolation` * feat: clearify transaction drop semantics Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
de2a013786
commit
5de4d6203f
|
@ -2113,6 +2113,7 @@ dependencies = [
|
|||
"schema",
|
||||
"snafu",
|
||||
"sqlx",
|
||||
"test_helpers",
|
||||
"tokio",
|
||||
"uuid",
|
||||
"workspace-hack",
|
||||
|
|
|
@ -44,8 +44,9 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
match config.command {
|
||||
Command::Update(update) => {
|
||||
let catalog = update.catalog_dsn.get_catalog("cli").await?;
|
||||
let topics_repo = catalog.kafka_topics();
|
||||
let topic = topics_repo.create_or_get(&update.db_name).await?;
|
||||
let mut txn = catalog.start_transaction().await?;
|
||||
let topic = txn.kafka_topics().create_or_get(&update.db_name).await?;
|
||||
txn.commit().await?;
|
||||
println!("{}", topic.id);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -100,7 +100,8 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
|
||||
let catalog = config.catalog_dsn.get_catalog("ingester").await?;
|
||||
|
||||
let kafka_topic = catalog
|
||||
let mut txn = catalog.start_transaction().await?;
|
||||
let kafka_topic = txn
|
||||
.kafka_topics()
|
||||
.get_by_name(&config.write_buffer_config.topic)
|
||||
.await?
|
||||
|
@ -122,13 +123,14 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
|
||||
let mut sequencers = BTreeMap::new();
|
||||
for k in kafka_partitions {
|
||||
let s = catalog
|
||||
let s = txn
|
||||
.sequencers()
|
||||
.get_by_topic_id_and_partition(kafka_topic.id, k)
|
||||
.await?
|
||||
.ok_or(Error::SequencerNotFound(k))?;
|
||||
sequencers.insert(k, s);
|
||||
}
|
||||
txn.commit().await?;
|
||||
|
||||
let metric_registry: Arc<metric::Registry> = Default::default();
|
||||
let trace_collector = common_state.trace_collector();
|
||||
|
|
|
@ -109,7 +109,8 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
// This code / auto-creation is for architecture testing purposes only - a
|
||||
// prod deployment would expect namespaces to be explicitly created and this
|
||||
// layer would be removed.
|
||||
let topic_id = catalog
|
||||
let mut txn = catalog.start_transaction().await?;
|
||||
let topic_id = txn
|
||||
.kafka_topics()
|
||||
.get_by_name(&config.write_buffer_config.topic)
|
||||
.await?
|
||||
|
@ -120,7 +121,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
&config.write_buffer_config.topic
|
||||
)
|
||||
});
|
||||
let query_id = catalog
|
||||
let query_id = txn
|
||||
.query_pools()
|
||||
.create_or_get(&config.query_pool_name)
|
||||
.await
|
||||
|
@ -131,6 +132,8 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
&config.write_buffer_config.topic, e
|
||||
)
|
||||
});
|
||||
txn.commit().await?;
|
||||
|
||||
let handler_stack = NamespaceAutocreation::new(
|
||||
catalog,
|
||||
ns_cache,
|
||||
|
|
|
@ -144,12 +144,15 @@ impl SequencerData {
|
|||
namespace: &str,
|
||||
catalog: &dyn Catalog,
|
||||
) -> Result<Arc<NamespaceData>> {
|
||||
let namespace = catalog
|
||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.get_by_name(namespace)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.context(NamespaceNotFoundSnafu { namespace })?;
|
||||
txn.commit().await.context(CatalogSnafu)?;
|
||||
|
||||
let mut n = self.namespaces.write();
|
||||
let data = Arc::clone(
|
||||
n.entry(namespace.name)
|
||||
|
@ -230,11 +233,14 @@ impl NamespaceData {
|
|||
table_name: &str,
|
||||
catalog: &dyn Catalog,
|
||||
) -> Result<Arc<TableData>> {
|
||||
let table = catalog
|
||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
||||
let table = txn
|
||||
.tables()
|
||||
.create_or_get(table_name, self.namespace_id)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
txn.commit().await.context(CatalogSnafu)?;
|
||||
|
||||
let mut t = self.tables.write();
|
||||
let data = Arc::clone(
|
||||
t.entry(table.name)
|
||||
|
@ -306,7 +312,8 @@ impl TableData {
|
|||
let min_time = Timestamp::new(predicate.range.start());
|
||||
let max_time = Timestamp::new(predicate.range.end());
|
||||
|
||||
let tombstone = catalog
|
||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
||||
let tombstone = txn
|
||||
.tombstones()
|
||||
.create_or_get(
|
||||
self.table_id,
|
||||
|
@ -318,6 +325,7 @@ impl TableData {
|
|||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
txn.commit().await.context(CatalogSnafu)?;
|
||||
|
||||
let partitions = self.partition_data.read();
|
||||
for data in partitions.values() {
|
||||
|
@ -339,11 +347,13 @@ impl TableData {
|
|||
sequencer_id: SequencerId,
|
||||
catalog: &dyn Catalog,
|
||||
) -> Result<Arc<PartitionData>> {
|
||||
let partition = catalog
|
||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
||||
let partition = txn
|
||||
.partitions()
|
||||
.create_or_get(partition_key, sequencer_id, self.table_id)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
txn.commit().await.context(CatalogSnafu)?;
|
||||
let mut p = self.partition_data.write();
|
||||
let data = Arc::new(PartitionData::new(partition.id));
|
||||
p.insert(partition.partition_key, Arc::clone(&data));
|
||||
|
|
|
@ -234,34 +234,28 @@ mod tests {
|
|||
use iox_catalog::validate_or_insert_schema;
|
||||
use metric::{Attributes, Metric, U64Counter, U64Gauge};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use std::num::NonZeroU32;
|
||||
use std::{num::NonZeroU32, ops::DerefMut};
|
||||
use time::Time;
|
||||
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_from_write_buffer_write_to_mutable_buffer() {
|
||||
let catalog = MemCatalog::new();
|
||||
let kafka_topic = catalog
|
||||
.kafka_topics()
|
||||
.create_or_get("whatevs")
|
||||
.await
|
||||
.unwrap();
|
||||
let query_pool = catalog
|
||||
.query_pools()
|
||||
.create_or_get("whatevs")
|
||||
.await
|
||||
.unwrap();
|
||||
let mut txn = catalog.start_transaction().await.unwrap();
|
||||
let kafka_topic = txn.kafka_topics().create_or_get("whatevs").await.unwrap();
|
||||
let query_pool = txn.query_pools().create_or_get("whatevs").await.unwrap();
|
||||
let kafka_partition = KafkaPartition::new(0);
|
||||
let namespace = catalog
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("foo", "inf", kafka_topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let sequencer = catalog
|
||||
let sequencer = txn
|
||||
.sequencers()
|
||||
.create_or_get(&kafka_topic, kafka_partition)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut sequencer_states = BTreeMap::new();
|
||||
sequencer_states.insert(kafka_partition, sequencer);
|
||||
|
||||
|
@ -276,7 +270,7 @@ mod tests {
|
|||
lines_to_batches("mem foo=1 10", 0).unwrap(),
|
||||
DmlMeta::sequenced(Sequence::new(0, 0), ingest_ts1, None, 50),
|
||||
);
|
||||
let schema = validate_or_insert_schema(w1.tables(), &schema, &catalog)
|
||||
let schema = validate_or_insert_schema(w1.tables(), &schema, txn.deref_mut())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
@ -286,10 +280,11 @@ mod tests {
|
|||
lines_to_batches("cpu bar=2 20\ncpu bar=3 30", 0).unwrap(),
|
||||
DmlMeta::sequenced(Sequence::new(0, 7), ingest_ts2, None, 150),
|
||||
);
|
||||
let _schema = validate_or_insert_schema(w2.tables(), &schema, &catalog)
|
||||
let _schema = validate_or_insert_schema(w2.tables(), &schema, txn.deref_mut())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
txn.commit().await.unwrap();
|
||||
write_buffer_state.push_write(w2);
|
||||
let reading: Arc<dyn WriteBufferReading> =
|
||||
Arc::new(MockBufferForReading::new(write_buffer_state, None).unwrap());
|
||||
|
|
|
@ -22,5 +22,6 @@ dotenv = "0.15.0"
|
|||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
paste = "1.0.6"
|
||||
pretty_assertions = "1.0.0"
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
||||
[features]
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -12,11 +12,11 @@
|
|||
)]
|
||||
|
||||
use crate::interface::{
|
||||
Catalog, ColumnType, Error, KafkaPartition, KafkaTopic, NamespaceSchema, QueryPool, Result,
|
||||
Sequencer, SequencerId, TableSchema,
|
||||
ColumnType, Error, KafkaPartition, KafkaTopic, NamespaceSchema, QueryPool, Result, Sequencer,
|
||||
SequencerId, TableSchema, Transaction,
|
||||
};
|
||||
use futures::{stream::FuturesOrdered, StreamExt};
|
||||
|
||||
use interface::{ParquetFile, ProcessedTombstone, Tombstone};
|
||||
use mutable_batch::MutableBatch;
|
||||
use std::{borrow::Cow, collections::BTreeMap};
|
||||
|
||||
|
@ -43,7 +43,7 @@ pub mod postgres;
|
|||
pub async fn validate_or_insert_schema<'a, T, U>(
|
||||
tables: T,
|
||||
schema: &NamespaceSchema,
|
||||
catalog: &dyn Catalog,
|
||||
txn: &mut dyn Transaction,
|
||||
) -> Result<Option<NamespaceSchema>>
|
||||
where
|
||||
T: IntoIterator<IntoIter = U, Item = (&'a str, &'a MutableBatch)> + Send + Sync,
|
||||
|
@ -55,7 +55,7 @@ where
|
|||
let mut schema = Cow::Borrowed(schema);
|
||||
|
||||
for (table_name, batch) in tables {
|
||||
validate_mutable_batch(batch, table_name, &mut schema, catalog).await?;
|
||||
validate_mutable_batch(batch, table_name, &mut schema, txn).await?;
|
||||
}
|
||||
|
||||
match schema {
|
||||
|
@ -68,7 +68,7 @@ async fn validate_mutable_batch(
|
|||
mb: &MutableBatch,
|
||||
table_name: &str,
|
||||
schema: &mut Cow<'_, NamespaceSchema>,
|
||||
catalog: &dyn Catalog,
|
||||
txn: &mut dyn Transaction,
|
||||
) -> Result<()> {
|
||||
// Check if the table exists in the schema.
|
||||
//
|
||||
|
@ -81,14 +81,14 @@ async fn validate_mutable_batch(
|
|||
//
|
||||
// Attempt to create the table in the catalog, or load an existing
|
||||
// table from the catalog to populate the cache.
|
||||
let mut table = catalog
|
||||
let mut table = txn
|
||||
.tables()
|
||||
.create_or_get(table_name, schema.id)
|
||||
.await
|
||||
.map(|t| TableSchema::new(t.id))?;
|
||||
|
||||
// Always add a time column to all new tables.
|
||||
let time_col = catalog
|
||||
let time_col = txn
|
||||
.columns()
|
||||
.create_or_get(TIME_COLUMN, table.id, ColumnType::Time)
|
||||
.await?;
|
||||
|
@ -134,7 +134,7 @@ async fn validate_mutable_batch(
|
|||
None => {
|
||||
// The column does not exist in the cache, create/get it from
|
||||
// the catalog, and add it to the table.
|
||||
let column = catalog
|
||||
let column = txn
|
||||
.columns()
|
||||
.create_or_get(name.as_str(), table.id, ColumnType::from(col.influx_type()))
|
||||
.await?;
|
||||
|
@ -161,34 +161,53 @@ async fn validate_mutable_batch(
|
|||
/// each of the partitions.
|
||||
pub async fn create_or_get_default_records(
|
||||
kafka_partition_count: i32,
|
||||
catalog: &dyn Catalog,
|
||||
txn: &mut dyn Transaction,
|
||||
) -> Result<(KafkaTopic, QueryPool, BTreeMap<SequencerId, Sequencer>)> {
|
||||
let kafka_topic = catalog
|
||||
.kafka_topics()
|
||||
.create_or_get(SHARED_KAFKA_TOPIC)
|
||||
.await?;
|
||||
let query_pool = catalog
|
||||
.query_pools()
|
||||
.create_or_get(SHARED_QUERY_POOL)
|
||||
.await?;
|
||||
let kafka_topic = txn.kafka_topics().create_or_get(SHARED_KAFKA_TOPIC).await?;
|
||||
let query_pool = txn.query_pools().create_or_get(SHARED_QUERY_POOL).await?;
|
||||
|
||||
let sequencers = (1..=kafka_partition_count)
|
||||
.map(|partition| {
|
||||
catalog
|
||||
.sequencers()
|
||||
.create_or_get(&kafka_topic, KafkaPartition::new(partition))
|
||||
})
|
||||
.collect::<FuturesOrdered<_>>()
|
||||
.map(|v| {
|
||||
let v = v.expect("failed to create sequencer");
|
||||
(v.id, v)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>()
|
||||
.await;
|
||||
let mut sequencers = BTreeMap::new();
|
||||
for partition in 1..=kafka_partition_count {
|
||||
let sequencer = txn
|
||||
.sequencers()
|
||||
.create_or_get(&kafka_topic, KafkaPartition::new(partition))
|
||||
.await?;
|
||||
sequencers.insert(sequencer.id, sequencer);
|
||||
}
|
||||
|
||||
Ok((kafka_topic, query_pool, sequencers))
|
||||
}
|
||||
|
||||
/// Insert the conpacted parquet file and its tombstones
|
||||
pub async fn add_parquet_file_with_tombstones(
|
||||
parquet_file: &ParquetFile,
|
||||
tombstones: &[Tombstone],
|
||||
txn: &mut dyn Transaction,
|
||||
) -> Result<(ParquetFile, Vec<ProcessedTombstone>), Error> {
|
||||
// create a parquet file in the catalog first
|
||||
let parquet = txn
|
||||
.parquet_files()
|
||||
.create(
|
||||
parquet_file.sequencer_id,
|
||||
parquet_file.table_id,
|
||||
parquet_file.partition_id,
|
||||
parquet_file.object_store_id,
|
||||
parquet_file.min_sequence_number,
|
||||
parquet_file.max_sequence_number,
|
||||
parquet_file.min_time,
|
||||
parquet_file.max_time,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Now the parquet available, create its processed tombstones
|
||||
let processed_tombstones = txn
|
||||
.processed_tombstones()
|
||||
.create_many(parquet.id, tombstones)
|
||||
.await?;
|
||||
|
||||
Ok((parquet, processed_tombstones))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -211,13 +230,16 @@ mod tests {
|
|||
#[allow(clippy::bool_assert_comparison)]
|
||||
#[tokio::test]
|
||||
async fn [<test_validate_schema_ $name>]() {
|
||||
use crate::interface::Catalog;
|
||||
use std::ops::DerefMut;
|
||||
use pretty_assertions::assert_eq;
|
||||
const NAMESPACE_NAME: &str = "bananas";
|
||||
|
||||
let repo = MemCatalog::new();
|
||||
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap();
|
||||
let mut txn = repo.start_transaction().await.unwrap();
|
||||
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, txn.deref_mut()).await.unwrap();
|
||||
|
||||
let namespace = repo
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create(NAMESPACE_NAME, "inf", kafka_topic.id, query_pool.id)
|
||||
.await
|
||||
|
@ -240,7 +262,7 @@ mod tests {
|
|||
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp.as_str(), 42)
|
||||
.expect("failed to build test writes from LP");
|
||||
|
||||
let got = validate_or_insert_schema(writes.iter().map(|(k, v)| (k.as_str(), v)), &schema, &repo)
|
||||
let got = validate_or_insert_schema(writes.iter().map(|(k, v)| (k.as_str(), v)), &schema, txn.deref_mut())
|
||||
.await;
|
||||
|
||||
match got {
|
||||
|
@ -260,7 +282,7 @@ mod tests {
|
|||
// Invariant: in absence of concurrency, the schema within
|
||||
// the database must always match the incrementally built
|
||||
// cached schema.
|
||||
let db_schema = get_schema_by_name(NAMESPACE_NAME, &repo)
|
||||
let db_schema = get_schema_by_name(NAMESPACE_NAME, txn.deref_mut())
|
||||
.await
|
||||
.expect("database failed to query for namespace schema");
|
||||
assert_eq!(schema, db_schema, "schema in DB and cached schema differ");
|
||||
|
|
|
@ -2,25 +2,26 @@
|
|||
//! used for testing or for an IOx designed to run without catalog persistence.
|
||||
|
||||
use crate::interface::{
|
||||
Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic,
|
||||
KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile,
|
||||
ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo,
|
||||
ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo, Result,
|
||||
SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp,
|
||||
Tombstone, TombstoneId, TombstoneRepo,
|
||||
sealed::TransactionFinalize, Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error,
|
||||
KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId,
|
||||
NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId,
|
||||
PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool,
|
||||
QueryPoolId, QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo,
|
||||
Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, Transaction,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use sqlx::{Postgres, Transaction};
|
||||
use observability_deps::tracing::warn;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::Formatter;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
|
||||
/// the catalog interface.
|
||||
#[derive(Default)]
|
||||
pub struct MemCatalog {
|
||||
collections: Mutex<MemCollections>,
|
||||
collections: Arc<Mutex<MemCollections>>,
|
||||
}
|
||||
|
||||
impl MemCatalog {
|
||||
|
@ -28,25 +29,15 @@ impl MemCatalog {
|
|||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
// Since this is test catalog that do not handle transaction
|
||||
// this is a help function to fake `rollback` work
|
||||
fn remove_parquet_file(&self, object_store_id: Uuid) {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
collections
|
||||
.parquet_files
|
||||
.retain(|f| f.object_store_id != object_store_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for MemCatalog {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
let c = self.collections.lock().expect("mutex poisoned");
|
||||
write!(f, "MemCatalog[ {:?} ]", c)
|
||||
f.debug_struct("MemCatalog").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Default, Debug, Clone)]
|
||||
struct MemCollections {
|
||||
kafka_topics: Vec<KafkaTopic>,
|
||||
query_pools: Vec<QueryPool>,
|
||||
|
@ -60,6 +51,22 @@ struct MemCollections {
|
|||
processed_tombstones: Vec<ProcessedTombstone>,
|
||||
}
|
||||
|
||||
/// transaction bound to an in-memory catalog.
|
||||
#[derive(Debug)]
|
||||
pub struct MemTxn {
|
||||
guard: OwnedMutexGuard<MemCollections>,
|
||||
stage: MemCollections,
|
||||
finalized: bool,
|
||||
}
|
||||
|
||||
impl Drop for MemTxn {
|
||||
fn drop(&mut self) {
|
||||
if !self.finalized {
|
||||
warn!("Dropping MemTxn w/o finalizing (commit or abort)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Catalog for MemCatalog {
|
||||
async fn setup(&self) -> Result<(), Error> {
|
||||
|
@ -67,110 +74,95 @@ impl Catalog for MemCatalog {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn query_pools(&self) -> &dyn QueryPoolRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn namespaces(&self) -> &dyn NamespaceRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tables(&self) -> &dyn TableRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn columns(&self) -> &dyn ColumnRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn sequencers(&self) -> &dyn SequencerRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn partitions(&self) -> &dyn PartitionRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tombstones(&self) -> &dyn TombstoneRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn parquet_files(&self) -> &dyn ParquetFileRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn processed_tombstones(&self) -> &dyn ProcessedTombstoneRepo {
|
||||
self
|
||||
}
|
||||
|
||||
async fn add_parquet_file_with_tombstones(
|
||||
&self,
|
||||
parquet_file: &ParquetFile,
|
||||
tombstones: &[Tombstone],
|
||||
) -> Result<(ParquetFile, Vec<ProcessedTombstone>), Error> {
|
||||
// The activities in this file must either be all succeed or all fail
|
||||
|
||||
// Create a parquet file in the catalog first
|
||||
let parquet = self
|
||||
.parquet_files()
|
||||
.create(
|
||||
None,
|
||||
parquet_file.sequencer_id,
|
||||
parquet_file.table_id,
|
||||
parquet_file.partition_id,
|
||||
parquet_file.object_store_id,
|
||||
parquet_file.min_sequence_number,
|
||||
parquet_file.max_sequence_number,
|
||||
parquet_file.min_time,
|
||||
parquet_file.max_time,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Now the parquet available, let create its dependent processed tombstones
|
||||
let processed_tombstones = self
|
||||
.processed_tombstones()
|
||||
.create_many(None, parquet.id, tombstones)
|
||||
.await;
|
||||
|
||||
if let Err(error) = processed_tombstones {
|
||||
// failed to insert processed tombstone, remove the above
|
||||
// inserted parquet file from the catalog
|
||||
self.remove_parquet_file(parquet.object_store_id);
|
||||
return Err(error);
|
||||
}
|
||||
let processed_tombstones = processed_tombstones.unwrap();
|
||||
|
||||
Ok((parquet, processed_tombstones))
|
||||
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error> {
|
||||
let guard = Arc::clone(&self.collections).lock_owned().await;
|
||||
let stage = guard.clone();
|
||||
Ok(Box::new(MemTxn {
|
||||
guard,
|
||||
stage,
|
||||
finalized: false,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KafkaTopicRepo for MemCatalog {
|
||||
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
impl TransactionFinalize for MemTxn {
|
||||
async fn commit_inplace(&mut self) -> Result<(), Error> {
|
||||
*self.guard = std::mem::take(&mut self.stage);
|
||||
self.finalized = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let topic = match collections.kafka_topics.iter().find(|t| t.name == name) {
|
||||
async fn abort_inplace(&mut self) -> Result<(), Error> {
|
||||
self.finalized = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Transaction for MemTxn {
|
||||
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn query_pools(&mut self) -> &mut dyn QueryPoolRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn namespaces(&mut self) -> &mut dyn NamespaceRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tables(&mut self) -> &mut dyn TableRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn columns(&mut self) -> &mut dyn ColumnRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn sequencers(&mut self) -> &mut dyn SequencerRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn partitions(&mut self) -> &mut dyn PartitionRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tombstones(&mut self) -> &mut dyn TombstoneRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn parquet_files(&mut self) -> &mut dyn ParquetFileRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn processed_tombstones(&mut self) -> &mut dyn ProcessedTombstoneRepo {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KafkaTopicRepo for MemTxn {
|
||||
async fn create_or_get(&mut self, name: &str) -> Result<KafkaTopic> {
|
||||
let topic = match self.stage.kafka_topics.iter().find(|t| t.name == name) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
let topic = KafkaTopic {
|
||||
id: KafkaTopicId::new(collections.kafka_topics.len() as i32 + 1),
|
||||
id: KafkaTopicId::new(self.stage.kafka_topics.len() as i32 + 1),
|
||||
name: name.to_string(),
|
||||
};
|
||||
collections.kafka_topics.push(topic);
|
||||
collections.kafka_topics.last().unwrap()
|
||||
self.stage.kafka_topics.push(topic);
|
||||
self.stage.kafka_topics.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
Ok(topic.clone())
|
||||
}
|
||||
|
||||
async fn get_by_name(&self, name: &str) -> Result<Option<KafkaTopic>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let kafka_topic = collections
|
||||
async fn get_by_name(&mut self, name: &str) -> Result<Option<KafkaTopic>> {
|
||||
let kafka_topic = self
|
||||
.stage
|
||||
.kafka_topics
|
||||
.iter()
|
||||
.find(|t| t.name == name)
|
||||
|
@ -180,19 +172,17 @@ impl KafkaTopicRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryPoolRepo for MemCatalog {
|
||||
async fn create_or_get(&self, name: &str) -> Result<QueryPool> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
|
||||
let pool = match collections.query_pools.iter().find(|t| t.name == name) {
|
||||
impl QueryPoolRepo for MemTxn {
|
||||
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool> {
|
||||
let pool = match self.stage.query_pools.iter().find(|t| t.name == name) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
let pool = QueryPool {
|
||||
id: QueryPoolId::new(collections.query_pools.len() as i16 + 1),
|
||||
id: QueryPoolId::new(self.stage.query_pools.len() as i16 + 1),
|
||||
name: name.to_string(),
|
||||
};
|
||||
collections.query_pools.push(pool);
|
||||
collections.query_pools.last().unwrap()
|
||||
self.stage.query_pools.push(pool);
|
||||
self.stage.query_pools.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -201,35 +191,34 @@ impl QueryPoolRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NamespaceRepo for MemCatalog {
|
||||
impl NamespaceRepo for MemTxn {
|
||||
async fn create(
|
||||
&self,
|
||||
&mut self,
|
||||
name: &str,
|
||||
retention_duration: &str,
|
||||
kafka_topic_id: KafkaTopicId,
|
||||
query_pool_id: QueryPoolId,
|
||||
) -> Result<Namespace> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
if collections.namespaces.iter().any(|n| n.name == name) {
|
||||
if self.stage.namespaces.iter().any(|n| n.name == name) {
|
||||
return Err(Error::NameExists {
|
||||
name: name.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let namespace = Namespace {
|
||||
id: NamespaceId::new(collections.namespaces.len() as i32 + 1),
|
||||
id: NamespaceId::new(self.stage.namespaces.len() as i32 + 1),
|
||||
name: name.to_string(),
|
||||
kafka_topic_id,
|
||||
query_pool_id,
|
||||
retention_duration: Some(retention_duration.to_string()),
|
||||
};
|
||||
collections.namespaces.push(namespace);
|
||||
Ok(collections.namespaces.last().unwrap().clone())
|
||||
self.stage.namespaces.push(namespace);
|
||||
Ok(self.stage.namespaces.last().unwrap().clone())
|
||||
}
|
||||
|
||||
async fn get_by_name(&self, name: &str) -> Result<Option<Namespace>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
Ok(collections
|
||||
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
|
||||
Ok(self
|
||||
.stage
|
||||
.namespaces
|
||||
.iter()
|
||||
.find(|n| n.name == name)
|
||||
|
@ -238,11 +227,10 @@ impl NamespaceRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableRepo for MemCatalog {
|
||||
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
|
||||
let table = match collections
|
||||
impl TableRepo for MemTxn {
|
||||
async fn create_or_get(&mut self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
|
||||
let table = match self
|
||||
.stage
|
||||
.tables
|
||||
.iter()
|
||||
.find(|t| t.name == name && t.namespace_id == namespace_id)
|
||||
|
@ -250,21 +238,21 @@ impl TableRepo for MemCatalog {
|
|||
Some(t) => t,
|
||||
None => {
|
||||
let table = Table {
|
||||
id: TableId::new(collections.tables.len() as i32 + 1),
|
||||
id: TableId::new(self.stage.tables.len() as i32 + 1),
|
||||
namespace_id,
|
||||
name: name.to_string(),
|
||||
};
|
||||
collections.tables.push(table);
|
||||
collections.tables.last().unwrap()
|
||||
self.stage.tables.push(table);
|
||||
self.stage.tables.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
Ok(table.clone())
|
||||
}
|
||||
|
||||
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let tables: Vec<_> = collections
|
||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
|
||||
let tables: Vec<_> = self
|
||||
.stage
|
||||
.tables
|
||||
.iter()
|
||||
.filter(|t| t.namespace_id == namespace_id)
|
||||
|
@ -275,16 +263,15 @@ impl TableRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ColumnRepo for MemCatalog {
|
||||
impl ColumnRepo for MemTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
name: &str,
|
||||
table_id: TableId,
|
||||
column_type: ColumnType,
|
||||
) -> Result<Column> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
|
||||
let column = match collections
|
||||
let column = match self
|
||||
.stage
|
||||
.columns
|
||||
.iter()
|
||||
.find(|t| t.name == name && t.table_id == table_id)
|
||||
|
@ -302,31 +289,31 @@ impl ColumnRepo for MemCatalog {
|
|||
}
|
||||
None => {
|
||||
let column = Column {
|
||||
id: ColumnId::new(collections.columns.len() as i32 + 1),
|
||||
id: ColumnId::new(self.stage.columns.len() as i32 + 1),
|
||||
table_id,
|
||||
name: name.to_string(),
|
||||
column_type: column_type as i16,
|
||||
};
|
||||
collections.columns.push(column);
|
||||
collections.columns.last().unwrap()
|
||||
self.stage.columns.push(column);
|
||||
self.stage.columns.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
Ok(column.clone())
|
||||
}
|
||||
|
||||
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
|
||||
let table_ids: Vec<_> = collections
|
||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
||||
let table_ids: Vec<_> = self
|
||||
.stage
|
||||
.tables
|
||||
.iter()
|
||||
.filter(|t| t.namespace_id == namespace_id)
|
||||
.map(|t| t.id)
|
||||
.collect();
|
||||
println!("tables: {:?}", collections.tables);
|
||||
println!("tables: {:?}", self.stage.tables);
|
||||
println!("table_ids: {:?}", table_ids);
|
||||
let columns: Vec<_> = collections
|
||||
let columns: Vec<_> = self
|
||||
.stage
|
||||
.columns
|
||||
.iter()
|
||||
.filter(|c| table_ids.contains(&c.table_id))
|
||||
|
@ -338,15 +325,14 @@ impl ColumnRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SequencerRepo for MemCatalog {
|
||||
impl SequencerRepo for MemTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
topic: &KafkaTopic,
|
||||
partition: KafkaPartition,
|
||||
) -> Result<Sequencer> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
|
||||
let sequencer = match collections
|
||||
let sequencer = match self
|
||||
.stage
|
||||
.sequencers
|
||||
.iter()
|
||||
.find(|s| s.kafka_topic_id == topic.id && s.kafka_partition == partition)
|
||||
|
@ -354,13 +340,13 @@ impl SequencerRepo for MemCatalog {
|
|||
Some(t) => t,
|
||||
None => {
|
||||
let sequencer = Sequencer {
|
||||
id: SequencerId::new(collections.sequencers.len() as i16 + 1),
|
||||
id: SequencerId::new(self.stage.sequencers.len() as i16 + 1),
|
||||
kafka_topic_id: topic.id,
|
||||
kafka_partition: partition,
|
||||
min_unpersisted_sequence_number: 0,
|
||||
};
|
||||
collections.sequencers.push(sequencer);
|
||||
collections.sequencers.last().unwrap()
|
||||
self.stage.sequencers.push(sequencer);
|
||||
self.stage.sequencers.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -368,12 +354,12 @@ impl SequencerRepo for MemCatalog {
|
|||
}
|
||||
|
||||
async fn get_by_topic_id_and_partition(
|
||||
&self,
|
||||
&mut self,
|
||||
topic_id: KafkaTopicId,
|
||||
partition: KafkaPartition,
|
||||
) -> Result<Option<Sequencer>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let sequencer = collections
|
||||
let sequencer = self
|
||||
.stage
|
||||
.sequencers
|
||||
.iter()
|
||||
.find(|s| s.kafka_topic_id == topic_id && s.kafka_partition == partition)
|
||||
|
@ -381,14 +367,13 @@ impl SequencerRepo for MemCatalog {
|
|||
Ok(sequencer)
|
||||
}
|
||||
|
||||
async fn list(&self) -> Result<Vec<Sequencer>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
Ok(collections.sequencers.clone())
|
||||
async fn list(&mut self) -> Result<Vec<Sequencer>> {
|
||||
Ok(self.stage.sequencers.clone())
|
||||
}
|
||||
|
||||
async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let sequencers: Vec<_> = collections
|
||||
async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
||||
let sequencers: Vec<_> = self
|
||||
.stage
|
||||
.sequencers
|
||||
.iter()
|
||||
.filter(|s| s.kafka_topic_id == topic.id)
|
||||
|
@ -399,36 +384,35 @@ impl SequencerRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PartitionRepo for MemCatalog {
|
||||
impl PartitionRepo for MemTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
key: &str,
|
||||
sequencer_id: SequencerId,
|
||||
table_id: TableId,
|
||||
) -> Result<Partition> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
let partition = match collections.partitions.iter().find(|p| {
|
||||
let partition = match self.stage.partitions.iter().find(|p| {
|
||||
p.partition_key == key && p.sequencer_id == sequencer_id && p.table_id == table_id
|
||||
}) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let p = Partition {
|
||||
id: PartitionId::new(collections.partitions.len() as i64 + 1),
|
||||
id: PartitionId::new(self.stage.partitions.len() as i64 + 1),
|
||||
sequencer_id,
|
||||
table_id,
|
||||
partition_key: key.to_string(),
|
||||
};
|
||||
collections.partitions.push(p);
|
||||
collections.partitions.last().unwrap()
|
||||
self.stage.partitions.push(p);
|
||||
self.stage.partitions.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
Ok(partition.clone())
|
||||
}
|
||||
|
||||
async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let partitions: Vec<_> = collections
|
||||
async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
||||
let partitions: Vec<_> = self
|
||||
.stage
|
||||
.partitions
|
||||
.iter()
|
||||
.filter(|p| p.sequencer_id == sequencer_id)
|
||||
|
@ -438,24 +422,26 @@ impl PartitionRepo for MemCatalog {
|
|||
}
|
||||
|
||||
async fn partition_info_by_id(
|
||||
&self,
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Option<PartitionInfo>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let partition = collections
|
||||
let partition = self
|
||||
.stage
|
||||
.partitions
|
||||
.iter()
|
||||
.find(|p| p.id == partition_id)
|
||||
.cloned();
|
||||
|
||||
if let Some(partition) = partition {
|
||||
let table = collections
|
||||
let table = self
|
||||
.stage
|
||||
.tables
|
||||
.iter()
|
||||
.find(|t| t.id == partition.table_id)
|
||||
.cloned();
|
||||
if let Some(table) = table {
|
||||
let namespace = collections
|
||||
let namespace = self
|
||||
.stage
|
||||
.namespaces
|
||||
.iter()
|
||||
.find(|n| n.id == table.namespace_id)
|
||||
|
@ -475,9 +461,9 @@ impl PartitionRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TombstoneRepo for MemCatalog {
|
||||
impl TombstoneRepo for MemTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
sequencer_id: SequencerId,
|
||||
sequence_number: SequenceNumber,
|
||||
|
@ -485,8 +471,7 @@ impl TombstoneRepo for MemCatalog {
|
|||
max_time: Timestamp,
|
||||
predicate: &str,
|
||||
) -> Result<Tombstone> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
let tombstone = match collections.tombstones.iter().find(|t| {
|
||||
let tombstone = match self.stage.tombstones.iter().find(|t| {
|
||||
t.table_id == table_id
|
||||
&& t.sequencer_id == sequencer_id
|
||||
&& t.sequence_number == sequence_number
|
||||
|
@ -494,7 +479,7 @@ impl TombstoneRepo for MemCatalog {
|
|||
Some(t) => t,
|
||||
None => {
|
||||
let t = Tombstone {
|
||||
id: TombstoneId::new(collections.tombstones.len() as i64 + 1),
|
||||
id: TombstoneId::new(self.stage.tombstones.len() as i64 + 1),
|
||||
table_id,
|
||||
sequencer_id,
|
||||
sequence_number,
|
||||
|
@ -502,8 +487,8 @@ impl TombstoneRepo for MemCatalog {
|
|||
max_time,
|
||||
serialized_predicate: predicate.to_string(),
|
||||
};
|
||||
collections.tombstones.push(t);
|
||||
collections.tombstones.last().unwrap()
|
||||
self.stage.tombstones.push(t);
|
||||
self.stage.tombstones.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -511,12 +496,12 @@ impl TombstoneRepo for MemCatalog {
|
|||
}
|
||||
|
||||
async fn list_tombstones_by_sequencer_greater_than(
|
||||
&self,
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
sequence_number: SequenceNumber,
|
||||
) -> Result<Vec<Tombstone>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let tombstones: Vec<_> = collections
|
||||
let tombstones: Vec<_> = self
|
||||
.stage
|
||||
.tombstones
|
||||
.iter()
|
||||
.filter(|t| t.sequencer_id == sequencer_id && t.sequence_number > sequence_number)
|
||||
|
@ -527,10 +512,9 @@ impl TombstoneRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ParquetFileRepo for MemCatalog {
|
||||
impl ParquetFileRepo for MemTxn {
|
||||
async fn create(
|
||||
&self,
|
||||
_txt: Option<&mut Transaction<'_, Postgres>>,
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
|
@ -540,8 +524,8 @@ impl ParquetFileRepo for MemCatalog {
|
|||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
) -> Result<ParquetFile> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
if collections
|
||||
if self
|
||||
.stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.any(|f| f.object_store_id == object_store_id)
|
||||
|
@ -550,7 +534,7 @@ impl ParquetFileRepo for MemCatalog {
|
|||
}
|
||||
|
||||
let parquet_file = ParquetFile {
|
||||
id: ParquetFileId::new(collections.parquet_files.len() as i64 + 1),
|
||||
id: ParquetFileId::new(self.stage.parquet_files.len() as i64 + 1),
|
||||
sequencer_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
|
@ -561,14 +545,12 @@ impl ParquetFileRepo for MemCatalog {
|
|||
max_time,
|
||||
to_delete: false,
|
||||
};
|
||||
collections.parquet_files.push(parquet_file);
|
||||
Ok(*collections.parquet_files.last().unwrap())
|
||||
self.stage.parquet_files.push(parquet_file);
|
||||
Ok(*self.stage.parquet_files.last().unwrap())
|
||||
}
|
||||
|
||||
async fn flag_for_delete(&self, id: ParquetFileId) -> Result<()> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
|
||||
match collections.parquet_files.iter_mut().find(|p| p.id == id) {
|
||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
||||
match self.stage.parquet_files.iter_mut().find(|p| p.id == id) {
|
||||
Some(f) => f.to_delete = true,
|
||||
None => return Err(Error::ParquetRecordNotFound { id }),
|
||||
}
|
||||
|
@ -577,12 +559,12 @@ impl ParquetFileRepo for MemCatalog {
|
|||
}
|
||||
|
||||
async fn list_by_sequencer_greater_than(
|
||||
&self,
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
sequence_number: SequenceNumber,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let files: Vec<_> = collections
|
||||
let files: Vec<_> = self
|
||||
.stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| f.sequencer_id == sequencer_id && f.max_sequence_number > sequence_number)
|
||||
|
@ -591,14 +573,12 @@ impl ParquetFileRepo for MemCatalog {
|
|||
Ok(files)
|
||||
}
|
||||
|
||||
async fn exist(&self, id: ParquetFileId) -> Result<bool> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
Ok(collections.parquet_files.iter().any(|f| f.id == id))
|
||||
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
|
||||
Ok(self.stage.parquet_files.iter().any(|f| f.id == id))
|
||||
}
|
||||
|
||||
async fn count(&self) -> Result<i64> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let count = collections.parquet_files.len();
|
||||
async fn count(&mut self) -> Result<i64> {
|
||||
let count = self.stage.parquet_files.len();
|
||||
let count_i64 = i64::try_from(count);
|
||||
if count_i64.is_err() {
|
||||
return Err(Error::InvalidValue { value: count });
|
||||
|
@ -608,17 +588,15 @@ impl ParquetFileRepo for MemCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProcessedTombstoneRepo for MemCatalog {
|
||||
impl ProcessedTombstoneRepo for MemTxn {
|
||||
async fn create_many(
|
||||
&self,
|
||||
_txt: Option<&mut Transaction<'_, Postgres>>,
|
||||
&mut self,
|
||||
parquet_file_id: ParquetFileId,
|
||||
tombstones: &[Tombstone],
|
||||
) -> Result<Vec<ProcessedTombstone>> {
|
||||
let mut collections = self.collections.lock().expect("mutex poisoned");
|
||||
|
||||
// check if the parquet file available
|
||||
if !collections
|
||||
if !self
|
||||
.stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.any(|f| f.id == parquet_file_id)
|
||||
|
@ -631,13 +609,14 @@ impl ProcessedTombstoneRepo for MemCatalog {
|
|||
let mut processed_tombstones = vec![];
|
||||
for tombstone in tombstones {
|
||||
// check if tomstone exists
|
||||
if !collections.tombstones.iter().any(|f| f.id == tombstone.id) {
|
||||
if !self.stage.tombstones.iter().any(|f| f.id == tombstone.id) {
|
||||
return Err(Error::TombstoneNotFound {
|
||||
id: tombstone.id.get(),
|
||||
});
|
||||
}
|
||||
|
||||
if collections
|
||||
if self
|
||||
.stage
|
||||
.processed_tombstones
|
||||
.iter()
|
||||
.any(|pt| pt.tombstone_id == tombstone.id && pt.parquet_file_id == parquet_file_id)
|
||||
|
@ -660,7 +639,7 @@ impl ProcessedTombstoneRepo for MemCatalog {
|
|||
let return_processed_tombstones = processed_tombstones.clone();
|
||||
|
||||
// Add to the catalog
|
||||
collections
|
||||
self.stage
|
||||
.processed_tombstones
|
||||
.append(&mut processed_tombstones);
|
||||
|
||||
|
@ -668,20 +647,19 @@ impl ProcessedTombstoneRepo for MemCatalog {
|
|||
}
|
||||
|
||||
async fn exist(
|
||||
&self,
|
||||
&mut self,
|
||||
parquet_file_id: ParquetFileId,
|
||||
tombstone_id: TombstoneId,
|
||||
) -> Result<bool> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
Ok(collections
|
||||
Ok(self
|
||||
.stage
|
||||
.processed_tombstones
|
||||
.iter()
|
||||
.any(|f| f.parquet_file_id == parquet_file_id && f.tombstone_id == tombstone_id))
|
||||
}
|
||||
|
||||
async fn count(&self) -> Result<i64> {
|
||||
let collections = self.collections.lock().expect("mutex poisoned");
|
||||
let count = collections.processed_tombstones.len();
|
||||
async fn count(&mut self) -> Result<i64> {
|
||||
let count = self.stage.processed_tombstones.len();
|
||||
let count_i64 = i64::try_from(count);
|
||||
if count_i64.is_err() {
|
||||
return Err(Error::InvalidValue { value: count });
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
//! A Postgres backed implementation of the Catalog
|
||||
|
||||
use crate::interface::{
|
||||
Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
|
||||
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId,
|
||||
ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo, ProcessedTombstone,
|
||||
ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo, Result, SequenceNumber,
|
||||
Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone,
|
||||
TombstoneId, TombstoneRepo,
|
||||
sealed::TransactionFinalize, Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition,
|
||||
KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile,
|
||||
ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo,
|
||||
ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo, Result,
|
||||
SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp,
|
||||
Tombstone, TombstoneId, TombstoneRepo, Transaction,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use sqlx::{
|
||||
migrate::Migrator, postgres::PgPoolOptions, Executor, Pool, Postgres, Row, Transaction,
|
||||
};
|
||||
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Executor, Pool, Postgres, Row};
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -24,7 +22,7 @@ pub const SCHEMA_NAME: &str = "iox_catalog";
|
|||
|
||||
static MIGRATOR: Migrator = sqlx::migrate!();
|
||||
|
||||
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
|
||||
/// PostgreSQL catalog.
|
||||
#[derive(Debug)]
|
||||
pub struct PostgresCatalog {
|
||||
pool: Pool<Postgres>,
|
||||
|
@ -72,6 +70,50 @@ impl PostgresCatalog {
|
|||
}
|
||||
}
|
||||
|
||||
/// transaction for [`PostgresCatalog`].
|
||||
#[derive(Debug)]
|
||||
pub struct PostgresTxn {
|
||||
transaction: Option<sqlx::Transaction<'static, Postgres>>,
|
||||
}
|
||||
|
||||
impl PostgresTxn {
|
||||
fn transaction(&mut self) -> &mut sqlx::Transaction<'static, Postgres> {
|
||||
self.transaction.as_mut().expect("Not yet finalized")
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PostgresTxn {
|
||||
fn drop(&mut self) {
|
||||
if self.transaction.is_some() {
|
||||
warn!("Dropping PostgresTxn w/o finalizing (commit or abort)");
|
||||
|
||||
// SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so we don't need to spawn
|
||||
// a task here to call `rollback` manually.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TransactionFinalize for PostgresTxn {
|
||||
async fn commit_inplace(&mut self) -> Result<(), Error> {
|
||||
self.transaction
|
||||
.take()
|
||||
.expect("Not yet finalized")
|
||||
.commit()
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn abort_inplace(&mut self) -> Result<(), Error> {
|
||||
self.transaction
|
||||
.take()
|
||||
.expect("Not yet finalized")
|
||||
.rollback()
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Catalog for PostgresCatalog {
|
||||
async fn setup(&self) -> Result<(), Error> {
|
||||
|
@ -83,111 +125,65 @@ impl Catalog for PostgresCatalog {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
|
||||
self
|
||||
}
|
||||
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error> {
|
||||
let transaction = self
|
||||
.pool
|
||||
.begin()
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
fn query_pools(&self) -> &dyn QueryPoolRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn namespaces(&self) -> &dyn NamespaceRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tables(&self) -> &dyn TableRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn columns(&self) -> &dyn ColumnRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn sequencers(&self) -> &dyn SequencerRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn partitions(&self) -> &dyn PartitionRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tombstones(&self) -> &dyn TombstoneRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn parquet_files(&self) -> &dyn ParquetFileRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn processed_tombstones(&self) -> &dyn ProcessedTombstoneRepo {
|
||||
self
|
||||
}
|
||||
|
||||
async fn add_parquet_file_with_tombstones(
|
||||
&self,
|
||||
parquet_file: &ParquetFile,
|
||||
tombstones: &[Tombstone],
|
||||
) -> Result<(ParquetFile, Vec<ProcessedTombstone>), Error> {
|
||||
// Start a transaction
|
||||
let txt = self.pool.begin().await;
|
||||
if let Err(error) = txt {
|
||||
return Err(Error::StartTransaction { source: error });
|
||||
}
|
||||
let mut txt = txt.unwrap();
|
||||
|
||||
// create a parquet file in the catalog first
|
||||
let parquet = self
|
||||
.parquet_files()
|
||||
.create(
|
||||
Some(&mut txt),
|
||||
parquet_file.sequencer_id,
|
||||
parquet_file.table_id,
|
||||
parquet_file.partition_id,
|
||||
parquet_file.object_store_id,
|
||||
parquet_file.min_sequence_number,
|
||||
parquet_file.max_sequence_number,
|
||||
parquet_file.min_time,
|
||||
parquet_file.max_time,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(error) = parquet {
|
||||
// Error while adding parquet file into the catalog, stop the transaction
|
||||
warn!(object_store_id=?parquet_file.object_store_id.to_string(), "{}", error.to_string());
|
||||
let _rollback = txt.rollback().await;
|
||||
return Err(error);
|
||||
}
|
||||
let parquet = parquet.unwrap();
|
||||
|
||||
// Now the parquet available, create its processed tombstones
|
||||
let processed_tombstones = self
|
||||
.processed_tombstones()
|
||||
.create_many(Some(&mut txt), parquet.id, tombstones)
|
||||
.await;
|
||||
|
||||
let processed_tombstones = match processed_tombstones {
|
||||
Ok(processed_tombstones) => processed_tombstones,
|
||||
Err(e) => {
|
||||
// Error while adding processed tombstones
|
||||
warn!(
|
||||
"Error while adding processed tombstone: {}. Transaction stops.",
|
||||
e.to_string()
|
||||
);
|
||||
let _rollback = txt.rollback().await;
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
// Commit the transaction
|
||||
let _commit = txt.commit().await;
|
||||
|
||||
Ok((parquet, processed_tombstones))
|
||||
Ok(Box::new(PostgresTxn {
|
||||
transaction: Some(transaction),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KafkaTopicRepo for PostgresCatalog {
|
||||
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic> {
|
||||
impl Transaction for PostgresTxn {
|
||||
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn query_pools(&mut self) -> &mut dyn QueryPoolRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn namespaces(&mut self) -> &mut dyn NamespaceRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tables(&mut self) -> &mut dyn TableRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn columns(&mut self) -> &mut dyn ColumnRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn sequencers(&mut self) -> &mut dyn SequencerRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn partitions(&mut self) -> &mut dyn PartitionRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn tombstones(&mut self) -> &mut dyn TombstoneRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn parquet_files(&mut self) -> &mut dyn ParquetFileRepo {
|
||||
self
|
||||
}
|
||||
|
||||
fn processed_tombstones(&mut self) -> &mut dyn ProcessedTombstoneRepo {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KafkaTopicRepo for PostgresTxn {
|
||||
async fn create_or_get(&mut self, name: &str) -> Result<KafkaTopic> {
|
||||
let rec = sqlx::query_as::<_, KafkaTopic>(
|
||||
r#"
|
||||
INSERT INTO kafka_topic ( name )
|
||||
|
@ -197,21 +193,21 @@ DO UPDATE SET name = kafka_topic.name RETURNING *;
|
|||
"#,
|
||||
)
|
||||
.bind(&name) // $1
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(rec)
|
||||
}
|
||||
|
||||
async fn get_by_name(&self, name: &str) -> Result<Option<KafkaTopic>> {
|
||||
async fn get_by_name(&mut self, name: &str) -> Result<Option<KafkaTopic>> {
|
||||
let rec = sqlx::query_as::<_, KafkaTopic>(
|
||||
r#"
|
||||
SELECT * FROM kafka_topic WHERE name = $1;
|
||||
"#,
|
||||
)
|
||||
.bind(&name) // $1
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await;
|
||||
|
||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||
|
@ -225,8 +221,8 @@ SELECT * FROM kafka_topic WHERE name = $1;
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryPoolRepo for PostgresCatalog {
|
||||
async fn create_or_get(&self, name: &str) -> Result<QueryPool> {
|
||||
impl QueryPoolRepo for PostgresTxn {
|
||||
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool> {
|
||||
let rec = sqlx::query_as::<_, QueryPool>(
|
||||
r#"
|
||||
INSERT INTO query_pool ( name )
|
||||
|
@ -236,7 +232,7 @@ DO UPDATE SET name = query_pool.name RETURNING *;
|
|||
"#,
|
||||
)
|
||||
.bind(&name) // $1
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
|
@ -245,9 +241,9 @@ DO UPDATE SET name = query_pool.name RETURNING *;
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NamespaceRepo for PostgresCatalog {
|
||||
impl NamespaceRepo for PostgresTxn {
|
||||
async fn create(
|
||||
&self,
|
||||
&mut self,
|
||||
name: &str,
|
||||
retention_duration: &str,
|
||||
kafka_topic_id: KafkaTopicId,
|
||||
|
@ -264,7 +260,7 @@ RETURNING *
|
|||
.bind(&retention_duration) // $2
|
||||
.bind(kafka_topic_id) // $3
|
||||
.bind(query_pool_id) // $4
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_unique_violation(&e) {
|
||||
|
@ -281,14 +277,14 @@ RETURNING *
|
|||
Ok(rec)
|
||||
}
|
||||
|
||||
async fn get_by_name(&self, name: &str) -> Result<Option<Namespace>> {
|
||||
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
|
||||
let rec = sqlx::query_as::<_, Namespace>(
|
||||
r#"
|
||||
SELECT * FROM namespace WHERE name = $1;
|
||||
"#,
|
||||
)
|
||||
.bind(&name) // $1
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await;
|
||||
|
||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||
|
@ -302,8 +298,8 @@ SELECT * FROM namespace WHERE name = $1;
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableRepo for PostgresCatalog {
|
||||
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
|
||||
impl TableRepo for PostgresTxn {
|
||||
async fn create_or_get(&mut self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
|
||||
let rec = sqlx::query_as::<_, Table>(
|
||||
r#"
|
||||
INSERT INTO table_name ( name, namespace_id )
|
||||
|
@ -314,7 +310,7 @@ DO UPDATE SET name = table_name.name RETURNING *;
|
|||
)
|
||||
.bind(&name) // $1
|
||||
.bind(&namespace_id) // $2
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_fk_violation(&e) {
|
||||
|
@ -327,7 +323,7 @@ DO UPDATE SET name = table_name.name RETURNING *;
|
|||
Ok(rec)
|
||||
}
|
||||
|
||||
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
|
||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
|
||||
let rec = sqlx::query_as::<_, Table>(
|
||||
r#"
|
||||
SELECT * FROM table_name
|
||||
|
@ -335,7 +331,7 @@ WHERE namespace_id = $1;
|
|||
"#,
|
||||
)
|
||||
.bind(&namespace_id)
|
||||
.fetch_all(&self.pool)
|
||||
.fetch_all(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
|
@ -344,9 +340,9 @@ WHERE namespace_id = $1;
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ColumnRepo for PostgresCatalog {
|
||||
impl ColumnRepo for PostgresTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
name: &str,
|
||||
table_id: TableId,
|
||||
column_type: ColumnType,
|
||||
|
@ -364,7 +360,7 @@ DO UPDATE SET name = column_name.name RETURNING *;
|
|||
.bind(&name) // $1
|
||||
.bind(&table_id) // $2
|
||||
.bind(&ct) // $3
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_fk_violation(&e) {
|
||||
|
@ -385,7 +381,7 @@ DO UPDATE SET name = column_name.name RETURNING *;
|
|||
Ok(rec)
|
||||
}
|
||||
|
||||
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
||||
let rec = sqlx::query_as::<_, Column>(
|
||||
r#"
|
||||
SELECT column_name.* FROM table_name
|
||||
|
@ -394,7 +390,7 @@ WHERE table_name.namespace_id = $1;
|
|||
"#,
|
||||
)
|
||||
.bind(&namespace_id)
|
||||
.fetch_all(&self.pool)
|
||||
.fetch_all(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
|
@ -403,9 +399,9 @@ WHERE table_name.namespace_id = $1;
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SequencerRepo for PostgresCatalog {
|
||||
impl SequencerRepo for PostgresTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
topic: &KafkaTopic,
|
||||
partition: KafkaPartition,
|
||||
) -> Result<Sequencer> {
|
||||
|
@ -421,7 +417,7 @@ impl SequencerRepo for PostgresCatalog {
|
|||
)
|
||||
.bind(&topic.id) // $1
|
||||
.bind(&partition) // $2
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_fk_violation(&e) {
|
||||
|
@ -433,7 +429,7 @@ impl SequencerRepo for PostgresCatalog {
|
|||
}
|
||||
|
||||
async fn get_by_topic_id_and_partition(
|
||||
&self,
|
||||
&mut self,
|
||||
topic_id: KafkaTopicId,
|
||||
partition: KafkaPartition,
|
||||
) -> Result<Option<Sequencer>> {
|
||||
|
@ -444,7 +440,7 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2;
|
|||
)
|
||||
.bind(topic_id) // $1
|
||||
.bind(partition) // $2
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await;
|
||||
|
||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||
|
@ -456,26 +452,26 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2;
|
|||
Ok(Some(sequencer))
|
||||
}
|
||||
|
||||
async fn list(&self) -> Result<Vec<Sequencer>> {
|
||||
async fn list(&mut self) -> Result<Vec<Sequencer>> {
|
||||
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#)
|
||||
.fetch_all(&self.pool)
|
||||
.fetch_all(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn list_by_kafka_topic(&self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
||||
async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
||||
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#)
|
||||
.bind(&topic.id) // $1
|
||||
.fetch_all(&self.pool)
|
||||
.fetch_all(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PartitionRepo for PostgresCatalog {
|
||||
impl PartitionRepo for PostgresTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
key: &str,
|
||||
sequencer_id: SequencerId,
|
||||
table_id: TableId,
|
||||
|
@ -493,7 +489,7 @@ impl PartitionRepo for PostgresCatalog {
|
|||
.bind(key) // $1
|
||||
.bind(&sequencer_id) // $2
|
||||
.bind(&table_id) // $3
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_fk_violation(&e) {
|
||||
|
@ -504,16 +500,16 @@ impl PartitionRepo for PostgresCatalog {
|
|||
})
|
||||
}
|
||||
|
||||
async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
||||
async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
||||
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#)
|
||||
.bind(&sequencer_id) // $1
|
||||
.fetch_all(&self.pool)
|
||||
.fetch_all(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn partition_info_by_id(
|
||||
&self,
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Option<PartitionInfo>> {
|
||||
let info = sqlx::query(
|
||||
|
@ -526,7 +522,7 @@ impl PartitionRepo for PostgresCatalog {
|
|||
WHERE partition.id = $1;"#,
|
||||
)
|
||||
.bind(&partition_id) // $1
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
|
@ -548,9 +544,9 @@ impl PartitionRepo for PostgresCatalog {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TombstoneRepo for PostgresCatalog {
|
||||
impl TombstoneRepo for PostgresTxn {
|
||||
async fn create_or_get(
|
||||
&self,
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
sequencer_id: SequencerId,
|
||||
sequence_number: SequenceNumber,
|
||||
|
@ -574,7 +570,7 @@ impl TombstoneRepo for PostgresCatalog {
|
|||
.bind(&min_time) // $4
|
||||
.bind(&max_time) // $5
|
||||
.bind(predicate) // $6
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_fk_violation(&e) {
|
||||
|
@ -586,24 +582,23 @@ impl TombstoneRepo for PostgresCatalog {
|
|||
}
|
||||
|
||||
async fn list_tombstones_by_sequencer_greater_than(
|
||||
&self,
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
sequence_number: SequenceNumber,
|
||||
) -> Result<Vec<Tombstone>> {
|
||||
sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#)
|
||||
.bind(&sequencer_id) // $1
|
||||
.bind(&sequence_number) // $2
|
||||
.fetch_all(&self.pool)
|
||||
.fetch_all(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ParquetFileRepo for PostgresCatalog {
|
||||
impl ParquetFileRepo for PostgresTxn {
|
||||
async fn create(
|
||||
&self,
|
||||
txt: Option<&mut Transaction<'_, Postgres>>,
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
|
@ -627,30 +622,28 @@ RETURNING *
|
|||
.bind(min_sequence_number) // $5
|
||||
.bind(max_sequence_number) // $6
|
||||
.bind(min_time) // $7
|
||||
.bind(max_time); // $8
|
||||
|
||||
let rec = match txt {
|
||||
Some(txt) => rec.fetch_one(txt).await,
|
||||
None => rec.fetch_one(&self.pool).await,
|
||||
};
|
||||
|
||||
let rec = rec.map_err(|e| {
|
||||
if is_unique_violation(&e) {
|
||||
Error::FileExists { object_store_id }
|
||||
} else if is_fk_violation(&e) {
|
||||
Error::ForeignKeyViolation { source: e }
|
||||
} else {
|
||||
Error::SqlxError { source: e }
|
||||
}
|
||||
})?;
|
||||
.bind(max_time) // $8
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_unique_violation(&e) {
|
||||
Error::FileExists {
|
||||
object_store_id,
|
||||
}
|
||||
} else if is_fk_violation(&e) {
|
||||
Error::ForeignKeyViolation { source: e }
|
||||
} else {
|
||||
Error::SqlxError { source: e }
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(rec)
|
||||
}
|
||||
|
||||
async fn flag_for_delete(&self, id: ParquetFileId) -> Result<()> {
|
||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
||||
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = true WHERE id = $1;"#)
|
||||
.bind(&id) // $1
|
||||
.execute(&self.pool)
|
||||
.execute(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
|
@ -658,34 +651,34 @@ RETURNING *
|
|||
}
|
||||
|
||||
async fn list_by_sequencer_greater_than(
|
||||
&self,
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
sequence_number: SequenceNumber,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
sqlx::query_as::<_, ParquetFile>(r#"SELECT * FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 ORDER BY id;"#)
|
||||
.bind(&sequencer_id) // $1
|
||||
.bind(&sequence_number) // $2
|
||||
.fetch_all(&self.pool)
|
||||
.fetch_all(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn exist(&self, id: ParquetFileId) -> Result<bool> {
|
||||
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
|
||||
let read_result = sqlx::query_as::<_, Count>(
|
||||
r#"SELECT count(*) as count FROM parquet_file WHERE id = $1;"#,
|
||||
)
|
||||
.bind(&id) // $1
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(read_result.count > 0)
|
||||
}
|
||||
|
||||
async fn count(&self) -> Result<i64> {
|
||||
async fn count(&mut self) -> Result<i64> {
|
||||
let read_result =
|
||||
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#)
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
|
@ -694,18 +687,12 @@ RETURNING *
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProcessedTombstoneRepo for PostgresCatalog {
|
||||
impl ProcessedTombstoneRepo for PostgresTxn {
|
||||
async fn create_many(
|
||||
&self,
|
||||
txt: Option<&mut Transaction<'_, Postgres>>,
|
||||
&mut self,
|
||||
parquet_file_id: ParquetFileId,
|
||||
tombstones: &[Tombstone],
|
||||
) -> Result<Vec<ProcessedTombstone>> {
|
||||
if txt.is_none() {
|
||||
return Err(Error::NoTransaction);
|
||||
}
|
||||
let txt = txt.unwrap();
|
||||
|
||||
// no transaction provided
|
||||
// todo: we should never needs this but since right now we implement 2 catalogs,
|
||||
// postgres (for production) and mem (for testing only) that does not need to provide txt
|
||||
|
@ -721,7 +708,7 @@ impl ProcessedTombstoneRepo for PostgresCatalog {
|
|||
)
|
||||
.bind(tombstone.id) // $1
|
||||
.bind(parquet_file_id) // $2
|
||||
.fetch_one(&mut *txt)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_unique_violation(&e) {
|
||||
|
@ -743,7 +730,7 @@ impl ProcessedTombstoneRepo for PostgresCatalog {
|
|||
}
|
||||
|
||||
async fn exist(
|
||||
&self,
|
||||
&mut self,
|
||||
parquet_file_id: ParquetFileId,
|
||||
tombstone_id: TombstoneId,
|
||||
) -> Result<bool> {
|
||||
|
@ -751,17 +738,17 @@ impl ProcessedTombstoneRepo for PostgresCatalog {
|
|||
r#"SELECT count(*) as count FROM processed_tombstone WHERE parquet_file_id = $1 AND tombstone_id = $2;"#)
|
||||
.bind(&parquet_file_id) // $1
|
||||
.bind(&tombstone_id) // $2
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(read_result.count > 0)
|
||||
}
|
||||
|
||||
async fn count(&self) -> Result<i64> {
|
||||
async fn count(&mut self) -> Result<i64> {
|
||||
let read_result =
|
||||
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM processed_tombstone;"#)
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(self.transaction())
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
|
|
|
@ -91,8 +91,13 @@ where
|
|||
if self.cache.get_schema(&namespace).is_none() {
|
||||
trace!(%namespace, "namespace auto-create cache miss");
|
||||
|
||||
match self
|
||||
let mut txn = self
|
||||
.catalog
|
||||
.start_transaction()
|
||||
.await
|
||||
.map_err(NamespaceCreationError::Create)?;
|
||||
|
||||
match txn
|
||||
.namespaces()
|
||||
.create(
|
||||
namespace.as_str(),
|
||||
|
@ -103,6 +108,8 @@ where
|
|||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
txn.commit().await.map_err(NamespaceCreationError::Create)?;
|
||||
|
||||
debug!(%namespace, "created namespace");
|
||||
}
|
||||
Err(iox_catalog::interface::Error::NameExists { .. }) => {
|
||||
|
@ -110,9 +117,11 @@ where
|
|||
// namespace, or another thread raced populating the catalog
|
||||
// and beat this thread to it.
|
||||
debug!(%namespace, "spurious namespace create failed");
|
||||
txn.abort().await.map_err(NamespaceCreationError::Create)?;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error=%e, %namespace, "failed to auto-create namespace");
|
||||
txn.abort().await.map_err(NamespaceCreationError::Create)?;
|
||||
return Err(NamespaceCreationError::Create(e));
|
||||
}
|
||||
}
|
||||
|
@ -190,15 +199,19 @@ mod tests {
|
|||
|
||||
// The cache hit should mean the catalog SHOULD NOT see a create request
|
||||
// for the namespace.
|
||||
let mut txn = catalog
|
||||
.start_transaction()
|
||||
.await
|
||||
.expect("failed to start UoW");
|
||||
assert!(
|
||||
catalog
|
||||
.namespaces()
|
||||
txn.namespaces()
|
||||
.get_by_name(ns.as_str())
|
||||
.await
|
||||
.expect("lookup should not error")
|
||||
.is_none(),
|
||||
"expected no request to the catalog"
|
||||
);
|
||||
txn.abort().await.expect("failed to abort UoW");
|
||||
|
||||
// And the DML handler must be called.
|
||||
assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => {
|
||||
|
@ -230,12 +243,17 @@ mod tests {
|
|||
|
||||
// The cache miss should mean the catalog MUST see a create request for
|
||||
// the namespace.
|
||||
let got = catalog
|
||||
let mut txn = catalog
|
||||
.start_transaction()
|
||||
.await
|
||||
.expect("failed to start UoW");
|
||||
let got = txn
|
||||
.namespaces()
|
||||
.get_by_name(ns.as_str())
|
||||
.await
|
||||
.expect("lookup should not error")
|
||||
.expect("creation request should be sent to catalog");
|
||||
txn.abort().await.expect("failed to abort UoW");
|
||||
|
||||
assert_eq!(
|
||||
got,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use std::{ops::DerefMut, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
|
||||
|
@ -135,6 +135,12 @@ where
|
|||
batches: HashMap<String, MutableBatch>,
|
||||
span_ctx: Option<SpanContext>,
|
||||
) -> Result<(), Self::WriteError> {
|
||||
let mut txn = self
|
||||
.catalog
|
||||
.start_transaction()
|
||||
.await
|
||||
.map_err(SchemaError::NamespaceLookup)?;
|
||||
|
||||
// Load the namespace schema from the cache, falling back to pulling it
|
||||
// from the global catalog (if it exists).
|
||||
let schema = self.cache.get_schema(&namespace);
|
||||
|
@ -143,7 +149,7 @@ where
|
|||
None => {
|
||||
// Pull the schema from the global catalog or error if it does
|
||||
// not exist.
|
||||
let schema = get_schema_by_name(&namespace, &*self.catalog)
|
||||
let schema = get_schema_by_name(&namespace, txn.deref_mut())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!(error=%e, %namespace, "failed to retrieve namespace schema");
|
||||
|
@ -162,7 +168,7 @@ where
|
|||
let maybe_new_schema = validate_or_insert_schema(
|
||||
batches.iter().map(|(k, v)| (k.as_str(), v)),
|
||||
&schema,
|
||||
&*self.catalog,
|
||||
txn.deref_mut(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
@ -171,6 +177,8 @@ where
|
|||
})?
|
||||
.map(Arc::new);
|
||||
|
||||
txn.commit().await.map_err(SchemaError::NamespaceLookup)?;
|
||||
|
||||
trace!(%namespace, "schema validation complete");
|
||||
|
||||
// If the schema has been updated, immediately add it to the cache
|
||||
|
@ -246,8 +254,12 @@ mod tests {
|
|||
/// named [`NAMESPACE`].
|
||||
async fn create_catalog() -> Arc<dyn Catalog> {
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
|
||||
catalog
|
||||
.namespaces()
|
||||
|
||||
let mut txn = catalog
|
||||
.start_transaction()
|
||||
.await
|
||||
.expect("failed to start UoW");
|
||||
txn.namespaces()
|
||||
.create(
|
||||
NAMESPACE,
|
||||
"inf",
|
||||
|
@ -256,6 +268,8 @@ mod tests {
|
|||
)
|
||||
.await
|
||||
.expect("failed to create test namespace");
|
||||
txn.commit().await.expect("failed to commit UoW");
|
||||
|
||||
catalog
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue