feat: allow catalog access w/o a transaction (#3735)
* feat: allow catalog access w/o a transaction Now the caller has the full control if they want to use a transaction or not. * fix: remove non-transaction-safe `create_many` * fix: remove unnecessary transactionspull/24376/head
parent
2626e4891d
commit
c6e374a025
|
@ -47,9 +47,8 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
||||||
match config.command {
|
match config.command {
|
||||||
Command::Update(update) => {
|
Command::Update(update) => {
|
||||||
let catalog = update.catalog_dsn.get_catalog("cli").await?;
|
let catalog = update.catalog_dsn.get_catalog("cli").await?;
|
||||||
let mut txn = catalog.start_transaction().await?;
|
let mut repos = catalog.repositories().await;
|
||||||
let topic = txn.kafka_topics().create_or_get(&update.db_name).await?;
|
let topic = repos.kafka_topics().create_or_get(&update.db_name).await?;
|
||||||
txn.commit().await?;
|
|
||||||
println!("{}", topic.id);
|
println!("{}", topic.id);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,14 +168,13 @@ impl SequencerData {
|
||||||
namespace: &str,
|
namespace: &str,
|
||||||
catalog: &dyn Catalog,
|
catalog: &dyn Catalog,
|
||||||
) -> Result<Arc<NamespaceData>> {
|
) -> Result<Arc<NamespaceData>> {
|
||||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
let mut repos = catalog.repositories().await;
|
||||||
let namespace = txn
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.get_by_name(namespace)
|
.get_by_name(namespace)
|
||||||
.await
|
.await
|
||||||
.context(CatalogSnafu)?
|
.context(CatalogSnafu)?
|
||||||
.context(NamespaceNotFoundSnafu { namespace })?;
|
.context(NamespaceNotFoundSnafu { namespace })?;
|
||||||
txn.commit().await.context(CatalogSnafu)?;
|
|
||||||
|
|
||||||
let mut n = self.namespaces.write();
|
let mut n = self.namespaces.write();
|
||||||
let data = Arc::clone(
|
let data = Arc::clone(
|
||||||
|
@ -257,13 +256,12 @@ impl NamespaceData {
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
catalog: &dyn Catalog,
|
catalog: &dyn Catalog,
|
||||||
) -> Result<Arc<TableData>> {
|
) -> Result<Arc<TableData>> {
|
||||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
let mut repos = catalog.repositories().await;
|
||||||
let table = txn
|
let table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get(table_name, self.namespace_id)
|
.create_or_get(table_name, self.namespace_id)
|
||||||
.await
|
.await
|
||||||
.context(CatalogSnafu)?;
|
.context(CatalogSnafu)?;
|
||||||
txn.commit().await.context(CatalogSnafu)?;
|
|
||||||
|
|
||||||
let mut t = self.tables.write();
|
let mut t = self.tables.write();
|
||||||
let data = Arc::clone(
|
let data = Arc::clone(
|
||||||
|
@ -336,8 +334,8 @@ impl TableData {
|
||||||
let min_time = Timestamp::new(predicate.range.start());
|
let min_time = Timestamp::new(predicate.range.start());
|
||||||
let max_time = Timestamp::new(predicate.range.end());
|
let max_time = Timestamp::new(predicate.range.end());
|
||||||
|
|
||||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
let mut repos = catalog.repositories().await;
|
||||||
let tombstone = txn
|
let tombstone = repos
|
||||||
.tombstones()
|
.tombstones()
|
||||||
.create_or_get(
|
.create_or_get(
|
||||||
self.table_id,
|
self.table_id,
|
||||||
|
@ -349,7 +347,6 @@ impl TableData {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context(CatalogSnafu)?;
|
.context(CatalogSnafu)?;
|
||||||
txn.commit().await.context(CatalogSnafu)?;
|
|
||||||
|
|
||||||
let partitions = self.partition_data.read();
|
let partitions = self.partition_data.read();
|
||||||
for data in partitions.values() {
|
for data in partitions.values() {
|
||||||
|
@ -371,13 +368,12 @@ impl TableData {
|
||||||
sequencer_id: SequencerId,
|
sequencer_id: SequencerId,
|
||||||
catalog: &dyn Catalog,
|
catalog: &dyn Catalog,
|
||||||
) -> Result<Arc<PartitionData>> {
|
) -> Result<Arc<PartitionData>> {
|
||||||
let mut txn = catalog.start_transaction().await.context(CatalogSnafu)?;
|
let mut repos = catalog.repositories().await;
|
||||||
let partition = txn
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(partition_key, sequencer_id, self.table_id)
|
.create_or_get(partition_key, sequencer_id, self.table_id)
|
||||||
.await
|
.await
|
||||||
.context(CatalogSnafu)?;
|
.context(CatalogSnafu)?;
|
||||||
txn.commit().await.context(CatalogSnafu)?;
|
|
||||||
let mut p = self.partition_data.write();
|
let mut p = self.partition_data.write();
|
||||||
let data = Arc::new(PartitionData::new(partition.id));
|
let data = Arc::new(PartitionData::new(partition.id));
|
||||||
p.insert(partition.partition_key, Arc::clone(&data));
|
p.insert(partition.partition_key, Arc::clone(&data));
|
||||||
|
|
|
@ -314,6 +314,9 @@ pub trait Catalog: Send + Sync + Debug {
|
||||||
/// transactions might be limited per catalog, so you MUST NOT rely on the ability to create multiple transactions in
|
/// transactions might be limited per catalog, so you MUST NOT rely on the ability to create multiple transactions in
|
||||||
/// parallel for correctness but only for scaling.
|
/// parallel for correctness but only for scaling.
|
||||||
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error>;
|
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error>;
|
||||||
|
|
||||||
|
/// Access the repositories w/o a transaction scope.
|
||||||
|
async fn repositories(&self) -> Box<dyn RepoCollection>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Secret module for [sealed traits].
|
/// Secret module for [sealed traits].
|
||||||
|
@ -344,22 +347,12 @@ pub(crate) mod sealed {
|
||||||
/// transaction MIGHT be poisoned and will return errors for all operations, depending on the backend.
|
/// transaction MIGHT be poisoned and will return errors for all operations, depending on the backend.
|
||||||
///
|
///
|
||||||
///
|
///
|
||||||
/// # Repositories
|
|
||||||
/// The methods (e.g. "get or create") for handling entities (e.g. namespaces, tombstones, ...) are grouped into
|
|
||||||
/// *repositories* with one *repository* per entity. A repository can be thought of a collection of a single entity.
|
|
||||||
/// Getting repositories from the transaction is cheap.
|
|
||||||
///
|
|
||||||
/// Note that a repository might internally map to a wide range of different storage abstractions, ranging from one or
|
|
||||||
/// more SQL tables over key-value key spaces to simple in-memory vectors. The user should and must not care how these
|
|
||||||
/// are implemented.
|
|
||||||
///
|
|
||||||
///
|
|
||||||
/// # Drop
|
/// # Drop
|
||||||
/// Dropping a transaction without calling [`commit`](Self::commit) or [`abort`](Self::abort) will abort the
|
/// Dropping a transaction without calling [`commit`](Self::commit) or [`abort`](Self::abort) will abort the
|
||||||
/// transaction. However resources might not be released immediately, so it is adviced to always call
|
/// transaction. However resources might not be released immediately, so it is adviced to always call
|
||||||
/// [`abort`](Self::abort) when you want to enforce that. Dropping w/o commiting/aborting will also log a warning.
|
/// [`abort`](Self::abort) when you want to enforce that. Dropping w/o commiting/aborting will also log a warning.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize {
|
pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection {
|
||||||
/// Commit transaction.
|
/// Commit transaction.
|
||||||
///
|
///
|
||||||
/// # Error Handling
|
/// # Error Handling
|
||||||
|
@ -376,7 +369,22 @@ pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize {
|
||||||
async fn abort(mut self: Box<Self>) -> Result<(), Error> {
|
async fn abort(mut self: Box<Self>) -> Result<(), Error> {
|
||||||
self.abort_inplace().await
|
self.abort_inplace().await
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Transaction for T where T: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection
|
||||||
|
{}
|
||||||
|
|
||||||
|
/// Collection of the different repositories that the catalog offers.
|
||||||
|
///
|
||||||
|
/// The methods (e.g. "get or create") for handling entities (e.g. namespaces, tombstones, ...) are grouped into
|
||||||
|
/// *repositories* with one *repository* per entity. A repository can be thought of a collection of a single entity.
|
||||||
|
/// Getting repositories from the transaction is cheap.
|
||||||
|
///
|
||||||
|
/// Note that a repository might internally map to a wide range of different storage abstractions, ranging from one or
|
||||||
|
/// more SQL tables over key-value key spaces to simple in-memory vectors. The user should and must not care how these
|
||||||
|
/// are implemented.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait RepoCollection: Send + Sync + Debug {
|
||||||
/// repo for kafka topics
|
/// repo for kafka topics
|
||||||
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo;
|
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo;
|
||||||
|
|
||||||
|
@ -588,12 +596,12 @@ pub trait ParquetFileRepo: Send + Sync {
|
||||||
/// Functions for working with processed tombstone pointers in the catalog
|
/// Functions for working with processed tombstone pointers in the catalog
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ProcessedTombstoneRepo: Send + Sync {
|
pub trait ProcessedTombstoneRepo: Send + Sync {
|
||||||
/// create processed tombstones
|
/// create a processed tombstone
|
||||||
async fn create_many(
|
async fn create(
|
||||||
&mut self,
|
&mut self,
|
||||||
parquet_file_id: ParquetFileId,
|
parquet_file_id: ParquetFileId,
|
||||||
tombstones: &[Tombstone],
|
tombstone_id: TombstoneId,
|
||||||
) -> Result<Vec<ProcessedTombstone>>;
|
) -> Result<ProcessedTombstone>;
|
||||||
|
|
||||||
/// Verify if a processed tombstone exists in the catalog
|
/// Verify if a processed tombstone exists in the catalog
|
||||||
async fn exist(
|
async fn exist(
|
||||||
|
@ -667,16 +675,19 @@ impl NamespaceSchema {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the namespace schema including all tables and columns.
|
/// Gets the namespace schema including all tables and columns.
|
||||||
pub async fn get_schema_by_name(name: &str, txn: &mut dyn Transaction) -> Result<NamespaceSchema> {
|
pub async fn get_schema_by_name<R>(name: &str, repos: &mut R) -> Result<NamespaceSchema>
|
||||||
let namespace = txn
|
where
|
||||||
|
R: RepoCollection + ?Sized,
|
||||||
|
{
|
||||||
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.get_by_name(name)
|
.get_by_name(name)
|
||||||
.await?
|
.await?
|
||||||
.context(NamespaceNotFoundSnafu { name })?;
|
.context(NamespaceNotFoundSnafu { name })?;
|
||||||
|
|
||||||
// get the columns first just in case someone else is creating schema while we're doing this.
|
// get the columns first just in case someone else is creating schema while we're doing this.
|
||||||
let columns = txn.columns().list_by_namespace_id(namespace.id).await?;
|
let columns = repos.columns().list_by_namespace_id(namespace.id).await?;
|
||||||
let tables = txn.tables().list_by_namespace_id(namespace.id).await?;
|
let tables = repos.tables().list_by_namespace_id(namespace.id).await?;
|
||||||
|
|
||||||
let mut namespace = NamespaceSchema::new(
|
let mut namespace = NamespaceSchema::new(
|
||||||
namespace.id,
|
namespace.id,
|
||||||
|
@ -1043,8 +1054,8 @@ pub(crate) mod test_helpers {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_kafka_topic(catalog: Arc<dyn Catalog>) {
|
async fn test_kafka_topic(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka_repo = txn.kafka_topics();
|
let kafka_repo = repos.kafka_topics();
|
||||||
|
|
||||||
let k = kafka_repo.create_or_get("foo").await.unwrap();
|
let k = kafka_repo.create_or_get("foo").await.unwrap();
|
||||||
assert!(k.id > KafkaTopicId::new(0));
|
assert!(k.id > KafkaTopicId::new(0));
|
||||||
|
@ -1055,40 +1066,34 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(k3, k);
|
assert_eq!(k3, k);
|
||||||
let k3 = kafka_repo.get_by_name("asdf").await.unwrap();
|
let k3 = kafka_repo.get_by_name("asdf").await.unwrap();
|
||||||
assert!(k3.is_none());
|
assert!(k3.is_none());
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_query_pool(catalog: Arc<dyn Catalog>) {
|
async fn test_query_pool(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let query_repo = txn.query_pools();
|
let query_repo = repos.query_pools();
|
||||||
|
|
||||||
let q = query_repo.create_or_get("foo").await.unwrap();
|
let q = query_repo.create_or_get("foo").await.unwrap();
|
||||||
assert!(q.id > QueryPoolId::new(0));
|
assert!(q.id > QueryPoolId::new(0));
|
||||||
assert_eq!(q.name, "foo");
|
assert_eq!(q.name, "foo");
|
||||||
let q2 = query_repo.create_or_get("foo").await.unwrap();
|
let q2 = query_repo.create_or_get("foo").await.unwrap();
|
||||||
assert_eq!(q, q2);
|
assert_eq!(q, q2);
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_namespace(catalog: Arc<dyn Catalog>) {
|
async fn test_namespace(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
|
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
|
||||||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||||
|
|
||||||
let namespace_name = "test_namespace";
|
let namespace_name = "test_namespace";
|
||||||
let namespace = txn
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(namespace_name, "inf", kafka.id, pool.id)
|
.create(namespace_name, "inf", kafka.id, pool.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(namespace.id > NamespaceId::new(0));
|
assert!(namespace.id > NamespaceId::new(0));
|
||||||
assert_eq!(namespace.name, namespace_name);
|
assert_eq!(namespace.name, namespace_name);
|
||||||
txn.commit().await.unwrap();
|
|
||||||
|
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let conflict = repos
|
||||||
let conflict = txn
|
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(namespace_name, "inf", kafka.id, pool.id)
|
.create(namespace_name, "inf", kafka.id, pool.id)
|
||||||
.await;
|
.await;
|
||||||
|
@ -1096,36 +1101,33 @@ pub(crate) mod test_helpers {
|
||||||
conflict.unwrap_err(),
|
conflict.unwrap_err(),
|
||||||
Error::NameExists { name: _ }
|
Error::NameExists { name: _ }
|
||||||
));
|
));
|
||||||
txn.abort().await.unwrap();
|
|
||||||
|
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let found = repos
|
||||||
let found = txn
|
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.get_by_name(namespace_name)
|
.get_by_name(namespace_name)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("namespace should be there");
|
.expect("namespace should be there");
|
||||||
assert_eq!(namespace, found);
|
assert_eq!(namespace, found);
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_table(catalog: Arc<dyn Catalog>) {
|
async fn test_table(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
|
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
|
||||||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||||
let namespace = txn
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_table_test", "inf", kafka.id, pool.id)
|
.create("namespace_table_test", "inf", kafka.id, pool.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// test we can create or get a table
|
// test we can create or get a table
|
||||||
let t = txn
|
let t = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table", namespace.id)
|
.create_or_get("test_table", namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let tt = txn
|
let tt = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table", namespace.id)
|
.create_or_get("test_table", namespace.id)
|
||||||
.await
|
.await
|
||||||
|
@ -1133,7 +1135,7 @@ pub(crate) mod test_helpers {
|
||||||
assert!(t.id > TableId::new(0));
|
assert!(t.id > TableId::new(0));
|
||||||
assert_eq!(t, tt);
|
assert_eq!(t, tt);
|
||||||
|
|
||||||
let tables = txn
|
let tables = repos
|
||||||
.tables()
|
.tables()
|
||||||
.list_by_namespace_id(namespace.id)
|
.list_by_namespace_id(namespace.id)
|
||||||
.await
|
.await
|
||||||
|
@ -1141,33 +1143,31 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(vec![t], tables);
|
assert_eq!(vec![t], tables);
|
||||||
|
|
||||||
// test we can create a table of the same name in a different namespace
|
// test we can create a table of the same name in a different namespace
|
||||||
let namespace2 = txn
|
let namespace2 = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("two", "inf", kafka.id, pool.id)
|
.create("two", "inf", kafka.id, pool.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_ne!(namespace, namespace2);
|
assert_ne!(namespace, namespace2);
|
||||||
let test_table = txn
|
let test_table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table", namespace2.id)
|
.create_or_get("test_table", namespace2.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_ne!(tt, test_table);
|
assert_ne!(tt, test_table);
|
||||||
assert_eq!(test_table.namespace_id, namespace2.id);
|
assert_eq!(test_table.namespace_id, namespace2.id);
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_column(catalog: Arc<dyn Catalog>) {
|
async fn test_column(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
|
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
|
||||||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||||
let namespace = txn
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_column_test", "inf", kafka.id, pool.id)
|
.create("namespace_column_test", "inf", kafka.id, pool.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = txn
|
let table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table", namespace.id)
|
.create_or_get("test_table", namespace.id)
|
||||||
.await
|
.await
|
||||||
|
@ -1175,12 +1175,12 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(table.namespace_id, namespace.id);
|
assert_eq!(table.namespace_id, namespace.id);
|
||||||
|
|
||||||
// test we can create or get a column
|
// test we can create or get a column
|
||||||
let c = txn
|
let c = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("column_test", table.id, ColumnType::Tag)
|
.create_or_get("column_test", table.id, ColumnType::Tag)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let cc = txn
|
let cc = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("column_test", table.id, ColumnType::Tag)
|
.create_or_get("column_test", table.id, ColumnType::Tag)
|
||||||
.await
|
.await
|
||||||
|
@ -1189,7 +1189,7 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(c, cc);
|
assert_eq!(c, cc);
|
||||||
|
|
||||||
// test that attempting to create an already defined column of a different type returns error
|
// test that attempting to create an already defined column of a different type returns error
|
||||||
let err = txn
|
let err = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("column_test", table.id, ColumnType::U64)
|
.create_or_get("column_test", table.id, ColumnType::U64)
|
||||||
.await
|
.await
|
||||||
|
@ -1204,31 +1204,29 @@ pub(crate) mod test_helpers {
|
||||||
));
|
));
|
||||||
|
|
||||||
// test that we can create a column of the same name under a different table
|
// test that we can create a column of the same name under a different table
|
||||||
let table2 = txn
|
let table2 = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table_2", namespace.id)
|
.create_or_get("test_table_2", namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let ccc = txn
|
let ccc = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get("column_test", table2.id, ColumnType::U64)
|
.create_or_get("column_test", table2.id, ColumnType::U64)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_ne!(c, ccc);
|
assert_ne!(c, ccc);
|
||||||
|
|
||||||
let columns = txn
|
let columns = repos
|
||||||
.columns()
|
.columns()
|
||||||
.list_by_namespace_id(namespace.id)
|
.list_by_namespace_id(namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(vec![c, ccc], columns);
|
assert_eq!(vec![c, ccc], columns);
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_sequencer(catalog: Arc<dyn Catalog>) {
|
async fn test_sequencer(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka = txn
|
let kafka = repos
|
||||||
.kafka_topics()
|
.kafka_topics()
|
||||||
.create_or_get("sequencer_test")
|
.create_or_get("sequencer_test")
|
||||||
.await
|
.await
|
||||||
|
@ -1237,7 +1235,7 @@ pub(crate) mod test_helpers {
|
||||||
// Create 10 sequencers
|
// Create 10 sequencers
|
||||||
let mut created = BTreeMap::new();
|
let mut created = BTreeMap::new();
|
||||||
for partition in 1..=10 {
|
for partition in 1..=10 {
|
||||||
let sequencer = txn
|
let sequencer = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.create_or_get(&kafka, KafkaPartition::new(partition))
|
.create_or_get(&kafka, KafkaPartition::new(partition))
|
||||||
.await
|
.await
|
||||||
|
@ -1246,7 +1244,7 @@ pub(crate) mod test_helpers {
|
||||||
}
|
}
|
||||||
|
|
||||||
// List them and assert they match
|
// List them and assert they match
|
||||||
let listed = txn
|
let listed = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.list_by_kafka_topic(&kafka)
|
.list_by_kafka_topic(&kafka)
|
||||||
.await
|
.await
|
||||||
|
@ -1259,7 +1257,7 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
// get by the sequencer id and partition
|
// get by the sequencer id and partition
|
||||||
let kafka_partition = KafkaPartition::new(1);
|
let kafka_partition = KafkaPartition::new(1);
|
||||||
let sequencer = txn
|
let sequencer = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.get_by_topic_id_and_partition(kafka.id, kafka_partition)
|
.get_by_topic_id_and_partition(kafka.id, kafka_partition)
|
||||||
.await
|
.await
|
||||||
|
@ -1268,36 +1266,34 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(kafka.id, sequencer.kafka_topic_id);
|
assert_eq!(kafka.id, sequencer.kafka_topic_id);
|
||||||
assert_eq!(kafka_partition, sequencer.kafka_partition);
|
assert_eq!(kafka_partition, sequencer.kafka_partition);
|
||||||
|
|
||||||
let sequencer = txn
|
let sequencer = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523))
|
.get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(sequencer.is_none());
|
assert!(sequencer.is_none());
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_partition(catalog: Arc<dyn Catalog>) {
|
async fn test_partition(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
|
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
|
||||||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||||
let namespace = txn
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_partition_test", "inf", kafka.id, pool.id)
|
.create("namespace_partition_test", "inf", kafka.id, pool.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = txn
|
let table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table", namespace.id)
|
.create_or_get("test_table", namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let sequencer = txn
|
let sequencer = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.create_or_get(&kafka, KafkaPartition::new(1))
|
.create_or_get(&kafka, KafkaPartition::new(1))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let other_sequencer = txn
|
let other_sequencer = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.create_or_get(&kafka, KafkaPartition::new(2))
|
.create_or_get(&kafka, KafkaPartition::new(2))
|
||||||
.await
|
.await
|
||||||
|
@ -1305,21 +1301,21 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
let mut created = BTreeMap::new();
|
let mut created = BTreeMap::new();
|
||||||
for key in ["foo", "bar"] {
|
for key in ["foo", "bar"] {
|
||||||
let partition = txn
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get(key, sequencer.id, table.id)
|
.create_or_get(key, sequencer.id, table.id)
|
||||||
.await
|
.await
|
||||||
.expect("failed to create partition");
|
.expect("failed to create partition");
|
||||||
created.insert(partition.id, partition);
|
created.insert(partition.id, partition);
|
||||||
}
|
}
|
||||||
let other_partition = txn
|
let other_partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("asdf", other_sequencer.id, table.id)
|
.create_or_get("asdf", other_sequencer.id, table.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// List them and assert they match
|
// List them and assert they match
|
||||||
let listed = txn
|
let listed = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.list_by_sequencer(sequencer.id)
|
.list_by_sequencer(sequencer.id)
|
||||||
.await
|
.await
|
||||||
|
@ -1331,7 +1327,7 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(created, listed);
|
assert_eq!(created, listed);
|
||||||
|
|
||||||
// test get_partition_info_by_id
|
// test get_partition_info_by_id
|
||||||
let info = txn
|
let info = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.partition_info_by_id(other_partition.id)
|
.partition_info_by_id(other_partition.id)
|
||||||
.await
|
.await
|
||||||
|
@ -1340,30 +1336,28 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(info.partition, other_partition);
|
assert_eq!(info.partition, other_partition);
|
||||||
assert_eq!(info.table_name, "test_table");
|
assert_eq!(info.table_name, "test_table");
|
||||||
assert_eq!(info.namespace_name, "namespace_partition_test");
|
assert_eq!(info.namespace_name, "namespace_partition_test");
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_tombstone(catalog: Arc<dyn Catalog>) {
|
async fn test_tombstone(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
|
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
|
||||||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||||
let namespace = txn
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_tombstone_test", "inf", kafka.id, pool.id)
|
.create("namespace_tombstone_test", "inf", kafka.id, pool.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = txn
|
let table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table", namespace.id)
|
.create_or_get("test_table", namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let other_table = txn
|
let other_table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("other", namespace.id)
|
.create_or_get("other", namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let sequencer = txn
|
let sequencer = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.create_or_get(&kafka, KafkaPartition::new(1))
|
.create_or_get(&kafka, KafkaPartition::new(1))
|
||||||
.await
|
.await
|
||||||
|
@ -1371,7 +1365,7 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
let min_time = Timestamp::new(1);
|
let min_time = Timestamp::new(1);
|
||||||
let max_time = Timestamp::new(10);
|
let max_time = Timestamp::new(10);
|
||||||
let t1 = txn
|
let t1 = repos
|
||||||
.tombstones()
|
.tombstones()
|
||||||
.create_or_get(
|
.create_or_get(
|
||||||
table.id,
|
table.id,
|
||||||
|
@ -1389,7 +1383,7 @@ pub(crate) mod test_helpers {
|
||||||
assert_eq!(t1.min_time, min_time);
|
assert_eq!(t1.min_time, min_time);
|
||||||
assert_eq!(t1.max_time, max_time);
|
assert_eq!(t1.max_time, max_time);
|
||||||
assert_eq!(t1.serialized_predicate, "whatevs");
|
assert_eq!(t1.serialized_predicate, "whatevs");
|
||||||
let t2 = txn
|
let t2 = repos
|
||||||
.tombstones()
|
.tombstones()
|
||||||
.create_or_get(
|
.create_or_get(
|
||||||
other_table.id,
|
other_table.id,
|
||||||
|
@ -1401,7 +1395,7 @@ pub(crate) mod test_helpers {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let t3 = txn
|
let t3 = repos
|
||||||
.tombstones()
|
.tombstones()
|
||||||
.create_or_get(
|
.create_or_get(
|
||||||
table.id,
|
table.id,
|
||||||
|
@ -1414,46 +1408,44 @@ pub(crate) mod test_helpers {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let listed = txn
|
let listed = repos
|
||||||
.tombstones()
|
.tombstones()
|
||||||
.list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
.list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(vec![t2, t3], listed);
|
assert_eq!(vec![t2, t3], listed);
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
|
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let mut repos = catalog.repositories().await;
|
||||||
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
|
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
|
||||||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||||
let namespace = txn
|
let namespace = repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create("namespace_parquet_file_test", "inf", kafka.id, pool.id)
|
.create("namespace_parquet_file_test", "inf", kafka.id, pool.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let table = txn
|
let table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("test_table", namespace.id)
|
.create_or_get("test_table", namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let other_table = txn
|
let other_table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get("other", namespace.id)
|
.create_or_get("other", namespace.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let sequencer = txn
|
let sequencer = repos
|
||||||
.sequencers()
|
.sequencers()
|
||||||
.create_or_get(&kafka, KafkaPartition::new(1))
|
.create_or_get(&kafka, KafkaPartition::new(1))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let partition = txn
|
let partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("one", sequencer.id, table.id)
|
.create_or_get("one", sequencer.id, table.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let other_partition = txn
|
let other_partition = repos
|
||||||
.partitions()
|
.partitions()
|
||||||
.create_or_get("one", sequencer.id, other_table.id)
|
.create_or_get("one", sequencer.id, other_table.id)
|
||||||
.await
|
.await
|
||||||
|
@ -1463,10 +1455,10 @@ pub(crate) mod test_helpers {
|
||||||
let max_time = Timestamp::new(10);
|
let max_time = Timestamp::new(10);
|
||||||
|
|
||||||
// Must have no rows
|
// Must have no rows
|
||||||
let row_count = txn.parquet_files().count().await.unwrap();
|
let row_count = repos.parquet_files().count().await.unwrap();
|
||||||
assert_eq!(row_count, 0);
|
assert_eq!(row_count, 0);
|
||||||
|
|
||||||
let parquet_file = txn
|
let parquet_file = repos
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.create(
|
.create(
|
||||||
sequencer.id,
|
sequencer.id,
|
||||||
|
@ -1480,11 +1472,9 @@ pub(crate) mod test_helpers {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
txn.commit().await.unwrap();
|
|
||||||
|
|
||||||
// verify that trying to create a file with the same UUID throws an error
|
// verify that trying to create a file with the same UUID throws an error
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let err = repos
|
||||||
let err = txn
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.create(
|
.create(
|
||||||
sequencer.id,
|
sequencer.id,
|
||||||
|
@ -1499,10 +1489,8 @@ pub(crate) mod test_helpers {
|
||||||
.await
|
.await
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
assert!(matches!(err, Error::FileExists { object_store_id: _ }));
|
assert!(matches!(err, Error::FileExists { object_store_id: _ }));
|
||||||
txn.abort().await.unwrap();
|
|
||||||
|
|
||||||
let mut txn = catalog.start_transaction().await.unwrap();
|
let other_file = repos
|
||||||
let other_file = txn
|
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.create(
|
.create(
|
||||||
sequencer.id,
|
sequencer.id,
|
||||||
|
@ -1518,23 +1506,23 @@ pub(crate) mod test_helpers {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Must have 2 rows
|
// Must have 2 rows
|
||||||
let row_count = txn.parquet_files().count().await.unwrap();
|
let row_count = repos.parquet_files().count().await.unwrap();
|
||||||
assert_eq!(row_count, 2);
|
assert_eq!(row_count, 2);
|
||||||
|
|
||||||
let exist_id = parquet_file.id;
|
let exist_id = parquet_file.id;
|
||||||
let non_exist_id = ParquetFileId::new(other_file.id.get() + 10);
|
let non_exist_id = ParquetFileId::new(other_file.id.get() + 10);
|
||||||
// make sure exists_id != non_exist_id
|
// make sure exists_id != non_exist_id
|
||||||
assert_ne!(exist_id, non_exist_id);
|
assert_ne!(exist_id, non_exist_id);
|
||||||
assert!(txn.parquet_files().exist(exist_id).await.unwrap());
|
assert!(repos.parquet_files().exist(exist_id).await.unwrap());
|
||||||
assert!(!txn.parquet_files().exist(non_exist_id).await.unwrap());
|
assert!(!repos.parquet_files().exist(non_exist_id).await.unwrap());
|
||||||
|
|
||||||
let files = txn
|
let files = repos
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(vec![parquet_file, other_file], files);
|
assert_eq!(vec![parquet_file, other_file], files);
|
||||||
let files = txn
|
let files = repos
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150))
|
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150))
|
||||||
.await
|
.await
|
||||||
|
@ -1543,18 +1531,17 @@ pub(crate) mod test_helpers {
|
||||||
|
|
||||||
// verify that to_delete is initially set to false and that it can be updated to true
|
// verify that to_delete is initially set to false and that it can be updated to true
|
||||||
assert!(!parquet_file.to_delete);
|
assert!(!parquet_file.to_delete);
|
||||||
txn.parquet_files()
|
repos
|
||||||
|
.parquet_files()
|
||||||
.flag_for_delete(parquet_file.id)
|
.flag_for_delete(parquet_file.id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let files = txn
|
let files = repos
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(files.first().unwrap().to_delete);
|
assert!(files.first().unwrap().to_delete);
|
||||||
|
|
||||||
txn.commit().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_add_parquet_file_with_tombstones(catalog: Arc<dyn Catalog>) {
|
async fn test_add_parquet_file_with_tombstones(catalog: Arc<dyn Catalog>) {
|
||||||
|
|
|
@ -16,7 +16,7 @@ use crate::interface::{
|
||||||
SequencerId, TableSchema, Transaction,
|
SequencerId, TableSchema, Transaction,
|
||||||
};
|
};
|
||||||
|
|
||||||
use interface::{ParquetFile, ProcessedTombstone, Tombstone};
|
use interface::{ParquetFile, ProcessedTombstone, RepoCollection, Tombstone};
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
use std::{borrow::Cow, collections::BTreeMap};
|
use std::{borrow::Cow, collections::BTreeMap};
|
||||||
|
|
||||||
|
@ -40,14 +40,15 @@ pub mod postgres;
|
||||||
/// This function pushes schema additions through to the backend catalog, and
|
/// This function pushes schema additions through to the backend catalog, and
|
||||||
/// relies on the catalog to serialise concurrent additions of a given column,
|
/// relies on the catalog to serialise concurrent additions of a given column,
|
||||||
/// ensuring only one type is ever accepted per column.
|
/// ensuring only one type is ever accepted per column.
|
||||||
pub async fn validate_or_insert_schema<'a, T, U>(
|
pub async fn validate_or_insert_schema<'a, T, U, R>(
|
||||||
tables: T,
|
tables: T,
|
||||||
schema: &NamespaceSchema,
|
schema: &NamespaceSchema,
|
||||||
txn: &mut dyn Transaction,
|
repos: &mut R,
|
||||||
) -> Result<Option<NamespaceSchema>>
|
) -> Result<Option<NamespaceSchema>>
|
||||||
where
|
where
|
||||||
T: IntoIterator<IntoIter = U, Item = (&'a str, &'a MutableBatch)> + Send + Sync,
|
T: IntoIterator<IntoIter = U, Item = (&'a str, &'a MutableBatch)> + Send + Sync,
|
||||||
U: Iterator<Item = T::Item> + Send,
|
U: Iterator<Item = T::Item> + Send,
|
||||||
|
R: RepoCollection + ?Sized,
|
||||||
{
|
{
|
||||||
let tables = tables.into_iter();
|
let tables = tables.into_iter();
|
||||||
|
|
||||||
|
@ -55,7 +56,7 @@ where
|
||||||
let mut schema = Cow::Borrowed(schema);
|
let mut schema = Cow::Borrowed(schema);
|
||||||
|
|
||||||
for (table_name, batch) in tables {
|
for (table_name, batch) in tables {
|
||||||
validate_mutable_batch(batch, table_name, &mut schema, txn).await?;
|
validate_mutable_batch(batch, table_name, &mut schema, repos).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
match schema {
|
match schema {
|
||||||
|
@ -64,12 +65,15 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn validate_mutable_batch(
|
async fn validate_mutable_batch<R>(
|
||||||
mb: &MutableBatch,
|
mb: &MutableBatch,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
schema: &mut Cow<'_, NamespaceSchema>,
|
schema: &mut Cow<'_, NamespaceSchema>,
|
||||||
txn: &mut dyn Transaction,
|
repos: &mut R,
|
||||||
) -> Result<()> {
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
R: RepoCollection + ?Sized,
|
||||||
|
{
|
||||||
// Check if the table exists in the schema.
|
// Check if the table exists in the schema.
|
||||||
//
|
//
|
||||||
// Because the entry API requires &mut it is not used to avoid a premature
|
// Because the entry API requires &mut it is not used to avoid a premature
|
||||||
|
@ -81,14 +85,14 @@ async fn validate_mutable_batch(
|
||||||
//
|
//
|
||||||
// Attempt to create the table in the catalog, or load an existing
|
// Attempt to create the table in the catalog, or load an existing
|
||||||
// table from the catalog to populate the cache.
|
// table from the catalog to populate the cache.
|
||||||
let mut table = txn
|
let mut table = repos
|
||||||
.tables()
|
.tables()
|
||||||
.create_or_get(table_name, schema.id)
|
.create_or_get(table_name, schema.id)
|
||||||
.await
|
.await
|
||||||
.map(|t| TableSchema::new(t.id))?;
|
.map(|t| TableSchema::new(t.id))?;
|
||||||
|
|
||||||
// Always add a time column to all new tables.
|
// Always add a time column to all new tables.
|
||||||
let time_col = txn
|
let time_col = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get(TIME_COLUMN, table.id, ColumnType::Time)
|
.create_or_get(TIME_COLUMN, table.id, ColumnType::Time)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -134,7 +138,7 @@ async fn validate_mutable_batch(
|
||||||
None => {
|
None => {
|
||||||
// The column does not exist in the cache, create/get it from
|
// The column does not exist in the cache, create/get it from
|
||||||
// the catalog, and add it to the table.
|
// the catalog, and add it to the table.
|
||||||
let column = txn
|
let column = repos
|
||||||
.columns()
|
.columns()
|
||||||
.create_or_get(name.as_str(), table.id, ColumnType::from(col.influx_type()))
|
.create_or_get(name.as_str(), table.id, ColumnType::from(col.influx_type()))
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -200,10 +204,14 @@ pub async fn add_parquet_file_with_tombstones(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Now the parquet available, create its processed tombstones
|
// Now the parquet available, create its processed tombstones
|
||||||
let processed_tombstones = txn
|
let mut processed_tombstones = Vec::with_capacity(tombstones.len());
|
||||||
.processed_tombstones()
|
for tombstone in tombstones {
|
||||||
.create_many(parquet.id, tombstones)
|
processed_tombstones.push(
|
||||||
.await?;
|
txn.processed_tombstones()
|
||||||
|
.create(parquet.id, tombstone.id)
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
Ok((parquet, processed_tombstones))
|
Ok((parquet, processed_tombstones))
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,8 +6,9 @@ use crate::interface::{
|
||||||
KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId,
|
KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId,
|
||||||
NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId,
|
NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId,
|
||||||
PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool,
|
PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool,
|
||||||
QueryPoolId, QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo,
|
QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId,
|
||||||
Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, Transaction,
|
SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
|
||||||
|
Transaction,
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use observability_deps::tracing::warn;
|
use observability_deps::tracing::warn;
|
||||||
|
@ -51,18 +52,41 @@ struct MemCollections {
|
||||||
processed_tombstones: Vec<ProcessedTombstone>,
|
processed_tombstones: Vec<ProcessedTombstone>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[allow(clippy::large_enum_variant)]
|
||||||
|
enum MemTxnInner {
|
||||||
|
Txn {
|
||||||
|
guard: OwnedMutexGuard<MemCollections>,
|
||||||
|
stage: MemCollections,
|
||||||
|
finalized: bool,
|
||||||
|
},
|
||||||
|
NoTxn {
|
||||||
|
collections: OwnedMutexGuard<MemCollections>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
/// transaction bound to an in-memory catalog.
|
/// transaction bound to an in-memory catalog.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct MemTxn {
|
pub struct MemTxn {
|
||||||
guard: OwnedMutexGuard<MemCollections>,
|
inner: MemTxnInner,
|
||||||
stage: MemCollections,
|
}
|
||||||
finalized: bool,
|
|
||||||
|
impl MemTxn {
|
||||||
|
fn stage(&mut self) -> &mut MemCollections {
|
||||||
|
match &mut self.inner {
|
||||||
|
MemTxnInner::Txn { stage, .. } => stage,
|
||||||
|
MemTxnInner::NoTxn { collections } => collections,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for MemTxn {
|
impl Drop for MemTxn {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if !self.finalized {
|
match self.inner {
|
||||||
warn!("Dropping MemTxn w/o finalizing (commit or abort)");
|
MemTxnInner::Txn { finalized, .. } if !finalized => {
|
||||||
|
warn!("Dropping MemTxn w/o finalizing (commit or abort)");
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -78,29 +102,58 @@ impl Catalog for MemCatalog {
|
||||||
let guard = Arc::clone(&self.collections).lock_owned().await;
|
let guard = Arc::clone(&self.collections).lock_owned().await;
|
||||||
let stage = guard.clone();
|
let stage = guard.clone();
|
||||||
Ok(Box::new(MemTxn {
|
Ok(Box::new(MemTxn {
|
||||||
guard,
|
inner: MemTxnInner::Txn {
|
||||||
stage,
|
guard,
|
||||||
finalized: false,
|
stage,
|
||||||
|
finalized: false,
|
||||||
|
},
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn repositories(&self) -> Box<dyn RepoCollection> {
|
||||||
|
let collections = Arc::clone(&self.collections).lock_owned().await;
|
||||||
|
Box::new(MemTxn {
|
||||||
|
inner: MemTxnInner::NoTxn { collections },
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TransactionFinalize for MemTxn {
|
impl TransactionFinalize for MemTxn {
|
||||||
async fn commit_inplace(&mut self) -> Result<(), Error> {
|
async fn commit_inplace(&mut self) -> Result<(), Error> {
|
||||||
*self.guard = std::mem::take(&mut self.stage);
|
match &mut self.inner {
|
||||||
self.finalized = true;
|
MemTxnInner::Txn {
|
||||||
|
guard,
|
||||||
|
stage,
|
||||||
|
finalized,
|
||||||
|
} => {
|
||||||
|
assert!(!*finalized);
|
||||||
|
**guard = std::mem::take(stage);
|
||||||
|
*finalized = true;
|
||||||
|
}
|
||||||
|
MemTxnInner::NoTxn { .. } => {
|
||||||
|
panic!("cannot commit oneshot");
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn abort_inplace(&mut self) -> Result<(), Error> {
|
async fn abort_inplace(&mut self) -> Result<(), Error> {
|
||||||
self.finalized = true;
|
match &mut self.inner {
|
||||||
|
MemTxnInner::Txn { finalized, .. } => {
|
||||||
|
assert!(!*finalized);
|
||||||
|
*finalized = true;
|
||||||
|
}
|
||||||
|
MemTxnInner::NoTxn { .. } => {
|
||||||
|
panic!("cannot abort oneshot");
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Transaction for MemTxn {
|
impl RepoCollection for MemTxn {
|
||||||
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo {
|
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
@ -145,15 +198,17 @@ impl Transaction for MemTxn {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl KafkaTopicRepo for MemTxn {
|
impl KafkaTopicRepo for MemTxn {
|
||||||
async fn create_or_get(&mut self, name: &str) -> Result<KafkaTopic> {
|
async fn create_or_get(&mut self, name: &str) -> Result<KafkaTopic> {
|
||||||
let topic = match self.stage.kafka_topics.iter().find(|t| t.name == name) {
|
let stage = self.stage();
|
||||||
|
|
||||||
|
let topic = match stage.kafka_topics.iter().find(|t| t.name == name) {
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
None => {
|
None => {
|
||||||
let topic = KafkaTopic {
|
let topic = KafkaTopic {
|
||||||
id: KafkaTopicId::new(self.stage.kafka_topics.len() as i32 + 1),
|
id: KafkaTopicId::new(stage.kafka_topics.len() as i32 + 1),
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
};
|
};
|
||||||
self.stage.kafka_topics.push(topic);
|
stage.kafka_topics.push(topic);
|
||||||
self.stage.kafka_topics.last().unwrap()
|
stage.kafka_topics.last().unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -161,12 +216,9 @@ impl KafkaTopicRepo for MemTxn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_by_name(&mut self, name: &str) -> Result<Option<KafkaTopic>> {
|
async fn get_by_name(&mut self, name: &str) -> Result<Option<KafkaTopic>> {
|
||||||
let kafka_topic = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
.kafka_topics
|
let kafka_topic = stage.kafka_topics.iter().find(|t| t.name == name).cloned();
|
||||||
.iter()
|
|
||||||
.find(|t| t.name == name)
|
|
||||||
.cloned();
|
|
||||||
Ok(kafka_topic)
|
Ok(kafka_topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -174,15 +226,17 @@ impl KafkaTopicRepo for MemTxn {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl QueryPoolRepo for MemTxn {
|
impl QueryPoolRepo for MemTxn {
|
||||||
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool> {
|
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool> {
|
||||||
let pool = match self.stage.query_pools.iter().find(|t| t.name == name) {
|
let stage = self.stage();
|
||||||
|
|
||||||
|
let pool = match stage.query_pools.iter().find(|t| t.name == name) {
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
None => {
|
None => {
|
||||||
let pool = QueryPool {
|
let pool = QueryPool {
|
||||||
id: QueryPoolId::new(self.stage.query_pools.len() as i16 + 1),
|
id: QueryPoolId::new(stage.query_pools.len() as i16 + 1),
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
};
|
};
|
||||||
self.stage.query_pools.push(pool);
|
stage.query_pools.push(pool);
|
||||||
self.stage.query_pools.last().unwrap()
|
stage.query_pools.last().unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -199,38 +253,38 @@ impl NamespaceRepo for MemTxn {
|
||||||
kafka_topic_id: KafkaTopicId,
|
kafka_topic_id: KafkaTopicId,
|
||||||
query_pool_id: QueryPoolId,
|
query_pool_id: QueryPoolId,
|
||||||
) -> Result<Namespace> {
|
) -> Result<Namespace> {
|
||||||
if self.stage.namespaces.iter().any(|n| n.name == name) {
|
let stage = self.stage();
|
||||||
|
|
||||||
|
if stage.namespaces.iter().any(|n| n.name == name) {
|
||||||
return Err(Error::NameExists {
|
return Err(Error::NameExists {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let namespace = Namespace {
|
let namespace = Namespace {
|
||||||
id: NamespaceId::new(self.stage.namespaces.len() as i32 + 1),
|
id: NamespaceId::new(stage.namespaces.len() as i32 + 1),
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
kafka_topic_id,
|
kafka_topic_id,
|
||||||
query_pool_id,
|
query_pool_id,
|
||||||
retention_duration: Some(retention_duration.to_string()),
|
retention_duration: Some(retention_duration.to_string()),
|
||||||
};
|
};
|
||||||
self.stage.namespaces.push(namespace);
|
stage.namespaces.push(namespace);
|
||||||
Ok(self.stage.namespaces.last().unwrap().clone())
|
Ok(stage.namespaces.last().unwrap().clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
|
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
|
||||||
Ok(self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
.namespaces
|
Ok(stage.namespaces.iter().find(|n| n.name == name).cloned())
|
||||||
.iter()
|
|
||||||
.find(|n| n.name == name)
|
|
||||||
.cloned())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TableRepo for MemTxn {
|
impl TableRepo for MemTxn {
|
||||||
async fn create_or_get(&mut self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
|
async fn create_or_get(&mut self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
|
||||||
let table = match self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let table = match stage
|
||||||
.tables
|
.tables
|
||||||
.iter()
|
.iter()
|
||||||
.find(|t| t.name == name && t.namespace_id == namespace_id)
|
.find(|t| t.name == name && t.namespace_id == namespace_id)
|
||||||
|
@ -238,12 +292,12 @@ impl TableRepo for MemTxn {
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
None => {
|
None => {
|
||||||
let table = Table {
|
let table = Table {
|
||||||
id: TableId::new(self.stage.tables.len() as i32 + 1),
|
id: TableId::new(stage.tables.len() as i32 + 1),
|
||||||
namespace_id,
|
namespace_id,
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
};
|
};
|
||||||
self.stage.tables.push(table);
|
stage.tables.push(table);
|
||||||
self.stage.tables.last().unwrap()
|
stage.tables.last().unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -251,8 +305,9 @@ impl TableRepo for MemTxn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
|
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
|
||||||
let tables: Vec<_> = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let tables: Vec<_> = stage
|
||||||
.tables
|
.tables
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|t| t.namespace_id == namespace_id)
|
.filter(|t| t.namespace_id == namespace_id)
|
||||||
|
@ -270,8 +325,9 @@ impl ColumnRepo for MemTxn {
|
||||||
table_id: TableId,
|
table_id: TableId,
|
||||||
column_type: ColumnType,
|
column_type: ColumnType,
|
||||||
) -> Result<Column> {
|
) -> Result<Column> {
|
||||||
let column = match self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let column = match stage
|
||||||
.columns
|
.columns
|
||||||
.iter()
|
.iter()
|
||||||
.find(|t| t.name == name && t.table_id == table_id)
|
.find(|t| t.name == name && t.table_id == table_id)
|
||||||
|
@ -289,13 +345,13 @@ impl ColumnRepo for MemTxn {
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let column = Column {
|
let column = Column {
|
||||||
id: ColumnId::new(self.stage.columns.len() as i32 + 1),
|
id: ColumnId::new(stage.columns.len() as i32 + 1),
|
||||||
table_id,
|
table_id,
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
column_type: column_type as i16,
|
column_type: column_type as i16,
|
||||||
};
|
};
|
||||||
self.stage.columns.push(column);
|
stage.columns.push(column);
|
||||||
self.stage.columns.last().unwrap()
|
stage.columns.last().unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -303,17 +359,17 @@ impl ColumnRepo for MemTxn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
|
||||||
let table_ids: Vec<_> = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let table_ids: Vec<_> = stage
|
||||||
.tables
|
.tables
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|t| t.namespace_id == namespace_id)
|
.filter(|t| t.namespace_id == namespace_id)
|
||||||
.map(|t| t.id)
|
.map(|t| t.id)
|
||||||
.collect();
|
.collect();
|
||||||
println!("tables: {:?}", self.stage.tables);
|
println!("tables: {:?}", stage.tables);
|
||||||
println!("table_ids: {:?}", table_ids);
|
println!("table_ids: {:?}", table_ids);
|
||||||
let columns: Vec<_> = self
|
let columns: Vec<_> = stage
|
||||||
.stage
|
|
||||||
.columns
|
.columns
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|c| table_ids.contains(&c.table_id))
|
.filter(|c| table_ids.contains(&c.table_id))
|
||||||
|
@ -331,8 +387,9 @@ impl SequencerRepo for MemTxn {
|
||||||
topic: &KafkaTopic,
|
topic: &KafkaTopic,
|
||||||
partition: KafkaPartition,
|
partition: KafkaPartition,
|
||||||
) -> Result<Sequencer> {
|
) -> Result<Sequencer> {
|
||||||
let sequencer = match self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let sequencer = match stage
|
||||||
.sequencers
|
.sequencers
|
||||||
.iter()
|
.iter()
|
||||||
.find(|s| s.kafka_topic_id == topic.id && s.kafka_partition == partition)
|
.find(|s| s.kafka_topic_id == topic.id && s.kafka_partition == partition)
|
||||||
|
@ -340,13 +397,13 @@ impl SequencerRepo for MemTxn {
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
None => {
|
None => {
|
||||||
let sequencer = Sequencer {
|
let sequencer = Sequencer {
|
||||||
id: SequencerId::new(self.stage.sequencers.len() as i16 + 1),
|
id: SequencerId::new(stage.sequencers.len() as i16 + 1),
|
||||||
kafka_topic_id: topic.id,
|
kafka_topic_id: topic.id,
|
||||||
kafka_partition: partition,
|
kafka_partition: partition,
|
||||||
min_unpersisted_sequence_number: 0,
|
min_unpersisted_sequence_number: 0,
|
||||||
};
|
};
|
||||||
self.stage.sequencers.push(sequencer);
|
stage.sequencers.push(sequencer);
|
||||||
self.stage.sequencers.last().unwrap()
|
stage.sequencers.last().unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -358,8 +415,9 @@ impl SequencerRepo for MemTxn {
|
||||||
topic_id: KafkaTopicId,
|
topic_id: KafkaTopicId,
|
||||||
partition: KafkaPartition,
|
partition: KafkaPartition,
|
||||||
) -> Result<Option<Sequencer>> {
|
) -> Result<Option<Sequencer>> {
|
||||||
let sequencer = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let sequencer = stage
|
||||||
.sequencers
|
.sequencers
|
||||||
.iter()
|
.iter()
|
||||||
.find(|s| s.kafka_topic_id == topic_id && s.kafka_partition == partition)
|
.find(|s| s.kafka_topic_id == topic_id && s.kafka_partition == partition)
|
||||||
|
@ -368,12 +426,15 @@ impl SequencerRepo for MemTxn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list(&mut self) -> Result<Vec<Sequencer>> {
|
async fn list(&mut self) -> Result<Vec<Sequencer>> {
|
||||||
Ok(self.stage.sequencers.clone())
|
let stage = self.stage();
|
||||||
|
|
||||||
|
Ok(stage.sequencers.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
||||||
let sequencers: Vec<_> = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let sequencers: Vec<_> = stage
|
||||||
.sequencers
|
.sequencers
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|s| s.kafka_topic_id == topic.id)
|
.filter(|s| s.kafka_topic_id == topic.id)
|
||||||
|
@ -391,19 +452,21 @@ impl PartitionRepo for MemTxn {
|
||||||
sequencer_id: SequencerId,
|
sequencer_id: SequencerId,
|
||||||
table_id: TableId,
|
table_id: TableId,
|
||||||
) -> Result<Partition> {
|
) -> Result<Partition> {
|
||||||
let partition = match self.stage.partitions.iter().find(|p| {
|
let stage = self.stage();
|
||||||
|
|
||||||
|
let partition = match stage.partitions.iter().find(|p| {
|
||||||
p.partition_key == key && p.sequencer_id == sequencer_id && p.table_id == table_id
|
p.partition_key == key && p.sequencer_id == sequencer_id && p.table_id == table_id
|
||||||
}) {
|
}) {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
let p = Partition {
|
let p = Partition {
|
||||||
id: PartitionId::new(self.stage.partitions.len() as i64 + 1),
|
id: PartitionId::new(stage.partitions.len() as i64 + 1),
|
||||||
sequencer_id,
|
sequencer_id,
|
||||||
table_id,
|
table_id,
|
||||||
partition_key: key.to_string(),
|
partition_key: key.to_string(),
|
||||||
};
|
};
|
||||||
self.stage.partitions.push(p);
|
stage.partitions.push(p);
|
||||||
self.stage.partitions.last().unwrap()
|
stage.partitions.last().unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -411,8 +474,9 @@ impl PartitionRepo for MemTxn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
||||||
let partitions: Vec<_> = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let partitions: Vec<_> = stage
|
||||||
.partitions
|
.partitions
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|p| p.sequencer_id == sequencer_id)
|
.filter(|p| p.sequencer_id == sequencer_id)
|
||||||
|
@ -425,23 +489,22 @@ impl PartitionRepo for MemTxn {
|
||||||
&mut self,
|
&mut self,
|
||||||
partition_id: PartitionId,
|
partition_id: PartitionId,
|
||||||
) -> Result<Option<PartitionInfo>> {
|
) -> Result<Option<PartitionInfo>> {
|
||||||
let partition = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let partition = stage
|
||||||
.partitions
|
.partitions
|
||||||
.iter()
|
.iter()
|
||||||
.find(|p| p.id == partition_id)
|
.find(|p| p.id == partition_id)
|
||||||
.cloned();
|
.cloned();
|
||||||
|
|
||||||
if let Some(partition) = partition {
|
if let Some(partition) = partition {
|
||||||
let table = self
|
let table = stage
|
||||||
.stage
|
|
||||||
.tables
|
.tables
|
||||||
.iter()
|
.iter()
|
||||||
.find(|t| t.id == partition.table_id)
|
.find(|t| t.id == partition.table_id)
|
||||||
.cloned();
|
.cloned();
|
||||||
if let Some(table) = table {
|
if let Some(table) = table {
|
||||||
let namespace = self
|
let namespace = stage
|
||||||
.stage
|
|
||||||
.namespaces
|
.namespaces
|
||||||
.iter()
|
.iter()
|
||||||
.find(|n| n.id == table.namespace_id)
|
.find(|n| n.id == table.namespace_id)
|
||||||
|
@ -471,7 +534,9 @@ impl TombstoneRepo for MemTxn {
|
||||||
max_time: Timestamp,
|
max_time: Timestamp,
|
||||||
predicate: &str,
|
predicate: &str,
|
||||||
) -> Result<Tombstone> {
|
) -> Result<Tombstone> {
|
||||||
let tombstone = match self.stage.tombstones.iter().find(|t| {
|
let stage = self.stage();
|
||||||
|
|
||||||
|
let tombstone = match stage.tombstones.iter().find(|t| {
|
||||||
t.table_id == table_id
|
t.table_id == table_id
|
||||||
&& t.sequencer_id == sequencer_id
|
&& t.sequencer_id == sequencer_id
|
||||||
&& t.sequence_number == sequence_number
|
&& t.sequence_number == sequence_number
|
||||||
|
@ -479,7 +544,7 @@ impl TombstoneRepo for MemTxn {
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
None => {
|
None => {
|
||||||
let t = Tombstone {
|
let t = Tombstone {
|
||||||
id: TombstoneId::new(self.stage.tombstones.len() as i64 + 1),
|
id: TombstoneId::new(stage.tombstones.len() as i64 + 1),
|
||||||
table_id,
|
table_id,
|
||||||
sequencer_id,
|
sequencer_id,
|
||||||
sequence_number,
|
sequence_number,
|
||||||
|
@ -487,8 +552,8 @@ impl TombstoneRepo for MemTxn {
|
||||||
max_time,
|
max_time,
|
||||||
serialized_predicate: predicate.to_string(),
|
serialized_predicate: predicate.to_string(),
|
||||||
};
|
};
|
||||||
self.stage.tombstones.push(t);
|
stage.tombstones.push(t);
|
||||||
self.stage.tombstones.last().unwrap()
|
stage.tombstones.last().unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -500,8 +565,9 @@ impl TombstoneRepo for MemTxn {
|
||||||
sequencer_id: SequencerId,
|
sequencer_id: SequencerId,
|
||||||
sequence_number: SequenceNumber,
|
sequence_number: SequenceNumber,
|
||||||
) -> Result<Vec<Tombstone>> {
|
) -> Result<Vec<Tombstone>> {
|
||||||
let tombstones: Vec<_> = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let tombstones: Vec<_> = stage
|
||||||
.tombstones
|
.tombstones
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|t| t.sequencer_id == sequencer_id && t.sequence_number > sequence_number)
|
.filter(|t| t.sequencer_id == sequencer_id && t.sequence_number > sequence_number)
|
||||||
|
@ -524,8 +590,9 @@ impl ParquetFileRepo for MemTxn {
|
||||||
min_time: Timestamp,
|
min_time: Timestamp,
|
||||||
max_time: Timestamp,
|
max_time: Timestamp,
|
||||||
) -> Result<ParquetFile> {
|
) -> Result<ParquetFile> {
|
||||||
if self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
if stage
|
||||||
.parquet_files
|
.parquet_files
|
||||||
.iter()
|
.iter()
|
||||||
.any(|f| f.object_store_id == object_store_id)
|
.any(|f| f.object_store_id == object_store_id)
|
||||||
|
@ -534,7 +601,7 @@ impl ParquetFileRepo for MemTxn {
|
||||||
}
|
}
|
||||||
|
|
||||||
let parquet_file = ParquetFile {
|
let parquet_file = ParquetFile {
|
||||||
id: ParquetFileId::new(self.stage.parquet_files.len() as i64 + 1),
|
id: ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
|
||||||
sequencer_id,
|
sequencer_id,
|
||||||
table_id,
|
table_id,
|
||||||
partition_id,
|
partition_id,
|
||||||
|
@ -545,12 +612,14 @@ impl ParquetFileRepo for MemTxn {
|
||||||
max_time,
|
max_time,
|
||||||
to_delete: false,
|
to_delete: false,
|
||||||
};
|
};
|
||||||
self.stage.parquet_files.push(parquet_file);
|
stage.parquet_files.push(parquet_file);
|
||||||
Ok(*self.stage.parquet_files.last().unwrap())
|
Ok(*stage.parquet_files.last().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
||||||
match self.stage.parquet_files.iter_mut().find(|p| p.id == id) {
|
let stage = self.stage();
|
||||||
|
|
||||||
|
match stage.parquet_files.iter_mut().find(|p| p.id == id) {
|
||||||
Some(f) => f.to_delete = true,
|
Some(f) => f.to_delete = true,
|
||||||
None => return Err(Error::ParquetRecordNotFound { id }),
|
None => return Err(Error::ParquetRecordNotFound { id }),
|
||||||
}
|
}
|
||||||
|
@ -563,8 +632,9 @@ impl ParquetFileRepo for MemTxn {
|
||||||
sequencer_id: SequencerId,
|
sequencer_id: SequencerId,
|
||||||
sequence_number: SequenceNumber,
|
sequence_number: SequenceNumber,
|
||||||
) -> Result<Vec<ParquetFile>> {
|
) -> Result<Vec<ParquetFile>> {
|
||||||
let files: Vec<_> = self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
let files: Vec<_> = stage
|
||||||
.parquet_files
|
.parquet_files
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|f| f.sequencer_id == sequencer_id && f.max_sequence_number > sequence_number)
|
.filter(|f| f.sequencer_id == sequencer_id && f.max_sequence_number > sequence_number)
|
||||||
|
@ -574,11 +644,15 @@ impl ParquetFileRepo for MemTxn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
|
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
|
||||||
Ok(self.stage.parquet_files.iter().any(|f| f.id == id))
|
let stage = self.stage();
|
||||||
|
|
||||||
|
Ok(stage.parquet_files.iter().any(|f| f.id == id))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn count(&mut self) -> Result<i64> {
|
async fn count(&mut self) -> Result<i64> {
|
||||||
let count = self.stage.parquet_files.len();
|
let stage = self.stage();
|
||||||
|
|
||||||
|
let count = stage.parquet_files.len();
|
||||||
let count_i64 = i64::try_from(count);
|
let count_i64 = i64::try_from(count);
|
||||||
if count_i64.is_err() {
|
if count_i64.is_err() {
|
||||||
return Err(Error::InvalidValue { value: count });
|
return Err(Error::InvalidValue { value: count });
|
||||||
|
@ -589,61 +663,46 @@ impl ParquetFileRepo for MemTxn {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ProcessedTombstoneRepo for MemTxn {
|
impl ProcessedTombstoneRepo for MemTxn {
|
||||||
async fn create_many(
|
async fn create(
|
||||||
&mut self,
|
&mut self,
|
||||||
parquet_file_id: ParquetFileId,
|
parquet_file_id: ParquetFileId,
|
||||||
tombstones: &[Tombstone],
|
tombstone_id: TombstoneId,
|
||||||
) -> Result<Vec<ProcessedTombstone>> {
|
) -> Result<ProcessedTombstone> {
|
||||||
|
let stage = self.stage();
|
||||||
|
|
||||||
// check if the parquet file available
|
// check if the parquet file available
|
||||||
if !self
|
if !stage.parquet_files.iter().any(|f| f.id == parquet_file_id) {
|
||||||
.stage
|
|
||||||
.parquet_files
|
|
||||||
.iter()
|
|
||||||
.any(|f| f.id == parquet_file_id)
|
|
||||||
{
|
|
||||||
return Err(Error::FileNotFound {
|
return Err(Error::FileNotFound {
|
||||||
id: parquet_file_id.get(),
|
id: parquet_file_id.get(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut processed_tombstones = vec![];
|
// check if tomstone exists
|
||||||
for tombstone in tombstones {
|
if !stage.tombstones.iter().any(|f| f.id == tombstone_id) {
|
||||||
// check if tomstone exists
|
return Err(Error::TombstoneNotFound {
|
||||||
if !self.stage.tombstones.iter().any(|f| f.id == tombstone.id) {
|
id: tombstone_id.get(),
|
||||||
return Err(Error::TombstoneNotFound {
|
});
|
||||||
id: tombstone.id.get(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if self
|
|
||||||
.stage
|
|
||||||
.processed_tombstones
|
|
||||||
.iter()
|
|
||||||
.any(|pt| pt.tombstone_id == tombstone.id && pt.parquet_file_id == parquet_file_id)
|
|
||||||
{
|
|
||||||
// The tombstone was already proccessed for this file
|
|
||||||
return Err(Error::ProcessTombstoneExists {
|
|
||||||
parquet_file_id: parquet_file_id.get(),
|
|
||||||
tombstone_id: tombstone.id.get(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let processed_tombstone = ProcessedTombstone {
|
|
||||||
tombstone_id: tombstone.id,
|
|
||||||
parquet_file_id,
|
|
||||||
};
|
|
||||||
processed_tombstones.push(processed_tombstone);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// save for returning
|
if stage
|
||||||
let return_processed_tombstones = processed_tombstones.clone();
|
|
||||||
|
|
||||||
// Add to the catalog
|
|
||||||
self.stage
|
|
||||||
.processed_tombstones
|
.processed_tombstones
|
||||||
.append(&mut processed_tombstones);
|
.iter()
|
||||||
|
.any(|pt| pt.tombstone_id == tombstone_id && pt.parquet_file_id == parquet_file_id)
|
||||||
|
{
|
||||||
|
// The tombstone was already proccessed for this file
|
||||||
|
return Err(Error::ProcessTombstoneExists {
|
||||||
|
parquet_file_id: parquet_file_id.get(),
|
||||||
|
tombstone_id: tombstone_id.get(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Ok(return_processed_tombstones)
|
let processed_tombstone = ProcessedTombstone {
|
||||||
|
tombstone_id,
|
||||||
|
parquet_file_id,
|
||||||
|
};
|
||||||
|
stage.processed_tombstones.push(processed_tombstone);
|
||||||
|
|
||||||
|
Ok(processed_tombstone)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn exist(
|
async fn exist(
|
||||||
|
@ -651,15 +710,18 @@ impl ProcessedTombstoneRepo for MemTxn {
|
||||||
parquet_file_id: ParquetFileId,
|
parquet_file_id: ParquetFileId,
|
||||||
tombstone_id: TombstoneId,
|
tombstone_id: TombstoneId,
|
||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
Ok(self
|
let stage = self.stage();
|
||||||
.stage
|
|
||||||
|
Ok(stage
|
||||||
.processed_tombstones
|
.processed_tombstones
|
||||||
.iter()
|
.iter()
|
||||||
.any(|f| f.parquet_file_id == parquet_file_id && f.tombstone_id == tombstone_id))
|
.any(|f| f.parquet_file_id == parquet_file_id && f.tombstone_id == tombstone_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn count(&mut self) -> Result<i64> {
|
async fn count(&mut self) -> Result<i64> {
|
||||||
let count = self.stage.processed_tombstones.len();
|
let stage = self.stage();
|
||||||
|
|
||||||
|
let count = stage.processed_tombstones.len();
|
||||||
let count_i64 = i64::try_from(count);
|
let count_i64 = i64::try_from(count);
|
||||||
if count_i64.is_err() {
|
if count_i64.is_err() {
|
||||||
return Err(Error::InvalidValue { value: count });
|
return Err(Error::InvalidValue { value: count });
|
||||||
|
|
|
@ -4,9 +4,9 @@ use crate::interface::{
|
||||||
sealed::TransactionFinalize, Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition,
|
sealed::TransactionFinalize, Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition,
|
||||||
KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile,
|
KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile,
|
||||||
ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo,
|
ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo,
|
||||||
ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo, Result,
|
ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo,
|
||||||
SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp,
|
RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
|
||||||
Tombstone, TombstoneId, TombstoneRepo, Transaction,
|
TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, Transaction,
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use observability_deps::tracing::{info, warn};
|
use observability_deps::tracing::{info, warn};
|
||||||
|
@ -73,18 +73,102 @@ impl PostgresCatalog {
|
||||||
/// transaction for [`PostgresCatalog`].
|
/// transaction for [`PostgresCatalog`].
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PostgresTxn {
|
pub struct PostgresTxn {
|
||||||
transaction: Option<sqlx::Transaction<'static, Postgres>>,
|
inner: PostgresTxnInner,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PostgresTxn {
|
#[derive(Debug)]
|
||||||
fn transaction(&mut self) -> &mut sqlx::Transaction<'static, Postgres> {
|
#[allow(clippy::large_enum_variant)]
|
||||||
self.transaction.as_mut().expect("Not yet finalized")
|
enum PostgresTxnInner {
|
||||||
|
Txn(Option<sqlx::Transaction<'static, Postgres>>),
|
||||||
|
Oneshot(Pool<Postgres>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'c> Executor<'c> for &'c mut PostgresTxnInner {
|
||||||
|
type Database = Postgres;
|
||||||
|
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
fn fetch_many<'e, 'q: 'e, E: 'q>(
|
||||||
|
self,
|
||||||
|
query: E,
|
||||||
|
) -> futures::stream::BoxStream<
|
||||||
|
'e,
|
||||||
|
Result<
|
||||||
|
sqlx::Either<
|
||||||
|
<Self::Database as sqlx::Database>::QueryResult,
|
||||||
|
<Self::Database as sqlx::Database>::Row,
|
||||||
|
>,
|
||||||
|
sqlx::Error,
|
||||||
|
>,
|
||||||
|
>
|
||||||
|
where
|
||||||
|
'c: 'e,
|
||||||
|
E: sqlx::Execute<'q, Self::Database>,
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
PostgresTxnInner::Txn(txn) => {
|
||||||
|
txn.as_mut().expect("Not yet finalized").fetch_many(query)
|
||||||
|
}
|
||||||
|
PostgresTxnInner::Oneshot(pool) => pool.fetch_many(query),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fetch_optional<'e, 'q: 'e, E: 'q>(
|
||||||
|
self,
|
||||||
|
query: E,
|
||||||
|
) -> futures::future::BoxFuture<
|
||||||
|
'e,
|
||||||
|
Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
|
||||||
|
>
|
||||||
|
where
|
||||||
|
'c: 'e,
|
||||||
|
E: sqlx::Execute<'q, Self::Database>,
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
PostgresTxnInner::Txn(txn) => txn
|
||||||
|
.as_mut()
|
||||||
|
.expect("Not yet finalized")
|
||||||
|
.fetch_optional(query),
|
||||||
|
PostgresTxnInner::Oneshot(pool) => pool.fetch_optional(query),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prepare_with<'e, 'q: 'e>(
|
||||||
|
self,
|
||||||
|
sql: &'q str,
|
||||||
|
parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
|
||||||
|
) -> futures::future::BoxFuture<
|
||||||
|
'e,
|
||||||
|
Result<<Self::Database as sqlx::database::HasStatement<'q>>::Statement, sqlx::Error>,
|
||||||
|
>
|
||||||
|
where
|
||||||
|
'c: 'e,
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
PostgresTxnInner::Txn(txn) => txn
|
||||||
|
.as_mut()
|
||||||
|
.expect("Not yet finalized")
|
||||||
|
.prepare_with(sql, parameters),
|
||||||
|
PostgresTxnInner::Oneshot(pool) => pool.prepare_with(sql, parameters),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe<'e, 'q: 'e>(
|
||||||
|
self,
|
||||||
|
sql: &'q str,
|
||||||
|
) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
|
||||||
|
where
|
||||||
|
'c: 'e,
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
PostgresTxnInner::Txn(txn) => txn.as_mut().expect("Not yet finalized").describe(sql),
|
||||||
|
PostgresTxnInner::Oneshot(pool) => pool.describe(sql),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for PostgresTxn {
|
impl Drop for PostgresTxn {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.transaction.is_some() {
|
if let PostgresTxnInner::Txn(Some(_)) = self.inner {
|
||||||
warn!("Dropping PostgresTxn w/o finalizing (commit or abort)");
|
warn!("Dropping PostgresTxn w/o finalizing (commit or abort)");
|
||||||
|
|
||||||
// SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so we don't need to spawn
|
// SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so we don't need to spawn
|
||||||
|
@ -96,21 +180,31 @@ impl Drop for PostgresTxn {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TransactionFinalize for PostgresTxn {
|
impl TransactionFinalize for PostgresTxn {
|
||||||
async fn commit_inplace(&mut self) -> Result<(), Error> {
|
async fn commit_inplace(&mut self) -> Result<(), Error> {
|
||||||
self.transaction
|
match &mut self.inner {
|
||||||
.take()
|
PostgresTxnInner::Txn(txn) => txn
|
||||||
.expect("Not yet finalized")
|
.take()
|
||||||
.commit()
|
.expect("Not yet finalized")
|
||||||
.await
|
.commit()
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.await
|
||||||
|
.map_err(|e| Error::SqlxError { source: e }),
|
||||||
|
PostgresTxnInner::Oneshot(_) => {
|
||||||
|
panic!("cannot commit oneshot");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn abort_inplace(&mut self) -> Result<(), Error> {
|
async fn abort_inplace(&mut self) -> Result<(), Error> {
|
||||||
self.transaction
|
match &mut self.inner {
|
||||||
.take()
|
PostgresTxnInner::Txn(txn) => txn
|
||||||
.expect("Not yet finalized")
|
.take()
|
||||||
.rollback()
|
.expect("Not yet finalized")
|
||||||
.await
|
.rollback()
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.await
|
||||||
|
.map_err(|e| Error::SqlxError { source: e }),
|
||||||
|
PostgresTxnInner::Oneshot(_) => {
|
||||||
|
panic!("cannot abort oneshot");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,13 +227,19 @@ impl Catalog for PostgresCatalog {
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
Ok(Box::new(PostgresTxn {
|
Ok(Box::new(PostgresTxn {
|
||||||
transaction: Some(transaction),
|
inner: PostgresTxnInner::Txn(Some(transaction)),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn repositories(&self) -> Box<dyn RepoCollection> {
|
||||||
|
Box::new(PostgresTxn {
|
||||||
|
inner: PostgresTxnInner::Oneshot(self.pool.clone()),
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Transaction for PostgresTxn {
|
impl RepoCollection for PostgresTxn {
|
||||||
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo {
|
fn kafka_topics(&mut self) -> &mut dyn KafkaTopicRepo {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
@ -193,7 +293,7 @@ DO UPDATE SET name = kafka_topic.name RETURNING *;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(&name) // $1
|
.bind(&name) // $1
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -207,7 +307,7 @@ SELECT * FROM kafka_topic WHERE name = $1;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(&name) // $1
|
.bind(&name) // $1
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||||
|
@ -232,7 +332,7 @@ DO UPDATE SET name = query_pool.name RETURNING *;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(&name) // $1
|
.bind(&name) // $1
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -260,7 +360,7 @@ RETURNING *
|
||||||
.bind(&retention_duration) // $2
|
.bind(&retention_duration) // $2
|
||||||
.bind(kafka_topic_id) // $3
|
.bind(kafka_topic_id) // $3
|
||||||
.bind(query_pool_id) // $4
|
.bind(query_pool_id) // $4
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
if is_unique_violation(&e) {
|
if is_unique_violation(&e) {
|
||||||
|
@ -284,7 +384,7 @@ SELECT * FROM namespace WHERE name = $1;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(&name) // $1
|
.bind(&name) // $1
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||||
|
@ -310,7 +410,7 @@ DO UPDATE SET name = table_name.name RETURNING *;
|
||||||
)
|
)
|
||||||
.bind(&name) // $1
|
.bind(&name) // $1
|
||||||
.bind(&namespace_id) // $2
|
.bind(&namespace_id) // $2
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
if is_fk_violation(&e) {
|
if is_fk_violation(&e) {
|
||||||
|
@ -331,7 +431,7 @@ WHERE namespace_id = $1;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(&namespace_id)
|
.bind(&namespace_id)
|
||||||
.fetch_all(self.transaction())
|
.fetch_all(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -360,7 +460,7 @@ DO UPDATE SET name = column_name.name RETURNING *;
|
||||||
.bind(&name) // $1
|
.bind(&name) // $1
|
||||||
.bind(&table_id) // $2
|
.bind(&table_id) // $2
|
||||||
.bind(&ct) // $3
|
.bind(&ct) // $3
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
if is_fk_violation(&e) {
|
if is_fk_violation(&e) {
|
||||||
|
@ -390,7 +490,7 @@ WHERE table_name.namespace_id = $1;
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(&namespace_id)
|
.bind(&namespace_id)
|
||||||
.fetch_all(self.transaction())
|
.fetch_all(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -417,7 +517,7 @@ impl SequencerRepo for PostgresTxn {
|
||||||
)
|
)
|
||||||
.bind(&topic.id) // $1
|
.bind(&topic.id) // $1
|
||||||
.bind(&partition) // $2
|
.bind(&partition) // $2
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
if is_fk_violation(&e) {
|
if is_fk_violation(&e) {
|
||||||
|
@ -440,7 +540,7 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2;
|
||||||
)
|
)
|
||||||
.bind(topic_id) // $1
|
.bind(topic_id) // $1
|
||||||
.bind(partition) // $2
|
.bind(partition) // $2
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||||
|
@ -454,7 +554,7 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2;
|
||||||
|
|
||||||
async fn list(&mut self) -> Result<Vec<Sequencer>> {
|
async fn list(&mut self) -> Result<Vec<Sequencer>> {
|
||||||
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#)
|
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#)
|
||||||
.fetch_all(self.transaction())
|
.fetch_all(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.map_err(|e| Error::SqlxError { source: e })
|
||||||
}
|
}
|
||||||
|
@ -462,7 +562,7 @@ SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2;
|
||||||
async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
async fn list_by_kafka_topic(&mut self, topic: &KafkaTopic) -> Result<Vec<Sequencer>> {
|
||||||
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#)
|
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#)
|
||||||
.bind(&topic.id) // $1
|
.bind(&topic.id) // $1
|
||||||
.fetch_all(self.transaction())
|
.fetch_all(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.map_err(|e| Error::SqlxError { source: e })
|
||||||
}
|
}
|
||||||
|
@ -489,7 +589,7 @@ impl PartitionRepo for PostgresTxn {
|
||||||
.bind(key) // $1
|
.bind(key) // $1
|
||||||
.bind(&sequencer_id) // $2
|
.bind(&sequencer_id) // $2
|
||||||
.bind(&table_id) // $3
|
.bind(&table_id) // $3
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
if is_fk_violation(&e) {
|
if is_fk_violation(&e) {
|
||||||
|
@ -503,7 +603,7 @@ impl PartitionRepo for PostgresTxn {
|
||||||
async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
async fn list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>> {
|
||||||
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#)
|
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#)
|
||||||
.bind(&sequencer_id) // $1
|
.bind(&sequencer_id) // $1
|
||||||
.fetch_all(self.transaction())
|
.fetch_all(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.map_err(|e| Error::SqlxError { source: e })
|
||||||
}
|
}
|
||||||
|
@ -522,7 +622,7 @@ impl PartitionRepo for PostgresTxn {
|
||||||
WHERE partition.id = $1;"#,
|
WHERE partition.id = $1;"#,
|
||||||
)
|
)
|
||||||
.bind(&partition_id) // $1
|
.bind(&partition_id) // $1
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -570,7 +670,7 @@ impl TombstoneRepo for PostgresTxn {
|
||||||
.bind(&min_time) // $4
|
.bind(&min_time) // $4
|
||||||
.bind(&max_time) // $5
|
.bind(&max_time) // $5
|
||||||
.bind(predicate) // $6
|
.bind(predicate) // $6
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
if is_fk_violation(&e) {
|
if is_fk_violation(&e) {
|
||||||
|
@ -589,7 +689,7 @@ impl TombstoneRepo for PostgresTxn {
|
||||||
sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#)
|
sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#)
|
||||||
.bind(&sequencer_id) // $1
|
.bind(&sequencer_id) // $1
|
||||||
.bind(&sequence_number) // $2
|
.bind(&sequence_number) // $2
|
||||||
.fetch_all(self.transaction())
|
.fetch_all(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.map_err(|e| Error::SqlxError { source: e })
|
||||||
}
|
}
|
||||||
|
@ -623,7 +723,7 @@ RETURNING *
|
||||||
.bind(max_sequence_number) // $6
|
.bind(max_sequence_number) // $6
|
||||||
.bind(min_time) // $7
|
.bind(min_time) // $7
|
||||||
.bind(max_time) // $8
|
.bind(max_time) // $8
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
if is_unique_violation(&e) {
|
if is_unique_violation(&e) {
|
||||||
|
@ -643,7 +743,7 @@ RETURNING *
|
||||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
||||||
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = true WHERE id = $1;"#)
|
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = true WHERE id = $1;"#)
|
||||||
.bind(&id) // $1
|
.bind(&id) // $1
|
||||||
.execute(self.transaction())
|
.execute(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -658,7 +758,7 @@ RETURNING *
|
||||||
sqlx::query_as::<_, ParquetFile>(r#"SELECT * FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 ORDER BY id;"#)
|
sqlx::query_as::<_, ParquetFile>(r#"SELECT * FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 ORDER BY id;"#)
|
||||||
.bind(&sequencer_id) // $1
|
.bind(&sequencer_id) // $1
|
||||||
.bind(&sequence_number) // $2
|
.bind(&sequence_number) // $2
|
||||||
.fetch_all(self.transaction())
|
.fetch_all(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.map_err(|e| Error::SqlxError { source: e })
|
||||||
}
|
}
|
||||||
|
@ -668,7 +768,7 @@ RETURNING *
|
||||||
r#"SELECT count(*) as count FROM parquet_file WHERE id = $1;"#,
|
r#"SELECT count(*) as count FROM parquet_file WHERE id = $1;"#,
|
||||||
)
|
)
|
||||||
.bind(&id) // $1
|
.bind(&id) // $1
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -678,7 +778,7 @@ RETURNING *
|
||||||
async fn count(&mut self) -> Result<i64> {
|
async fn count(&mut self) -> Result<i64> {
|
||||||
let read_result =
|
let read_result =
|
||||||
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#)
|
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#)
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -688,45 +788,34 @@ RETURNING *
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ProcessedTombstoneRepo for PostgresTxn {
|
impl ProcessedTombstoneRepo for PostgresTxn {
|
||||||
async fn create_many(
|
async fn create(
|
||||||
&mut self,
|
&mut self,
|
||||||
parquet_file_id: ParquetFileId,
|
parquet_file_id: ParquetFileId,
|
||||||
tombstones: &[Tombstone],
|
tombstone_id: TombstoneId,
|
||||||
) -> Result<Vec<ProcessedTombstone>> {
|
) -> Result<ProcessedTombstone> {
|
||||||
// no transaction provided
|
sqlx::query_as::<_, ProcessedTombstone>(
|
||||||
// todo: we should never needs this but since right now we implement 2 catalogs,
|
r#"
|
||||||
// postgres (for production) and mem (for testing only) that does not need to provide txt
|
INSERT INTO processed_tombstone ( tombstone_id, parquet_file_id )
|
||||||
// this will be refactor when Marco has his new abstraction done
|
VALUES ( $1, $2 )
|
||||||
let mut processed_tombstones = vec![];
|
RETURNING *
|
||||||
for tombstone in tombstones {
|
"#,
|
||||||
let processed_tombstone = sqlx::query_as::<_, ProcessedTombstone>(
|
)
|
||||||
r#"
|
.bind(tombstone_id) // $1
|
||||||
INSERT INTO processed_tombstone ( tombstone_id, parquet_file_id )
|
.bind(parquet_file_id) // $2
|
||||||
VALUES ( $1, $2 )
|
.fetch_one(&mut self.inner)
|
||||||
RETURNING *
|
.await
|
||||||
"#,
|
.map_err(|e| {
|
||||||
)
|
if is_unique_violation(&e) {
|
||||||
.bind(tombstone.id) // $1
|
Error::ProcessTombstoneExists {
|
||||||
.bind(parquet_file_id) // $2
|
tombstone_id: tombstone_id.get(),
|
||||||
.fetch_one(self.transaction())
|
parquet_file_id: parquet_file_id.get(),
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
if is_unique_violation(&e) {
|
|
||||||
Error::ProcessTombstoneExists {
|
|
||||||
tombstone_id: tombstone.id.get(),
|
|
||||||
parquet_file_id: parquet_file_id.get(),
|
|
||||||
}
|
|
||||||
} else if is_fk_violation(&e) {
|
|
||||||
Error::ForeignKeyViolation { source: e }
|
|
||||||
} else {
|
|
||||||
Error::SqlxError { source: e }
|
|
||||||
}
|
}
|
||||||
})?;
|
} else if is_fk_violation(&e) {
|
||||||
|
Error::ForeignKeyViolation { source: e }
|
||||||
processed_tombstones.push(processed_tombstone);
|
} else {
|
||||||
}
|
Error::SqlxError { source: e }
|
||||||
|
}
|
||||||
Ok(processed_tombstones)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn exist(
|
async fn exist(
|
||||||
|
@ -738,7 +827,7 @@ impl ProcessedTombstoneRepo for PostgresTxn {
|
||||||
r#"SELECT count(*) as count FROM processed_tombstone WHERE parquet_file_id = $1 AND tombstone_id = $2;"#)
|
r#"SELECT count(*) as count FROM processed_tombstone WHERE parquet_file_id = $1 AND tombstone_id = $2;"#)
|
||||||
.bind(&parquet_file_id) // $1
|
.bind(&parquet_file_id) // $1
|
||||||
.bind(&tombstone_id) // $2
|
.bind(&tombstone_id) // $2
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
@ -748,7 +837,7 @@ impl ProcessedTombstoneRepo for PostgresTxn {
|
||||||
async fn count(&mut self) -> Result<i64> {
|
async fn count(&mut self) -> Result<i64> {
|
||||||
let read_result =
|
let read_result =
|
||||||
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM processed_tombstone;"#)
|
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM processed_tombstone;"#)
|
||||||
.fetch_one(self.transaction())
|
.fetch_one(&mut self.inner)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::SqlxError { source: e })?;
|
.map_err(|e| Error::SqlxError { source: e })?;
|
||||||
|
|
||||||
|
|
|
@ -91,13 +91,9 @@ where
|
||||||
if self.cache.get_schema(&namespace).is_none() {
|
if self.cache.get_schema(&namespace).is_none() {
|
||||||
trace!(%namespace, "namespace auto-create cache miss");
|
trace!(%namespace, "namespace auto-create cache miss");
|
||||||
|
|
||||||
let mut txn = self
|
let mut repos = self.catalog.repositories().await;
|
||||||
.catalog
|
|
||||||
.start_transaction()
|
|
||||||
.await
|
|
||||||
.map_err(NamespaceCreationError::Create)?;
|
|
||||||
|
|
||||||
match txn
|
match repos
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.create(
|
.create(
|
||||||
namespace.as_str(),
|
namespace.as_str(),
|
||||||
|
@ -108,8 +104,6 @@ where
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
txn.commit().await.map_err(NamespaceCreationError::Create)?;
|
|
||||||
|
|
||||||
debug!(%namespace, "created namespace");
|
debug!(%namespace, "created namespace");
|
||||||
}
|
}
|
||||||
Err(iox_catalog::interface::Error::NameExists { .. }) => {
|
Err(iox_catalog::interface::Error::NameExists { .. }) => {
|
||||||
|
@ -117,11 +111,9 @@ where
|
||||||
// namespace, or another thread raced populating the catalog
|
// namespace, or another thread raced populating the catalog
|
||||||
// and beat this thread to it.
|
// and beat this thread to it.
|
||||||
debug!(%namespace, "spurious namespace create failed");
|
debug!(%namespace, "spurious namespace create failed");
|
||||||
txn.abort().await.map_err(NamespaceCreationError::Create)?;
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(error=%e, %namespace, "failed to auto-create namespace");
|
error!(error=%e, %namespace, "failed to auto-create namespace");
|
||||||
txn.abort().await.map_err(NamespaceCreationError::Create)?;
|
|
||||||
return Err(NamespaceCreationError::Create(e));
|
return Err(NamespaceCreationError::Create(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -199,19 +191,16 @@ mod tests {
|
||||||
|
|
||||||
// The cache hit should mean the catalog SHOULD NOT see a create request
|
// The cache hit should mean the catalog SHOULD NOT see a create request
|
||||||
// for the namespace.
|
// for the namespace.
|
||||||
let mut txn = catalog
|
let mut repos = catalog.repositories().await;
|
||||||
.start_transaction()
|
|
||||||
.await
|
|
||||||
.expect("failed to start UoW");
|
|
||||||
assert!(
|
assert!(
|
||||||
txn.namespaces()
|
repos
|
||||||
|
.namespaces()
|
||||||
.get_by_name(ns.as_str())
|
.get_by_name(ns.as_str())
|
||||||
.await
|
.await
|
||||||
.expect("lookup should not error")
|
.expect("lookup should not error")
|
||||||
.is_none(),
|
.is_none(),
|
||||||
"expected no request to the catalog"
|
"expected no request to the catalog"
|
||||||
);
|
);
|
||||||
txn.abort().await.expect("failed to abort UoW");
|
|
||||||
|
|
||||||
// And the DML handler must be called.
|
// And the DML handler must be called.
|
||||||
assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => {
|
assert_matches!(mock_handler.calls().as_slice(), [MockDmlHandlerCall::Write { namespace, .. }] => {
|
||||||
|
@ -243,17 +232,13 @@ mod tests {
|
||||||
|
|
||||||
// The cache miss should mean the catalog MUST see a create request for
|
// The cache miss should mean the catalog MUST see a create request for
|
||||||
// the namespace.
|
// the namespace.
|
||||||
let mut txn = catalog
|
let mut repos = catalog.repositories().await;
|
||||||
.start_transaction()
|
let got = repos
|
||||||
.await
|
|
||||||
.expect("failed to start UoW");
|
|
||||||
let got = txn
|
|
||||||
.namespaces()
|
.namespaces()
|
||||||
.get_by_name(ns.as_str())
|
.get_by_name(ns.as_str())
|
||||||
.await
|
.await
|
||||||
.expect("lookup should not error")
|
.expect("lookup should not error")
|
||||||
.expect("creation request should be sent to catalog");
|
.expect("creation request should be sent to catalog");
|
||||||
txn.abort().await.expect("failed to abort UoW");
|
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
got,
|
got,
|
||||||
|
|
|
@ -135,11 +135,7 @@ where
|
||||||
batches: HashMap<String, MutableBatch>,
|
batches: HashMap<String, MutableBatch>,
|
||||||
span_ctx: Option<SpanContext>,
|
span_ctx: Option<SpanContext>,
|
||||||
) -> Result<(), Self::WriteError> {
|
) -> Result<(), Self::WriteError> {
|
||||||
let mut txn = self
|
let mut repos = self.catalog.repositories().await;
|
||||||
.catalog
|
|
||||||
.start_transaction()
|
|
||||||
.await
|
|
||||||
.map_err(SchemaError::NamespaceLookup)?;
|
|
||||||
|
|
||||||
// Load the namespace schema from the cache, falling back to pulling it
|
// Load the namespace schema from the cache, falling back to pulling it
|
||||||
// from the global catalog (if it exists).
|
// from the global catalog (if it exists).
|
||||||
|
@ -149,7 +145,7 @@ where
|
||||||
None => {
|
None => {
|
||||||
// Pull the schema from the global catalog or error if it does
|
// Pull the schema from the global catalog or error if it does
|
||||||
// not exist.
|
// not exist.
|
||||||
let schema = get_schema_by_name(&namespace, txn.deref_mut())
|
let schema = get_schema_by_name(&namespace, repos.deref_mut())
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
warn!(error=%e, %namespace, "failed to retrieve namespace schema");
|
warn!(error=%e, %namespace, "failed to retrieve namespace schema");
|
||||||
|
@ -168,7 +164,7 @@ where
|
||||||
let maybe_new_schema = validate_or_insert_schema(
|
let maybe_new_schema = validate_or_insert_schema(
|
||||||
batches.iter().map(|(k, v)| (k.as_str(), v)),
|
batches.iter().map(|(k, v)| (k.as_str(), v)),
|
||||||
&schema,
|
&schema,
|
||||||
txn.deref_mut(),
|
repos.deref_mut(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
|
@ -177,8 +173,6 @@ where
|
||||||
})?
|
})?
|
||||||
.map(Arc::new);
|
.map(Arc::new);
|
||||||
|
|
||||||
txn.commit().await.map_err(SchemaError::NamespaceLookup)?;
|
|
||||||
|
|
||||||
trace!(%namespace, "schema validation complete");
|
trace!(%namespace, "schema validation complete");
|
||||||
|
|
||||||
// If the schema has been updated, immediately add it to the cache
|
// If the schema has been updated, immediately add it to the cache
|
||||||
|
@ -255,11 +249,9 @@ mod tests {
|
||||||
async fn create_catalog() -> Arc<dyn Catalog> {
|
async fn create_catalog() -> Arc<dyn Catalog> {
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
|
||||||
|
|
||||||
let mut txn = catalog
|
let mut repos = catalog.repositories().await;
|
||||||
.start_transaction()
|
repos
|
||||||
.await
|
.namespaces()
|
||||||
.expect("failed to start UoW");
|
|
||||||
txn.namespaces()
|
|
||||||
.create(
|
.create(
|
||||||
NAMESPACE,
|
NAMESPACE,
|
||||||
"inf",
|
"inf",
|
||||||
|
@ -268,7 +260,6 @@ mod tests {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("failed to create test namespace");
|
.expect("failed to create test namespace");
|
||||||
txn.commit().await.expect("failed to commit UoW");
|
|
||||||
|
|
||||||
catalog
|
catalog
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue