fix: Rename columns, tables, indexes and constraints in postgres catalog
parent
5e187ae1c0
commit
8a0fa616cf
|
@ -418,7 +418,6 @@ pub struct Namespace {
|
|||
#[sqlx(default)]
|
||||
pub retention_duration: Option<String>,
|
||||
/// The topic that writes to this namespace will land in
|
||||
#[sqlx(rename = "kafka_topic_id")]
|
||||
pub topic_id: TopicId,
|
||||
/// The query pool assigned to answer queries for this namespace
|
||||
pub query_pool_id: QueryPoolId,
|
||||
|
@ -730,11 +729,9 @@ pub struct Shard {
|
|||
/// the id of the shard, assigned by the catalog
|
||||
pub id: ShardId,
|
||||
/// the topic the shard is reading from
|
||||
#[sqlx(rename = "kafka_topic_id")]
|
||||
pub topic_id: TopicId,
|
||||
/// the shard index of the shard the sequence numbers are coming from, sharded by the router
|
||||
/// and write buffer
|
||||
#[sqlx(rename = "kafka_partition")]
|
||||
pub shard_index: ShardIndex,
|
||||
/// The minimum unpersisted sequence number. Because different tables
|
||||
/// can be persisted at different times, it is possible some data has been persisted
|
||||
|
@ -807,7 +804,6 @@ pub struct Partition {
|
|||
/// the id of the partition
|
||||
pub id: PartitionId,
|
||||
/// the shard the data in the partition arrived from
|
||||
#[sqlx(rename = "sequencer_id")]
|
||||
pub shard_id: ShardId,
|
||||
/// the table the partition is under
|
||||
pub table_id: TableId,
|
||||
|
@ -867,7 +863,6 @@ pub struct PartitionParam {
|
|||
/// the partition
|
||||
pub partition_id: PartitionId,
|
||||
/// the partition's shard
|
||||
#[sqlx(rename = "sequencer_id")]
|
||||
pub shard_id: ShardId,
|
||||
/// the partition's namespace
|
||||
pub namespace_id: NamespaceId,
|
||||
|
@ -883,7 +878,6 @@ pub struct Tombstone {
|
|||
/// the table the tombstone is associated with
|
||||
pub table_id: TableId,
|
||||
/// the shard the tombstone was sent through
|
||||
#[sqlx(rename = "sequencer_id")]
|
||||
pub shard_id: ShardId,
|
||||
/// the sequence number assigned to the tombstone from the `router::Shard`
|
||||
pub sequence_number: SequenceNumber,
|
||||
|
@ -975,7 +969,6 @@ pub struct ParquetFile {
|
|||
/// the id of the file in the catalog
|
||||
pub id: ParquetFileId,
|
||||
/// the shard that sequenced writes that went into this file
|
||||
#[sqlx(rename = "sequencer_id")]
|
||||
pub shard_id: ShardId,
|
||||
/// the namespace
|
||||
pub namespace_id: NamespaceId,
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
-- Rename sequencer to shard
|
||||
ALTER TABLE IF EXISTS sequencer RENAME TO shard;
|
||||
ALTER TABLE IF EXISTS shard RENAME CONSTRAINT sequencer_unique TO shard_unique;
|
||||
ALTER TABLE IF EXISTS partition RENAME sequencer_id TO shard_id;
|
||||
ALTER TABLE IF EXISTS tombstone RENAME sequencer_id TO shard_id;
|
||||
ALTER TABLE IF EXISTS parquet_file RENAME sequencer_id TO shard_id;
|
||||
ALTER INDEX IF EXISTS parquet_file_sequencer_compaction_delete_idx
|
||||
RENAME TO parquet_file_shard_compaction_delete_idx;
|
||||
|
||||
-- Rename kafka_partition to shard_index
|
||||
ALTER TABLE IF EXISTS shard RENAME kafka_partition TO shard_index;
|
||||
|
||||
-- Rename kafka_topic to topic
|
||||
ALTER TABLE IF EXISTS kafka_topic RENAME TO topic;
|
||||
ALTER TABLE IF EXISTS topic RENAME CONSTRAINT kafka_topic_name_unique TO topic_name_unique;
|
||||
ALTER TABLE IF EXISTS namespace RENAME kafka_topic_id TO topic_id;
|
||||
ALTER TABLE IF EXISTS shard RENAME kafka_topic_id TO topic_id;
|
|
@ -329,7 +329,6 @@ pub trait TableRepo: Send + Sync {
|
|||
#[derive(Debug, Copy, Clone, Eq, PartialEq, sqlx::FromRow)]
|
||||
pub struct TablePersistInfo {
|
||||
/// shard the sequence numbers are associated with
|
||||
#[sqlx(rename = "sequencer_id")]
|
||||
pub shard_id: ShardId,
|
||||
/// the global identifier for the table
|
||||
pub table_id: TableId,
|
||||
|
|
|
@ -507,10 +507,10 @@ impl TopicMetadataRepo for PostgresTxn {
|
|||
async fn create_or_get(&mut self, name: &str) -> Result<TopicMetadata> {
|
||||
let rec = sqlx::query_as::<_, TopicMetadata>(
|
||||
r#"
|
||||
INSERT INTO kafka_topic ( name )
|
||||
INSERT INTO topic ( name )
|
||||
VALUES ( $1 )
|
||||
ON CONFLICT ON CONSTRAINT kafka_topic_name_unique
|
||||
DO UPDATE SET name = kafka_topic.name
|
||||
ON CONFLICT ON CONSTRAINT topic_name_unique
|
||||
DO UPDATE SET name = topic.name
|
||||
RETURNING *;
|
||||
"#,
|
||||
)
|
||||
|
@ -526,7 +526,7 @@ RETURNING *;
|
|||
let rec = sqlx::query_as::<_, TopicMetadata>(
|
||||
r#"
|
||||
SELECT *
|
||||
FROM kafka_topic
|
||||
FROM topic
|
||||
WHERE name = $1;
|
||||
"#,
|
||||
)
|
||||
|
@ -576,7 +576,7 @@ impl NamespaceRepo for PostgresTxn {
|
|||
) -> Result<Namespace> {
|
||||
let rec = sqlx::query_as::<_, Namespace>(
|
||||
r#"
|
||||
INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id )
|
||||
INSERT INTO namespace ( name, retention_duration, topic_id, query_pool_id )
|
||||
VALUES ( $1, $2, $3, $4 )
|
||||
RETURNING *;
|
||||
"#,
|
||||
|
@ -835,13 +835,13 @@ WHERE namespace_id = $1;
|
|||
let rec = sqlx::query_as::<_, TablePersistInfo>(
|
||||
r#"
|
||||
WITH tid as (SELECT id FROM table_name WHERE name = $2 AND namespace_id = $3)
|
||||
SELECT $1 as sequencer_id, id as table_id,
|
||||
SELECT $1 as shard_id, id as table_id,
|
||||
tombstone.sequence_number as tombstone_max_sequence_number
|
||||
FROM tid
|
||||
LEFT JOIN (
|
||||
SELECT tombstone.table_id, sequence_number
|
||||
FROM tombstone
|
||||
WHERE sequencer_id = $1 AND tombstone.table_id = (SELECT id FROM tid)
|
||||
WHERE shard_id = $1 AND tombstone.table_id = (SELECT id FROM tid)
|
||||
ORDER BY sequence_number DESC
|
||||
LIMIT 1
|
||||
) tombstone ON tombstone.table_id = tid.id
|
||||
|
@ -1035,12 +1035,12 @@ impl ShardRepo for PostgresTxn {
|
|||
) -> Result<Shard> {
|
||||
sqlx::query_as::<_, Shard>(
|
||||
r#"
|
||||
INSERT INTO sequencer
|
||||
( kafka_topic_id, kafka_partition, min_unpersisted_sequence_number )
|
||||
INSERT INTO shard
|
||||
( topic_id, shard_index, min_unpersisted_sequence_number )
|
||||
VALUES
|
||||
( $1, $2, 0 )
|
||||
ON CONFLICT ON CONSTRAINT sequencer_unique
|
||||
DO UPDATE SET kafka_topic_id = sequencer.kafka_topic_id
|
||||
ON CONFLICT ON CONSTRAINT shard_unique
|
||||
DO UPDATE SET topic_id = shard.topic_id
|
||||
RETURNING *;;
|
||||
"#,
|
||||
)
|
||||
|
@ -1065,9 +1065,9 @@ RETURNING *;;
|
|||
let rec = sqlx::query_as::<_, Shard>(
|
||||
r#"
|
||||
SELECT *
|
||||
FROM sequencer
|
||||
WHERE kafka_topic_id = $1
|
||||
AND kafka_partition = $2;
|
||||
FROM shard
|
||||
WHERE topic_id = $1
|
||||
AND shard_index = $2;
|
||||
"#,
|
||||
)
|
||||
.bind(topic_id) // $1
|
||||
|
@ -1085,14 +1085,14 @@ WHERE kafka_topic_id = $1
|
|||
}
|
||||
|
||||
async fn list(&mut self) -> Result<Vec<Shard>> {
|
||||
sqlx::query_as::<_, Shard>(r#"SELECT * FROM sequencer;"#)
|
||||
sqlx::query_as::<_, Shard>(r#"SELECT * FROM shard;"#)
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn list_by_topic(&mut self, topic: &TopicMetadata) -> Result<Vec<Shard>> {
|
||||
sqlx::query_as::<_, Shard>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#)
|
||||
sqlx::query_as::<_, Shard>(r#"SELECT * FROM shard WHERE topic_id = $1;"#)
|
||||
.bind(&topic.id) // $1
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
|
@ -1105,7 +1105,11 @@ WHERE kafka_topic_id = $1
|
|||
sequence_number: SequenceNumber,
|
||||
) -> Result<()> {
|
||||
let _ = sqlx::query(
|
||||
r#"UPDATE sequencer SET min_unpersisted_sequence_number = $1 WHERE id = $2;"#,
|
||||
r#"
|
||||
UPDATE shard
|
||||
SET min_unpersisted_sequence_number = $1
|
||||
WHERE id = $2;
|
||||
"#,
|
||||
)
|
||||
.bind(&sequence_number.get()) // $1
|
||||
.bind(&shard_id) // $2
|
||||
|
@ -1125,13 +1129,14 @@ impl PartitionRepo for PostgresTxn {
|
|||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
) -> Result<Partition> {
|
||||
// Note: since sort_key is now an array, we must explicitly insert '{}' which is an empty array
|
||||
// rather than NULL which sqlx will throw `UnexpectedNullError` while is is doing `ColumnDecode`
|
||||
// Note: since sort_key is now an array, we must explicitly insert '{}' which is an empty
|
||||
// array rather than NULL which sqlx will throw `UnexpectedNullError` while is is doing
|
||||
// `ColumnDecode`
|
||||
|
||||
let v = sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
INSERT INTO partition
|
||||
( partition_key, sequencer_id, table_id, sort_key)
|
||||
( partition_key, shard_id, table_id, sort_key)
|
||||
VALUES
|
||||
( $1, $2, $3, '{}')
|
||||
ON CONFLICT ON CONSTRAINT partition_key_unique
|
||||
|
@ -1180,7 +1185,7 @@ RETURNING *;
|
|||
}
|
||||
|
||||
async fn list_by_shard(&mut self, shard_id: ShardId) -> Result<Vec<Partition>> {
|
||||
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#)
|
||||
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE shard_id = $1;"#)
|
||||
.bind(&shard_id) // $1
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
|
@ -1238,7 +1243,7 @@ WHERE partition.id = $1;
|
|||
let table_name = info.get("table_name");
|
||||
let partition = Partition {
|
||||
id: info.get("id"),
|
||||
shard_id: info.get("sequencer_id"),
|
||||
shard_id: info.get("shard_id"),
|
||||
table_id: info.get("table_id"),
|
||||
partition_key: info.get("partition_key"),
|
||||
sort_key: info.get("sort_key"),
|
||||
|
@ -1274,7 +1279,12 @@ RETURNING *;
|
|||
_ => Error::SqlxError { source: e },
|
||||
})?;
|
||||
|
||||
debug!(?partition_id, input_sort_key=?sort_key, partition_after_catalog_update=?partition, "Paritition after updating sort key");
|
||||
debug!(
|
||||
?partition_id,
|
||||
input_sort_key=?sort_key,
|
||||
partition_after_catalog_update=?partition,
|
||||
"Partition after updating sort key"
|
||||
);
|
||||
|
||||
Ok(partition)
|
||||
}
|
||||
|
@ -1294,7 +1304,7 @@ impl TombstoneRepo for PostgresTxn {
|
|||
let v = sqlx::query_as::<_, Tombstone>(
|
||||
r#"
|
||||
INSERT INTO tombstone
|
||||
( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate )
|
||||
( table_id, shard_id, sequence_number, min_time, max_time, serialized_predicate )
|
||||
VALUES
|
||||
( $1, $2, $3, $4, $5, $6 )
|
||||
ON CONFLICT ON CONSTRAINT tombstone_unique
|
||||
|
@ -1346,7 +1356,7 @@ RETURNING *;
|
|||
SELECT
|
||||
tombstone.id as id,
|
||||
tombstone.table_id as table_id,
|
||||
tombstone.sequencer_id as sequencer_id,
|
||||
tombstone.shard_id as shard_id,
|
||||
tombstone.sequence_number as sequence_number,
|
||||
tombstone.min_time as min_time,
|
||||
tombstone.max_time as max_time,
|
||||
|
@ -1407,7 +1417,7 @@ WHERE id = $1;
|
|||
r#"
|
||||
SELECT *
|
||||
FROM tombstone
|
||||
WHERE sequencer_id = $1
|
||||
WHERE shard_id = $1
|
||||
AND sequence_number > $2
|
||||
ORDER BY id;
|
||||
"#,
|
||||
|
@ -1463,7 +1473,7 @@ WHERE id = ANY($1);
|
|||
r#"
|
||||
SELECT *
|
||||
FROM tombstone
|
||||
WHERE sequencer_id = $1
|
||||
WHERE shard_id = $1
|
||||
AND table_id = $2
|
||||
AND sequence_number > $3
|
||||
AND ((min_time <= $4 AND max_time >= $4)
|
||||
|
@ -1504,7 +1514,7 @@ impl ParquetFileRepo for PostgresTxn {
|
|||
let rec = sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
INSERT INTO parquet_file (
|
||||
sequencer_id, table_id, partition_id, object_store_id,
|
||||
shard_id, table_id, partition_id, object_store_id,
|
||||
max_sequence_number, min_time, max_time, file_size_bytes,
|
||||
row_count, compaction_level, created_at, namespace_id, column_set )
|
||||
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
|
||||
|
@ -1561,11 +1571,11 @@ RETURNING *;
|
|||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at, column_set
|
||||
FROM parquet_file
|
||||
WHERE sequencer_id = $1
|
||||
WHERE shard_id = $1
|
||||
AND max_sequence_number > $2
|
||||
ORDER BY id;
|
||||
"#,
|
||||
|
@ -1585,7 +1595,7 @@ ORDER BY id;
|
|||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT parquet_file.id, parquet_file.sequencer_id, parquet_file.namespace_id,
|
||||
SELECT parquet_file.id, parquet_file.shard_id, parquet_file.namespace_id,
|
||||
parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id,
|
||||
parquet_file.max_sequence_number, parquet_file.min_time,
|
||||
parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes,
|
||||
|
@ -1607,7 +1617,7 @@ WHERE table_name.namespace_id = $1
|
|||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at, column_set
|
||||
FROM parquet_file
|
||||
|
@ -1642,11 +1652,11 @@ RETURNING *;
|
|||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at, column_set
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.sequencer_id = $1
|
||||
WHERE parquet_file.shard_id = $1
|
||||
AND parquet_file.compaction_level = 0
|
||||
AND parquet_file.to_delete IS NULL
|
||||
LIMIT 1000;
|
||||
|
@ -1668,11 +1678,11 @@ WHERE parquet_file.sequencer_id = $1
|
|||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at, column_set
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.sequencer_id = $1
|
||||
WHERE parquet_file.shard_id = $1
|
||||
AND parquet_file.table_id = $2
|
||||
AND parquet_file.partition_id = $3
|
||||
AND parquet_file.compaction_level = $4
|
||||
|
@ -1709,10 +1719,10 @@ WHERE parquet_file.sequencer_id = $1
|
|||
// and index on (shard_id, comapction_level, to_delete, created_at)
|
||||
sqlx::query_as::<_, PartitionParam>(
|
||||
r#"
|
||||
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id)
|
||||
SELECT partition_id, shard_id, namespace_id, table_id, count(id)
|
||||
FROM parquet_file
|
||||
WHERE compaction_level = 0 and to_delete is null
|
||||
and sequencer_id = $1
|
||||
and shard_id = $1
|
||||
and to_timestamp(created_at/1000000000) > now() - ($2 || 'hour')::interval
|
||||
group by 1, 2, 3, 4
|
||||
having count(id) >= $3
|
||||
|
@ -1742,11 +1752,11 @@ limit $4;
|
|||
// We have index on (shard_id, comapction_level, to_delete)
|
||||
sqlx::query_as::<_, PartitionParam>(
|
||||
r#"
|
||||
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id), max(created_at)
|
||||
SELECT partition_id, shard_id, namespace_id, table_id, count(id), max(created_at)
|
||||
FROM parquet_file
|
||||
WHERE compaction_level = 0
|
||||
AND to_delete IS NULL
|
||||
AND sequencer_id = $1
|
||||
AND shard_id = $1
|
||||
GROUP BY 1, 2, 3, 4
|
||||
HAVING to_timestamp(max(created_at)/1000000000) < now() - ($2 || 'hour')::interval
|
||||
ORDER BY 5 DESC
|
||||
|
@ -1769,7 +1779,7 @@ LIMIT $3;
|
|||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at, column_set
|
||||
FROM parquet_file
|
||||
|
@ -1843,7 +1853,7 @@ RETURNING id;
|
|||
SELECT count(1) as count
|
||||
FROM parquet_file
|
||||
WHERE table_id = $1
|
||||
AND sequencer_id = $2
|
||||
AND shard_id = $2
|
||||
AND max_sequence_number < $3
|
||||
AND parquet_file.to_delete IS NULL
|
||||
AND compaction_level = 0
|
||||
|
@ -1875,7 +1885,7 @@ WHERE table_id = $1
|
|||
SELECT count(1) as count
|
||||
FROM parquet_file
|
||||
WHERE table_id = $1
|
||||
AND sequencer_id = $2
|
||||
AND shard_id = $2
|
||||
AND parquet_file.to_delete IS NULL
|
||||
AND compaction_level = 1
|
||||
AND ((parquet_file.min_time <= $3 AND parquet_file.max_time >= $3)
|
||||
|
@ -1901,7 +1911,7 @@ WHERE table_id = $1
|
|||
// `parquet_metadata` column!!
|
||||
let rec = sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at, column_set
|
||||
FROM parquet_file
|
||||
|
|
Loading…
Reference in New Issue