From c6e374a025c034bdb1c9482029ef596607e3d119 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 15 Feb 2022 10:15:36 +0000 Subject: [PATCH] feat: allow catalog access w/o a transaction (#3735) * feat: allow catalog access w/o a transaction Now the caller has the full control if they want to use a transaction or not. * fix: remove non-transaction-safe `create_many` * fix: remove unnecessary transactions --- influxdb_iox/src/commands/catalog/topic.rs | 5 +- ingester/src/data.rs | 20 +- iox_catalog/src/interface.rs | 231 ++++++------ iox_catalog/src/lib.rs | 36 +- iox_catalog/src/mem.rs | 340 +++++++++++------- iox_catalog/src/postgres.rs | 255 ++++++++----- router2/src/dml_handlers/ns_autocreation.rs | 29 +- router2/src/dml_handlers/schema_validation.rs | 21 +- 8 files changed, 527 insertions(+), 410 deletions(-) diff --git a/influxdb_iox/src/commands/catalog/topic.rs b/influxdb_iox/src/commands/catalog/topic.rs index 2c57ab995a..4eae7ced4e 100644 --- a/influxdb_iox/src/commands/catalog/topic.rs +++ b/influxdb_iox/src/commands/catalog/topic.rs @@ -47,9 +47,8 @@ pub async fn command(config: Config) -> Result<(), Error> { match config.command { Command::Update(update) => { let catalog = update.catalog_dsn.get_catalog("cli").await?; - let mut txn = catalog.start_transaction().await?; - let topic = txn.kafka_topics().create_or_get(&update.db_name).await?; - txn.commit().await?; + let mut repos = catalog.repositories().await; + let topic = repos.kafka_topics().create_or_get(&update.db_name).await?; println!("{}", topic.id); Ok(()) } diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 653aeaf381..306962d1db 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -168,14 +168,13 @@ impl SequencerData { namespace: &str, catalog: &dyn Catalog, ) -> Result> { - let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?; - let namespace = txn + let mut repos = catalog.repositories().await; + let namespace = repos .namespaces() .get_by_name(namespace) .await .context(CatalogSnafu)? .context(NamespaceNotFoundSnafu { namespace })?; - txn.commit().await.context(CatalogSnafu)?; let mut n = self.namespaces.write(); let data = Arc::clone( @@ -257,13 +256,12 @@ impl NamespaceData { table_name: &str, catalog: &dyn Catalog, ) -> Result> { - let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?; - let table = txn + let mut repos = catalog.repositories().await; + let table = repos .tables() .create_or_get(table_name, self.namespace_id) .await .context(CatalogSnafu)?; - txn.commit().await.context(CatalogSnafu)?; let mut t = self.tables.write(); let data = Arc::clone( @@ -336,8 +334,8 @@ impl TableData { let min_time = Timestamp::new(predicate.range.start()); let max_time = Timestamp::new(predicate.range.end()); - let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?; - let tombstone = txn + let mut repos = catalog.repositories().await; + let tombstone = repos .tombstones() .create_or_get( self.table_id, @@ -349,7 +347,6 @@ impl TableData { ) .await .context(CatalogSnafu)?; - txn.commit().await.context(CatalogSnafu)?; let partitions = self.partition_data.read(); for data in partitions.values() { @@ -371,13 +368,12 @@ impl TableData { sequencer_id: SequencerId, catalog: &dyn Catalog, ) -> Result> { - let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?; - let partition = txn + let mut repos = catalog.repositories().await; + let partition = repos .partitions() .create_or_get(partition_key, sequencer_id, self.table_id) .await .context(CatalogSnafu)?; - txn.commit().await.context(CatalogSnafu)?; let mut p = self.partition_data.write(); let data = Arc::new(PartitionData::new(partition.id)); p.insert(partition.partition_key, Arc::clone(&data)); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 8ec68f871c..2394add5d8 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -314,6 +314,9 @@ pub trait Catalog: Send + Sync + Debug { /// transactions might be limited per catalog, so you MUST NOT rely on the ability to create multiple transactions in /// parallel for correctness but only for scaling. async fn start_transaction(&self) -> Result, Error>; + + /// Access the repositories w/o a transaction scope. + async fn repositories(&self) -> Box; } /// Secret module for [sealed traits]. @@ -344,22 +347,12 @@ pub(crate) mod sealed { /// transaction MIGHT be poisoned and will return errors for all operations, depending on the backend. /// /// -/// # Repositories -/// The methods (e.g. "get or create") for handling entities (e.g. namespaces, tombstones, ...) are grouped into -/// *repositories* with one *repository* per entity. A repository can be thought of a collection of a single entity. -/// Getting repositories from the transaction is cheap. -/// -/// Note that a repository might internally map to a wide range of different storage abstractions, ranging from one or -/// more SQL tables over key-value key spaces to simple in-memory vectors. The user should and must not care how these -/// are implemented. -/// -/// /// # Drop /// Dropping a transaction without calling [`commit`](Self::commit) or [`abort`](Self::abort) will abort the /// transaction. However resources might not be released immediately, so it is adviced to always call /// [`abort`](Self::abort) when you want to enforce that. Dropping w/o commiting/aborting will also log a warning. #[async_trait] -pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize { +pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection { /// Commit transaction. /// /// # Error Handling @@ -376,7 +369,22 @@ pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize { async fn abort(mut self: Box) -> Result<(), Error> { self.abort_inplace().await } +} +impl Transaction for T where T: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection +{} + +/// Collection of the different repositories that the catalog offers. +/// +/// The methods (e.g. "get or create") for handling entities (e.g. namespaces, tombstones, ...) are grouped into +/// *repositories* with one *repository* per entity. A repository can be thought of a collection of a single entity. +/// Getting repositories from the transaction is cheap. +/// +/// Note that a repository might internally map to a wide range of different storage abstractions, ranging from one or +/// more SQL tables over key-value key spaces to simple in-memory vectors. The user should and must not care how these +/// are implemented. +#[async_trait] +pub trait RepoCollection: Send + Sync + Debug { /// repo for kafka topics fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo; @@ -588,12 +596,12 @@ pub trait ParquetFileRepo: Send + Sync { /// Functions for working with processed tombstone pointers in the catalog #[async_trait] pub trait ProcessedTombstoneRepo: Send + Sync { - /// create processed tombstones - async fn create_many( + /// create a processed tombstone + async fn create( &mut self, parquet_file_id: ParquetFileId, - tombstones: &[Tombstone], - ) -> Result>; + tombstone_id: TombstoneId, + ) -> Result; /// Verify if a processed tombstone exists in the catalog async fn exist( @@ -667,16 +675,19 @@ impl NamespaceSchema { } /// Gets the namespace schema including all tables and columns. -pub async fn get_schema_by_name(name: &str, txn: &mut dyn Transaction) -> Result { - let namespace = txn +pub async fn get_schema_by_name(name: &str, repos: &mut R) -> Result +where + R: RepoCollection + ?Sized, +{ + let namespace = repos .namespaces() .get_by_name(name) .await? .context(NamespaceNotFoundSnafu { name })?; // get the columns first just in case someone else is creating schema while we're doing this. - let columns = txn.columns().list_by_namespace_id(namespace.id).await?; - let tables = txn.tables().list_by_namespace_id(namespace.id).await?; + let columns = repos.columns().list_by_namespace_id(namespace.id).await?; + let tables = repos.tables().list_by_namespace_id(namespace.id).await?; let mut namespace = NamespaceSchema::new( namespace.id, @@ -1043,8 +1054,8 @@ pub(crate) mod test_helpers { } async fn test_kafka_topic(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka_repo = txn.kafka_topics(); + let mut repos = catalog.repositories().await; + let kafka_repo = repos.kafka_topics(); let k = kafka_repo.create_or_get("foo").await.unwrap(); assert!(k.id > KafkaTopicId::new(0)); @@ -1055,40 +1066,34 @@ pub(crate) mod test_helpers { assert_eq!(k3, k); let k3 = kafka_repo.get_by_name("asdf").await.unwrap(); assert!(k3.is_none()); - - txn.commit().await.unwrap(); } async fn test_query_pool(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let query_repo = txn.query_pools(); + let mut repos = catalog.repositories().await; + let query_repo = repos.query_pools(); let q = query_repo.create_or_get("foo").await.unwrap(); assert!(q.id > QueryPoolId::new(0)); assert_eq!(q.name, "foo"); let q2 = query_repo.create_or_get("foo").await.unwrap(); assert_eq!(q, q2); - - txn.commit().await.unwrap(); } async fn test_namespace(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace_name = "test_namespace"; - let namespace = txn + let namespace = repos .namespaces() .create(namespace_name, "inf", kafka.id, pool.id) .await .unwrap(); assert!(namespace.id > NamespaceId::new(0)); assert_eq!(namespace.name, namespace_name); - txn.commit().await.unwrap(); - let mut txn = catalog.start_transaction().await.unwrap(); - let conflict = txn + let conflict = repos .namespaces() .create(namespace_name, "inf", kafka.id, pool.id) .await; @@ -1096,36 +1101,33 @@ pub(crate) mod test_helpers { conflict.unwrap_err(), Error::NameExists { name: _ } )); - txn.abort().await.unwrap(); - let mut txn = catalog.start_transaction().await.unwrap(); - let found = txn + let found = repos .namespaces() .get_by_name(namespace_name) .await .unwrap() .expect("namespace should be there"); assert_eq!(namespace, found); - txn.commit().await.unwrap(); } async fn test_table(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); - let namespace = txn + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); + let namespace = repos .namespaces() .create("namespace_table_test", "inf", kafka.id, pool.id) .await .unwrap(); // test we can create or get a table - let t = txn + let t = repos .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let tt = txn + let tt = repos .tables() .create_or_get("test_table", namespace.id) .await @@ -1133,7 +1135,7 @@ pub(crate) mod test_helpers { assert!(t.id > TableId::new(0)); assert_eq!(t, tt); - let tables = txn + let tables = repos .tables() .list_by_namespace_id(namespace.id) .await @@ -1141,33 +1143,31 @@ pub(crate) mod test_helpers { assert_eq!(vec![t], tables); // test we can create a table of the same name in a different namespace - let namespace2 = txn + let namespace2 = repos .namespaces() .create("two", "inf", kafka.id, pool.id) .await .unwrap(); assert_ne!(namespace, namespace2); - let test_table = txn + let test_table = repos .tables() .create_or_get("test_table", namespace2.id) .await .unwrap(); assert_ne!(tt, test_table); assert_eq!(test_table.namespace_id, namespace2.id); - - txn.commit().await.unwrap(); } async fn test_column(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); - let namespace = txn + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); + let namespace = repos .namespaces() .create("namespace_column_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = txn + let table = repos .tables() .create_or_get("test_table", namespace.id) .await @@ -1175,12 +1175,12 @@ pub(crate) mod test_helpers { assert_eq!(table.namespace_id, namespace.id); // test we can create or get a column - let c = txn + let c = repos .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await .unwrap(); - let cc = txn + let cc = repos .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await @@ -1189,7 +1189,7 @@ pub(crate) mod test_helpers { assert_eq!(c, cc); // test that attempting to create an already defined column of a different type returns error - let err = txn + let err = repos .columns() .create_or_get("column_test", table.id, ColumnType::U64) .await @@ -1204,31 +1204,29 @@ pub(crate) mod test_helpers { )); // test that we can create a column of the same name under a different table - let table2 = txn + let table2 = repos .tables() .create_or_get("test_table_2", namespace.id) .await .unwrap(); - let ccc = txn + let ccc = repos .columns() .create_or_get("column_test", table2.id, ColumnType::U64) .await .unwrap(); assert_ne!(c, ccc); - let columns = txn + let columns = repos .columns() .list_by_namespace_id(namespace.id) .await .unwrap(); assert_eq!(vec![c, ccc], columns); - - txn.commit().await.unwrap(); } async fn test_sequencer(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka = txn + let mut repos = catalog.repositories().await; + let kafka = repos .kafka_topics() .create_or_get("sequencer_test") .await @@ -1237,7 +1235,7 @@ pub(crate) mod test_helpers { // Create 10 sequencers let mut created = BTreeMap::new(); for partition in 1..=10 { - let sequencer = txn + let sequencer = repos .sequencers() .create_or_get(&kafka, KafkaPartition::new(partition)) .await @@ -1246,7 +1244,7 @@ pub(crate) mod test_helpers { } // List them and assert they match - let listed = txn + let listed = repos .sequencers() .list_by_kafka_topic(&kafka) .await @@ -1259,7 +1257,7 @@ pub(crate) mod test_helpers { // get by the sequencer id and partition let kafka_partition = KafkaPartition::new(1); - let sequencer = txn + let sequencer = repos .sequencers() .get_by_topic_id_and_partition(kafka.id, kafka_partition) .await @@ -1268,36 +1266,34 @@ pub(crate) mod test_helpers { assert_eq!(kafka.id, sequencer.kafka_topic_id); assert_eq!(kafka_partition, sequencer.kafka_partition); - let sequencer = txn + let sequencer = repos .sequencers() .get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523)) .await .unwrap(); assert!(sequencer.is_none()); - - txn.commit().await.unwrap(); } async fn test_partition(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); - let namespace = txn + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); + let namespace = repos .namespaces() .create("namespace_partition_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = txn + let table = repos .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let sequencer = txn + let sequencer = repos .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let other_sequencer = txn + let other_sequencer = repos .sequencers() .create_or_get(&kafka, KafkaPartition::new(2)) .await @@ -1305,21 +1301,21 @@ pub(crate) mod test_helpers { let mut created = BTreeMap::new(); for key in ["foo", "bar"] { - let partition = txn + let partition = repos .partitions() .create_or_get(key, sequencer.id, table.id) .await .expect("failed to create partition"); created.insert(partition.id, partition); } - let other_partition = txn + let other_partition = repos .partitions() .create_or_get("asdf", other_sequencer.id, table.id) .await .unwrap(); // List them and assert they match - let listed = txn + let listed = repos .partitions() .list_by_sequencer(sequencer.id) .await @@ -1331,7 +1327,7 @@ pub(crate) mod test_helpers { assert_eq!(created, listed); // test get_partition_info_by_id - let info = txn + let info = repos .partitions() .partition_info_by_id(other_partition.id) .await @@ -1340,30 +1336,28 @@ pub(crate) mod test_helpers { assert_eq!(info.partition, other_partition); assert_eq!(info.table_name, "test_table"); assert_eq!(info.namespace_name, "namespace_partition_test"); - - txn.commit().await.unwrap(); } async fn test_tombstone(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); - let namespace = txn + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); + let namespace = repos .namespaces() .create("namespace_tombstone_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = txn + let table = repos .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = txn + let other_table = repos .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = txn + let sequencer = repos .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await @@ -1371,7 +1365,7 @@ pub(crate) mod test_helpers { let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - let t1 = txn + let t1 = repos .tombstones() .create_or_get( table.id, @@ -1389,7 +1383,7 @@ pub(crate) mod test_helpers { assert_eq!(t1.min_time, min_time); assert_eq!(t1.max_time, max_time); assert_eq!(t1.serialized_predicate, "whatevs"); - let t2 = txn + let t2 = repos .tombstones() .create_or_get( other_table.id, @@ -1401,7 +1395,7 @@ pub(crate) mod test_helpers { ) .await .unwrap(); - let t3 = txn + let t3 = repos .tombstones() .create_or_get( table.id, @@ -1414,46 +1408,44 @@ pub(crate) mod test_helpers { .await .unwrap(); - let listed = txn + let listed = repos .tombstones() .list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); assert_eq!(vec![t2, t3], listed); - - txn.commit().await.unwrap(); } async fn test_parquet_file(catalog: Arc) { - let mut txn = catalog.start_transaction().await.unwrap(); - let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); - let namespace = txn + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); + let namespace = repos .namespaces() .create("namespace_parquet_file_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = txn + let table = repos .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = txn + let other_table = repos .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = txn + let sequencer = repos .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let partition = txn + let partition = repos .partitions() .create_or_get("one", sequencer.id, table.id) .await .unwrap(); - let other_partition = txn + let other_partition = repos .partitions() .create_or_get("one", sequencer.id, other_table.id) .await @@ -1463,10 +1455,10 @@ pub(crate) mod test_helpers { let max_time = Timestamp::new(10); // Must have no rows - let row_count = txn.parquet_files().count().await.unwrap(); + let row_count = repos.parquet_files().count().await.unwrap(); assert_eq!(row_count, 0); - let parquet_file = txn + let parquet_file = repos .parquet_files() .create( sequencer.id, @@ -1480,11 +1472,9 @@ pub(crate) mod test_helpers { ) .await .unwrap(); - txn.commit().await.unwrap(); // verify that trying to create a file with the same UUID throws an error - let mut txn = catalog.start_transaction().await.unwrap(); - let err = txn + let err = repos .parquet_files() .create( sequencer.id, @@ -1499,10 +1489,8 @@ pub(crate) mod test_helpers { .await .unwrap_err(); assert!(matches!(err, Error::FileExists { object_store_id: _ })); - txn.abort().await.unwrap(); - let mut txn = catalog.start_transaction().await.unwrap(); - let other_file = txn + let other_file = repos .parquet_files() .create( sequencer.id, @@ -1518,23 +1506,23 @@ pub(crate) mod test_helpers { .unwrap(); // Must have 2 rows - let row_count = txn.parquet_files().count().await.unwrap(); + let row_count = repos.parquet_files().count().await.unwrap(); assert_eq!(row_count, 2); let exist_id = parquet_file.id; let non_exist_id = ParquetFileId::new(other_file.id.get() + 10); // make sure exists_id != non_exist_id assert_ne!(exist_id, non_exist_id); - assert!(txn.parquet_files().exist(exist_id).await.unwrap()); - assert!(!txn.parquet_files().exist(non_exist_id).await.unwrap()); + assert!(repos.parquet_files().exist(exist_id).await.unwrap()); + assert!(!repos.parquet_files().exist(non_exist_id).await.unwrap()); - let files = txn + let files = repos .parquet_files() .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); assert_eq!(vec![parquet_file, other_file], files); - let files = txn + let files = repos .parquet_files() .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150)) .await @@ -1543,18 +1531,17 @@ pub(crate) mod test_helpers { // verify that to_delete is initially set to false and that it can be updated to true assert!(!parquet_file.to_delete); - txn.parquet_files() + repos + .parquet_files() .flag_for_delete(parquet_file.id) .await .unwrap(); - let files = txn + let files = repos .parquet_files() .list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); assert!(files.first().unwrap().to_delete); - - txn.commit().await.unwrap(); } async fn test_add_parquet_file_with_tombstones(catalog: Arc) { diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index a14108ebfc..a20b852710 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -16,7 +16,7 @@ use crate::interface::{ SequencerId, TableSchema, Transaction, }; -use interface::{ParquetFile, ProcessedTombstone, Tombstone}; +use interface::{ParquetFile, ProcessedTombstone, RepoCollection, Tombstone}; use mutable_batch::MutableBatch; use std::{borrow::Cow, collections::BTreeMap}; @@ -40,14 +40,15 @@ pub mod postgres; /// This function pushes schema additions through to the backend catalog, and /// relies on the catalog to serialise concurrent additions of a given column, /// ensuring only one type is ever accepted per column. -pub async fn validate_or_insert_schema<'a, T, U>( +pub async fn validate_or_insert_schema<'a, T, U, R>( tables: T, schema: &NamespaceSchema, - txn: &mut dyn Transaction, + repos: &mut R, ) -> Result> where T: IntoIterator + Send + Sync, U: Iterator + Send, + R: RepoCollection + ?Sized, { let tables = tables.into_iter(); @@ -55,7 +56,7 @@ where let mut schema = Cow::Borrowed(schema); for (table_name, batch) in tables { - validate_mutable_batch(batch, table_name, &mut schema, txn).await?; + validate_mutable_batch(batch, table_name, &mut schema, repos).await?; } match schema { @@ -64,12 +65,15 @@ where } } -async fn validate_mutable_batch( +async fn validate_mutable_batch( mb: &MutableBatch, table_name: &str, schema: &mut Cow<'_, NamespaceSchema>, - txn: &mut dyn Transaction, -) -> Result<()> { + repos: &mut R, +) -> Result<()> +where + R: RepoCollection + ?Sized, +{ // Check if the table exists in the schema. // // Because the entry API requires &mut it is not used to avoid a premature @@ -81,14 +85,14 @@ async fn validate_mutable_batch( // // Attempt to create the table in the catalog, or load an existing // table from the catalog to populate the cache. - let mut table = txn + let mut table = repos .tables() .create_or_get(table_name, schema.id) .await .map(|t| TableSchema::new(t.id))?; // Always add a time column to all new tables. - let time_col = txn + let time_col = repos .columns() .create_or_get(TIME_COLUMN, table.id, ColumnType::Time) .await?; @@ -134,7 +138,7 @@ async fn validate_mutable_batch( None => { // The column does not exist in the cache, create/get it from // the catalog, and add it to the table. - let column = txn + let column = repos .columns() .create_or_get(name.as_str(), table.id, ColumnType::from(col.influx_type())) .await?; @@ -200,10 +204,14 @@ pub async fn add_parquet_file_with_tombstones( .await?; // Now the parquet available, create its processed tombstones - let processed_tombstones = txn - .processed_tombstones() - .create_many(parquet.id, tombstones) - .await?; + let mut processed_tombstones = Vec::with_capacity(tombstones.len()); + for tombstone in tombstones { + processed_tombstones.push( + txn.processed_tombstones() + .create(parquet.id, tombstone.id) + .await?, + ); + } Ok((parquet, processed_tombstones)) } diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index c844976848..554c7e3415 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -6,8 +6,9 @@ use crate::interface::{ KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, - QueryPoolId, QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, - Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, Transaction, + QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, + SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, + Transaction, }; use async_trait::async_trait; use observability_deps::tracing::warn; @@ -51,18 +52,41 @@ struct MemCollections { processed_tombstones: Vec, } +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +enum MemTxnInner { + Txn { + guard: OwnedMutexGuard, + stage: MemCollections, + finalized: bool, + }, + NoTxn { + collections: OwnedMutexGuard, + }, +} + /// transaction bound to an in-memory catalog. #[derive(Debug)] pub struct MemTxn { - guard: OwnedMutexGuard, - stage: MemCollections, - finalized: bool, + inner: MemTxnInner, +} + +impl MemTxn { + fn stage(&mut self) -> &mut MemCollections { + match &mut self.inner { + MemTxnInner::Txn { stage, .. } => stage, + MemTxnInner::NoTxn { collections } => collections, + } + } } impl Drop for MemTxn { fn drop(&mut self) { - if !self.finalized { - warn!("Dropping MemTxn w/o finalizing (commit or abort)"); + match self.inner { + MemTxnInner::Txn { finalized, .. } if !finalized => { + warn!("Dropping MemTxn w/o finalizing (commit or abort)"); + } + _ => {} } } } @@ -78,29 +102,58 @@ impl Catalog for MemCatalog { let guard = Arc::clone(&self.collections).lock_owned().await; let stage = guard.clone(); Ok(Box::new(MemTxn { - guard, - stage, - finalized: false, + inner: MemTxnInner::Txn { + guard, + stage, + finalized: false, + }, })) } + + async fn repositories(&self) -> Box { + let collections = Arc::clone(&self.collections).lock_owned().await; + Box::new(MemTxn { + inner: MemTxnInner::NoTxn { collections }, + }) + } } #[async_trait] impl TransactionFinalize for MemTxn { async fn commit_inplace(&mut self) -> Result<(), Error> { - *self.guard = std::mem::take(&mut self.stage); - self.finalized = true; + match &mut self.inner { + MemTxnInner::Txn { + guard, + stage, + finalized, + } => { + assert!(!*finalized); + **guard = std::mem::take(stage); + *finalized = true; + } + MemTxnInner::NoTxn { .. } => { + panic!("cannot commit oneshot"); + } + } Ok(()) } async fn abort_inplace(&mut self) -> Result<(), Error> { - self.finalized = true; + match &mut self.inner { + MemTxnInner::Txn { finalized, .. } => { + assert!(!*finalized); + *finalized = true; + } + MemTxnInner::NoTxn { .. } => { + panic!("cannot abort oneshot"); + } + } Ok(()) } } #[async_trait] -impl Transaction for MemTxn { +impl RepoCollection for MemTxn { fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo { self } @@ -145,15 +198,17 @@ impl Transaction for MemTxn { #[async_trait] impl KafkaTopicRepo for MemTxn { async fn create_or_get(&mut self, name: &str) -> Result { - let topic = match self.stage.kafka_topics.iter().find(|t| t.name == name) { + let stage = self.stage(); + + let topic = match stage.kafka_topics.iter().find(|t| t.name == name) { Some(t) => t, None => { let topic = KafkaTopic { - id: KafkaTopicId::new(self.stage.kafka_topics.len() as i32 + 1), + id: KafkaTopicId::new(stage.kafka_topics.len() as i32 + 1), name: name.to_string(), }; - self.stage.kafka_topics.push(topic); - self.stage.kafka_topics.last().unwrap() + stage.kafka_topics.push(topic); + stage.kafka_topics.last().unwrap() } }; @@ -161,12 +216,9 @@ impl KafkaTopicRepo for MemTxn { } async fn get_by_name(&mut self, name: &str) -> Result> { - let kafka_topic = self - .stage - .kafka_topics - .iter() - .find(|t| t.name == name) - .cloned(); + let stage = self.stage(); + + let kafka_topic = stage.kafka_topics.iter().find(|t| t.name == name).cloned(); Ok(kafka_topic) } } @@ -174,15 +226,17 @@ impl KafkaTopicRepo for MemTxn { #[async_trait] impl QueryPoolRepo for MemTxn { async fn create_or_get(&mut self, name: &str) -> Result { - let pool = match self.stage.query_pools.iter().find(|t| t.name == name) { + let stage = self.stage(); + + let pool = match stage.query_pools.iter().find(|t| t.name == name) { Some(t) => t, None => { let pool = QueryPool { - id: QueryPoolId::new(self.stage.query_pools.len() as i16 + 1), + id: QueryPoolId::new(stage.query_pools.len() as i16 + 1), name: name.to_string(), }; - self.stage.query_pools.push(pool); - self.stage.query_pools.last().unwrap() + stage.query_pools.push(pool); + stage.query_pools.last().unwrap() } }; @@ -199,38 +253,38 @@ impl NamespaceRepo for MemTxn { kafka_topic_id: KafkaTopicId, query_pool_id: QueryPoolId, ) -> Result { - if self.stage.namespaces.iter().any(|n| n.name == name) { + let stage = self.stage(); + + if stage.namespaces.iter().any(|n| n.name == name) { return Err(Error::NameExists { name: name.to_string(), }); } let namespace = Namespace { - id: NamespaceId::new(self.stage.namespaces.len() as i32 + 1), + id: NamespaceId::new(stage.namespaces.len() as i32 + 1), name: name.to_string(), kafka_topic_id, query_pool_id, retention_duration: Some(retention_duration.to_string()), }; - self.stage.namespaces.push(namespace); - Ok(self.stage.namespaces.last().unwrap().clone()) + stage.namespaces.push(namespace); + Ok(stage.namespaces.last().unwrap().clone()) } async fn get_by_name(&mut self, name: &str) -> Result> { - Ok(self - .stage - .namespaces - .iter() - .find(|n| n.name == name) - .cloned()) + let stage = self.stage(); + + Ok(stage.namespaces.iter().find(|n| n.name == name).cloned()) } } #[async_trait] impl TableRepo for MemTxn { async fn create_or_get(&mut self, name: &str, namespace_id: NamespaceId) -> Result { - let table = match self - .stage + let stage = self.stage(); + + let table = match stage .tables .iter() .find(|t| t.name == name && t.namespace_id == namespace_id) @@ -238,12 +292,12 @@ impl TableRepo for MemTxn { Some(t) => t, None => { let table = Table { - id: TableId::new(self.stage.tables.len() as i32 + 1), + id: TableId::new(stage.tables.len() as i32 + 1), namespace_id, name: name.to_string(), }; - self.stage.tables.push(table); - self.stage.tables.last().unwrap() + stage.tables.push(table); + stage.tables.last().unwrap() } }; @@ -251,8 +305,9 @@ impl TableRepo for MemTxn { } async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result> { - let tables: Vec<_> = self - .stage + let stage = self.stage(); + + let tables: Vec<_> = stage .tables .iter() .filter(|t| t.namespace_id == namespace_id) @@ -270,8 +325,9 @@ impl ColumnRepo for MemTxn { table_id: TableId, column_type: ColumnType, ) -> Result { - let column = match self - .stage + let stage = self.stage(); + + let column = match stage .columns .iter() .find(|t| t.name == name && t.table_id == table_id) @@ -289,13 +345,13 @@ impl ColumnRepo for MemTxn { } None => { let column = Column { - id: ColumnId::new(self.stage.columns.len() as i32 + 1), + id: ColumnId::new(stage.columns.len() as i32 + 1), table_id, name: name.to_string(), column_type: column_type as i16, }; - self.stage.columns.push(column); - self.stage.columns.last().unwrap() + stage.columns.push(column); + stage.columns.last().unwrap() } }; @@ -303,17 +359,17 @@ impl ColumnRepo for MemTxn { } async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result> { - let table_ids: Vec<_> = self - .stage + let stage = self.stage(); + + let table_ids: Vec<_> = stage .tables .iter() .filter(|t| t.namespace_id == namespace_id) .map(|t| t.id) .collect(); - println!("tables: {:?}", self.stage.tables); + println!("tables: {:?}", stage.tables); println!("table_ids: {:?}", table_ids); - let columns: Vec<_> = self - .stage + let columns: Vec<_> = stage .columns .iter() .filter(|c| table_ids.contains(&c.table_id)) @@ -331,8 +387,9 @@ impl SequencerRepo for MemTxn { topic: &KafkaTopic, partition: KafkaPartition, ) -> Result { - let sequencer = match self - .stage + let stage = self.stage(); + + let sequencer = match stage .sequencers .iter() .find(|s| s.kafka_topic_id == topic.id && s.kafka_partition == partition) @@ -340,13 +397,13 @@ impl SequencerRepo for MemTxn { Some(t) => t, None => { let sequencer = Sequencer { - id: SequencerId::new(self.stage.sequencers.len() as i16 + 1), + id: SequencerId::new(stage.sequencers.len() as i16 + 1), kafka_topic_id: topic.id, kafka_partition: partition, min_unpersisted_sequence_number: 0, }; - self.stage.sequencers.push(sequencer); - self.stage.sequencers.last().unwrap() + stage.sequencers.push(sequencer); + stage.sequencers.last().unwrap() } }; @@ -358,8 +415,9 @@ impl SequencerRepo for MemTxn { topic_id: KafkaTopicId, partition: KafkaPartition, ) -> Result> { - let sequencer = self - .stage + let stage = self.stage(); + + let sequencer = stage .sequencers .iter() .find(|s| s.kafka_topic_id == topic_id && s.kafka_partition == partition) @@ -368,12 +426,15 @@ impl SequencerRepo for MemTxn { } async fn list(&mut self) -> Result> { - Ok(self.stage.sequencers.clone()) + let stage = self.stage(); + + Ok(stage.sequencers.clone()) } async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result> { - let sequencers: Vec<_> = self - .stage + let stage = self.stage(); + + let sequencers: Vec<_> = stage .sequencers .iter() .filter(|s| s.kafka_topic_id == topic.id) @@ -391,19 +452,21 @@ impl PartitionRepo for MemTxn { sequencer_id: SequencerId, table_id: TableId, ) -> Result { - let partition = match self.stage.partitions.iter().find(|p| { + let stage = self.stage(); + + let partition = match stage.partitions.iter().find(|p| { p.partition_key == key && p.sequencer_id == sequencer_id && p.table_id == table_id }) { Some(p) => p, None => { let p = Partition { - id: PartitionId::new(self.stage.partitions.len() as i64 + 1), + id: PartitionId::new(stage.partitions.len() as i64 + 1), sequencer_id, table_id, partition_key: key.to_string(), }; - self.stage.partitions.push(p); - self.stage.partitions.last().unwrap() + stage.partitions.push(p); + stage.partitions.last().unwrap() } }; @@ -411,8 +474,9 @@ impl PartitionRepo for MemTxn { } async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result> { - let partitions: Vec<_> = self - .stage + let stage = self.stage(); + + let partitions: Vec<_> = stage .partitions .iter() .filter(|p| p.sequencer_id == sequencer_id) @@ -425,23 +489,22 @@ impl PartitionRepo for MemTxn { &mut self, partition_id: PartitionId, ) -> Result> { - let partition = self - .stage + let stage = self.stage(); + + let partition = stage .partitions .iter() .find(|p| p.id == partition_id) .cloned(); if let Some(partition) = partition { - let table = self - .stage + let table = stage .tables .iter() .find(|t| t.id == partition.table_id) .cloned(); if let Some(table) = table { - let namespace = self - .stage + let namespace = stage .namespaces .iter() .find(|n| n.id == table.namespace_id) @@ -471,7 +534,9 @@ impl TombstoneRepo for MemTxn { max_time: Timestamp, predicate: &str, ) -> Result { - let tombstone = match self.stage.tombstones.iter().find(|t| { + let stage = self.stage(); + + let tombstone = match stage.tombstones.iter().find(|t| { t.table_id == table_id && t.sequencer_id == sequencer_id && t.sequence_number == sequence_number @@ -479,7 +544,7 @@ impl TombstoneRepo for MemTxn { Some(t) => t, None => { let t = Tombstone { - id: TombstoneId::new(self.stage.tombstones.len() as i64 + 1), + id: TombstoneId::new(stage.tombstones.len() as i64 + 1), table_id, sequencer_id, sequence_number, @@ -487,8 +552,8 @@ impl TombstoneRepo for MemTxn { max_time, serialized_predicate: predicate.to_string(), }; - self.stage.tombstones.push(t); - self.stage.tombstones.last().unwrap() + stage.tombstones.push(t); + stage.tombstones.last().unwrap() } }; @@ -500,8 +565,9 @@ impl TombstoneRepo for MemTxn { sequencer_id: SequencerId, sequence_number: SequenceNumber, ) -> Result> { - let tombstones: Vec<_> = self - .stage + let stage = self.stage(); + + let tombstones: Vec<_> = stage .tombstones .iter() .filter(|t| t.sequencer_id == sequencer_id && t.sequence_number > sequence_number) @@ -524,8 +590,9 @@ impl ParquetFileRepo for MemTxn { min_time: Timestamp, max_time: Timestamp, ) -> Result { - if self - .stage + let stage = self.stage(); + + if stage .parquet_files .iter() .any(|f| f.object_store_id == object_store_id) @@ -534,7 +601,7 @@ impl ParquetFileRepo for MemTxn { } let parquet_file = ParquetFile { - id: ParquetFileId::new(self.stage.parquet_files.len() as i64 + 1), + id: ParquetFileId::new(stage.parquet_files.len() as i64 + 1), sequencer_id, table_id, partition_id, @@ -545,12 +612,14 @@ impl ParquetFileRepo for MemTxn { max_time, to_delete: false, }; - self.stage.parquet_files.push(parquet_file); - Ok(*self.stage.parquet_files.last().unwrap()) + stage.parquet_files.push(parquet_file); + Ok(*stage.parquet_files.last().unwrap()) } async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { - match self.stage.parquet_files.iter_mut().find(|p| p.id == id) { + let stage = self.stage(); + + match stage.parquet_files.iter_mut().find(|p| p.id == id) { Some(f) => f.to_delete = true, None => return Err(Error::ParquetRecordNotFound { id }), } @@ -563,8 +632,9 @@ impl ParquetFileRepo for MemTxn { sequencer_id: SequencerId, sequence_number: SequenceNumber, ) -> Result> { - let files: Vec<_> = self - .stage + let stage = self.stage(); + + let files: Vec<_> = stage .parquet_files .iter() .filter(|f| f.sequencer_id == sequencer_id && f.max_sequence_number > sequence_number) @@ -574,11 +644,15 @@ impl ParquetFileRepo for MemTxn { } async fn exist(&mut self, id: ParquetFileId) -> Result { - Ok(self.stage.parquet_files.iter().any(|f| f.id == id)) + let stage = self.stage(); + + Ok(stage.parquet_files.iter().any(|f| f.id == id)) } async fn count(&mut self) -> Result { - let count = self.stage.parquet_files.len(); + let stage = self.stage(); + + let count = stage.parquet_files.len(); let count_i64 = i64::try_from(count); if count_i64.is_err() { return Err(Error::InvalidValue { value: count }); @@ -589,61 +663,46 @@ impl ParquetFileRepo for MemTxn { #[async_trait] impl ProcessedTombstoneRepo for MemTxn { - async fn create_many( + async fn create( &mut self, parquet_file_id: ParquetFileId, - tombstones: &[Tombstone], - ) -> Result> { + tombstone_id: TombstoneId, + ) -> Result { + let stage = self.stage(); + // check if the parquet file available - if !self - .stage - .parquet_files - .iter() - .any(|f| f.id == parquet_file_id) - { + if !stage.parquet_files.iter().any(|f| f.id == parquet_file_id) { return Err(Error::FileNotFound { id: parquet_file_id.get(), }); } - let mut processed_tombstones = vec![]; - for tombstone in tombstones { - // check if tomstone exists - if !self.stage.tombstones.iter().any(|f| f.id == tombstone.id) { - return Err(Error::TombstoneNotFound { - id: tombstone.id.get(), - }); - } - - if self - .stage - .processed_tombstones - .iter() - .any(|pt| pt.tombstone_id == tombstone.id && pt.parquet_file_id == parquet_file_id) - { - // The tombstone was already proccessed for this file - return Err(Error::ProcessTombstoneExists { - parquet_file_id: parquet_file_id.get(), - tombstone_id: tombstone.id.get(), - }); - } - - let processed_tombstone = ProcessedTombstone { - tombstone_id: tombstone.id, - parquet_file_id, - }; - processed_tombstones.push(processed_tombstone); + // check if tomstone exists + if !stage.tombstones.iter().any(|f| f.id == tombstone_id) { + return Err(Error::TombstoneNotFound { + id: tombstone_id.get(), + }); } - // save for returning - let return_processed_tombstones = processed_tombstones.clone(); - - // Add to the catalog - self.stage + if stage .processed_tombstones - .append(&mut processed_tombstones); + .iter() + .any(|pt| pt.tombstone_id == tombstone_id && pt.parquet_file_id == parquet_file_id) + { + // The tombstone was already proccessed for this file + return Err(Error::ProcessTombstoneExists { + parquet_file_id: parquet_file_id.get(), + tombstone_id: tombstone_id.get(), + }); + } - Ok(return_processed_tombstones) + let processed_tombstone = ProcessedTombstone { + tombstone_id, + parquet_file_id, + }; + stage.processed_tombstones.push(processed_tombstone); + + Ok(processed_tombstone) } async fn exist( @@ -651,15 +710,18 @@ impl ProcessedTombstoneRepo for MemTxn { parquet_file_id: ParquetFileId, tombstone_id: TombstoneId, ) -> Result { - Ok(self - .stage + let stage = self.stage(); + + Ok(stage .processed_tombstones .iter() .any(|f| f.parquet_file_id == parquet_file_id && f.tombstone_id == tombstone_id)) } async fn count(&mut self) -> Result { - let count = self.stage.processed_tombstones.len(); + let stage = self.stage(); + + let count = stage.processed_tombstones.len(); let count_i64 = i64::try_from(count); if count_i64.is_err() { return Err(Error::InvalidValue { value: count }); diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 7fa63859cb..f69457ddc0 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -4,9 +4,9 @@ use crate::interface::{ sealed::TransactionFinalize, Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo, - ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo, Result, - SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp, - Tombstone, TombstoneId, TombstoneRepo, Transaction, + ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo, + RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, + TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, Transaction, }; use async_trait::async_trait; use observability_deps::tracing::{info, warn}; @@ -73,18 +73,102 @@ impl PostgresCatalog { /// transaction for [`PostgresCatalog`]. #[derive(Debug)] pub struct PostgresTxn { - transaction: Option>, + inner: PostgresTxnInner, } -impl PostgresTxn { - fn transaction(&mut self) -> &mut sqlx::Transaction<'static, Postgres> { - self.transaction.as_mut().expect("Not yet finalized") +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +enum PostgresTxnInner { + Txn(Option>), + Oneshot(Pool), +} + +impl<'c> Executor<'c> for &'c mut PostgresTxnInner { + type Database = Postgres; + + #[allow(clippy::type_complexity)] + fn fetch_many<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> futures::stream::BoxStream< + 'e, + Result< + sqlx::Either< + ::QueryResult, + ::Row, + >, + sqlx::Error, + >, + > + where + 'c: 'e, + E: sqlx::Execute<'q, Self::Database>, + { + match self { + PostgresTxnInner::Txn(txn) => { + txn.as_mut().expect("Not yet finalized").fetch_many(query) + } + PostgresTxnInner::Oneshot(pool) => pool.fetch_many(query), + } + } + + fn fetch_optional<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> futures::future::BoxFuture< + 'e, + Result::Row>, sqlx::Error>, + > + where + 'c: 'e, + E: sqlx::Execute<'q, Self::Database>, + { + match self { + PostgresTxnInner::Txn(txn) => txn + .as_mut() + .expect("Not yet finalized") + .fetch_optional(query), + PostgresTxnInner::Oneshot(pool) => pool.fetch_optional(query), + } + } + + fn prepare_with<'e, 'q: 'e>( + self, + sql: &'q str, + parameters: &'e [::TypeInfo], + ) -> futures::future::BoxFuture< + 'e, + Result<>::Statement, sqlx::Error>, + > + where + 'c: 'e, + { + match self { + PostgresTxnInner::Txn(txn) => txn + .as_mut() + .expect("Not yet finalized") + .prepare_with(sql, parameters), + PostgresTxnInner::Oneshot(pool) => pool.prepare_with(sql, parameters), + } + } + + fn describe<'e, 'q: 'e>( + self, + sql: &'q str, + ) -> futures::future::BoxFuture<'e, Result, sqlx::Error>> + where + 'c: 'e, + { + match self { + PostgresTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").describe(sql), + PostgresTxnInner::Oneshot(pool) => pool.describe(sql), + } } } impl Drop for PostgresTxn { fn drop(&mut self) { - if self.transaction.is_some() { + if let PostgresTxnInner::Txn(Some(_)) = self.inner { warn!("Dropping PostgresTxn w/o finalizing (commit or abort)"); // SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so we don't need to spawn @@ -96,21 +180,31 @@ impl Drop for PostgresTxn { #[async_trait] impl TransactionFinalize for PostgresTxn { async fn commit_inplace(&mut self) -> Result<(), Error> { - self.transaction - .take() - .expect("Not yet finalized") - .commit() - .await - .map_err(|e| Error::SqlxError { source: e }) + match &mut self.inner { + PostgresTxnInner::Txn(txn) => txn + .take() + .expect("Not yet finalized") + .commit() + .await + .map_err(|e| Error::SqlxError { source: e }), + PostgresTxnInner::Oneshot(_) => { + panic!("cannot commit oneshot"); + } + } } async fn abort_inplace(&mut self) -> Result<(), Error> { - self.transaction - .take() - .expect("Not yet finalized") - .rollback() - .await - .map_err(|e| Error::SqlxError { source: e }) + match &mut self.inner { + PostgresTxnInner::Txn(txn) => txn + .take() + .expect("Not yet finalized") + .rollback() + .await + .map_err(|e| Error::SqlxError { source: e }), + PostgresTxnInner::Oneshot(_) => { + panic!("cannot abort oneshot"); + } + } } } @@ -133,13 +227,19 @@ impl Catalog for PostgresCatalog { .map_err(|e| Error::SqlxError { source: e })?; Ok(Box::new(PostgresTxn { - transaction: Some(transaction), + inner: PostgresTxnInner::Txn(Some(transaction)), })) } + + async fn repositories(&self) -> Box { + Box::new(PostgresTxn { + inner: PostgresTxnInner::Oneshot(self.pool.clone()), + }) + } } #[async_trait] -impl Transaction for PostgresTxn { +impl RepoCollection for PostgresTxn { fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo { self } @@ -193,7 +293,7 @@ DO UPDATE SET name = kafka_topic.name RETURNING *; "#, ) .bind(&name) // $1 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -207,7 +307,7 @@ SELECT * FROM kafka_topic WHERE name = $1; "#, ) .bind(&name) // $1 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await; if let Err(sqlx::Error::RowNotFound) = rec { @@ -232,7 +332,7 @@ DO UPDATE SET name = query_pool.name RETURNING *; "#, ) .bind(&name) // $1 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -260,7 +360,7 @@ RETURNING * .bind(&retention_duration) // $2 .bind(kafka_topic_id) // $3 .bind(query_pool_id) // $4 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| { if is_unique_violation(&e) { @@ -284,7 +384,7 @@ SELECT * FROM namespace WHERE name = $1; "#, ) .bind(&name) // $1 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await; if let Err(sqlx::Error::RowNotFound) = rec { @@ -310,7 +410,7 @@ DO UPDATE SET name = table_name.name RETURNING *; ) .bind(&name) // $1 .bind(&namespace_id) // $2 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| { if is_fk_violation(&e) { @@ -331,7 +431,7 @@ WHERE namespace_id = $1; "#, ) .bind(&namespace_id) - .fetch_all(self.transaction()) + .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -360,7 +460,7 @@ DO UPDATE SET name = column_name.name RETURNING *; .bind(&name) // $1 .bind(&table_id) // $2 .bind(&ct) // $3 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| { if is_fk_violation(&e) { @@ -390,7 +490,7 @@ WHERE table_name.namespace_id = $1; "#, ) .bind(&namespace_id) - .fetch_all(self.transaction()) + .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -417,7 +517,7 @@ impl SequencerRepo for PostgresTxn { ) .bind(&topic.id) // $1 .bind(&partition) // $2 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| { if is_fk_violation(&e) { @@ -440,7 +540,7 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2; ) .bind(topic_id) // $1 .bind(partition) // $2 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await; if let Err(sqlx::Error::RowNotFound) = rec { @@ -454,7 +554,7 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2; async fn list(&mut self) -> Result> { sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#) - .fetch_all(self.transaction()) + .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e }) } @@ -462,7 +562,7 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2; async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result> { sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#) .bind(&topic.id) // $1 - .fetch_all(self.transaction()) + .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e }) } @@ -489,7 +589,7 @@ impl PartitionRepo for PostgresTxn { .bind(key) // $1 .bind(&sequencer_id) // $2 .bind(&table_id) // $3 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| { if is_fk_violation(&e) { @@ -503,7 +603,7 @@ impl PartitionRepo for PostgresTxn { async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result> { sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#) .bind(&sequencer_id) // $1 - .fetch_all(self.transaction()) + .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e }) } @@ -522,7 +622,7 @@ impl PartitionRepo for PostgresTxn { WHERE partition.id = $1;"#, ) .bind(&partition_id) // $1 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -570,7 +670,7 @@ impl TombstoneRepo for PostgresTxn { .bind(&min_time) // $4 .bind(&max_time) // $5 .bind(predicate) // $6 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| { if is_fk_violation(&e) { @@ -589,7 +689,7 @@ impl TombstoneRepo for PostgresTxn { sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#) .bind(&sequencer_id) // $1 .bind(&sequence_number) // $2 - .fetch_all(self.transaction()) + .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e }) } @@ -623,7 +723,7 @@ RETURNING * .bind(max_sequence_number) // $6 .bind(min_time) // $7 .bind(max_time) // $8 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| { if is_unique_violation(&e) { @@ -643,7 +743,7 @@ RETURNING * async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = true WHERE id = $1;"#) .bind(&id) // $1 - .execute(self.transaction()) + .execute(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -658,7 +758,7 @@ RETURNING * sqlx::query_as::<_, ParquetFile>(r#"SELECT * FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 ORDER BY id;"#) .bind(&sequencer_id) // $1 .bind(&sequence_number) // $2 - .fetch_all(self.transaction()) + .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e }) } @@ -668,7 +768,7 @@ RETURNING * r#"SELECT count(*) as count FROM parquet_file WHERE id = $1;"#, ) .bind(&id) // $1 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -678,7 +778,7 @@ RETURNING * async fn count(&mut self) -> Result { let read_result = sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#) - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -688,45 +788,34 @@ RETURNING * #[async_trait] impl ProcessedTombstoneRepo for PostgresTxn { - async fn create_many( + async fn create( &mut self, parquet_file_id: ParquetFileId, - tombstones: &[Tombstone], - ) -> Result> { - // no transaction provided - // todo: we should never needs this but since right now we implement 2 catalogs, - // postgres (for production) and mem (for testing only) that does not need to provide txt - // this will be refactor when Marco has his new abstraction done - let mut processed_tombstones = vec![]; - for tombstone in tombstones { - let processed_tombstone = sqlx::query_as::<_, ProcessedTombstone>( - r#" - INSERT INTO processed_tombstone ( tombstone_id, parquet_file_id ) - VALUES ( $1, $2 ) - RETURNING * - "#, - ) - .bind(tombstone.id) // $1 - .bind(parquet_file_id) // $2 - .fetch_one(self.transaction()) - .await - .map_err(|e| { - if is_unique_violation(&e) { - Error::ProcessTombstoneExists { - tombstone_id: tombstone.id.get(), - parquet_file_id: parquet_file_id.get(), - } - } else if is_fk_violation(&e) { - Error::ForeignKeyViolation { source: e } - } else { - Error::SqlxError { source: e } + tombstone_id: TombstoneId, + ) -> Result { + sqlx::query_as::<_, ProcessedTombstone>( + r#" + INSERT INTO processed_tombstone ( tombstone_id, parquet_file_id ) + VALUES ( $1, $2 ) + RETURNING * + "#, + ) + .bind(tombstone_id) // $1 + .bind(parquet_file_id) // $2 + .fetch_one(&mut self.inner) + .await + .map_err(|e| { + if is_unique_violation(&e) { + Error::ProcessTombstoneExists { + tombstone_id: tombstone_id.get(), + parquet_file_id: parquet_file_id.get(), } - })?; - - processed_tombstones.push(processed_tombstone); - } - - Ok(processed_tombstones) + } else if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + }) } async fn exist( @@ -738,7 +827,7 @@ impl ProcessedTombstoneRepo for PostgresTxn { r#"SELECT count(*) as count FROM processed_tombstone WHERE parquet_file_id = $1 AND tombstone_id = $2;"#) .bind(&parquet_file_id) // $1 .bind(&tombstone_id) // $2 - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; @@ -748,7 +837,7 @@ impl ProcessedTombstoneRepo for PostgresTxn { async fn count(&mut self) -> Result { let read_result = sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM processed_tombstone;"#) - .fetch_one(self.transaction()) + .fetch_one(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e })?; diff --git a/router2/src/dml_handlers/ns_autocreation.rs b/router2/src/dml_handlers/ns_autocreation.rs index c1a883cf33..3048dd1539 100644 --- a/router2/src/dml_handlers/ns_autocreation.rs +++ b/router2/src/dml_handlers/ns_autocreation.rs @@ -91,13 +91,9 @@ where if self.cache.get_schema(&namespace).is_none() { trace!(%namespace, "namespace auto-create cache miss"); - let mut txn = self - .catalog - .start_transaction() - .await - .map_err(NamespaceCreationError::Create)?; + let mut repos = self.catalog.repositories().await; - match txn + match repos .namespaces() .create( namespace.as_str(), @@ -108,8 +104,6 @@ where .await { Ok(_) => { - txn.commit().await.map_err(NamespaceCreationError::Create)?; - debug!(%namespace, "created namespace"); } Err(iox_catalog::interface::Error::NameExists { .. }) => { @@ -117,11 +111,9 @@ where // namespace, or another thread raced populating the catalog // and beat this thread to it. debug!(%namespace, "spurious namespace create failed"); - txn.abort().await.map_err(NamespaceCreationError::Create)?; } Err(e) => { error!(error=%e, %namespace, "failed to auto-create namespace"); - txn.abort().await.map_err(NamespaceCreationError::Create)?; return Err(NamespaceCreationError::Create(e)); } } @@ -199,19 +191,16 @@ mod tests { // The cache hit should mean the catalog SHOULD NOT see a create request // for the namespace. - let mut txn = catalog - .start_transaction() - .await - .expect("failed to start UoW"); + let mut repos = catalog.repositories().await; assert!( - txn.namespaces() + repos + .namespaces() .get_by_name(ns.as_str()) .await .expect("lookup should not error") .is_none(), "expected no request to the catalog" ); - txn.abort().await.expect("failed to abort UoW"); // And the DML handler must be called. assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => { @@ -243,17 +232,13 @@ mod tests { // The cache miss should mean the catalog MUST see a create request for // the namespace. - let mut txn = catalog - .start_transaction() - .await - .expect("failed to start UoW"); - let got = txn + let mut repos = catalog.repositories().await; + let got = repos .namespaces() .get_by_name(ns.as_str()) .await .expect("lookup should not error") .expect("creation request should be sent to catalog"); - txn.abort().await.expect("failed to abort UoW"); assert_eq!( got, diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index 225c5abf0a..1c010fc749 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -135,11 +135,7 @@ where batches: HashMap, span_ctx: Option, ) -> Result<(), Self::WriteError> { - let mut txn = self - .catalog - .start_transaction() - .await - .map_err(SchemaError::NamespaceLookup)?; + let mut repos = self.catalog.repositories().await; // Load the namespace schema from the cache, falling back to pulling it // from the global catalog (if it exists). @@ -149,7 +145,7 @@ where None => { // Pull the schema from the global catalog or error if it does // not exist. - let schema = get_schema_by_name(&namespace, txn.deref_mut()) + let schema = get_schema_by_name(&namespace, repos.deref_mut()) .await .map_err(|e| { warn!(error=%e, %namespace, "failed to retrieve namespace schema"); @@ -168,7 +164,7 @@ where let maybe_new_schema = validate_or_insert_schema( batches.iter().map(|(k, v)| (k.as_str(), v)), &schema, - txn.deref_mut(), + repos.deref_mut(), ) .await .map_err(|e| { @@ -177,8 +173,6 @@ where })? .map(Arc::new); - txn.commit().await.map_err(SchemaError::NamespaceLookup)?; - trace!(%namespace, "schema validation complete"); // If the schema has been updated, immediately add it to the cache @@ -255,11 +249,9 @@ mod tests { async fn create_catalog() -> Arc { let catalog: Arc = Arc::new(MemCatalog::new()); - let mut txn = catalog - .start_transaction() - .await - .expect("failed to start UoW"); - txn.namespaces() + let mut repos = catalog.repositories().await; + repos + .namespaces() .create( NAMESPACE, "inf", @@ -268,7 +260,6 @@ mod tests { ) .await .expect("failed to create test namespace"); - txn.commit().await.expect("failed to commit UoW"); catalog }