diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index c3c5eac6b7..42d9954b41 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -418,7 +418,6 @@ pub struct Namespace { #[sqlx(default)] pub retention_duration: Option, /// The topic that writes to this namespace will land in - #[sqlx(rename = "kafka_topic_id")] pub topic_id: TopicId, /// The query pool assigned to answer queries for this namespace pub query_pool_id: QueryPoolId, @@ -730,11 +729,9 @@ pub struct Shard { /// the id of the shard, assigned by the catalog pub id: ShardId, /// the topic the shard is reading from - #[sqlx(rename = "kafka_topic_id")] pub topic_id: TopicId, /// the shard index of the shard the sequence numbers are coming from, sharded by the router /// and write buffer - #[sqlx(rename = "kafka_partition")] pub shard_index: ShardIndex, /// The minimum unpersisted sequence number. Because different tables /// can be persisted at different times, it is possible some data has been persisted @@ -807,7 +804,6 @@ pub struct Partition { /// the id of the partition pub id: PartitionId, /// the shard the data in the partition arrived from - #[sqlx(rename = "sequencer_id")] pub shard_id: ShardId, /// the table the partition is under pub table_id: TableId, @@ -867,7 +863,6 @@ pub struct PartitionParam { /// the partition pub partition_id: PartitionId, /// the partition's shard - #[sqlx(rename = "sequencer_id")] pub shard_id: ShardId, /// the partition's namespace pub namespace_id: NamespaceId, @@ -883,7 +878,6 @@ pub struct Tombstone { /// the table the tombstone is associated with pub table_id: TableId, /// the shard the tombstone was sent through - #[sqlx(rename = "sequencer_id")] pub shard_id: ShardId, /// the sequence number assigned to the tombstone from the `router::Shard` pub sequence_number: SequenceNumber, @@ -975,7 +969,6 @@ pub struct ParquetFile { /// the id of the file in the catalog pub id: ParquetFileId, /// the shard that sequenced writes that went into this file - #[sqlx(rename = "sequencer_id")] pub shard_id: ShardId, /// the namespace pub namespace_id: NamespaceId, diff --git a/iox_catalog/migrations/20220819154515_sequencer_to_shard.sql b/iox_catalog/migrations/20220819154515_sequencer_to_shard.sql new file mode 100644 index 0000000000..600549ea43 --- /dev/null +++ b/iox_catalog/migrations/20220819154515_sequencer_to_shard.sql @@ -0,0 +1,17 @@ +-- Rename sequencer to shard +ALTER TABLE IF EXISTS sequencer RENAME TO shard; +ALTER TABLE IF EXISTS shard RENAME CONSTRAINT sequencer_unique TO shard_unique; +ALTER TABLE IF EXISTS partition RENAME sequencer_id TO shard_id; +ALTER TABLE IF EXISTS tombstone RENAME sequencer_id TO shard_id; +ALTER TABLE IF EXISTS parquet_file RENAME sequencer_id TO shard_id; +ALTER INDEX IF EXISTS parquet_file_sequencer_compaction_delete_idx + RENAME TO parquet_file_shard_compaction_delete_idx; + +-- Rename kafka_partition to shard_index +ALTER TABLE IF EXISTS shard RENAME kafka_partition TO shard_index; + +-- Rename kafka_topic to topic +ALTER TABLE IF EXISTS kafka_topic RENAME TO topic; +ALTER TABLE IF EXISTS topic RENAME CONSTRAINT kafka_topic_name_unique TO topic_name_unique; +ALTER TABLE IF EXISTS namespace RENAME kafka_topic_id TO topic_id; +ALTER TABLE IF EXISTS shard RENAME kafka_topic_id TO topic_id; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index c97be53778..f4a6275e83 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -329,7 +329,6 @@ pub trait TableRepo: Send + Sync { #[derive(Debug, Copy, Clone, Eq, PartialEq, sqlx::FromRow)] pub struct TablePersistInfo { /// shard the sequence numbers are associated with - #[sqlx(rename = "sequencer_id")] pub shard_id: ShardId, /// the global identifier for the table pub table_id: TableId, diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 9a1949d265..008221bfc7 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -507,10 +507,10 @@ impl TopicMetadataRepo for PostgresTxn { async fn create_or_get(&mut self, name: &str) -> Result { let rec = sqlx::query_as::<_, TopicMetadata>( r#" -INSERT INTO kafka_topic ( name ) +INSERT INTO topic ( name ) VALUES ( $1 ) -ON CONFLICT ON CONSTRAINT kafka_topic_name_unique -DO UPDATE SET name = kafka_topic.name +ON CONFLICT ON CONSTRAINT topic_name_unique +DO UPDATE SET name = topic.name RETURNING *; "#, ) @@ -526,7 +526,7 @@ RETURNING *; let rec = sqlx::query_as::<_, TopicMetadata>( r#" SELECT * -FROM kafka_topic +FROM topic WHERE name = $1; "#, ) @@ -576,7 +576,7 @@ impl NamespaceRepo for PostgresTxn { ) -> Result { let rec = sqlx::query_as::<_, Namespace>( r#" -INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id ) +INSERT INTO namespace ( name, retention_duration, topic_id, query_pool_id ) VALUES ( $1, $2, $3, $4 ) RETURNING *; "#, @@ -835,13 +835,13 @@ WHERE namespace_id = $1; let rec = sqlx::query_as::<_, TablePersistInfo>( r#" WITH tid as (SELECT id FROM table_name WHERE name = $2 AND namespace_id = $3) -SELECT $1 as sequencer_id, id as table_id, +SELECT $1 as shard_id, id as table_id, tombstone.sequence_number as tombstone_max_sequence_number FROM tid LEFT JOIN ( SELECT tombstone.table_id, sequence_number FROM tombstone - WHERE sequencer_id = $1 AND tombstone.table_id = (SELECT id FROM tid) + WHERE shard_id = $1 AND tombstone.table_id = (SELECT id FROM tid) ORDER BY sequence_number DESC LIMIT 1 ) tombstone ON tombstone.table_id = tid.id @@ -1035,12 +1035,12 @@ impl ShardRepo for PostgresTxn { ) -> Result { sqlx::query_as::<_, Shard>( r#" -INSERT INTO sequencer - ( kafka_topic_id, kafka_partition, min_unpersisted_sequence_number ) +INSERT INTO shard + ( topic_id, shard_index, min_unpersisted_sequence_number ) VALUES ( $1, $2, 0 ) -ON CONFLICT ON CONSTRAINT sequencer_unique -DO UPDATE SET kafka_topic_id = sequencer.kafka_topic_id +ON CONFLICT ON CONSTRAINT shard_unique +DO UPDATE SET topic_id = shard.topic_id RETURNING *;; "#, ) @@ -1065,9 +1065,9 @@ RETURNING *;; let rec = sqlx::query_as::<_, Shard>( r#" SELECT * -FROM sequencer -WHERE kafka_topic_id = $1 - AND kafka_partition = $2; +FROM shard +WHERE topic_id = $1 + AND shard_index = $2; "#, ) .bind(topic_id) // $1 @@ -1085,14 +1085,14 @@ WHERE kafka_topic_id = $1 } async fn list(&mut self) -> Result> { - sqlx::query_as::<_, Shard>(r#"SELECT * FROM sequencer;"#) + sqlx::query_as::<_, Shard>(r#"SELECT * FROM shard;"#) .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e }) } async fn list_by_topic(&mut self, topic: &TopicMetadata) -> Result> { - sqlx::query_as::<_, Shard>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#) + sqlx::query_as::<_, Shard>(r#"SELECT * FROM shard WHERE topic_id = $1;"#) .bind(&topic.id) // $1 .fetch_all(&mut self.inner) .await @@ -1105,7 +1105,11 @@ WHERE kafka_topic_id = $1 sequence_number: SequenceNumber, ) -> Result<()> { let _ = sqlx::query( - r#"UPDATE sequencer SET min_unpersisted_sequence_number = $1 WHERE id = $2;"#, + r#" +UPDATE shard +SET min_unpersisted_sequence_number = $1 +WHERE id = $2; + "#, ) .bind(&sequence_number.get()) // $1 .bind(&shard_id) // $2 @@ -1125,13 +1129,14 @@ impl PartitionRepo for PostgresTxn { shard_id: ShardId, table_id: TableId, ) -> Result { - // Note: since sort_key is now an array, we must explicitly insert '{}' which is an empty array - // rather than NULL which sqlx will throw `UnexpectedNullError` while is is doing `ColumnDecode` + // Note: since sort_key is now an array, we must explicitly insert '{}' which is an empty + // array rather than NULL which sqlx will throw `UnexpectedNullError` while is is doing + // `ColumnDecode` let v = sqlx::query_as::<_, Partition>( r#" INSERT INTO partition - ( partition_key, sequencer_id, table_id, sort_key) + ( partition_key, shard_id, table_id, sort_key) VALUES ( $1, $2, $3, '{}') ON CONFLICT ON CONSTRAINT partition_key_unique @@ -1180,7 +1185,7 @@ RETURNING *; } async fn list_by_shard(&mut self, shard_id: ShardId) -> Result> { - sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#) + sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE shard_id = $1;"#) .bind(&shard_id) // $1 .fetch_all(&mut self.inner) .await @@ -1238,7 +1243,7 @@ WHERE partition.id = $1; let table_name = info.get("table_name"); let partition = Partition { id: info.get("id"), - shard_id: info.get("sequencer_id"), + shard_id: info.get("shard_id"), table_id: info.get("table_id"), partition_key: info.get("partition_key"), sort_key: info.get("sort_key"), @@ -1274,7 +1279,12 @@ RETURNING *; _ => Error::SqlxError { source: e }, })?; - debug!(?partition_id, input_sort_key=?sort_key, partition_after_catalog_update=?partition, "Paritition after updating sort key"); + debug!( + ?partition_id, + input_sort_key=?sort_key, + partition_after_catalog_update=?partition, + "Partition after updating sort key" + ); Ok(partition) } @@ -1294,7 +1304,7 @@ impl TombstoneRepo for PostgresTxn { let v = sqlx::query_as::<_, Tombstone>( r#" INSERT INTO tombstone - ( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate ) + ( table_id, shard_id, sequence_number, min_time, max_time, serialized_predicate ) VALUES ( $1, $2, $3, $4, $5, $6 ) ON CONFLICT ON CONSTRAINT tombstone_unique @@ -1346,7 +1356,7 @@ RETURNING *; SELECT tombstone.id as id, tombstone.table_id as table_id, - tombstone.sequencer_id as sequencer_id, + tombstone.shard_id as shard_id, tombstone.sequence_number as sequence_number, tombstone.min_time as min_time, tombstone.max_time as max_time, @@ -1407,7 +1417,7 @@ WHERE id = $1; r#" SELECT * FROM tombstone -WHERE sequencer_id = $1 +WHERE shard_id = $1 AND sequence_number > $2 ORDER BY id; "#, @@ -1463,7 +1473,7 @@ WHERE id = ANY($1); r#" SELECT * FROM tombstone -WHERE sequencer_id = $1 +WHERE shard_id = $1 AND table_id = $2 AND sequence_number > $3 AND ((min_time <= $4 AND max_time >= $4) @@ -1504,7 +1514,7 @@ impl ParquetFileRepo for PostgresTxn { let rec = sqlx::query_as::<_, ParquetFile>( r#" INSERT INTO parquet_file ( - sequencer_id, table_id, partition_id, object_store_id, + shard_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, file_size_bytes, row_count, compaction_level, created_at, namespace_id, column_set ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) @@ -1561,11 +1571,11 @@ RETURNING *; // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set FROM parquet_file -WHERE sequencer_id = $1 +WHERE shard_id = $1 AND max_sequence_number > $2 ORDER BY id; "#, @@ -1585,7 +1595,7 @@ ORDER BY id; // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT parquet_file.id, parquet_file.sequencer_id, parquet_file.namespace_id, +SELECT parquet_file.id, parquet_file.shard_id, parquet_file.namespace_id, parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id, parquet_file.max_sequence_number, parquet_file.min_time, parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes, @@ -1607,7 +1617,7 @@ WHERE table_name.namespace_id = $1 // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set FROM parquet_file @@ -1642,11 +1652,11 @@ RETURNING *; // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set FROM parquet_file -WHERE parquet_file.sequencer_id = $1 +WHERE parquet_file.shard_id = $1 AND parquet_file.compaction_level = 0 AND parquet_file.to_delete IS NULL LIMIT 1000; @@ -1668,11 +1678,11 @@ WHERE parquet_file.sequencer_id = $1 // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set FROM parquet_file -WHERE parquet_file.sequencer_id = $1 +WHERE parquet_file.shard_id = $1 AND parquet_file.table_id = $2 AND parquet_file.partition_id = $3 AND parquet_file.compaction_level = $4 @@ -1709,10 +1719,10 @@ WHERE parquet_file.sequencer_id = $1 // and index on (shard_id, comapction_level, to_delete, created_at) sqlx::query_as::<_, PartitionParam>( r#" -SELECT partition_id, sequencer_id, namespace_id, table_id, count(id) +SELECT partition_id, shard_id, namespace_id, table_id, count(id) FROM parquet_file WHERE compaction_level = 0 and to_delete is null - and sequencer_id = $1 + and shard_id = $1 and to_timestamp(created_at/1000000000) > now() - ($2 || 'hour')::interval group by 1, 2, 3, 4 having count(id) >= $3 @@ -1742,11 +1752,11 @@ limit $4; // We have index on (shard_id, comapction_level, to_delete) sqlx::query_as::<_, PartitionParam>( r#" -SELECT partition_id, sequencer_id, namespace_id, table_id, count(id), max(created_at) +SELECT partition_id, shard_id, namespace_id, table_id, count(id), max(created_at) FROM parquet_file WHERE compaction_level = 0 AND to_delete IS NULL -AND sequencer_id = $1 +AND shard_id = $1 GROUP BY 1, 2, 3, 4 HAVING to_timestamp(max(created_at)/1000000000) < now() - ($2 || 'hour')::interval ORDER BY 5 DESC @@ -1769,7 +1779,7 @@ LIMIT $3; // `parquet_metadata` column!! sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set FROM parquet_file @@ -1843,7 +1853,7 @@ RETURNING id; SELECT count(1) as count FROM parquet_file WHERE table_id = $1 - AND sequencer_id = $2 + AND shard_id = $2 AND max_sequence_number < $3 AND parquet_file.to_delete IS NULL AND compaction_level = 0 @@ -1875,7 +1885,7 @@ WHERE table_id = $1 SELECT count(1) as count FROM parquet_file WHERE table_id = $1 - AND sequencer_id = $2 + AND shard_id = $2 AND parquet_file.to_delete IS NULL AND compaction_level = 1 AND ((parquet_file.min_time <= $3 AND parquet_file.max_time >= $3) @@ -1901,7 +1911,7 @@ WHERE table_id = $1 // `parquet_metadata` column!! let rec = sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id, +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set FROM parquet_file