From 0b39e6738f6705520f22b06b661ce7aed9eb9976 Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Thu, 20 Apr 2023 10:26:21 -0400 Subject: [PATCH] chore: add method for creating transition shard using transactions --- ingester2/src/init.rs | 7 +---- iox_catalog/src/interface.rs | 7 +++++ iox_catalog/src/mem.rs | 44 ++++++++++++++++++++++++++++ iox_catalog/src/metrics.rs | 1 + iox_catalog/src/postgres.rs | 55 +++++++++++++++++++++++++++++++++++ iox_catalog/src/sqlite.rs | 56 ++++++++++++++++++++++++++++++++++++ 6 files changed, 164 insertions(+), 6 deletions(-) diff --git a/ingester2/src/init.rs b/ingester2/src/init.rs index f0510e9441..3eb3155099 100644 --- a/ingester2/src/init.rs +++ b/ingester2/src/init.rs @@ -237,14 +237,9 @@ where { // Create the transition shard. let mut txn = catalog.repositories().await; - let topic = txn - .topics() - .create_or_get("iox-shared") - .await - .expect("get topic"); let transition_shard = txn .shards() - .create_or_get(&topic, TRANSITION_SHARD_INDEX) + .create_transition_shard("iox-shared", TRANSITION_SHARD_INDEX) .await .expect("create transition shard"); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 7cec1403f2..4d928165b0 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -399,6 +399,13 @@ pub trait ShardRepo: Send + Sync { shard: ShardId, sequence_number: SequenceNumber, ) -> Result<()>; + + /// creates the transition shard for the kafkaless path + async fn create_transition_shard( + &mut self, + topic_name: &str, + shard_index: ShardIndex, + ) -> Result; } /// Functions for working with IOx partitions in the catalog. Note that these are how IOx splits up diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index c72f9f2f04..7a5137473c 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -678,6 +678,50 @@ impl ShardRepo for MemTxn { Ok(()) } + + async fn create_transition_shard( + &mut self, + topic_name: &str, + shard_index: ShardIndex, + ) -> Result { + let mut stage = self.inner.clone(); + + let topic = match stage.topics.iter().find(|t| t.name == topic_name) { + Some(t) => t, + None => { + let topic = TopicMetadata { + id: TopicId::new(stage.topics.len() as i64 + 1), + name: topic_name.to_string(), + }; + stage.topics.push(topic); + stage.topics.last().unwrap() + } + }; + + let shard = match stage + .shards + .iter() + .find(|s| s.topic_id == topic.id && s.shard_index == shard_index) + { + Some(t) => t, + None => { + let shard = Shard { + id: ShardId::new(stage.shards.len() as i64 + 1), + topic_id: topic.id, + shard_index, + min_unpersisted_sequence_number: SequenceNumber::new(0), + }; + stage.shards.push(shard); + stage.shards.last().unwrap() + } + }; + + let shard = *shard; + + *self.inner = stage; + + Ok(shard) + } } #[async_trait] diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 4ac03e8db6..d3290e0d3e 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -208,6 +208,7 @@ decorate!( "shard_list" = list(&mut self) -> Result>; "shard_list_by_topic" = list_by_topic(&mut self, topic: &TopicMetadata) -> Result>; "shard_update_min_unpersisted_sequence_number" = update_min_unpersisted_sequence_number(&mut self, shard_id: ShardId, sequence_number: SequenceNumber) -> Result<()>; + "shard_create_transition_shard" = create_transition_shard(&mut self, topic_name: &str, shard_index: ShardIndex) -> Result; ] ); diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index dd4303c615..275517342d 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1101,6 +1101,61 @@ WHERE id = $2; Ok(()) } + + async fn create_transition_shard( + &mut self, + topic_name: &str, + shard_index: ShardIndex, + ) -> Result { + let mut tx = self + .inner + .pool + .begin() + .await + .map_err(|e| Error::StartTransaction { source: e })?; + + let query = sqlx::query_as::<_, TopicMetadata>( + r#" +INSERT INTO topic ( name ) +VALUES ( $1 ) +ON CONFLICT ON CONSTRAINT topic_name_unique +DO UPDATE SET name = topic.name +RETURNING *; + "#, + ) + .bind(topic_name); // $1 + let topic = query + .fetch_one(&mut tx) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + let query = sqlx::query_as::<_, Shard>( + r#" +INSERT INTO shard + ( topic_id, shard_index, min_unpersisted_sequence_number ) +VALUES + ( $1, $2, 0 ) +ON CONFLICT ON CONSTRAINT shard_unique +DO UPDATE SET topic_id = shard.topic_id +RETURNING *;; + "#, + ) + .bind(topic.id) // $1 + .bind(shard_index); // $2 + let shard = query.fetch_one(&mut tx).await.map_err(|e| { + if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + })?; + + tx.commit() + .await + .map_err(|e| Error::FailedToCommit { source: e })?; + + Ok(shard) + } } #[async_trait] diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 0aa9b2e415..0fd97fdd7c 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -882,6 +882,62 @@ WHERE id = $2; Ok(()) } + + async fn create_transition_shard( + &mut self, + topic_name: &str, + shard_index: ShardIndex, + ) -> Result { + let mut tx = self + .inner + .get_mut() + .pool + .begin() + .await + .map_err(|e| Error::StartTransaction { source: e })?; + + let query = sqlx::query_as::<_, TopicMetadata>( + r#" +INSERT INTO topic ( name ) +VALUES ( $1 ) +ON CONFLICT (name) +DO UPDATE SET name = topic.name +RETURNING *; + "#, + ) + .bind(topic_name); // $1 + let topic = query + .fetch_one(&mut tx) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + let query = sqlx::query_as::<_, Shard>( + r#" +INSERT INTO shard + ( topic_id, shard_index, min_unpersisted_sequence_number ) +VALUES + ( $1, $2, 0 ) +ON CONFLICT (topic_id, shard_index) +DO UPDATE SET topic_id = shard.topic_id +RETURNING *; + "#, + ) + .bind(topic.id) // $1 + .bind(shard_index); // $2 + let shard = query.fetch_one(&mut tx).await.map_err(|e| { + if is_fk_violation(&e) { + Error::ForeignKeyViolation { source: e } + } else { + Error::SqlxError { source: e } + } + })?; + + tx.commit() + .await + .map_err(|e| Error::FailedToCommit { source: e })?; + + Ok(shard) + } } // We can't use [`Partition`], as uses Vec which the Sqlite