chore: add method for creating transition shard using transactions

pull/24376/head
Jeffrey Smith II 2023-04-20 10:26:21 -04:00
parent 60eaa71191
commit 0b39e6738f
6 changed files with 164 additions and 6 deletions

View File

@ -237,14 +237,9 @@ where
{
// Create the transition shard.
let mut txn = catalog.repositories().await;
let topic = txn
.topics()
.create_or_get("iox-shared")
.await
.expect("get topic");
let transition_shard = txn
.shards()
.create_or_get(&topic, TRANSITION_SHARD_INDEX)
.create_transition_shard("iox-shared", TRANSITION_SHARD_INDEX)
.await
.expect("create transition shard");

View File

@ -399,6 +399,13 @@ pub trait ShardRepo: Send + Sync {
shard: ShardId,
sequence_number: SequenceNumber,
) -> Result<()>;
/// creates the transition shard for the kafkaless path
async fn create_transition_shard(
&mut self,
topic_name: &str,
shard_index: ShardIndex,
) -> Result<Shard>;
}
/// Functions for working with IOx partitions in the catalog. Note that these are how IOx splits up

View File

@ -678,6 +678,50 @@ impl ShardRepo for MemTxn {
Ok(())
}
async fn create_transition_shard(
&mut self,
topic_name: &str,
shard_index: ShardIndex,
) -> Result<Shard> {
let mut stage = self.inner.clone();
let topic = match stage.topics.iter().find(|t| t.name == topic_name) {
Some(t) => t,
None => {
let topic = TopicMetadata {
id: TopicId::new(stage.topics.len() as i64 + 1),
name: topic_name.to_string(),
};
stage.topics.push(topic);
stage.topics.last().unwrap()
}
};
let shard = match stage
.shards
.iter()
.find(|s| s.topic_id == topic.id && s.shard_index == shard_index)
{
Some(t) => t,
None => {
let shard = Shard {
id: ShardId::new(stage.shards.len() as i64 + 1),
topic_id: topic.id,
shard_index,
min_unpersisted_sequence_number: SequenceNumber::new(0),
};
stage.shards.push(shard);
stage.shards.last().unwrap()
}
};
let shard = *shard;
*self.inner = stage;
Ok(shard)
}
}
#[async_trait]

View File

@ -208,6 +208,7 @@ decorate!(
"shard_list" = list(&mut self) -> Result<Vec<Shard>>;
"shard_list_by_topic" = list_by_topic(&mut self, topic: &TopicMetadata) -> Result<Vec<Shard>>;
"shard_update_min_unpersisted_sequence_number" = update_min_unpersisted_sequence_number(&mut self, shard_id: ShardId, sequence_number: SequenceNumber) -> Result<()>;
"shard_create_transition_shard" = create_transition_shard(&mut self, topic_name: &str, shard_index: ShardIndex) -> Result<Shard>;
]
);

View File

@ -1101,6 +1101,61 @@ WHERE id = $2;
Ok(())
}
async fn create_transition_shard(
&mut self,
topic_name: &str,
shard_index: ShardIndex,
) -> Result<Shard> {
let mut tx = self
.inner
.pool
.begin()
.await
.map_err(|e| Error::StartTransaction { source: e })?;
let query = sqlx::query_as::<_, TopicMetadata>(
r#"
INSERT INTO topic ( name )
VALUES ( $1 )
ON CONFLICT ON CONSTRAINT topic_name_unique
DO UPDATE SET name = topic.name
RETURNING *;
"#,
)
.bind(topic_name); // $1
let topic = query
.fetch_one(&mut tx)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let query = sqlx::query_as::<_, Shard>(
r#"
INSERT INTO shard
( topic_id, shard_index, min_unpersisted_sequence_number )
VALUES
( $1, $2, 0 )
ON CONFLICT ON CONSTRAINT shard_unique
DO UPDATE SET topic_id = shard.topic_id
RETURNING *;;
"#,
)
.bind(topic.id) // $1
.bind(shard_index); // $2
let shard = query.fetch_one(&mut tx).await.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
tx.commit()
.await
.map_err(|e| Error::FailedToCommit { source: e })?;
Ok(shard)
}
}
#[async_trait]

View File

@ -882,6 +882,62 @@ WHERE id = $2;
Ok(())
}
async fn create_transition_shard(
&mut self,
topic_name: &str,
shard_index: ShardIndex,
) -> Result<Shard> {
let mut tx = self
.inner
.get_mut()
.pool
.begin()
.await
.map_err(|e| Error::StartTransaction { source: e })?;
let query = sqlx::query_as::<_, TopicMetadata>(
r#"
INSERT INTO topic ( name )
VALUES ( $1 )
ON CONFLICT (name)
DO UPDATE SET name = topic.name
RETURNING *;
"#,
)
.bind(topic_name); // $1
let topic = query
.fetch_one(&mut tx)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let query = sqlx::query_as::<_, Shard>(
r#"
INSERT INTO shard
( topic_id, shard_index, min_unpersisted_sequence_number )
VALUES
( $1, $2, 0 )
ON CONFLICT (topic_id, shard_index)
DO UPDATE SET topic_id = shard.topic_id
RETURNING *;
"#,
)
.bind(topic.id) // $1
.bind(shard_index); // $2
let shard = query.fetch_one(&mut tx).await.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
tx.commit()
.await
.map_err(|e| Error::FailedToCommit { source: e })?;
Ok(shard)
}
}
// We can't use [`Partition`], as uses Vec<String> which the Sqlite