|
|
|
@ -507,10 +507,10 @@ impl TopicMetadataRepo for PostgresTxn {
|
|
|
|
|
async fn create_or_get(&mut self, name: &str) -> Result<TopicMetadata> {
|
|
|
|
|
let rec = sqlx::query_as::<_, TopicMetadata>(
|
|
|
|
|
r#"
|
|
|
|
|
INSERT INTO kafka_topic ( name )
|
|
|
|
|
INSERT INTO topic ( name )
|
|
|
|
|
VALUES ( $1 )
|
|
|
|
|
ON CONFLICT ON CONSTRAINT kafka_topic_name_unique
|
|
|
|
|
DO UPDATE SET name = kafka_topic.name
|
|
|
|
|
ON CONFLICT ON CONSTRAINT topic_name_unique
|
|
|
|
|
DO UPDATE SET name = topic.name
|
|
|
|
|
RETURNING *;
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
@ -526,7 +526,7 @@ RETURNING *;
|
|
|
|
|
let rec = sqlx::query_as::<_, TopicMetadata>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM kafka_topic
|
|
|
|
|
FROM topic
|
|
|
|
|
WHERE name = $1;
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
@ -576,7 +576,7 @@ impl NamespaceRepo for PostgresTxn {
|
|
|
|
|
) -> Result<Namespace> {
|
|
|
|
|
let rec = sqlx::query_as::<_, Namespace>(
|
|
|
|
|
r#"
|
|
|
|
|
INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id )
|
|
|
|
|
INSERT INTO namespace ( name, retention_duration, topic_id, query_pool_id )
|
|
|
|
|
VALUES ( $1, $2, $3, $4 )
|
|
|
|
|
RETURNING *;
|
|
|
|
|
"#,
|
|
|
|
@ -835,13 +835,13 @@ WHERE namespace_id = $1;
|
|
|
|
|
let rec = sqlx::query_as::<_, TablePersistInfo>(
|
|
|
|
|
r#"
|
|
|
|
|
WITH tid as (SELECT id FROM table_name WHERE name = $2 AND namespace_id = $3)
|
|
|
|
|
SELECT $1 as sequencer_id, id as table_id,
|
|
|
|
|
SELECT $1 as shard_id, id as table_id,
|
|
|
|
|
tombstone.sequence_number as tombstone_max_sequence_number
|
|
|
|
|
FROM tid
|
|
|
|
|
LEFT JOIN (
|
|
|
|
|
SELECT tombstone.table_id, sequence_number
|
|
|
|
|
FROM tombstone
|
|
|
|
|
WHERE sequencer_id = $1 AND tombstone.table_id = (SELECT id FROM tid)
|
|
|
|
|
WHERE shard_id = $1 AND tombstone.table_id = (SELECT id FROM tid)
|
|
|
|
|
ORDER BY sequence_number DESC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
) tombstone ON tombstone.table_id = tid.id
|
|
|
|
@ -1035,12 +1035,12 @@ impl ShardRepo for PostgresTxn {
|
|
|
|
|
) -> Result<Shard> {
|
|
|
|
|
sqlx::query_as::<_, Shard>(
|
|
|
|
|
r#"
|
|
|
|
|
INSERT INTO sequencer
|
|
|
|
|
( kafka_topic_id, kafka_partition, min_unpersisted_sequence_number )
|
|
|
|
|
INSERT INTO shard
|
|
|
|
|
( topic_id, shard_index, min_unpersisted_sequence_number )
|
|
|
|
|
VALUES
|
|
|
|
|
( $1, $2, 0 )
|
|
|
|
|
ON CONFLICT ON CONSTRAINT sequencer_unique
|
|
|
|
|
DO UPDATE SET kafka_topic_id = sequencer.kafka_topic_id
|
|
|
|
|
ON CONFLICT ON CONSTRAINT shard_unique
|
|
|
|
|
DO UPDATE SET topic_id = shard.topic_id
|
|
|
|
|
RETURNING *;;
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
@ -1065,9 +1065,9 @@ RETURNING *;;
|
|
|
|
|
let rec = sqlx::query_as::<_, Shard>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM sequencer
|
|
|
|
|
WHERE kafka_topic_id = $1
|
|
|
|
|
AND kafka_partition = $2;
|
|
|
|
|
FROM shard
|
|
|
|
|
WHERE topic_id = $1
|
|
|
|
|
AND shard_index = $2;
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
|
.bind(topic_id) // $1
|
|
|
|
@ -1085,14 +1085,14 @@ WHERE kafka_topic_id = $1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn list(&mut self) -> Result<Vec<Shard>> {
|
|
|
|
|
sqlx::query_as::<_, Shard>(r#"SELECT * FROM sequencer;"#)
|
|
|
|
|
sqlx::query_as::<_, Shard>(r#"SELECT * FROM shard;"#)
|
|
|
|
|
.fetch_all(&mut self.inner)
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| Error::SqlxError { source: e })
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn list_by_topic(&mut self, topic: &TopicMetadata) -> Result<Vec<Shard>> {
|
|
|
|
|
sqlx::query_as::<_, Shard>(r#"SELECT * FROM sequencer WHERE kafka_topic_id = $1;"#)
|
|
|
|
|
sqlx::query_as::<_, Shard>(r#"SELECT * FROM shard WHERE topic_id = $1;"#)
|
|
|
|
|
.bind(&topic.id) // $1
|
|
|
|
|
.fetch_all(&mut self.inner)
|
|
|
|
|
.await
|
|
|
|
@ -1105,7 +1105,11 @@ WHERE kafka_topic_id = $1
|
|
|
|
|
sequence_number: SequenceNumber,
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let _ = sqlx::query(
|
|
|
|
|
r#"UPDATE sequencer SET min_unpersisted_sequence_number = $1 WHERE id = $2;"#,
|
|
|
|
|
r#"
|
|
|
|
|
UPDATE shard
|
|
|
|
|
SET min_unpersisted_sequence_number = $1
|
|
|
|
|
WHERE id = $2;
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
|
.bind(&sequence_number.get()) // $1
|
|
|
|
|
.bind(&shard_id) // $2
|
|
|
|
@ -1125,13 +1129,14 @@ impl PartitionRepo for PostgresTxn {
|
|
|
|
|
shard_id: ShardId,
|
|
|
|
|
table_id: TableId,
|
|
|
|
|
) -> Result<Partition> {
|
|
|
|
|
// Note: since sort_key is now an array, we must explicitly insert '{}' which is an empty array
|
|
|
|
|
// rather than NULL which sqlx will throw `UnexpectedNullError` while is is doing `ColumnDecode`
|
|
|
|
|
// Note: since sort_key is now an array, we must explicitly insert '{}' which is an empty
|
|
|
|
|
// array rather than NULL which sqlx will throw `UnexpectedNullError` while is is doing
|
|
|
|
|
// `ColumnDecode`
|
|
|
|
|
|
|
|
|
|
let v = sqlx::query_as::<_, Partition>(
|
|
|
|
|
r#"
|
|
|
|
|
INSERT INTO partition
|
|
|
|
|
( partition_key, sequencer_id, table_id, sort_key)
|
|
|
|
|
( partition_key, shard_id, table_id, sort_key)
|
|
|
|
|
VALUES
|
|
|
|
|
( $1, $2, $3, '{}')
|
|
|
|
|
ON CONFLICT ON CONSTRAINT partition_key_unique
|
|
|
|
@ -1180,7 +1185,7 @@ RETURNING *;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn list_by_shard(&mut self, shard_id: ShardId) -> Result<Vec<Partition>> {
|
|
|
|
|
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE sequencer_id = $1;"#)
|
|
|
|
|
sqlx::query_as::<_, Partition>(r#"SELECT * FROM partition WHERE shard_id = $1;"#)
|
|
|
|
|
.bind(&shard_id) // $1
|
|
|
|
|
.fetch_all(&mut self.inner)
|
|
|
|
|
.await
|
|
|
|
@ -1238,7 +1243,7 @@ WHERE partition.id = $1;
|
|
|
|
|
let table_name = info.get("table_name");
|
|
|
|
|
let partition = Partition {
|
|
|
|
|
id: info.get("id"),
|
|
|
|
|
shard_id: info.get("sequencer_id"),
|
|
|
|
|
shard_id: info.get("shard_id"),
|
|
|
|
|
table_id: info.get("table_id"),
|
|
|
|
|
partition_key: info.get("partition_key"),
|
|
|
|
|
sort_key: info.get("sort_key"),
|
|
|
|
@ -1274,7 +1279,12 @@ RETURNING *;
|
|
|
|
|
_ => Error::SqlxError { source: e },
|
|
|
|
|
})?;
|
|
|
|
|
|
|
|
|
|
debug!(?partition_id, input_sort_key=?sort_key, partition_after_catalog_update=?partition, "Paritition after updating sort key");
|
|
|
|
|
debug!(
|
|
|
|
|
?partition_id,
|
|
|
|
|
input_sort_key=?sort_key,
|
|
|
|
|
partition_after_catalog_update=?partition,
|
|
|
|
|
"Partition after updating sort key"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Ok(partition)
|
|
|
|
|
}
|
|
|
|
@ -1294,7 +1304,7 @@ impl TombstoneRepo for PostgresTxn {
|
|
|
|
|
let v = sqlx::query_as::<_, Tombstone>(
|
|
|
|
|
r#"
|
|
|
|
|
INSERT INTO tombstone
|
|
|
|
|
( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate )
|
|
|
|
|
( table_id, shard_id, sequence_number, min_time, max_time, serialized_predicate )
|
|
|
|
|
VALUES
|
|
|
|
|
( $1, $2, $3, $4, $5, $6 )
|
|
|
|
|
ON CONFLICT ON CONSTRAINT tombstone_unique
|
|
|
|
@ -1346,7 +1356,7 @@ RETURNING *;
|
|
|
|
|
SELECT
|
|
|
|
|
tombstone.id as id,
|
|
|
|
|
tombstone.table_id as table_id,
|
|
|
|
|
tombstone.sequencer_id as sequencer_id,
|
|
|
|
|
tombstone.shard_id as shard_id,
|
|
|
|
|
tombstone.sequence_number as sequence_number,
|
|
|
|
|
tombstone.min_time as min_time,
|
|
|
|
|
tombstone.max_time as max_time,
|
|
|
|
@ -1407,7 +1417,7 @@ WHERE id = $1;
|
|
|
|
|
r#"
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM tombstone
|
|
|
|
|
WHERE sequencer_id = $1
|
|
|
|
|
WHERE shard_id = $1
|
|
|
|
|
AND sequence_number > $2
|
|
|
|
|
ORDER BY id;
|
|
|
|
|
"#,
|
|
|
|
@ -1463,7 +1473,7 @@ WHERE id = ANY($1);
|
|
|
|
|
r#"
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM tombstone
|
|
|
|
|
WHERE sequencer_id = $1
|
|
|
|
|
WHERE shard_id = $1
|
|
|
|
|
AND table_id = $2
|
|
|
|
|
AND sequence_number > $3
|
|
|
|
|
AND ((min_time <= $4 AND max_time >= $4)
|
|
|
|
@ -1504,7 +1514,7 @@ impl ParquetFileRepo for PostgresTxn {
|
|
|
|
|
let rec = sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
INSERT INTO parquet_file (
|
|
|
|
|
sequencer_id, table_id, partition_id, object_store_id,
|
|
|
|
|
shard_id, table_id, partition_id, object_store_id,
|
|
|
|
|
max_sequence_number, min_time, max_time, file_size_bytes,
|
|
|
|
|
row_count, compaction_level, created_at, namespace_id, column_set )
|
|
|
|
|
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
|
|
|
|
@ -1561,11 +1571,11 @@ RETURNING *;
|
|
|
|
|
// `parquet_metadata` column!!
|
|
|
|
|
sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
|
|
|
|
row_count, compaction_level, created_at, column_set
|
|
|
|
|
FROM parquet_file
|
|
|
|
|
WHERE sequencer_id = $1
|
|
|
|
|
WHERE shard_id = $1
|
|
|
|
|
AND max_sequence_number > $2
|
|
|
|
|
ORDER BY id;
|
|
|
|
|
"#,
|
|
|
|
@ -1585,7 +1595,7 @@ ORDER BY id;
|
|
|
|
|
// `parquet_metadata` column!!
|
|
|
|
|
sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT parquet_file.id, parquet_file.sequencer_id, parquet_file.namespace_id,
|
|
|
|
|
SELECT parquet_file.id, parquet_file.shard_id, parquet_file.namespace_id,
|
|
|
|
|
parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id,
|
|
|
|
|
parquet_file.max_sequence_number, parquet_file.min_time,
|
|
|
|
|
parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes,
|
|
|
|
@ -1607,7 +1617,7 @@ WHERE table_name.namespace_id = $1
|
|
|
|
|
// `parquet_metadata` column!!
|
|
|
|
|
sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
|
|
|
|
row_count, compaction_level, created_at, column_set
|
|
|
|
|
FROM parquet_file
|
|
|
|
@ -1642,11 +1652,11 @@ RETURNING *;
|
|
|
|
|
// `parquet_metadata` column!!
|
|
|
|
|
sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
|
|
|
|
row_count, compaction_level, created_at, column_set
|
|
|
|
|
FROM parquet_file
|
|
|
|
|
WHERE parquet_file.sequencer_id = $1
|
|
|
|
|
WHERE parquet_file.shard_id = $1
|
|
|
|
|
AND parquet_file.compaction_level = 0
|
|
|
|
|
AND parquet_file.to_delete IS NULL
|
|
|
|
|
LIMIT 1000;
|
|
|
|
@ -1668,11 +1678,11 @@ WHERE parquet_file.sequencer_id = $1
|
|
|
|
|
// `parquet_metadata` column!!
|
|
|
|
|
sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
|
|
|
|
row_count, compaction_level, created_at, column_set
|
|
|
|
|
FROM parquet_file
|
|
|
|
|
WHERE parquet_file.sequencer_id = $1
|
|
|
|
|
WHERE parquet_file.shard_id = $1
|
|
|
|
|
AND parquet_file.table_id = $2
|
|
|
|
|
AND parquet_file.partition_id = $3
|
|
|
|
|
AND parquet_file.compaction_level = $4
|
|
|
|
@ -1709,10 +1719,10 @@ WHERE parquet_file.sequencer_id = $1
|
|
|
|
|
// and index on (shard_id, comapction_level, to_delete, created_at)
|
|
|
|
|
sqlx::query_as::<_, PartitionParam>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id)
|
|
|
|
|
SELECT partition_id, shard_id, namespace_id, table_id, count(id)
|
|
|
|
|
FROM parquet_file
|
|
|
|
|
WHERE compaction_level = 0 and to_delete is null
|
|
|
|
|
and sequencer_id = $1
|
|
|
|
|
and shard_id = $1
|
|
|
|
|
and to_timestamp(created_at/1000000000) > now() - ($2 || 'hour')::interval
|
|
|
|
|
group by 1, 2, 3, 4
|
|
|
|
|
having count(id) >= $3
|
|
|
|
@ -1742,11 +1752,11 @@ limit $4;
|
|
|
|
|
// We have index on (shard_id, comapction_level, to_delete)
|
|
|
|
|
sqlx::query_as::<_, PartitionParam>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id), max(created_at)
|
|
|
|
|
SELECT partition_id, shard_id, namespace_id, table_id, count(id), max(created_at)
|
|
|
|
|
FROM parquet_file
|
|
|
|
|
WHERE compaction_level = 0
|
|
|
|
|
AND to_delete IS NULL
|
|
|
|
|
AND sequencer_id = $1
|
|
|
|
|
AND shard_id = $1
|
|
|
|
|
GROUP BY 1, 2, 3, 4
|
|
|
|
|
HAVING to_timestamp(max(created_at)/1000000000) < now() - ($2 || 'hour')::interval
|
|
|
|
|
ORDER BY 5 DESC
|
|
|
|
@ -1769,7 +1779,7 @@ LIMIT $3;
|
|
|
|
|
// `parquet_metadata` column!!
|
|
|
|
|
sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
|
|
|
|
row_count, compaction_level, created_at, column_set
|
|
|
|
|
FROM parquet_file
|
|
|
|
@ -1843,7 +1853,7 @@ RETURNING id;
|
|
|
|
|
SELECT count(1) as count
|
|
|
|
|
FROM parquet_file
|
|
|
|
|
WHERE table_id = $1
|
|
|
|
|
AND sequencer_id = $2
|
|
|
|
|
AND shard_id = $2
|
|
|
|
|
AND max_sequence_number < $3
|
|
|
|
|
AND parquet_file.to_delete IS NULL
|
|
|
|
|
AND compaction_level = 0
|
|
|
|
@ -1875,7 +1885,7 @@ WHERE table_id = $1
|
|
|
|
|
SELECT count(1) as count
|
|
|
|
|
FROM parquet_file
|
|
|
|
|
WHERE table_id = $1
|
|
|
|
|
AND sequencer_id = $2
|
|
|
|
|
AND shard_id = $2
|
|
|
|
|
AND parquet_file.to_delete IS NULL
|
|
|
|
|
AND compaction_level = 1
|
|
|
|
|
AND ((parquet_file.min_time <= $3 AND parquet_file.max_time >= $3)
|
|
|
|
@ -1901,7 +1911,7 @@ WHERE table_id = $1
|
|
|
|
|
// `parquet_metadata` column!!
|
|
|
|
|
let rec = sqlx::query_as::<_, ParquetFile>(
|
|
|
|
|
r#"
|
|
|
|
|
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
|
|
|
|
|
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
|
|
|
|
row_count, compaction_level, created_at, column_set
|
|
|
|
|
FROM parquet_file
|
|
|
|
|