feat: remove transactions from Catalog

pull/24376/head
Jeffrey Smith II 2023-04-19 15:56:32 -04:00
parent 523fd0cabf
commit a01ae8f23d
6 changed files with 517 additions and 665 deletions

View File

@ -112,6 +112,9 @@ pub enum Error {
#[snafu(display("no transaction provided"))]
NoTransaction,
#[snafu(display("transaction failed to commit: {}", source))]
FailedToCommit { source: sqlx::Error },
#[snafu(display("error while converting usize {} to i64", value))]
InvalidValue { value: usize },
@ -192,14 +195,6 @@ pub trait Catalog: Send + Sync + Debug + Display {
/// Setup catalog for usage and apply possible migrations.
async fn setup(&self) -> Result<(), Error>;
/// Creates a new [`Transaction`].
///
/// Creating transactions is potentially expensive. Holding one consumes resources. The number
/// of parallel active transactions might be limited per catalog, so you MUST NOT rely on the
/// ability to create multiple transactions in parallel for correctness. Parallel transactions
/// must only be used for scaling.
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error>;
/// Accesses the repositories without a transaction scope.
async fn repositories(&self) -> Box<dyn RepoCollection>;
@ -210,68 +205,6 @@ pub trait Catalog: Send + Sync + Debug + Display {
fn time_provider(&self) -> Arc<dyn TimeProvider>;
}
/// Secret module for [sealed traits].
///
/// [sealed traits]: https://rust-lang.github.io/api-guidelines/future-proofing.html#sealed-traits-protect-against-downstream-implementations-c-sealed
#[doc(hidden)]
pub(crate) mod sealed {
use super::*;
/// Helper trait to implement commit and abort of an transaction.
///
/// The problem is that both methods cannot take `self` directly, otherwise the [`Transaction`]
/// would not be object safe. Therefore we can only take a reference. To avoid that a user uses
/// a transaction after calling one of the finalizers, we use a tiny trick and take `Box<dyn
/// Transaction>` in our public interface and use a sealed trait for the actual implementation.
#[async_trait]
pub trait TransactionFinalize: Send + Sync + Debug {
async fn commit_inplace(&mut self) -> Result<(), Error>;
async fn abort_inplace(&mut self) -> Result<(), Error>;
}
}
/// Transaction in a [`Catalog`] (similar to a database transaction).
///
/// A transaction provides a consistent view on data and stages writes.
/// To finalize a transaction, call [commit](Self::commit) or [abort](Self::abort).
///
/// Repositories can cheaply be borrowed from it.
///
/// Note that after any method in this transaction (including all repositories derived from it)
/// returns an error, the transaction MIGHT be poisoned and will return errors for all operations,
/// depending on the backend.
///
///
/// # Drop
///
/// Dropping a transaction without calling [`commit`](Self::commit) or [`abort`](Self::abort) will
/// abort the transaction. However resources might not be released immediately, so it is adviced to
/// always call [`abort`](Self::abort) when you want to enforce that. Dropping w/o
/// commiting/aborting will also log a warning.
#[async_trait]
pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection {
/// Commits a transaction.
///
/// # Error Handling
///
/// If successful, all changes will be visible to other transactions.
///
/// If an error is returned, the transaction may or or not be committed. This might be due to
/// IO errors after the transaction was finished. However in either case, the transaction is
/// atomic and can only succeed or fail entirely.
async fn commit(mut self: Box<Self>) -> Result<(), Error> {
self.commit_inplace().await
}
/// Aborts the transaction, throwing away all changes.
async fn abort(mut self: Box<Self>) -> Result<(), Error> {
self.abort_inplace().await
}
}
impl<T> Transaction for T where T: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection
{}
/// Methods for working with the catalog's various repositories (collections of entities).
///
/// # Repositories
@ -611,6 +544,16 @@ pub trait ParquetFileRepo: Send + Sync {
&mut self,
object_store_id: Uuid,
) -> Result<Option<ParquetFile>>;
/// Commit deletions, upgrades and creations in a single transaction.
async fn create_upgrade_delete(
&mut self,
_partition_id: PartitionId,
delete: &[ParquetFile],
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>>;
}
/// Gets the namespace schema including all tables and columns.
@ -833,7 +776,6 @@ pub(crate) mod test_helpers {
};
use super::*;
use ::test_helpers::{assert_contains, tracing::TracingCapture};
use assert_matches::assert_matches;
use data_types::{
ColumnId, ColumnSet, CompactionLevel, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
@ -857,8 +799,6 @@ pub(crate) mod test_helpers {
test_parquet_file_delete_broken(clean_state().await).await;
test_update_to_compaction_level_1(clean_state().await).await;
test_list_by_partiton_not_to_delete(clean_state().await).await;
test_txn_isolation(clean_state().await).await;
test_txn_drop(clean_state().await).await;
test_list_schemas(clean_state().await).await;
test_list_schemas_soft_deleted_rows(clean_state().await).await;
test_delete_namespace(clean_state().await).await;
@ -3222,66 +3162,6 @@ pub(crate) mod test_helpers {
.expect("parquet file exists check should succeed"));
}
async fn test_txn_isolation(catalog: Arc<dyn Catalog>) {
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let barrier_captured = Arc::clone(&barrier);
let catalog_captured = Arc::clone(&catalog);
let insertion_task = tokio::spawn(async move {
barrier_captured.wait().await;
let mut txn = catalog_captured.start_transaction().await.unwrap();
txn.topics()
.create_or_get("test_txn_isolation")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
txn.abort().await.unwrap();
});
let mut txn = catalog.start_transaction().await.unwrap();
barrier.wait().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let topic = txn
.topics()
.get_by_name("test_txn_isolation")
.await
.unwrap();
assert!(topic.is_none());
txn.abort().await.unwrap();
insertion_task.await.unwrap();
let mut txn = catalog.start_transaction().await.unwrap();
let topic = txn
.topics()
.get_by_name("test_txn_isolation")
.await
.unwrap();
assert!(topic.is_none());
txn.abort().await.unwrap();
}
async fn test_txn_drop(catalog: Arc<dyn Catalog>) {
let capture = TracingCapture::new();
let mut txn = catalog.start_transaction().await.unwrap();
txn.topics().create_or_get("test_txn_drop").await.unwrap();
drop(txn);
// got a warning
assert_contains!(capture.to_string(), "Dropping ");
assert_contains!(capture.to_string(), " w/o finalizing (commit or abort)");
// data is NOT committed
let mut txn = catalog.start_transaction().await.unwrap();
let topic = txn.topics().get_by_name("test_txn_drop").await.unwrap();
assert!(topic.is_none());
txn.abort().await.unwrap();
}
/// Upsert a namespace called `namespace_name` and write `lines` to it.
async fn populate_namespace<R>(
repos: &mut R,

View File

@ -13,7 +13,7 @@
clippy::dbg_macro
)]
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result, Transaction};
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result};
use data_types::{
ColumnType, NamespaceSchema, QueryPool, Shard, ShardId, ShardIndex, TableSchema, TopicId,
TopicMetadata,
@ -215,7 +215,7 @@ where
/// Used in tests and when creating an in-memory catalog.
pub async fn create_or_get_default_records(
shard_count: i32,
txn: &mut dyn Transaction,
txn: &mut dyn RepoCollection,
) -> Result<(TopicMetadata, QueryPool, BTreeMap<ShardId, Shard>)> {
let topic = txn.topics().create_or_get(SHARED_TOPIC_NAME).await?;
let query_pool = txn.query_pools().create_or_get(SHARED_QUERY_POOL).await?;
@ -264,7 +264,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let repo = MemCatalog::new(metrics);
let mut txn = repo.start_transaction().await.unwrap();
let mut txn = repo.repositories().await;
let (topic, query_pool, _) = create_or_get_default_records(
2,
txn.deref_mut()

View File

@ -3,10 +3,9 @@
use crate::{
interface::{
sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu,
Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection,
Result, ShardRepo, SoftDeletedRows, TableRepo, TopicMetadataRepo, Transaction,
MAX_PARQUET_FILES_SELECTED_ONCE,
CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo,
SoftDeletedRows, TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
@ -19,7 +18,6 @@ use data_types::{
TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::warn;
use snafu::ensure;
use sqlx::types::Uuid;
use std::{
@ -75,43 +73,16 @@ struct MemCollections {
parquet_files: Vec<ParquetFile>,
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum MemTxnInner {
Txn {
guard: OwnedMutexGuard<MemCollections>,
stage: MemCollections,
finalized: bool,
},
NoTxn {
collections: OwnedMutexGuard<MemCollections>,
},
}
/// transaction bound to an in-memory catalog.
#[derive(Debug)]
pub struct MemTxn {
inner: MemTxnInner,
inner: OwnedMutexGuard<MemCollections>,
time_provider: Arc<dyn TimeProvider>,
}
impl MemTxn {
fn stage(&mut self) -> &mut MemCollections {
match &mut self.inner {
MemTxnInner::Txn { stage, .. } => stage,
MemTxnInner::NoTxn { collections } => collections,
}
}
}
impl Drop for MemTxn {
fn drop(&mut self) {
match self.inner {
MemTxnInner::Txn { finalized, .. } if !finalized => {
warn!("Dropping MemTxn w/o finalizing (commit or abort)");
}
_ => {}
}
&mut self.inner
}
}
@ -124,17 +95,8 @@ impl Display for MemCatalog {
#[async_trait]
impl Catalog for MemCatalog {
async fn setup(&self) -> Result<(), Error> {
let guard = Arc::clone(&self.collections).lock_owned().await;
let stage = guard.clone();
let mut transaction = MemTxn {
inner: MemTxnInner::Txn {
guard,
stage,
finalized: false,
},
time_provider: self.time_provider(),
};
let stage = transaction.stage();
let mut guard = Arc::clone(&self.collections).lock_owned().await;
let mut stage = guard.clone();
// We need to manually insert the topic here so that we can create the transition shard
// below.
@ -152,31 +114,15 @@ impl Catalog for MemCatalog {
min_unpersisted_sequence_number: SequenceNumber::new(0),
};
stage.shards.push(shard);
transaction.commit_inplace().await?;
*guard = stage;
Ok(())
}
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error> {
let guard = Arc::clone(&self.collections).lock_owned().await;
let stage = guard.clone();
Ok(Box::new(MetricDecorator::new(
MemTxn {
inner: MemTxnInner::Txn {
guard,
stage,
finalized: false,
},
time_provider: self.time_provider(),
},
Arc::clone(&self.metrics),
)))
}
async fn repositories(&self) -> Box<dyn RepoCollection> {
let collections = Arc::clone(&self.collections).lock_owned().await;
Box::new(MetricDecorator::new(
MemTxn {
inner: MemTxnInner::NoTxn { collections },
inner: collections,
time_provider: self.time_provider(),
},
Arc::clone(&self.metrics),
@ -192,40 +138,6 @@ impl Catalog for MemCatalog {
}
}
#[async_trait]
impl TransactionFinalize for MemTxn {
async fn commit_inplace(&mut self) -> Result<(), Error> {
match &mut self.inner {
MemTxnInner::Txn {
guard,
stage,
finalized,
} => {
assert!(!*finalized);
**guard = std::mem::take(stage);
*finalized = true;
}
MemTxnInner::NoTxn { .. } => {
panic!("cannot commit oneshot");
}
}
Ok(())
}
async fn abort_inplace(&mut self) -> Result<(), Error> {
match &mut self.inner {
MemTxnInner::Txn { finalized, .. } => {
assert!(!*finalized);
*finalized = true;
}
MemTxnInner::NoTxn { .. } => {
panic!("cannot abort oneshot");
}
}
Ok(())
}
}
#[async_trait]
impl RepoCollection for MemTxn {
fn topics(&mut self) -> &mut dyn TopicMetadataRepo {
@ -961,51 +873,12 @@ impl PartitionRepo for MemTxn {
#[async_trait]
impl ParquetFileRepo for MemTxn {
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
let stage = self.stage();
if stage
.parquet_files
.iter()
.any(|f| f.object_store_id == parquet_file_params.object_store_id)
{
return Err(Error::FileExists {
object_store_id: parquet_file_params.object_store_id,
});
}
let parquet_file = ParquetFile::from_params(
parquet_file_params,
ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
);
let compaction_level = parquet_file.compaction_level;
let created_at = parquet_file.created_at;
let partition_id = parquet_file.partition_id;
stage.parquet_files.push(parquet_file);
// Update the new_file_at field its partition to the time of created_at
// Only update if the compaction level is not Final which signal more compaction needed
if compaction_level < CompactionLevel::Final {
let partition = stage
.partitions
.iter_mut()
.find(|p| p.id == partition_id)
.ok_or(Error::PartitionNotFound { id: partition_id })?;
partition.new_file_at = Some(created_at);
}
Ok(stage.parquet_files.last().unwrap().clone())
create_parquet_file(self.stage(), parquet_file_params).await
}
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
let marked_at = Timestamp::from(self.time_provider.now());
let stage = self.stage();
match stage.parquet_files.iter_mut().find(|p| p.id == id) {
Some(f) => f.to_delete = Some(marked_at),
None => return Err(Error::ParquetRecordNotFound { id }),
}
Ok(())
flag_for_delete(self.stage(), id, marked_at).await
}
async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> {
@ -1119,20 +992,7 @@ impl ParquetFileRepo for MemTxn {
parquet_file_ids: &[ParquetFileId],
compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> {
let stage = self.stage();
let mut updated = Vec::with_capacity(parquet_file_ids.len());
for f in stage
.parquet_files
.iter_mut()
.filter(|p| parquet_file_ids.contains(&p.id))
{
f.compaction_level = compaction_level;
updated.push(f.id);
}
Ok(updated)
update_compaction_level(self.stage(), parquet_file_ids, compaction_level).await
}
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
@ -1164,6 +1024,50 @@ impl ParquetFileRepo for MemTxn {
.find(|f| f.object_store_id.eq(&object_store_id))
.cloned())
}
async fn create_upgrade_delete(
&mut self,
_partition_id: PartitionId,
delete: &[ParquetFile],
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> {
let mut delete_set = HashSet::new();
let mut upgrade_set = HashSet::new();
for d in delete {
delete_set.insert(d.id.get());
}
for u in upgrade {
upgrade_set.insert(u.id.get());
}
assert!(
delete_set.is_disjoint(&upgrade_set),
"attempted to upgrade a file scheduled for delete"
);
let mut stage = self.inner.clone();
let upgrade = upgrade.iter().map(|f| f.id).collect::<Vec<_>>();
for file in delete {
let marked_at = Timestamp::from(self.time_provider.now());
flag_for_delete(&mut stage, file.id, marked_at).await?;
}
update_compaction_level(&mut stage, &upgrade, target_level).await?;
let mut ids = Vec::with_capacity(create.len());
for file in create {
let res = create_parquet_file(&mut stage, file.clone()).await?;
ids.push(res.id);
}
*self.inner = stage;
Ok(ids)
}
}
fn filter_namespace_soft_delete<'a>(
@ -1177,6 +1081,77 @@ fn filter_namespace_soft_delete<'a>(
})
}
// The following three functions are helpers to the create_upgrade_delete method.
// They are also used by the respective create/flag_for_delete/update_compaction_level methods.
async fn create_parquet_file(
stage: &mut MemCollections,
parquet_file_params: ParquetFileParams,
) -> Result<ParquetFile> {
if stage
.parquet_files
.iter()
.any(|f| f.object_store_id == parquet_file_params.object_store_id)
{
return Err(Error::FileExists {
object_store_id: parquet_file_params.object_store_id,
});
}
let parquet_file = ParquetFile::from_params(
parquet_file_params,
ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
);
let compaction_level = parquet_file.compaction_level;
let created_at = parquet_file.created_at;
let partition_id = parquet_file.partition_id;
stage.parquet_files.push(parquet_file);
// Update the new_file_at field its partition to the time of created_at
// Only update if the compaction level is not Final which signal more compaction needed
if compaction_level < CompactionLevel::Final {
let partition = stage
.partitions
.iter_mut()
.find(|p| p.id == partition_id)
.ok_or(Error::PartitionNotFound { id: partition_id })?;
partition.new_file_at = Some(created_at);
}
Ok(stage.parquet_files.last().unwrap().clone())
}
async fn flag_for_delete(
stage: &mut MemCollections,
id: ParquetFileId,
marked_at: Timestamp,
) -> Result<()> {
match stage.parquet_files.iter_mut().find(|p| p.id == id) {
Some(f) => f.to_delete = Some(marked_at),
None => return Err(Error::ParquetRecordNotFound { id }),
}
Ok(())
}
async fn update_compaction_level(
stage: &mut MemCollections,
parquet_file_ids: &[ParquetFileId],
compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> {
let mut updated = Vec::with_capacity(parquet_file_ids.len());
for f in stage
.parquet_files
.iter_mut()
.filter(|p| parquet_file_ids.contains(&p.id))
{
f.compaction_level = compaction_level;
updated.push(f.id);
}
Ok(updated)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,9 +1,8 @@
//! Metric instrumentation for catalog implementations.
use crate::interface::{
sealed::TransactionFinalize, CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo,
PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo,
TopicMetadataRepo,
CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, QueryPoolRepo,
RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo, TopicMetadataRepo,
};
use async_trait::async_trait;
use data_types::{
@ -87,20 +86,6 @@ where
}
}
#[async_trait]
impl<T, P> TransactionFinalize for MetricDecorator<T, P>
where
T: TransactionFinalize,
P: TimeProvider,
{
async fn commit_inplace(&mut self) -> Result<(), super::interface::Error> {
self.inner.commit_inplace().await
}
async fn abort_inplace(&mut self) -> Result<(), super::interface::Error> {
self.inner.abort_inplace().await
}
}
/// Emit a trait impl for `impl_trait` that delegates calls to the inner
/// implementation, recording the duration and result to the metrics registry.
///
@ -258,5 +243,6 @@ decorate!(
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
"parquet_count" = count(&mut self) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"parquet_create_upgrade_delete" = create_upgrade_delete(&mut self, _partition_id: PartitionId, delete: &[ParquetFile], upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel) -> Result<Vec<ParquetFileId>>;
]
);

View File

@ -2,10 +2,9 @@
use crate::{
interface::{
self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo,
ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo,
QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo,
TopicMetadataRepo, Transaction, MAX_PARQUET_FILES_SELECTED_ONCE,
self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo,
SoftDeletedRows, TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
@ -27,6 +26,7 @@ use sqlx::{
Acquire, ConnectOptions, Executor, Postgres, Row,
};
use sqlx_hotswap_pool::HotSwapPool;
use std::collections::HashSet;
use std::{collections::HashMap, fmt::Display, str::FromStr, sync::Arc, time::Duration};
static MIGRATOR: Migrator = sqlx::migrate!();
@ -151,10 +151,8 @@ pub struct PostgresTxn {
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum PostgresTxnInner {
Txn(Option<sqlx::Transaction<'static, Postgres>>),
Oneshot(HotSwapPool<Postgres>),
struct PostgresTxnInner {
pool: HotSwapPool<Postgres>,
}
impl<'c> Executor<'c> for &'c mut PostgresTxnInner {
@ -178,12 +176,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner {
'c: 'e,
E: sqlx::Execute<'q, Self::Database>,
{
match self {
PostgresTxnInner::Txn(txn) => {
txn.as_mut().expect("Not yet finalized").fetch_many(query)
}
PostgresTxnInner::Oneshot(pool) => pool.fetch_many(query),
}
self.pool.fetch_many(query)
}
fn fetch_optional<'e, 'q: 'e, E: 'q>(
@ -197,13 +190,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner {
'c: 'e,
E: sqlx::Execute<'q, Self::Database>,
{
match self {
PostgresTxnInner::Txn(txn) => txn
.as_mut()
.expect("Not yet finalized")
.fetch_optional(query),
PostgresTxnInner::Oneshot(pool) => pool.fetch_optional(query),
}
self.pool.fetch_optional(query)
}
fn prepare_with<'e, 'q: 'e>(
@ -217,13 +204,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner {
where
'c: 'e,
{
match self {
PostgresTxnInner::Txn(txn) => txn
.as_mut()
.expect("Not yet finalized")
.prepare_with(sql, parameters),
PostgresTxnInner::Oneshot(pool) => pool.prepare_with(sql, parameters),
}
self.pool.prepare_with(sql, parameters)
}
fn describe<'e, 'q: 'e>(
@ -233,52 +214,7 @@ impl<'c> Executor<'c> for &'c mut PostgresTxnInner {
where
'c: 'e,
{
match self {
PostgresTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").describe(sql),
PostgresTxnInner::Oneshot(pool) => pool.describe(sql),
}
}
}
impl Drop for PostgresTxn {
fn drop(&mut self) {
if let PostgresTxnInner::Txn(Some(_)) = self.inner {
warn!("Dropping PostgresTxn w/o finalizing (commit or abort)");
// SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so
// we don't need to spawn a task here to call `rollback` manually.
}
}
}
#[async_trait]
impl TransactionFinalize for PostgresTxn {
async fn commit_inplace(&mut self) -> Result<(), Error> {
match &mut self.inner {
PostgresTxnInner::Txn(txn) => txn
.take()
.expect("Not yet finalized")
.commit()
.await
.map_err(|e| Error::SqlxError { source: e }),
PostgresTxnInner::Oneshot(_) => {
panic!("cannot commit oneshot");
}
}
}
async fn abort_inplace(&mut self) -> Result<(), Error> {
match &mut self.inner {
PostgresTxnInner::Txn(txn) => txn
.take()
.expect("Not yet finalized")
.rollback()
.await
.map_err(|e| Error::SqlxError { source: e }),
PostgresTxnInner::Oneshot(_) => {
panic!("cannot abort oneshot");
}
}
self.pool.describe(sql)
}
}
@ -338,26 +274,12 @@ DO NOTHING;
Ok(())
}
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error> {
let transaction = self
.pool
.begin()
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(Box::new(MetricDecorator::new(
PostgresTxn {
inner: PostgresTxnInner::Txn(Some(transaction)),
time_provider: Arc::clone(&self.time_provider),
},
Arc::clone(&self.metrics),
)))
}
async fn repositories(&self) -> Box<dyn RepoCollection> {
Box::new(MetricDecorator::new(
PostgresTxn {
inner: PostgresTxnInner::Oneshot(self.pool.clone()),
inner: PostgresTxnInner {
pool: self.pool.clone(),
},
time_provider: Arc::clone(&self.time_provider),
},
Arc::clone(&self.metrics),
@ -1460,73 +1382,15 @@ RETURNING *
#[async_trait]
impl ParquetFileRepo for PostgresTxn {
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
let ParquetFileParams {
shard_id,
namespace_id,
table_id,
partition_id,
object_store_id,
max_sequence_number,
min_time,
max_time,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set,
max_l0_created_at,
} = parquet_file_params;
let rec = sqlx::query_as::<_, ParquetFile>(
r#"
INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
RETURNING *;
"#,
)
.bind(shard_id) // $1
.bind(table_id) // $2
.bind(partition_id) // $3
.bind(object_store_id) // $4
.bind(max_sequence_number) // $5
.bind(min_time) // $6
.bind(max_time) // $7
.bind(file_size_bytes) // $8
.bind(row_count) // $9
.bind(compaction_level) // $10
.bind(created_at) // $11
.bind(namespace_id) // $12
.bind(column_set) // $13
.bind(max_l0_created_at) // $14
.fetch_one(&mut self.inner)
.await
.map_err(|e| {
if is_unique_violation(&e) {
Error::FileExists { object_store_id }
} else if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
Ok(rec)
let executor = &mut self.inner;
create_parquet_file(executor, parquet_file_params).await
}
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
let marked_at = Timestamp::from(self.time_provider.now());
let executor = &mut self.inner;
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#)
.bind(marked_at) // $1
.bind(id) // $2
.execute(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
flag_for_delete(executor, id, marked_at).await
}
async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> {
@ -1673,25 +1537,8 @@ WHERE parquet_file.partition_id = $1
parquet_file_ids: &[ParquetFileId],
compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> {
// If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx.
// See https://github.com/launchbadge/sqlx/issues/1744
let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect();
let updated = sqlx::query(
r#"
UPDATE parquet_file
SET compaction_level = $1
WHERE id = ANY($2)
RETURNING id;
"#,
)
.bind(compaction_level) // $1
.bind(&ids[..]) // $2
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let updated = updated.into_iter().map(|row| row.get("id")).collect();
Ok(updated)
let executor = &mut self.inner;
update_compaction_level(executor, parquet_file_ids, compaction_level).await
}
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
@ -1743,6 +1590,168 @@ WHERE object_store_id = $1;
Ok(Some(parquet_file))
}
async fn create_upgrade_delete(
&mut self,
_partition_id: PartitionId,
delete: &[ParquetFile],
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> {
let mut delete_set = HashSet::new();
let mut upgrade_set = HashSet::new();
for d in delete {
delete_set.insert(d.id.get());
}
for u in upgrade {
upgrade_set.insert(u.id.get());
}
assert!(
delete_set.is_disjoint(&upgrade_set),
"attempted to upgrade a file scheduled for delete"
);
let mut tx = self
.inner
.pool
.begin()
.await
.map_err(|e| Error::StartTransaction { source: e })?;
let upgrade = upgrade.iter().map(|f| f.id).collect::<Vec<_>>();
for file in delete {
let marked_at = Timestamp::from(self.time_provider.now());
flag_for_delete(&mut tx, file.id, marked_at).await?;
}
update_compaction_level(&mut tx, &upgrade, target_level).await?;
let mut ids = Vec::with_capacity(create.len());
for file in create {
let pf = create_parquet_file(&mut tx, file.clone()).await?;
ids.push(pf.id);
}
let res = tx.commit().await;
if let Err(e) = res {
return Err(Error::FailedToCommit { source: e });
}
Ok(ids)
}
}
// The following three functions are helpers to the create_upgrade_delete method.
// They are also used by the respective create/flag_for_delete/update_compaction_level methods.
async fn create_parquet_file<'q, E>(
executor: E,
parquet_file_params: ParquetFileParams,
) -> Result<ParquetFile>
where
E: Executor<'q, Database = Postgres>,
{
let ParquetFileParams {
shard_id,
namespace_id,
table_id,
partition_id,
object_store_id,
max_sequence_number,
min_time,
max_time,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set,
max_l0_created_at,
} = parquet_file_params;
let query = sqlx::query_as::<_, ParquetFile>(
r#"
INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
RETURNING
id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at;
"#,
)
.bind(shard_id) // $1
.bind(table_id) // $2
.bind(partition_id) // $3
.bind(object_store_id) // $4
.bind(max_sequence_number) // $5
.bind(min_time) // $6
.bind(max_time) // $7
.bind(file_size_bytes) // $8
.bind(row_count) // $9
.bind(compaction_level) // $10
.bind(created_at) // $11
.bind(namespace_id) // $12
.bind(column_set) // $13
.bind(max_l0_created_at); // $14
let parquet_file = query.fetch_one(executor).await.map_err(|e| {
if is_unique_violation(&e) {
Error::FileExists { object_store_id }
} else if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
Ok(parquet_file)
}
async fn flag_for_delete<'q, E>(executor: E, id: ParquetFileId, marked_at: Timestamp) -> Result<()>
where
E: Executor<'q, Database = Postgres>,
{
let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#)
.bind(marked_at) // $1
.bind(id); // $2
query
.execute(executor)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
}
async fn update_compaction_level<'q, E>(
executor: E,
parquet_file_ids: &[ParquetFileId],
compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>>
where
E: Executor<'q, Database = Postgres>,
{
// If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx.
// See https://github.com/launchbadge/sqlx/issues/1744
let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect();
let query = sqlx::query(
r#"
UPDATE parquet_file
SET compaction_level = $1
WHERE id = ANY($2)
RETURNING id;
"#,
)
.bind(compaction_level) // $1
.bind(&ids[..]); // $2
let updated = query
.fetch_all(executor)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let updated = updated.into_iter().map(|row| row.get("id")).collect();
Ok(updated)
}
/// The error code returned by Postgres for a unique constraint violation.
@ -1979,11 +1988,10 @@ mod tests {
let postgres = setup_db().await;
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut txn = postgres.start_transaction().await.expect("txn start");
let mut txn = postgres.repositories().await;
let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = postgres
.repositories()
@ -2038,11 +2046,10 @@ mod tests {
let postgres = setup_db().await;
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut txn = postgres.start_transaction().await.expect("txn start");
let mut txn = postgres.repositories().await;
let (kafka, query, _) = create_or_get_default_records(2, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = postgres
.repositories()
@ -2199,11 +2206,10 @@ mod tests {
let metrics = Arc::clone(&postgres.metrics);
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut txn = postgres.start_transaction().await.expect("txn start");
let mut txn = postgres.repositories().await;
let (kafka, query, _shards) = create_or_get_default_records(1, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = postgres
.repositories()
@ -2381,11 +2387,10 @@ mod tests {
let pool = postgres.pool.clone();
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut txn = postgres.start_transaction().await.expect("txn start");
let mut txn = postgres.repositories().await;
let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = postgres
.repositories()

View File

@ -2,10 +2,9 @@
use crate::{
interface::{
self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo,
ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo,
QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo,
TopicMetadataRepo, Transaction, MAX_PARQUET_FILES_SELECTED_ONCE,
self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo,
SoftDeletedRows, TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
@ -18,12 +17,12 @@ use data_types::{
TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use serde::{Deserialize, Serialize};
use std::ops::Deref;
use std::collections::HashSet;
use std::{collections::HashMap, fmt::Display};
use iox_time::{SystemProvider, TimeProvider};
use metric::Registry;
use observability_deps::tracing::{debug, warn};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use snafu::prelude::*;
use sqlx::types::Json;
@ -66,10 +65,8 @@ pub struct SqliteTxn {
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum SqliteTxnInner {
Txn(Option<sqlx::Transaction<'static, Sqlite>>),
Oneshot(Pool<Sqlite>),
struct SqliteTxnInner {
pool: Pool<Sqlite>,
}
impl<'c> Executor<'c> for &'c mut SqliteTxnInner {
@ -93,10 +90,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner {
'c: 'e,
E: sqlx::Execute<'q, Self::Database>,
{
match self {
SqliteTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").fetch_many(query),
SqliteTxnInner::Oneshot(pool) => pool.fetch_many(query),
}
self.pool.fetch_many(query)
}
fn fetch_optional<'e, 'q: 'e, E: 'q>(
@ -110,13 +104,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner {
'c: 'e,
E: sqlx::Execute<'q, Self::Database>,
{
match self {
SqliteTxnInner::Txn(txn) => txn
.as_mut()
.expect("Not yet finalized")
.fetch_optional(query),
SqliteTxnInner::Oneshot(pool) => pool.fetch_optional(query),
}
self.pool.fetch_optional(query)
}
fn prepare_with<'e, 'q: 'e>(
@ -130,13 +118,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner {
where
'c: 'e,
{
match self {
SqliteTxnInner::Txn(txn) => txn
.as_mut()
.expect("Not yet finalized")
.prepare_with(sql, parameters),
SqliteTxnInner::Oneshot(pool) => pool.prepare_with(sql, parameters),
}
self.pool.prepare_with(sql, parameters)
}
fn describe<'e, 'q: 'e>(
@ -146,52 +128,7 @@ impl<'c> Executor<'c> for &'c mut SqliteTxnInner {
where
'c: 'e,
{
match self {
SqliteTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").describe(sql),
SqliteTxnInner::Oneshot(pool) => pool.describe(sql),
}
}
}
impl Drop for SqliteTxn {
fn drop(&mut self) {
if let SqliteTxnInner::Txn(Some(_)) = self.inner.lock().deref() {
warn!("Dropping SqliteTxn w/o finalizing (commit or abort)");
// SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so
// we don't need to spawn a task here to call `rollback` manually.
}
}
}
#[async_trait]
impl TransactionFinalize for SqliteTxn {
async fn commit_inplace(&mut self) -> Result<(), Error> {
match self.inner.get_mut() {
SqliteTxnInner::Txn(txn) => txn
.take()
.expect("Not yet finalized")
.commit()
.await
.map_err(|e| Error::SqlxError { source: e }),
SqliteTxnInner::Oneshot(_) => {
panic!("cannot commit oneshot");
}
}
}
async fn abort_inplace(&mut self) -> Result<(), Error> {
match self.inner.get_mut() {
SqliteTxnInner::Txn(txn) => txn
.take()
.expect("Not yet finalized")
.rollback()
.await
.map_err(|e| Error::SqlxError { source: e }),
SqliteTxnInner::Oneshot(_) => {
panic!("cannot abort oneshot");
}
}
self.pool.describe(sql)
}
}
@ -261,26 +198,12 @@ DO NOTHING;
Ok(())
}
async fn start_transaction(&self) -> Result<Box<dyn Transaction>> {
let transaction = self
.pool
.begin()
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(Box::new(MetricDecorator::new(
SqliteTxn {
inner: Mutex::new(SqliteTxnInner::Txn(Some(transaction))),
time_provider: Arc::clone(&self.time_provider),
},
Arc::clone(&self.metrics),
)))
}
async fn repositories(&self) -> Box<dyn RepoCollection> {
Box::new(MetricDecorator::new(
SqliteTxn {
inner: Mutex::new(SqliteTxnInner::Oneshot(self.pool.clone())),
inner: Mutex::new(SqliteTxnInner {
pool: self.pool.clone(),
}),
time_provider: Arc::clone(&self.time_provider),
},
Arc::clone(&self.metrics),
@ -1328,73 +1251,15 @@ impl From<ParquetFilePod> for ParquetFile {
#[async_trait]
impl ParquetFileRepo for SqliteTxn {
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
let ParquetFileParams {
shard_id,
namespace_id,
table_id,
partition_id,
object_store_id,
max_sequence_number,
min_time,
max_time,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set,
max_l0_created_at,
} = parquet_file_params;
let rec = sqlx::query_as::<_, ParquetFilePod>(
r#"
INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
RETURNING *;
"#,
)
.bind(shard_id) // $1
.bind(table_id) // $2
.bind(partition_id) // $3
.bind(object_store_id) // $4
.bind(max_sequence_number) // $5
.bind(min_time) // $6
.bind(max_time) // $7
.bind(file_size_bytes) // $8
.bind(row_count) // $9
.bind(compaction_level) // $10
.bind(created_at) // $11
.bind(namespace_id) // $12
.bind(from_column_set(&column_set)) // $13
.bind(max_l0_created_at) // $14
.fetch_one(self.inner.get_mut())
.await
.map_err(|e| {
if is_unique_violation(&e) {
Error::FileExists { object_store_id }
} else if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
Ok(rec.into())
let executor = self.inner.get_mut();
create_parquet_file(executor, parquet_file_params).await
}
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
let marked_at = Timestamp::from(self.time_provider.now());
let executor = self.inner.get_mut();
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#)
.bind(marked_at) // $1
.bind(id) // $2
.execute(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
flag_for_delete(executor, id, marked_at).await
}
async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> {
@ -1553,25 +1418,8 @@ WHERE parquet_file.partition_id = $1
parquet_file_ids: &[ParquetFileId],
compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> {
// If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx.
// See https://github.com/launchbadge/sqlx/issues/1744
let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect();
let updated = sqlx::query(
r#"
UPDATE parquet_file
SET compaction_level = $1
WHERE id IN (SELECT value FROM json_each($2))
RETURNING id;
"#,
)
.bind(compaction_level) // $1
.bind(Json(&ids[..])) // $2
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?;
let updated = updated.into_iter().map(|row| row.get("id")).collect();
Ok(updated)
let executor = self.inner.get_mut();
update_compaction_level(executor, parquet_file_ids, compaction_level).await
}
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
@ -1623,6 +1471,168 @@ WHERE object_store_id = $1;
Ok(Some(parquet_file.into()))
}
async fn create_upgrade_delete(
&mut self,
_partition_id: PartitionId,
delete: &[ParquetFile],
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> {
let mut delete_set = HashSet::new();
let mut upgrade_set = HashSet::new();
for d in delete {
delete_set.insert(d.id.get());
}
for u in upgrade {
upgrade_set.insert(u.id.get());
}
assert!(
delete_set.is_disjoint(&upgrade_set),
"attempted to upgrade a file scheduled for delete"
);
let mut tx = self
.inner
.get_mut()
.pool
.begin()
.await
.map_err(|e| Error::StartTransaction { source: e })?;
let upgrade = upgrade.iter().map(|f| f.id).collect::<Vec<_>>();
for file in delete {
let marked_at = Timestamp::from(self.time_provider.now());
flag_for_delete(&mut tx, file.id, marked_at).await?;
}
update_compaction_level(&mut tx, &upgrade, target_level).await?;
let mut ids = Vec::with_capacity(create.len());
for file in create {
let res = create_parquet_file(&mut tx, file.clone()).await?;
ids.push(res.id);
}
tx.commit()
.await
.map_err(|e| Error::FailedToCommit { source: e })?;
Ok(ids)
}
}
// The following three functions are helpers to the create_upgrade_delete method.
// They are also used by the respective create/flag_for_delete/update_compaction_level methods.
async fn create_parquet_file<'q, E>(
executor: E,
parquet_file_params: ParquetFileParams,
) -> Result<ParquetFile>
where
E: Executor<'q, Database = Sqlite>,
{
let ParquetFileParams {
shard_id,
namespace_id,
table_id,
partition_id,
object_store_id,
max_sequence_number,
min_time,
max_time,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set,
max_l0_created_at,
} = parquet_file_params;
let query = sqlx::query_as::<_, ParquetFilePod>(
r#"
INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
RETURNING
id, table_id, partition_id, object_store_id,
min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at;
"#,
)
.bind(shard_id) // $1
.bind(table_id) // $2
.bind(partition_id) // $3
.bind(object_store_id) // $4
.bind(max_sequence_number) // $5
.bind(min_time) // $6
.bind(max_time) // $7
.bind(file_size_bytes) // $8
.bind(row_count) // $9
.bind(compaction_level) // $10
.bind(created_at) // $11
.bind(namespace_id) // $12
.bind(from_column_set(&column_set)) // $13
.bind(max_l0_created_at); // $14
let rec = query.fetch_one(executor).await.map_err(|e| {
if is_unique_violation(&e) {
Error::FileExists { object_store_id }
} else if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
Ok(rec.into())
}
async fn flag_for_delete<'q, E>(executor: E, id: ParquetFileId, marked_at: Timestamp) -> Result<()>
where
E: Executor<'q, Database = Sqlite>,
{
let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#)
.bind(marked_at) // $1
.bind(id); // $2
query
.execute(executor)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
}
async fn update_compaction_level<'q, E>(
executor: E,
parquet_file_ids: &[ParquetFileId],
compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>>
where
E: Executor<'q, Database = Sqlite>,
{
// If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx.
// See https://github.com/launchbadge/sqlx/issues/1744
let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect();
let query = sqlx::query(
r#"
UPDATE parquet_file
SET compaction_level = $1
WHERE id IN (SELECT value FROM json_each($2))
RETURNING id;
"#,
)
.bind(compaction_level) // $1
.bind(Json(&ids[..])); // $2
let updated = query
.fetch_all(executor)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let updated = updated.into_iter().map(|row| row.get("id")).collect();
Ok(updated)
}
/// The error code returned by SQLite for a unique constraint violation.
@ -1706,11 +1716,10 @@ mod tests {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut txn = sqlite.start_transaction().await.expect("txn start");
let mut txn = sqlite.repositories().await;
let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = sqlite
.repositories()
@ -1760,11 +1769,10 @@ mod tests {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut txn = sqlite.start_transaction().await.expect("txn start");
let mut txn = sqlite.repositories().await;
let (kafka, query, _) = create_or_get_default_records(2, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = sqlite
.repositories()
@ -1832,11 +1840,10 @@ mod tests {
let metrics = Arc::clone(&sqlite.metrics);
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut txn = sqlite.start_transaction().await.expect("txn start");
let mut txn = sqlite.repositories().await;
let (kafka, query, _shards) = create_or_get_default_records(1, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = sqlite
.repositories()
@ -1999,11 +2006,10 @@ mod tests {
let pool = sqlite.pool.clone();
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut txn = sqlite.start_transaction().await.expect("txn start");
let mut txn = sqlite.repositories().await;
let (kafka, query, shards) = create_or_get_default_records(1, txn.deref_mut())
.await
.expect("db init failed");
txn.commit().await.expect("txn commit");
let namespace_id = sqlite
.repositories()