diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index ee56af5afe..7b68804956 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -112,6 +112,9 @@ pub enum Error { #[snafu(display("no transaction provided"))] NoTransaction, + #[snafu(display("transaction failed to commit: {}", source))] + FailedToCommit { source: sqlx::Error }, + #[snafu(display("error while converting usize {} to i64", value))] InvalidValue { value: usize }, @@ -192,14 +195,6 @@ pub trait Catalog: Send + Sync + Debug + Display { /// Setup catalog for usage and apply possible migrations. async fn setup(&self) -> Result<(), Error>; - /// Creates a new [`Transaction`]. - /// - /// Creating transactions is potentially expensive. Holding one consumes resources. The number - /// of parallel active transactions might be limited per catalog, so you MUST NOT rely on the - /// ability to create multiple transactions in parallel for correctness. Parallel transactions - /// must only be used for scaling. - async fn start_transaction(&self) -> Result, Error>; - /// Accesses the repositories without a transaction scope. async fn repositories(&self) -> Box; @@ -210,68 +205,6 @@ pub trait Catalog: Send + Sync + Debug + Display { fn time_provider(&self) -> Arc; } -/// Secret module for [sealed traits]. -/// -/// [sealed traits]: https://rust-lang.github.io/api-guidelines/future-proofing.html#sealed-traits-protect-against-downstream-implementations-c-sealed -#[doc(hidden)] -pub(crate) mod sealed { - use super::*; - - /// Helper trait to implement commit and abort of an transaction. - /// - /// The problem is that both methods cannot take `self` directly, otherwise the [`Transaction`] - /// would not be object safe. Therefore we can only take a reference. To avoid that a user uses - /// a transaction after calling one of the finalizers, we use a tiny trick and take `Box` in our public interface and use a sealed trait for the actual implementation. - #[async_trait] - pub trait TransactionFinalize: Send + Sync + Debug { - async fn commit_inplace(&mut self) -> Result<(), Error>; - async fn abort_inplace(&mut self) -> Result<(), Error>; - } -} - -/// Transaction in a [`Catalog`] (similar to a database transaction). -/// -/// A transaction provides a consistent view on data and stages writes. -/// To finalize a transaction, call [commit](Self::commit) or [abort](Self::abort). -/// -/// Repositories can cheaply be borrowed from it. -/// -/// Note that after any method in this transaction (including all repositories derived from it) -/// returns an error, the transaction MIGHT be poisoned and will return errors for all operations, -/// depending on the backend. -/// -/// -/// # Drop -/// -/// Dropping a transaction without calling [`commit`](Self::commit) or [`abort`](Self::abort) will -/// abort the transaction. However resources might not be released immediately, so it is adviced to -/// always call [`abort`](Self::abort) when you want to enforce that. Dropping w/o -/// commiting/aborting will also log a warning. -#[async_trait] -pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection { - /// Commits a transaction. - /// - /// # Error Handling - /// - /// If successful, all changes will be visible to other transactions. - /// - /// If an error is returned, the transaction may or or not be committed. This might be due to - /// IO errors after the transaction was finished. However in either case, the transaction is - /// atomic and can only succeed or fail entirely. - async fn commit(mut self: Box) -> Result<(), Error> { - self.commit_inplace().await - } - - /// Aborts the transaction, throwing away all changes. - async fn abort(mut self: Box) -> Result<(), Error> { - self.abort_inplace().await - } -} - -impl Transaction for T where T: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection -{} - /// Methods for working with the catalog's various repositories (collections of entities). /// /// # Repositories @@ -611,6 +544,16 @@ pub trait ParquetFileRepo: Send + Sync { &mut self, object_store_id: Uuid, ) -> Result>; + + /// Commit deletions, upgrades and creations in a single transaction. + async fn create_upgrade_delete( + &mut self, + _partition_id: PartitionId, + delete: &[ParquetFile], + upgrade: &[ParquetFile], + create: &[ParquetFileParams], + target_level: CompactionLevel, + ) -> Result>; } /// Gets the namespace schema including all tables and columns. @@ -833,7 +776,6 @@ pub(crate) mod test_helpers { }; use super::*; - use ::test_helpers::{assert_contains, tracing::TracingCapture}; use assert_matches::assert_matches; use data_types::{ ColumnId, ColumnSet, CompactionLevel, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, @@ -857,8 +799,6 @@ pub(crate) mod test_helpers { test_parquet_file_delete_broken(clean_state().await).await; test_update_to_compaction_level_1(clean_state().await).await; test_list_by_partiton_not_to_delete(clean_state().await).await; - test_txn_isolation(clean_state().await).await; - test_txn_drop(clean_state().await).await; test_list_schemas(clean_state().await).await; test_list_schemas_soft_deleted_rows(clean_state().await).await; test_delete_namespace(clean_state().await).await; @@ -3222,66 +3162,6 @@ pub(crate) mod test_helpers { .expect("parquet file exists check should succeed")); } - async fn test_txn_isolation(catalog: Arc) { - let barrier = Arc::new(tokio::sync::Barrier::new(2)); - - let barrier_captured = Arc::clone(&barrier); - let catalog_captured = Arc::clone(&catalog); - let insertion_task = tokio::spawn(async move { - barrier_captured.wait().await; - - let mut txn = catalog_captured.start_transaction().await.unwrap(); - txn.topics() - .create_or_get("test_txn_isolation") - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(200)).await; - txn.abort().await.unwrap(); - }); - - let mut txn = catalog.start_transaction().await.unwrap(); - - barrier.wait().await; - tokio::time::sleep(Duration::from_millis(100)).await; - - let topic = txn - .topics() - .get_by_name("test_txn_isolation") - .await - .unwrap(); - assert!(topic.is_none()); - txn.abort().await.unwrap(); - - insertion_task.await.unwrap(); - - let mut txn = catalog.start_transaction().await.unwrap(); - let topic = txn - .topics() - .get_by_name("test_txn_isolation") - .await - .unwrap(); - assert!(topic.is_none()); - txn.abort().await.unwrap(); - } - - async fn test_txn_drop(catalog: Arc) { - let capture = TracingCapture::new(); - let mut txn = catalog.start_transaction().await.unwrap(); - txn.topics().create_or_get("test_txn_drop").await.unwrap(); - drop(txn); - - // got a warning - assert_contains!(capture.to_string(), "Dropping "); - assert_contains!(capture.to_string(), " w/o finalizing (commit or abort)"); - - // data is NOT committed - let mut txn = catalog.start_transaction().await.unwrap(); - let topic = txn.topics().get_by_name("test_txn_drop").await.unwrap(); - assert!(topic.is_none()); - txn.abort().await.unwrap(); - } - /// Upsert a namespace called `namespace_name` and write `lines` to it. async fn populate_namespace( repos: &mut R, diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index a1caf178d8..43e7804df7 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -13,7 +13,7 @@ clippy::dbg_macro )] -use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result, Transaction}; +use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result}; use data_types::{ ColumnType, NamespaceSchema, QueryPool, Shard, ShardId, ShardIndex, TableSchema, TopicId, TopicMetadata, @@ -215,7 +215,7 @@ where /// Used in tests and when creating an in-memory catalog. pub async fn create_or_get_default_records( shard_count: i32, - txn: &mut dyn Transaction, + txn: &mut dyn RepoCollection, ) -> Result<(TopicMetadata, QueryPool, BTreeMap)> { let topic = txn.topics().create_or_get(SHARED_TOPIC_NAME).await?; let query_pool = txn.query_pools().create_or_get(SHARED_QUERY_POOL).await?; @@ -264,7 +264,7 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let repo = MemCatalog::new(metrics); - let mut txn = repo.start_transaction().await.unwrap(); + let mut txn = repo.repositories().await; let (topic, query_pool, _) = create_or_get_default_records( 2, txn.deref_mut() diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 00f97e4ffa..c72f9f2f04 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -3,10 +3,9 @@ use crate::{ interface::{ - sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, - Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, - Result, ShardRepo, SoftDeletedRows, TableRepo, TopicMetadataRepo, Transaction, - MAX_PARQUET_FILES_SELECTED_ONCE, + CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, + ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, + SoftDeletedRows, TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE, }, metrics::MetricDecorator, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, @@ -19,7 +18,6 @@ use data_types::{ TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }; use iox_time::{SystemProvider, TimeProvider}; -use observability_deps::tracing::warn; use snafu::ensure; use sqlx::types::Uuid; use std::{ @@ -75,43 +73,16 @@ struct MemCollections { parquet_files: Vec, } -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum MemTxnInner { - Txn { - guard: OwnedMutexGuard, - stage: MemCollections, - finalized: bool, - }, - NoTxn { - collections: OwnedMutexGuard, - }, -} - /// transaction bound to an in-memory catalog. #[derive(Debug)] pub struct MemTxn { - inner: MemTxnInner, + inner: OwnedMutexGuard, time_provider: Arc, } impl MemTxn { fn stage(&mut self) -> &mut MemCollections { - match &mut self.inner { - MemTxnInner::Txn { stage, .. } => stage, - MemTxnInner::NoTxn { collections } => collections, - } - } -} - -impl Drop for MemTxn { - fn drop(&mut self) { - match self.inner { - MemTxnInner::Txn { finalized, .. } if !finalized => { - warn!("Dropping MemTxn w/o finalizing (commit or abort)"); - } - _ => {} - } + &mut self.inner } } @@ -124,17 +95,8 @@ impl Display for MemCatalog { #[async_trait] impl Catalog for MemCatalog { async fn setup(&self) -> Result<(), Error> { - let guard = Arc::clone(&self.collections).lock_owned().await; - let stage = guard.clone(); - let mut transaction = MemTxn { - inner: MemTxnInner::Txn { - guard, - stage, - finalized: false, - }, - time_provider: self.time_provider(), - }; - let stage = transaction.stage(); + let mut guard = Arc::clone(&self.collections).lock_owned().await; + let mut stage = guard.clone(); // We need to manually insert the topic here so that we can create the transition shard // below. @@ -152,31 +114,15 @@ impl Catalog for MemCatalog { min_unpersisted_sequence_number: SequenceNumber::new(0), }; stage.shards.push(shard); - transaction.commit_inplace().await?; + *guard = stage; Ok(()) } - async fn start_transaction(&self) -> Result, Error> { - let guard = Arc::clone(&self.collections).lock_owned().await; - let stage = guard.clone(); - Ok(Box::new(MetricDecorator::new( - MemTxn { - inner: MemTxnInner::Txn { - guard, - stage, - finalized: false, - }, - time_provider: self.time_provider(), - }, - Arc::clone(&self.metrics), - ))) - } - async fn repositories(&self) -> Box { let collections = Arc::clone(&self.collections).lock_owned().await; Box::new(MetricDecorator::new( MemTxn { - inner: MemTxnInner::NoTxn { collections }, + inner: collections, time_provider: self.time_provider(), }, Arc::clone(&self.metrics), @@ -192,40 +138,6 @@ impl Catalog for MemCatalog { } } -#[async_trait] -impl TransactionFinalize for MemTxn { - async fn commit_inplace(&mut self) -> Result<(), Error> { - match &mut self.inner { - MemTxnInner::Txn { - guard, - stage, - finalized, - } => { - assert!(!*finalized); - **guard = std::mem::take(stage); - *finalized = true; - } - MemTxnInner::NoTxn { .. } => { - panic!("cannot commit oneshot"); - } - } - Ok(()) - } - - async fn abort_inplace(&mut self) -> Result<(), Error> { - match &mut self.inner { - MemTxnInner::Txn { finalized, .. } => { - assert!(!*finalized); - *finalized = true; - } - MemTxnInner::NoTxn { .. } => { - panic!("cannot abort oneshot"); - } - } - Ok(()) - } -} - #[async_trait] impl RepoCollection for MemTxn { fn topics(&mut self) -> &mut dyn TopicMetadataRepo { @@ -961,51 +873,12 @@ impl PartitionRepo for MemTxn { #[async_trait] impl ParquetFileRepo for MemTxn { async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { - let stage = self.stage(); - - if stage - .parquet_files - .iter() - .any(|f| f.object_store_id == parquet_file_params.object_store_id) - { - return Err(Error::FileExists { - object_store_id: parquet_file_params.object_store_id, - }); - } - - let parquet_file = ParquetFile::from_params( - parquet_file_params, - ParquetFileId::new(stage.parquet_files.len() as i64 + 1), - ); - let compaction_level = parquet_file.compaction_level; - let created_at = parquet_file.created_at; - let partition_id = parquet_file.partition_id; - stage.parquet_files.push(parquet_file); - - // Update the new_file_at field its partition to the time of created_at - // Only update if the compaction level is not Final which signal more compaction needed - if compaction_level < CompactionLevel::Final { - let partition = stage - .partitions - .iter_mut() - .find(|p| p.id == partition_id) - .ok_or(Error::PartitionNotFound { id: partition_id })?; - partition.new_file_at = Some(created_at); - } - - Ok(stage.parquet_files.last().unwrap().clone()) + create_parquet_file(self.stage(), parquet_file_params).await } async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { let marked_at = Timestamp::from(self.time_provider.now()); - let stage = self.stage(); - - match stage.parquet_files.iter_mut().find(|p| p.id == id) { - Some(f) => f.to_delete = Some(marked_at), - None => return Err(Error::ParquetRecordNotFound { id }), - } - - Ok(()) + flag_for_delete(self.stage(), id, marked_at).await } async fn flag_for_delete_by_retention(&mut self) -> Result> { @@ -1119,20 +992,7 @@ impl ParquetFileRepo for MemTxn { parquet_file_ids: &[ParquetFileId], compaction_level: CompactionLevel, ) -> Result> { - let stage = self.stage(); - - let mut updated = Vec::with_capacity(parquet_file_ids.len()); - - for f in stage - .parquet_files - .iter_mut() - .filter(|p| parquet_file_ids.contains(&p.id)) - { - f.compaction_level = compaction_level; - updated.push(f.id); - } - - Ok(updated) + update_compaction_level(self.stage(), parquet_file_ids, compaction_level).await } async fn exist(&mut self, id: ParquetFileId) -> Result { @@ -1164,6 +1024,50 @@ impl ParquetFileRepo for MemTxn { .find(|f| f.object_store_id.eq(&object_store_id)) .cloned()) } + + async fn create_upgrade_delete( + &mut self, + _partition_id: PartitionId, + delete: &[ParquetFile], + upgrade: &[ParquetFile], + create: &[ParquetFileParams], + target_level: CompactionLevel, + ) -> Result> { + let mut delete_set = HashSet::new(); + let mut upgrade_set = HashSet::new(); + for d in delete { + delete_set.insert(d.id.get()); + } + for u in upgrade { + upgrade_set.insert(u.id.get()); + } + + assert!( + delete_set.is_disjoint(&upgrade_set), + "attempted to upgrade a file scheduled for delete" + ); + + let mut stage = self.inner.clone(); + + let upgrade = upgrade.iter().map(|f| f.id).collect::>(); + + for file in delete { + let marked_at = Timestamp::from(self.time_provider.now()); + flag_for_delete(&mut stage, file.id, marked_at).await?; + } + + update_compaction_level(&mut stage, &upgrade, target_level).await?; + + let mut ids = Vec::with_capacity(create.len()); + for file in create { + let res = create_parquet_file(&mut stage, file.clone()).await?; + ids.push(res.id); + } + + *self.inner = stage; + + Ok(ids) + } } fn filter_namespace_soft_delete<'a>( @@ -1177,6 +1081,77 @@ fn filter_namespace_soft_delete<'a>( }) } +// The following three functions are helpers to the create_upgrade_delete method. +// They are also used by the respective create/flag_for_delete/update_compaction_level methods. +async fn create_parquet_file( + stage: &mut MemCollections, + parquet_file_params: ParquetFileParams, +) -> Result { + if stage + .parquet_files + .iter() + .any(|f| f.object_store_id == parquet_file_params.object_store_id) + { + return Err(Error::FileExists { + object_store_id: parquet_file_params.object_store_id, + }); + } + + let parquet_file = ParquetFile::from_params( + parquet_file_params, + ParquetFileId::new(stage.parquet_files.len() as i64 + 1), + ); + let compaction_level = parquet_file.compaction_level; + let created_at = parquet_file.created_at; + let partition_id = parquet_file.partition_id; + stage.parquet_files.push(parquet_file); + + // Update the new_file_at field its partition to the time of created_at + // Only update if the compaction level is not Final which signal more compaction needed + if compaction_level < CompactionLevel::Final { + let partition = stage + .partitions + .iter_mut() + .find(|p| p.id == partition_id) + .ok_or(Error::PartitionNotFound { id: partition_id })?; + partition.new_file_at = Some(created_at); + } + + Ok(stage.parquet_files.last().unwrap().clone()) +} + +async fn flag_for_delete( + stage: &mut MemCollections, + id: ParquetFileId, + marked_at: Timestamp, +) -> Result<()> { + match stage.parquet_files.iter_mut().find(|p| p.id == id) { + Some(f) => f.to_delete = Some(marked_at), + None => return Err(Error::ParquetRecordNotFound { id }), + } + + Ok(()) +} + +async fn update_compaction_level( + stage: &mut MemCollections, + parquet_file_ids: &[ParquetFileId], + compaction_level: CompactionLevel, +) -> Result> { + let mut updated = Vec::with_capacity(parquet_file_ids.len()); + + for f in stage + .parquet_files + .iter_mut() + .filter(|p| parquet_file_ids.contains(&p.id)) + { + f.compaction_level = compaction_level; + updated.push(f.id); + } + + Ok(updated) +} + #[cfg(test)] mod tests { use super::*; diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index ded7ca75e7..4ac03e8db6 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -1,9 +1,8 @@ //! Metric instrumentation for catalog implementations. use crate::interface::{ - sealed::TransactionFinalize, CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, - PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo, - TopicMetadataRepo, + CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, QueryPoolRepo, + RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo, TopicMetadataRepo, }; use async_trait::async_trait; use data_types::{ @@ -87,20 +86,6 @@ where } } -#[async_trait] -impl TransactionFinalize for MetricDecorator -where - T: TransactionFinalize, - P: TimeProvider, -{ - async fn commit_inplace(&mut self) -> Result<(), super::interface::Error> { - self.inner.commit_inplace().await - } - async fn abort_inplace(&mut self) -> Result<(), super::interface::Error> { - self.inner.abort_inplace().await - } -} - /// Emit a trait impl for `impl_trait` that delegates calls to the inner /// implementation, recording the duration and result to the metrics registry. /// @@ -258,5 +243,6 @@ decorate!( "parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result; "parquet_count" = count(&mut self) -> Result; "parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result>; + "parquet_create_upgrade_delete" = create_upgrade_delete(&mut self, _partition_id: PartitionId, delete: &[ParquetFile], upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel) -> Result>; ] ); diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 43f81f9ccd..dd4303c615 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -2,10 +2,9 @@ use crate::{ interface::{ - self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, - ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, - QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo, - TopicMetadataRepo, Transaction, MAX_PARQUET_FILES_SELECTED_ONCE, + self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, + ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, + SoftDeletedRows, TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE, }, metrics::MetricDecorator, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, @@ -27,6 +26,7 @@ use sqlx::{ Acquire, ConnectOptions, Executor, Postgres, Row, }; use sqlx_hotswap_pool::HotSwapPool; +use std::collections::HashSet; use std::{collections::HashMap, fmt::Display, str::FromStr, sync::Arc, time::Duration}; static MIGRATOR: Migrator = sqlx::migrate!(); @@ -151,10 +151,8 @@ pub struct PostgresTxn { } #[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum PostgresTxnInner { - Txn(Option>), - Oneshot(HotSwapPool), +struct PostgresTxnInner { + pool: HotSwapPool, } impl<'c> Executor<'c> for &'c mut PostgresTxnInner { @@ -178,12 +176,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner { 'c: 'e, E: sqlx::Execute<'q, Self::Database>, { - match self { - PostgresTxnInner::Txn(txn) => { - txn.as_mut().expect("Not yet finalized").fetch_many(query) - } - PostgresTxnInner::Oneshot(pool) => pool.fetch_many(query), - } + self.pool.fetch_many(query) } fn fetch_optional<'e, 'q: 'e, E: 'q>( @@ -197,13 +190,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner { 'c: 'e, E: sqlx::Execute<'q, Self::Database>, { - match self { - PostgresTxnInner::Txn(txn) => txn - .as_mut() - .expect("Not yet finalized") - .fetch_optional(query), - PostgresTxnInner::Oneshot(pool) => pool.fetch_optional(query), - } + self.pool.fetch_optional(query) } fn prepare_with<'e, 'q: 'e>( @@ -217,13 +204,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner { where 'c: 'e, { - match self { - PostgresTxnInner::Txn(txn) => txn - .as_mut() - .expect("Not yet finalized") - .prepare_with(sql, parameters), - PostgresTxnInner::Oneshot(pool) => pool.prepare_with(sql, parameters), - } + self.pool.prepare_with(sql, parameters) } fn describe<'e, 'q: 'e>( @@ -233,52 +214,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner { where 'c: 'e, { - match self { - PostgresTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").describe(sql), - PostgresTxnInner::Oneshot(pool) => pool.describe(sql), - } - } -} - -impl Drop for PostgresTxn { - fn drop(&mut self) { - if let PostgresTxnInner::Txn(Some(_)) = self.inner { - warn!("Dropping PostgresTxn w/o finalizing (commit or abort)"); - - // SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so - // we don't need to spawn a task here to call `rollback` manually. - } - } -} - -#[async_trait] -impl TransactionFinalize for PostgresTxn { - async fn commit_inplace(&mut self) -> Result<(), Error> { - match &mut self.inner { - PostgresTxnInner::Txn(txn) => txn - .take() - .expect("Not yet finalized") - .commit() - .await - .map_err(|e| Error::SqlxError { source: e }), - PostgresTxnInner::Oneshot(_) => { - panic!("cannot commit oneshot"); - } - } - } - - async fn abort_inplace(&mut self) -> Result<(), Error> { - match &mut self.inner { - PostgresTxnInner::Txn(txn) => txn - .take() - .expect("Not yet finalized") - .rollback() - .await - .map_err(|e| Error::SqlxError { source: e }), - PostgresTxnInner::Oneshot(_) => { - panic!("cannot abort oneshot"); - } - } + self.pool.describe(sql) } } @@ -338,26 +274,12 @@ DO NOTHING; Ok(()) } - async fn start_transaction(&self) -> Result, Error> { - let transaction = self - .pool - .begin() - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(Box::new(MetricDecorator::new( - PostgresTxn { - inner: PostgresTxnInner::Txn(Some(transaction)), - time_provider: Arc::clone(&self.time_provider), - }, - Arc::clone(&self.metrics), - ))) - } - async fn repositories(&self) -> Box { Box::new(MetricDecorator::new( PostgresTxn { - inner: PostgresTxnInner::Oneshot(self.pool.clone()), + inner: PostgresTxnInner { + pool: self.pool.clone(), + }, time_provider: Arc::clone(&self.time_provider), }, Arc::clone(&self.metrics), @@ -1460,73 +1382,15 @@ RETURNING * #[async_trait] impl ParquetFileRepo for PostgresTxn { async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { - let ParquetFileParams { - shard_id, - namespace_id, - table_id, - partition_id, - object_store_id, - max_sequence_number, - min_time, - max_time, - file_size_bytes, - row_count, - compaction_level, - created_at, - column_set, - max_l0_created_at, - } = parquet_file_params; - - let rec = sqlx::query_as::<_, ParquetFile>( - r#" -INSERT INTO parquet_file ( - shard_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, file_size_bytes, - row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) -RETURNING *; - "#, - ) - .bind(shard_id) // $1 - .bind(table_id) // $2 - .bind(partition_id) // $3 - .bind(object_store_id) // $4 - .bind(max_sequence_number) // $5 - .bind(min_time) // $6 - .bind(max_time) // $7 - .bind(file_size_bytes) // $8 - .bind(row_count) // $9 - .bind(compaction_level) // $10 - .bind(created_at) // $11 - .bind(namespace_id) // $12 - .bind(column_set) // $13 - .bind(max_l0_created_at) // $14 - .fetch_one(&mut self.inner) - .await - .map_err(|e| { - if is_unique_violation(&e) { - Error::FileExists { object_store_id } - } else if is_fk_violation(&e) { - Error::ForeignKeyViolation { source: e } - } else { - Error::SqlxError { source: e } - } - })?; - - Ok(rec) + let executor = &mut self.inner; + create_parquet_file(executor, parquet_file_params).await } async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { let marked_at = Timestamp::from(self.time_provider.now()); + let executor = &mut self.inner; - let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#) - .bind(marked_at) // $1 - .bind(id) // $2 - .execute(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(()) + flag_for_delete(executor, id, marked_at).await } async fn flag_for_delete_by_retention(&mut self) -> Result> { @@ -1673,25 +1537,8 @@ WHERE parquet_file.partition_id = $1 parquet_file_ids: &[ParquetFileId], compaction_level: CompactionLevel, ) -> Result> { - // If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx. - // See https://github.com/launchbadge/sqlx/issues/1744 - let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect(); - let updated = sqlx::query( - r#" -UPDATE parquet_file -SET compaction_level = $1 -WHERE id = ANY($2) -RETURNING id; - "#, - ) - .bind(compaction_level) // $1 - .bind(&ids[..]) // $2 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - let updated = updated.into_iter().map(|row| row.get("id")).collect(); - Ok(updated) + let executor = &mut self.inner; + update_compaction_level(executor, parquet_file_ids, compaction_level).await } async fn exist(&mut self, id: ParquetFileId) -> Result { @@ -1743,6 +1590,168 @@ WHERE object_store_id = $1; Ok(Some(parquet_file)) } + async fn create_upgrade_delete( + &mut self, + _partition_id: PartitionId, + delete: &[ParquetFile], + upgrade: &[ParquetFile], + create: &[ParquetFileParams], + target_level: CompactionLevel, + ) -> Result> { + let mut delete_set = HashSet::new(); + let mut upgrade_set = HashSet::new(); + for d in delete { + delete_set.insert(d.id.get()); + } + for u in upgrade { + upgrade_set.insert(u.id.get()); + } + + assert!( + delete_set.is_disjoint(&upgrade_set), + "attempted to upgrade a file scheduled for delete" + ); + + let mut tx = self + .inner + .pool + .begin() + .await + .map_err(|e| Error::StartTransaction { source: e })?; + + let upgrade = upgrade.iter().map(|f| f.id).collect::>(); + + for file in delete { + let marked_at = Timestamp::from(self.time_provider.now()); + flag_for_delete(&mut tx, file.id, marked_at).await?; + } + + update_compaction_level(&mut tx, &upgrade, target_level).await?; + + let mut ids = Vec::with_capacity(create.len()); + for file in create { + let pf = create_parquet_file(&mut tx, file.clone()).await?; + ids.push(pf.id); + } + + let res = tx.commit().await; + if let Err(e) = res { + return Err(Error::FailedToCommit { source: e }); + } + + Ok(ids) + } +} + +// The following three functions are helpers to the create_upgrade_delete method. +// They are also used by the respective create/flag_for_delete/update_compaction_level methods. +async fn create_parquet_file<'q, E>( + executor: E, + parquet_file_params: ParquetFileParams, +) -> Result +where + E: Executor<'q, Database = Postgres>, +{ + let ParquetFileParams { + shard_id, + namespace_id, + table_id, + partition_id, + object_store_id, + max_sequence_number, + min_time, + max_time, + file_size_bytes, + row_count, + compaction_level, + created_at, + column_set, + max_l0_created_at, + } = parquet_file_params; + + let query = sqlx::query_as::<_, ParquetFile>( + r#" +INSERT INTO parquet_file ( + shard_id, table_id, partition_id, object_store_id, + max_sequence_number, min_time, max_time, file_size_bytes, + row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) +RETURNING + id, table_id, partition_id, object_store_id, + max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at; + "#, + ) + .bind(shard_id) // $1 + .bind(table_id) // $2 + .bind(partition_id) // $3 + .bind(object_store_id) // $4 + .bind(max_sequence_number) // $5 + .bind(min_time) // $6 + .bind(max_time) // $7 + .bind(file_size_bytes) // $8 + .bind(row_count) // $9 + .bind(compaction_level) // $10 + .bind(created_at) // $11 + .bind(namespace_id) // $12 + .bind(column_set) // $13 + .bind(max_l0_created_at); // $14 + let parquet_file = query.fetch_one(executor).await.map_err(|e| { + if is_unique_violation(&e) { + Error::FileExists { object_store_id } + } else if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + })?; + + Ok(parquet_file) +} + +async fn flag_for_delete<'q, E>(executor: E, id: ParquetFileId, marked_at: Timestamp) -> Result<()> +where + E: Executor<'q, Database = Postgres>, +{ + let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#) + .bind(marked_at) // $1 + .bind(id); // $2 + query + .execute(executor) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + Ok(()) +} + +async fn update_compaction_level<'q, E>( + executor: E, + parquet_file_ids: &[ParquetFileId], + compaction_level: CompactionLevel, +) -> Result> +where + E: Executor<'q, Database = Postgres>, +{ + // If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx. + // See https://github.com/launchbadge/sqlx/issues/1744 + let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect(); + let query = sqlx::query( + r#" +UPDATE parquet_file +SET compaction_level = $1 +WHERE id = ANY($2) +RETURNING id; + "#, + ) + .bind(compaction_level) // $1 + .bind(&ids[..]); // $2 + let updated = query + .fetch_all(executor) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + let updated = updated.into_iter().map(|row| row.get("id")).collect(); + Ok(updated) } /// The error code returned by Postgres for a unique constraint violation. @@ -1979,11 +1988,10 @@ mod tests { let postgres = setup_db().await; let postgres: Arc = Arc::new(postgres); - let mut txn = postgres.start_transaction().await.expect("txn start"); + let mut txn = postgres.repositories().await; let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = postgres .repositories() @@ -2038,11 +2046,10 @@ mod tests { let postgres = setup_db().await; let postgres: Arc = Arc::new(postgres); - let mut txn = postgres.start_transaction().await.expect("txn start"); + let mut txn = postgres.repositories().await; let (kafka, query, _) = create_or_get_default_records(2, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = postgres .repositories() @@ -2199,11 +2206,10 @@ mod tests { let metrics = Arc::clone(&postgres.metrics); let postgres: Arc = Arc::new(postgres); - let mut txn = postgres.start_transaction().await.expect("txn start"); + let mut txn = postgres.repositories().await; let (kafka, query, _shards) = create_or_get_default_records(1, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = postgres .repositories() @@ -2381,11 +2387,10 @@ mod tests { let pool = postgres.pool.clone(); let postgres: Arc = Arc::new(postgres); - let mut txn = postgres.start_transaction().await.expect("txn start"); + let mut txn = postgres.repositories().await; let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = postgres .repositories() diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 5abcea3686..0aa9b2e415 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -2,10 +2,9 @@ use crate::{ interface::{ - self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, - ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, - QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo, - TopicMetadataRepo, Transaction, MAX_PARQUET_FILES_SELECTED_ONCE, + self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, + ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, + SoftDeletedRows, TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE, }, metrics::MetricDecorator, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, @@ -18,12 +17,12 @@ use data_types::{ TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }; use serde::{Deserialize, Serialize}; -use std::ops::Deref; +use std::collections::HashSet; use std::{collections::HashMap, fmt::Display}; use iox_time::{SystemProvider, TimeProvider}; use metric::Registry; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::debug; use parking_lot::Mutex; use snafu::prelude::*; use sqlx::types::Json; @@ -66,10 +65,8 @@ pub struct SqliteTxn { } #[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum SqliteTxnInner { - Txn(Option>), - Oneshot(Pool), +struct SqliteTxnInner { + pool: Pool, } impl<'c> Executor<'c> for &'c mut SqliteTxnInner { @@ -93,10 +90,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner { 'c: 'e, E: sqlx::Execute<'q, Self::Database>, { - match self { - SqliteTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").fetch_many(query), - SqliteTxnInner::Oneshot(pool) => pool.fetch_many(query), - } + self.pool.fetch_many(query) } fn fetch_optional<'e, 'q: 'e, E: 'q>( @@ -110,13 +104,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner { 'c: 'e, E: sqlx::Execute<'q, Self::Database>, { - match self { - SqliteTxnInner::Txn(txn) => txn - .as_mut() - .expect("Not yet finalized") - .fetch_optional(query), - SqliteTxnInner::Oneshot(pool) => pool.fetch_optional(query), - } + self.pool.fetch_optional(query) } fn prepare_with<'e, 'q: 'e>( @@ -130,13 +118,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner { where 'c: 'e, { - match self { - SqliteTxnInner::Txn(txn) => txn - .as_mut() - .expect("Not yet finalized") - .prepare_with(sql, parameters), - SqliteTxnInner::Oneshot(pool) => pool.prepare_with(sql, parameters), - } + self.pool.prepare_with(sql, parameters) } fn describe<'e, 'q: 'e>( @@ -146,52 +128,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner { where 'c: 'e, { - match self { - SqliteTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").describe(sql), - SqliteTxnInner::Oneshot(pool) => pool.describe(sql), - } - } -} - -impl Drop for SqliteTxn { - fn drop(&mut self) { - if let SqliteTxnInner::Txn(Some(_)) = self.inner.lock().deref() { - warn!("Dropping SqliteTxn w/o finalizing (commit or abort)"); - - // SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so - // we don't need to spawn a task here to call `rollback` manually. - } - } -} - -#[async_trait] -impl TransactionFinalize for SqliteTxn { - async fn commit_inplace(&mut self) -> Result<(), Error> { - match self.inner.get_mut() { - SqliteTxnInner::Txn(txn) => txn - .take() - .expect("Not yet finalized") - .commit() - .await - .map_err(|e| Error::SqlxError { source: e }), - SqliteTxnInner::Oneshot(_) => { - panic!("cannot commit oneshot"); - } - } - } - - async fn abort_inplace(&mut self) -> Result<(), Error> { - match self.inner.get_mut() { - SqliteTxnInner::Txn(txn) => txn - .take() - .expect("Not yet finalized") - .rollback() - .await - .map_err(|e| Error::SqlxError { source: e }), - SqliteTxnInner::Oneshot(_) => { - panic!("cannot abort oneshot"); - } - } + self.pool.describe(sql) } } @@ -261,26 +198,12 @@ DO NOTHING; Ok(()) } - async fn start_transaction(&self) -> Result> { - let transaction = self - .pool - .begin() - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(Box::new(MetricDecorator::new( - SqliteTxn { - inner: Mutex::new(SqliteTxnInner::Txn(Some(transaction))), - time_provider: Arc::clone(&self.time_provider), - }, - Arc::clone(&self.metrics), - ))) - } - async fn repositories(&self) -> Box { Box::new(MetricDecorator::new( SqliteTxn { - inner: Mutex::new(SqliteTxnInner::Oneshot(self.pool.clone())), + inner: Mutex::new(SqliteTxnInner { + pool: self.pool.clone(), + }), time_provider: Arc::clone(&self.time_provider), }, Arc::clone(&self.metrics), @@ -1328,73 +1251,15 @@ impl From for ParquetFile { #[async_trait] impl ParquetFileRepo for SqliteTxn { async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { - let ParquetFileParams { - shard_id, - namespace_id, - table_id, - partition_id, - object_store_id, - max_sequence_number, - min_time, - max_time, - file_size_bytes, - row_count, - compaction_level, - created_at, - column_set, - max_l0_created_at, - } = parquet_file_params; - - let rec = sqlx::query_as::<_, ParquetFilePod>( - r#" -INSERT INTO parquet_file ( - shard_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, file_size_bytes, - row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) -RETURNING *; - "#, - ) - .bind(shard_id) // $1 - .bind(table_id) // $2 - .bind(partition_id) // $3 - .bind(object_store_id) // $4 - .bind(max_sequence_number) // $5 - .bind(min_time) // $6 - .bind(max_time) // $7 - .bind(file_size_bytes) // $8 - .bind(row_count) // $9 - .bind(compaction_level) // $10 - .bind(created_at) // $11 - .bind(namespace_id) // $12 - .bind(from_column_set(&column_set)) // $13 - .bind(max_l0_created_at) // $14 - .fetch_one(self.inner.get_mut()) - .await - .map_err(|e| { - if is_unique_violation(&e) { - Error::FileExists { object_store_id } - } else if is_fk_violation(&e) { - Error::ForeignKeyViolation { source: e } - } else { - Error::SqlxError { source: e } - } - })?; - - Ok(rec.into()) + let executor = self.inner.get_mut(); + create_parquet_file(executor, parquet_file_params).await } async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { let marked_at = Timestamp::from(self.time_provider.now()); + let executor = self.inner.get_mut(); - let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#) - .bind(marked_at) // $1 - .bind(id) // $2 - .execute(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(()) + flag_for_delete(executor, id, marked_at).await } async fn flag_for_delete_by_retention(&mut self) -> Result> { @@ -1553,25 +1418,8 @@ WHERE parquet_file.partition_id = $1 parquet_file_ids: &[ParquetFileId], compaction_level: CompactionLevel, ) -> Result> { - // If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx. - // See https://github.com/launchbadge/sqlx/issues/1744 - let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect(); - let updated = sqlx::query( - r#" -UPDATE parquet_file -SET compaction_level = $1 -WHERE id IN (SELECT value FROM json_each($2)) -RETURNING id; - "#, - ) - .bind(compaction_level) // $1 - .bind(Json(&ids[..])) // $2 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - let updated = updated.into_iter().map(|row| row.get("id")).collect(); - Ok(updated) + let executor = self.inner.get_mut(); + update_compaction_level(executor, parquet_file_ids, compaction_level).await } async fn exist(&mut self, id: ParquetFileId) -> Result { @@ -1623,6 +1471,168 @@ WHERE object_store_id = $1; Ok(Some(parquet_file.into())) } + + async fn create_upgrade_delete( + &mut self, + _partition_id: PartitionId, + delete: &[ParquetFile], + upgrade: &[ParquetFile], + create: &[ParquetFileParams], + target_level: CompactionLevel, + ) -> Result> { + let mut delete_set = HashSet::new(); + let mut upgrade_set = HashSet::new(); + for d in delete { + delete_set.insert(d.id.get()); + } + for u in upgrade { + upgrade_set.insert(u.id.get()); + } + + assert!( + delete_set.is_disjoint(&upgrade_set), + "attempted to upgrade a file scheduled for delete" + ); + let mut tx = self + .inner + .get_mut() + .pool + .begin() + .await + .map_err(|e| Error::StartTransaction { source: e })?; + + let upgrade = upgrade.iter().map(|f| f.id).collect::>(); + + for file in delete { + let marked_at = Timestamp::from(self.time_provider.now()); + flag_for_delete(&mut tx, file.id, marked_at).await?; + } + + update_compaction_level(&mut tx, &upgrade, target_level).await?; + + let mut ids = Vec::with_capacity(create.len()); + for file in create { + let res = create_parquet_file(&mut tx, file.clone()).await?; + ids.push(res.id); + } + tx.commit() + .await + .map_err(|e| Error::FailedToCommit { source: e })?; + + Ok(ids) + } +} + +// The following three functions are helpers to the create_upgrade_delete method. +// They are also used by the respective create/flag_for_delete/update_compaction_level methods. +async fn create_parquet_file<'q, E>( + executor: E, + parquet_file_params: ParquetFileParams, +) -> Result +where + E: Executor<'q, Database = Sqlite>, +{ + let ParquetFileParams { + shard_id, + namespace_id, + table_id, + partition_id, + object_store_id, + max_sequence_number, + min_time, + max_time, + file_size_bytes, + row_count, + compaction_level, + created_at, + column_set, + max_l0_created_at, + } = parquet_file_params; + + let query = sqlx::query_as::<_, ParquetFilePod>( + r#" +INSERT INTO parquet_file ( + shard_id, table_id, partition_id, object_store_id, + max_sequence_number, min_time, max_time, file_size_bytes, + row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) +RETURNING + id, table_id, partition_id, object_store_id, + min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at; + "#, + ) + .bind(shard_id) // $1 + .bind(table_id) // $2 + .bind(partition_id) // $3 + .bind(object_store_id) // $4 + .bind(max_sequence_number) // $5 + .bind(min_time) // $6 + .bind(max_time) // $7 + .bind(file_size_bytes) // $8 + .bind(row_count) // $9 + .bind(compaction_level) // $10 + .bind(created_at) // $11 + .bind(namespace_id) // $12 + .bind(from_column_set(&column_set)) // $13 + .bind(max_l0_created_at); // $14 + let rec = query.fetch_one(executor).await.map_err(|e| { + if is_unique_violation(&e) { + Error::FileExists { object_store_id } + } else if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + })?; + + Ok(rec.into()) +} + +async fn flag_for_delete<'q, E>(executor: E, id: ParquetFileId, marked_at: Timestamp) -> Result<()> +where + E: Executor<'q, Database = Sqlite>, +{ + let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#) + .bind(marked_at) // $1 + .bind(id); // $2 + + query + .execute(executor) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + Ok(()) +} + +async fn update_compaction_level<'q, E>( + executor: E, + parquet_file_ids: &[ParquetFileId], + compaction_level: CompactionLevel, +) -> Result> +where + E: Executor<'q, Database = Sqlite>, +{ + // If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx. + // See https://github.com/launchbadge/sqlx/issues/1744 + let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect(); + let query = sqlx::query( + r#" +UPDATE parquet_file +SET compaction_level = $1 +WHERE id IN (SELECT value FROM json_each($2)) +RETURNING id; + "#, + ) + .bind(compaction_level) // $1 + .bind(Json(&ids[..])); // $2 + let updated = query + .fetch_all(executor) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + let updated = updated.into_iter().map(|row| row.get("id")).collect(); + Ok(updated) } /// The error code returned by SQLite for a unique constraint violation. @@ -1706,11 +1716,10 @@ mod tests { let sqlite = setup_db().await; let sqlite: Arc = Arc::new(sqlite); - let mut txn = sqlite.start_transaction().await.expect("txn start"); + let mut txn = sqlite.repositories().await; let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = sqlite .repositories() @@ -1760,11 +1769,10 @@ mod tests { let sqlite = setup_db().await; let sqlite: Arc = Arc::new(sqlite); - let mut txn = sqlite.start_transaction().await.expect("txn start"); + let mut txn = sqlite.repositories().await; let (kafka, query, _) = create_or_get_default_records(2, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = sqlite .repositories() @@ -1832,11 +1840,10 @@ mod tests { let metrics = Arc::clone(&sqlite.metrics); let sqlite: Arc = Arc::new(sqlite); - let mut txn = sqlite.start_transaction().await.expect("txn start"); + let mut txn = sqlite.repositories().await; let (kafka, query, _shards) = create_or_get_default_records(1, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = sqlite .repositories() @@ -1999,11 +2006,10 @@ mod tests { let pool = sqlite.pool.clone(); let sqlite: Arc = Arc::new(sqlite); - let mut txn = sqlite.start_transaction().await.expect("txn start"); + let mut txn = sqlite.repositories().await; let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut()) .await .expect("db init failed"); - txn.commit().await.expect("txn commit"); let namespace_id = sqlite .repositories()