fix: Make SQL queries more consistent

- Use "SELECT *" when possible
- Left align
- Wrap at 100 chars
- Include semicolon
pull/24376/head
Carol (Nichols || Goulding) 2022-03-13 20:28:10 -04:00
parent 8888e4c3a2
commit 268138ceef
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 226 additions and 203 deletions

View File

@ -174,8 +174,8 @@ impl Drop for PostgresTxn {
if let PostgresTxnInner::Txn(Some(_)) = self.inner { if let PostgresTxnInner::Txn(Some(_)) = self.inner {
warn!("Dropping PostgresTxn w/o finalizing (commit or abort)"); warn!("Dropping PostgresTxn w/o finalizing (commit or abort)");
// SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so we don't need to spawn // SQLx ensures that the inner transaction enqueues a rollback when it is dropped, so
// a task here to call `rollback` manually. // we don't need to spawn a task here to call `rollback` manually.
} }
} }
} }
@ -214,15 +214,15 @@ impl TransactionFinalize for PostgresTxn {
#[async_trait] #[async_trait]
impl Catalog for PostgresCatalog { impl Catalog for PostgresCatalog {
async fn setup(&self) -> Result<(), Error> { async fn setup(&self) -> Result<(), Error> {
// We need to create the schema if we're going to set it as the first item of the search_path // We need to create the schema if we're going to set it as the first item of the
// otherwise when we run the sqlx migration scripts for the first time, sqlx will create the // search_path otherwise when we run the sqlx migration scripts for the first time, sqlx
// `_sqlx_migrations` table in the public namespace (the only namespace that exists), but the second // will create the `_sqlx_migrations` table in the public namespace (the only namespace
// time it will create it in the `<schema_name>` namespace and re-run all the migrations without // that exists), but the second time it will create it in the `<schema_name>` namespace and
// skipping the ones already applied (see #3893). // re-run all the migrations without skipping the ones already applied (see #3893).
// //
// This makes the migrations/20210217134322_create_schema.sql step unnecessary; we need to keep that // This makes the migrations/20210217134322_create_schema.sql step unnecessary; we need to
// file because migration files are immutable. // keep that file because migration files are immutable.
let create_schema_query = format!("CREATE SCHEMA IF NOT EXISTS {}", &self.schema_name); let create_schema_query = format!("CREATE SCHEMA IF NOT EXISTS {};", &self.schema_name);
self.pool self.pool
.execute(sqlx::query(&create_schema_query)) .execute(sqlx::query(&create_schema_query))
.await .await
@ -293,16 +293,16 @@ async fn new_raw_pool(
// set as part of the DSN, and we can set it explictly. // set as part of the DSN, and we can set it explictly.
// Recall that this block is running on connection, not when creating the pool! // Recall that this block is running on connection, not when creating the pool!
let current_application_name: String = let current_application_name: String =
sqlx::query_scalar("SELECT current_setting('application_name')") sqlx::query_scalar("SELECT current_setting('application_name');")
.fetch_one(&mut *c) .fetch_one(&mut *c)
.await?; .await?;
if current_application_name.is_empty() { if current_application_name.is_empty() {
sqlx::query("SELECT set_config('application_name', $1, false)") sqlx::query("SELECT set_config('application_name', $1, false);")
.bind(&*app_name) .bind(&*app_name)
.execute(&mut *c) .execute(&mut *c)
.await?; .await?;
} }
let search_path_query = format!("SET search_path TO {},public", schema_name); let search_path_query = format!("SET search_path TO {},public;", schema_name);
c.execute(sqlx::query(&search_path_query)).await?; c.execute(sqlx::query(&search_path_query)).await?;
Ok(()) Ok(())
}) })
@ -395,7 +395,12 @@ async fn new_pool(
current_dsn = new_dsn; current_dsn = new_dsn;
} }
Err(e) => { Err(e) => {
warn!(error=%e, filename=%dsn_file, "not replacing hotswap pool because of an error connecting to the new DSN"); warn!(
error=%e,
filename=%dsn_file,
"not replacing hotswap pool because of an error \
connecting to the new DSN"
);
} }
} }
} }
@ -467,7 +472,8 @@ impl KafkaTopicRepo for PostgresTxn {
INSERT INTO kafka_topic ( name ) INSERT INTO kafka_topic ( name )
VALUES ( $1 ) VALUES ( $1 )
ON CONFLICT ON CONSTRAINT kafka_topic_name_unique ON CONFLICT ON CONSTRAINT kafka_topic_name_unique
DO UPDATE SET name = kafka_topic.name RETURNING *; DO UPDATE SET name = kafka_topic.name
RETURNING *;
"#, "#,
) )
.bind(&name) // $1 .bind(&name) // $1
@ -481,7 +487,9 @@ DO UPDATE SET name = kafka_topic.name RETURNING *;
async fn get_by_name(&mut self, name: &str) -> Result<Option<KafkaTopic>> { async fn get_by_name(&mut self, name: &str) -> Result<Option<KafkaTopic>> {
let rec = sqlx::query_as::<_, KafkaTopic>( let rec = sqlx::query_as::<_, KafkaTopic>(
r#" r#"
SELECT * FROM kafka_topic WHERE name = $1; SELECT *
FROM kafka_topic
WHERE name = $1;
"#, "#,
) )
.bind(&name) // $1 .bind(&name) // $1
@ -506,7 +514,8 @@ impl QueryPoolRepo for PostgresTxn {
INSERT INTO query_pool ( name ) INSERT INTO query_pool ( name )
VALUES ( $1 ) VALUES ( $1 )
ON CONFLICT ON CONSTRAINT query_pool_name_unique ON CONFLICT ON CONSTRAINT query_pool_name_unique
DO UPDATE SET name = query_pool.name RETURNING *; DO UPDATE SET name = query_pool.name
RETURNING *;
"#, "#,
) )
.bind(&name) // $1 .bind(&name) // $1
@ -531,7 +540,7 @@ impl NamespaceRepo for PostgresTxn {
r#" r#"
INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id ) INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id )
VALUES ( $1, $2, $3, $4 ) VALUES ( $1, $2, $3, $4 )
RETURNING * RETURNING *;
"#, "#,
) )
.bind(&name) // $1 .bind(&name) // $1
@ -558,7 +567,8 @@ RETURNING *
async fn list(&mut self) -> Result<Vec<Namespace>> { async fn list(&mut self) -> Result<Vec<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>( let rec = sqlx::query_as::<_, Namespace>(
r#" r#"
SELECT * FROM namespace; SELECT *
FROM namespace;
"#, "#,
) )
.fetch_all(&mut self.inner) .fetch_all(&mut self.inner)
@ -571,7 +581,9 @@ SELECT * FROM namespace;
async fn get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>> { async fn get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>( let rec = sqlx::query_as::<_, Namespace>(
r#" r#"
SELECT * FROM namespace WHERE id = $1; SELECT *
FROM namespace
WHERE id = $1;
"#, "#,
) )
.bind(&id) // $1 .bind(&id) // $1
@ -590,7 +602,9 @@ SELECT * FROM namespace WHERE id = $1;
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> { async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>( let rec = sqlx::query_as::<_, Namespace>(
r#" r#"
SELECT * FROM namespace WHERE name = $1; SELECT *
FROM namespace
WHERE name = $1;
"#, "#,
) )
.bind(&name) // $1 .bind(&name) // $1
@ -609,7 +623,10 @@ SELECT * FROM namespace WHERE name = $1;
async fn update_table_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace> { async fn update_table_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>( let rec = sqlx::query_as::<_, Namespace>(
r#" r#"
UPDATE namespace SET max_tables = $1 WHERE name = $2 RETURNING *; UPDATE namespace
SET max_tables = $1
WHERE name = $2
RETURNING *;
"#, "#,
) )
.bind(&new_max) .bind(&new_max)
@ -630,7 +647,10 @@ UPDATE namespace SET max_tables = $1 WHERE name = $2 RETURNING *;
async fn update_column_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace> { async fn update_column_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>( let rec = sqlx::query_as::<_, Namespace>(
r#" r#"
UPDATE namespace SET max_columns_per_table = $1 WHERE name = $2 RETURNING *; UPDATE namespace
SET max_columns_per_table = $1
WHERE name = $2
RETURNING *;
"#, "#,
) )
.bind(&new_max) .bind(&new_max)
@ -657,7 +677,8 @@ impl TableRepo for PostgresTxn {
INSERT INTO table_name ( name, namespace_id ) INSERT INTO table_name ( name, namespace_id )
VALUES ( $1, $2 ) VALUES ( $1, $2 )
ON CONFLICT ON CONSTRAINT table_name_unique ON CONFLICT ON CONSTRAINT table_name_unique
DO UPDATE SET name = table_name.name RETURNING *; DO UPDATE SET name = table_name.name
RETURNING *;
"#, "#,
) )
.bind(&name) // $1 .bind(&name) // $1
@ -676,10 +697,16 @@ DO UPDATE SET name = table_name.name RETURNING *;
} }
async fn get_by_id(&mut self, table_id: TableId) -> Result<Option<Table>> { async fn get_by_id(&mut self, table_id: TableId) -> Result<Option<Table>> {
let rec = sqlx::query_as::<_, Table>(r#"SELECT * FROM table_name WHERE id = $1;"#) let rec = sqlx::query_as::<_, Table>(
.bind(&table_id) // $1 r#"
.fetch_one(&mut self.inner) SELECT *
.await; FROM table_name
WHERE id = $1;
"#,
)
.bind(&table_id) // $1
.fetch_one(&mut self.inner)
.await;
if let Err(sqlx::Error::RowNotFound) = rec { if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None); return Ok(None);
@ -693,7 +720,8 @@ DO UPDATE SET name = table_name.name RETURNING *;
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>> { async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>> {
let rec = sqlx::query_as::<_, Table>( let rec = sqlx::query_as::<_, Table>(
r#" r#"
SELECT * FROM table_name SELECT *
FROM table_name
WHERE namespace_id = $1; WHERE namespace_id = $1;
"#, "#,
) )
@ -713,30 +741,32 @@ WHERE namespace_id = $1;
) -> Result<Option<TablePersistInfo>> { ) -> Result<Option<TablePersistInfo>> {
let rec = sqlx::query_as::<_, TablePersistInfo>( let rec = sqlx::query_as::<_, TablePersistInfo>(
r#" r#"
WITH tid as (SELECT id FROM table_name WHERE name = $2 AND namespace_id = $3) WITH tid as (SELECT id FROM table_name WHERE name = $2 AND namespace_id = $3)
SELECT $1 as sequencer_id, id as table_id, parquet_file.max_sequence_number AS parquet_max_sequence_number, SELECT $1 as sequencer_id, id as table_id,
tombstone.sequence_number as tombstone_max_sequence_number parquet_file.max_sequence_number AS parquet_max_sequence_number,
FROM tid tombstone.sequence_number as tombstone_max_sequence_number
LEFT JOIN ( FROM tid
SELECT tombstone.table_id, sequence_number LEFT JOIN (
FROM tombstone SELECT tombstone.table_id, sequence_number
WHERE sequencer_id = $1 AND tombstone.table_id = (SELECT id FROM tid) FROM tombstone
ORDER BY sequence_number DESC WHERE sequencer_id = $1 AND tombstone.table_id = (SELECT id FROM tid)
LIMIT 1 ORDER BY sequence_number DESC
) tombstone ON tombstone.table_id = tid.id LIMIT 1
LEFT JOIN ( ) tombstone ON tombstone.table_id = tid.id
SELECT parquet_file.table_id, max_sequence_number LEFT JOIN (
FROM parquet_file SELECT parquet_file.table_id, max_sequence_number
WHERE parquet_file.sequencer_id = $1 AND parquet_file.table_id = (SELECT id from tid) FROM parquet_file
ORDER BY max_sequence_number DESC WHERE parquet_file.sequencer_id = $1 AND parquet_file.table_id = (SELECT id from tid)
LIMIT 1 ORDER BY max_sequence_number DESC
) parquet_file ON parquet_file.table_id = tid.id; LIMIT 1
"#) ) parquet_file ON parquet_file.table_id = tid.id;
.bind(&sequencer_id) // $1 "#,
.bind(&table_name) // $2 )
.bind(&namespace_id) // $3 .bind(&sequencer_id) // $1
.fetch_one(&mut self.inner) .bind(&table_name) // $2
.await; .bind(&namespace_id) // $3
.fetch_one(&mut self.inner)
.await;
if let Err(sqlx::Error::RowNotFound) = rec { if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None); return Ok(None);
@ -763,7 +793,8 @@ impl ColumnRepo for PostgresTxn {
INSERT INTO column_name ( name, table_id, column_type ) INSERT INTO column_name ( name, table_id, column_type )
VALUES ( $1, $2, $3 ) VALUES ( $1, $2, $3 )
ON CONFLICT ON CONSTRAINT column_name_unique ON CONFLICT ON CONSTRAINT column_name_unique
DO UPDATE SET name = column_name.name RETURNING *; DO UPDATE SET name = column_name.name
RETURNING *;
"#, "#,
) )
.bind(&name) // $1 .bind(&name) // $1
@ -821,18 +852,19 @@ WHERE table_name.namespace_id = $1;
let out = sqlx::query_as::<_, Column>( let out = sqlx::query_as::<_, Column>(
r#" r#"
INSERT INTO column_name ( name, table_id, column_type ) INSERT INTO column_name ( name, table_id, column_type )
SELECT name, table_id, column_type FROM UNNEST($1, $2, $3) as a(name, table_id, column_type) SELECT name, table_id, column_type FROM UNNEST($1, $2, $3) as a(name, table_id, column_type)
ON CONFLICT ON CONSTRAINT column_name_unique ON CONFLICT ON CONSTRAINT column_name_unique
DO UPDATE SET name = column_name.name DO UPDATE SET name = column_name.name
RETURNING * RETURNING *;
"#, "#,
) )
.bind(&v_name) .bind(&v_name)
.bind(&v_table_id) .bind(&v_table_id)
.bind(&v_column_type) .bind(&v_column_type)
.fetch_all(&mut self.inner) .fetch_all(&mut self.inner)
.await.map_err(|e| { .await
.map_err(|e| {
if is_fk_violation(&e) { if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e } Error::ForeignKeyViolation { source: e }
} else { } else {
@ -869,12 +901,13 @@ impl SequencerRepo for PostgresTxn {
) -> Result<Sequencer> { ) -> Result<Sequencer> {
sqlx::query_as::<_, Sequencer>( sqlx::query_as::<_, Sequencer>(
r#" r#"
INSERT INTO sequencer INSERT INTO sequencer
( kafka_topic_id, kafka_partition, min_unpersisted_sequence_number ) ( kafka_topic_id, kafka_partition, min_unpersisted_sequence_number )
VALUES VALUES
( $1, $2, 0 ) ( $1, $2, 0 )
ON CONFLICT ON CONSTRAINT sequencer_unique ON CONFLICT ON CONSTRAINT sequencer_unique
DO UPDATE SET kafka_topic_id = sequencer.kafka_topic_id RETURNING *; DO UPDATE SET kafka_topic_id = sequencer.kafka_topic_id
RETURNING *;;
"#, "#,
) )
.bind(&topic.id) // $1 .bind(&topic.id) // $1
@ -897,7 +930,10 @@ impl SequencerRepo for PostgresTxn {
) -> Result<Option<Sequencer>> { ) -> Result<Option<Sequencer>> {
let rec = sqlx::query_as::<_, Sequencer>( let rec = sqlx::query_as::<_, Sequencer>(
r#" r#"
SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2; SELECT *
FROM sequencer
WHERE kafka_topic_id = $1
AND kafka_partition = $2;
"#, "#,
) )
.bind(topic_id) // $1 .bind(topic_id) // $1
@ -957,12 +993,13 @@ impl PartitionRepo for PostgresTxn {
) -> Result<Partition> { ) -> Result<Partition> {
let v = sqlx::query_as::<_, Partition>( let v = sqlx::query_as::<_, Partition>(
r#" r#"
INSERT INTO partition INSERT INTO partition
( partition_key, sequencer_id, table_id ) ( partition_key, sequencer_id, table_id )
VALUES VALUES
( $1, $2, $3 ) ( $1, $2, $3 )
ON CONFLICT ON CONSTRAINT partition_key_unique ON CONFLICT ON CONSTRAINT partition_key_unique
DO UPDATE SET partition_key = partition.partition_key RETURNING *; DO UPDATE SET partition_key = partition.partition_key
RETURNING *;
"#, "#,
) )
.bind(key) // $1 .bind(key) // $1
@ -1014,16 +1051,21 @@ impl PartitionRepo for PostgresTxn {
} }
async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>> { async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>> {
sqlx::query_as::<_, Partition>(r#" sqlx::query_as::<_, Partition>(
SELECT partition.id as id, partition.sequencer_id as sequencer_id, partition.table_id as table_id, partition.partition_key as partition_key r#"
SELECT partition.id as id,
partition.sequencer_id as sequencer_id,
partition.table_id as table_id,
partition.partition_key as partition_key
FROM table_name FROM table_name
INNER JOIN partition on partition.table_id = table_name.id INNER JOIN partition on partition.table_id = table_name.id
WHERE table_name.namespace_id = $1; WHERE table_name.namespace_id = $1;
"#) "#,
.bind(&namespace_id) // $1 )
.fetch_all(&mut self.inner) .bind(&namespace_id) // $1
.await .fetch_all(&mut self.inner)
.map_err(|e| Error::SqlxError { source: e }) .await
.map_err(|e| Error::SqlxError { source: e })
} }
async fn partition_info_by_id( async fn partition_info_by_id(
@ -1032,12 +1074,13 @@ WHERE table_name.namespace_id = $1;
) -> Result<Option<PartitionInfo>> { ) -> Result<Option<PartitionInfo>> {
let info = sqlx::query( let info = sqlx::query(
r#" r#"
SELECT namespace.name as namespace_name, table_name.name as table_name, partition.id, SELECT namespace.name as namespace_name, table_name.name as table_name, partition.id,
partition.sequencer_id, partition.table_id, partition.partition_key partition.sequencer_id, partition.table_id, partition.partition_key
FROM partition FROM partition
INNER JOIN table_name on table_name.id = partition.table_id INNER JOIN table_name on table_name.id = partition.table_id
INNER JOIN namespace on namespace.id = table_name.namespace_id INNER JOIN namespace on namespace.id = table_name.namespace_id
WHERE partition.id = $1;"#, WHERE partition.id = $1;
"#,
) )
.bind(&partition_id) // $1 .bind(&partition_id) // $1
.fetch_one(&mut self.inner) .fetch_one(&mut self.inner)
@ -1074,12 +1117,13 @@ impl TombstoneRepo for PostgresTxn {
) -> Result<Tombstone> { ) -> Result<Tombstone> {
let v = sqlx::query_as::<_, Tombstone>( let v = sqlx::query_as::<_, Tombstone>(
r#" r#"
INSERT INTO tombstone INSERT INTO tombstone
( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate ) ( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate )
VALUES VALUES
( $1, $2, $3, $4, $5, $6 ) ( $1, $2, $3, $4, $5, $6 )
ON CONFLICT ON CONSTRAINT tombstone_unique ON CONFLICT ON CONSTRAINT tombstone_unique
DO UPDATE SET table_id = tombstone.table_id RETURNING *; DO UPDATE SET table_id = tombstone.table_id
RETURNING *;
"#, "#,
) )
.bind(&table_id) // $1 .bind(&table_id) // $1
@ -1147,12 +1191,20 @@ WHERE table_name.namespace_id = $1;
sequencer_id: SequencerId, sequencer_id: SequencerId,
sequence_number: SequenceNumber, sequence_number: SequenceNumber,
) -> Result<Vec<Tombstone>> { ) -> Result<Vec<Tombstone>> {
sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#) sqlx::query_as::<_, Tombstone>(
.bind(&sequencer_id) // $1 r#"
.bind(&sequence_number) // $2 SELECT *
.fetch_all(&mut self.inner) FROM tombstone
.await WHERE sequencer_id = $1
.map_err(|e| Error::SqlxError { source: e }) AND sequence_number > $2
ORDER BY id;
"#,
)
.bind(&sequencer_id) // $1
.bind(&sequence_number) // $2
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
} }
} }
@ -1176,37 +1228,38 @@ impl ParquetFileRepo for PostgresTxn {
let rec = sqlx::query_as::<_, ParquetFile>( let rec = sqlx::query_as::<_, ParquetFile>(
r#" r#"
INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, parquet_metadata, row_count, compaction_level, created_at ) INSERT INTO parquet_file (
sequencer_id, table_id, partition_id, object_store_id, min_sequence_number,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes, parquet_metadata,
row_count, compaction_level, created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, false, $9, $10, $11, $12, $13 ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, false, $9, $10, $11, $12, $13 )
RETURNING * RETURNING *;
"#, "#,
) )
.bind(sequencer_id) // $1 .bind(sequencer_id) // $1
.bind(table_id) // $2 .bind(table_id) // $2
.bind(partition_id) // $3 .bind(partition_id) // $3
.bind(object_store_id) // $4 .bind(object_store_id) // $4
.bind(min_sequence_number) // $5 .bind(min_sequence_number) // $5
.bind(max_sequence_number) // $6 .bind(max_sequence_number) // $6
.bind(min_time) // $7 .bind(min_time) // $7
.bind(max_time) // $8 .bind(max_time) // $8
.bind(file_size_bytes) // $9 .bind(file_size_bytes) // $9
.bind(parquet_metadata) // $10 .bind(parquet_metadata) // $10
.bind(row_count) // $11 .bind(row_count) // $11
.bind(INITIAL_COMPACTION_LEVEL) // $12 .bind(INITIAL_COMPACTION_LEVEL) // $12
.bind(created_at) // $13 .bind(created_at) // $13
.fetch_one(&mut self.inner) .fetch_one(&mut self.inner)
.await .await
.map_err(|e| { .map_err(|e| {
if is_unique_violation(&e) { if is_unique_violation(&e) {
Error::FileExists { Error::FileExists { object_store_id }
object_store_id, } else if is_fk_violation(&e) {
} Error::ForeignKeyViolation { source: e }
} else if is_fk_violation(&e) { } else {
Error::ForeignKeyViolation { source: e } Error::SqlxError { source: e }
} else { }
Error::SqlxError { source: e } })?;
}
})?;
Ok(rec) Ok(rec)
} }
@ -1226,12 +1279,20 @@ RETURNING *
sequencer_id: SequencerId, sequencer_id: SequencerId,
sequence_number: SequenceNumber, sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>> { ) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(r#"SELECT * FROM parquet_file WHERE sequencer_id = $1 AND max_sequence_number > $2 ORDER BY id;"#) sqlx::query_as::<_, ParquetFile>(
.bind(&sequencer_id) // $1 r#"
.bind(&sequence_number) // $2 SELECT *
.fetch_all(&mut self.inner) FROM parquet_file
.await WHERE sequencer_id = $1
.map_err(|e| Error::SqlxError { source: e }) AND max_sequence_number > $2
ORDER BY id;
"#,
)
.bind(&sequencer_id) // $1
.bind(&sequence_number) // $2
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
} }
async fn list_by_namespace_not_to_delete( async fn list_by_namespace_not_to_delete(
@ -1240,25 +1301,11 @@ RETURNING *
) -> Result<Vec<ParquetFile>> { ) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>( sqlx::query_as::<_, ParquetFile>(
r#" r#"
SELECT SELECT parquet_file.*
parquet_file.id as id,
parquet_file.sequencer_id as sequencer_id,
parquet_file.table_id as table_id,
parquet_file.partition_id as partition_id,
parquet_file.object_store_id as object_store_id,
parquet_file.min_sequence_number as min_sequence_number,
parquet_file.max_sequence_number as max_sequence_number,
parquet_file.min_time as min_time,
parquet_file.max_time as max_time,
parquet_file.to_delete as to_delete,
parquet_file.file_size_bytes as file_size_bytes,
parquet_file.parquet_metadata as parquet_metadata,
parquet_file.row_count as row_count,
parquet_file.compaction_level as compaction_level,
parquet_file.created_at as created_at
FROM parquet_file FROM parquet_file
INNER JOIN table_name on table_name.id = parquet_file.table_id INNER JOIN table_name on table_name.id = parquet_file.table_id
WHERE table_name.namespace_id = $1 AND parquet_file.to_delete = false; WHERE table_name.namespace_id = $1
AND parquet_file.to_delete = false;
"#, "#,
) )
.bind(&namespace_id) // $1 .bind(&namespace_id) // $1
@ -1270,27 +1317,12 @@ WHERE table_name.namespace_id = $1 AND parquet_file.to_delete = false;
async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>> { async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>( sqlx::query_as::<_, ParquetFile>(
r#" r#"
SELECT SELECT *
parquet_file.id as id,
parquet_file.sequencer_id as sequencer_id,
parquet_file.table_id as table_id,
parquet_file.partition_id as partition_id,
parquet_file.object_store_id as object_store_id,
parquet_file.min_sequence_number as min_sequence_number,
parquet_file.max_sequence_number as max_sequence_number,
parquet_file.min_time as min_time,
parquet_file.max_time as max_time,
parquet_file.to_delete as to_delete,
parquet_file.file_size_bytes as file_size_bytes,
parquet_file.parquet_metadata as parquet_metadata,
parquet_file.row_count as row_count,
parquet_file.compaction_level as compaction_level,
parquet_file.created_at as created_at
FROM parquet_file FROM parquet_file
WHERE parquet_file.sequencer_id = $1 WHERE parquet_file.sequencer_id = $1
AND parquet_file.compaction_level = 0 AND parquet_file.compaction_level = 0
AND parquet_file.to_delete = false; AND parquet_file.to_delete = false;
"#, "#,
) )
.bind(&sequencer_id) // $1 .bind(&sequencer_id) // $1
.fetch_all(&mut self.inner) .fetch_all(&mut self.inner)
@ -1306,22 +1338,7 @@ WHERE parquet_file.sequencer_id = $1
) -> Result<Vec<ParquetFile>> { ) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>( sqlx::query_as::<_, ParquetFile>(
r#" r#"
SELECT SELECT *
parquet_file.id as id,
parquet_file.sequencer_id as sequencer_id,
parquet_file.table_id as table_id,
parquet_file.partition_id as partition_id,
parquet_file.object_store_id as object_store_id,
parquet_file.min_sequence_number as min_sequence_number,
parquet_file.max_sequence_number as max_sequence_number,
parquet_file.min_time as min_time,
parquet_file.max_time as max_time,
parquet_file.to_delete as to_delete,
parquet_file.file_size_bytes as file_size_bytes,
parquet_file.parquet_metadata as parquet_metadata,
parquet_file.row_count as row_count,
parquet_file.compaction_level as compaction_level,
parquet_file.created_at as created_at
FROM parquet_file FROM parquet_file
WHERE parquet_file.sequencer_id = $1 WHERE parquet_file.sequencer_id = $1
AND parquet_file.table_id = $2 AND parquet_file.table_id = $2
@ -1329,8 +1346,8 @@ WHERE parquet_file.sequencer_id = $1
AND parquet_file.compaction_level = 1 AND parquet_file.compaction_level = 1
AND parquet_file.to_delete = false AND parquet_file.to_delete = false
AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $4) AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $4)
OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $5)) OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $5));
;"#, "#,
) )
.bind(&table_partition.sequencer_id) // $1 .bind(&table_partition.sequencer_id) // $1
.bind(&table_partition.table_id) // $2 .bind(&table_partition.table_id) // $2
@ -1351,11 +1368,11 @@ WHERE parquet_file.sequencer_id = $1
let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect(); let ids: Vec<_> = parquet_file_ids.iter().map(|p| p.get()).collect();
let updated = sqlx::query( let updated = sqlx::query(
r#" r#"
UPDATE parquet_file UPDATE parquet_file
SET compaction_level = 1 SET compaction_level = 1
WHERE id = ANY($1) WHERE id = ANY($1)
RETURNING id RETURNING id;
;"#, "#,
) )
.bind(&ids[..]) .bind(&ids[..])
.fetch_all(&mut self.inner) .fetch_all(&mut self.inner)
@ -1380,7 +1397,7 @@ WHERE parquet_file.sequencer_id = $1
async fn count(&mut self) -> Result<i64> { async fn count(&mut self) -> Result<i64> {
let read_result = let read_result =
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#) sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#)
.fetch_one(&mut self.inner) .fetch_one(&mut self.inner)
.await .await
.map_err(|e| Error::SqlxError { source: e })?; .map_err(|e| Error::SqlxError { source: e })?;
@ -1398,10 +1415,10 @@ impl ProcessedTombstoneRepo for PostgresTxn {
) -> Result<ProcessedTombstone> { ) -> Result<ProcessedTombstone> {
sqlx::query_as::<_, ProcessedTombstone>( sqlx::query_as::<_, ProcessedTombstone>(
r#" r#"
INSERT INTO processed_tombstone ( tombstone_id, parquet_file_id ) INSERT INTO processed_tombstone ( tombstone_id, parquet_file_id )
VALUES ( $1, $2 ) VALUES ( $1, $2 )
RETURNING * RETURNING *;
"#, "#,
) )
.bind(tombstone_id) // $1 .bind(tombstone_id) // $1
.bind(parquet_file_id) // $2 .bind(parquet_file_id) // $2
@ -1427,12 +1444,18 @@ impl ProcessedTombstoneRepo for PostgresTxn {
tombstone_id: TombstoneId, tombstone_id: TombstoneId,
) -> Result<bool> { ) -> Result<bool> {
let read_result = sqlx::query_as::<_, Count>( let read_result = sqlx::query_as::<_, Count>(
r#"SELECT count(*) as count FROM processed_tombstone WHERE parquet_file_id = $1 AND tombstone_id = $2;"#) r#"
.bind(&parquet_file_id) // $1 SELECT count(*) as count
.bind(&tombstone_id) // $2 FROM processed_tombstone
.fetch_one(&mut self.inner) WHERE parquet_file_id = $1
.await AND tombstone_id = $2;
.map_err(|e| Error::SqlxError { source: e })?; "#,
)
.bind(&parquet_file_id) // $1
.bind(&tombstone_id) // $2
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.count > 0) Ok(read_result.count > 0)
} }
@ -1991,9 +2014,9 @@ mod tests {
paste::paste! { paste::paste! {
#[tokio::test] #[tokio::test]
async fn [<test_column_create_or_get_many_ $name>]() { async fn [<test_column_create_or_get_many_ $name>]() {
// If running an integration test on your laptop, this requires that you have Postgres // If running an integration test on your laptop, this requires that you have
// running and that you've done the sqlx migrations. See the README in this crate for // Postgres running and that you've done the sqlx migrations. See the README in
// info to set it up. // this crate for info to set it up.
maybe_skip_integration!(); maybe_skip_integration!();
let postgres = setup_db().await; let postgres = setup_db().await;