feat: Abstract over which partition ID type we're using to list Parquet files

pull/24376/head
Carol (Nichols || Goulding) 2023-06-26 13:44:34 -04:00
parent c1e42651ec
commit 22c17fb970
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
9 changed files with 78 additions and 37 deletions

View File

@ -5,7 +5,7 @@ use std::{
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{ParquetFile, PartitionId};
use data_types::{ParquetFile, PartitionId, TransitionPartitionId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::warn;
@ -61,7 +61,7 @@ impl CatalogQuerier for Arc<dyn Catalog> {
self.repositories()
.await
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
.await
}
}

View File

@ -542,7 +542,7 @@ mod tests {
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> iox_catalog::interface::Result<Vec<ParquetFile>> {
self.inner
.list_by_partition_not_to_delete(partition_id)

View File

@ -16,7 +16,7 @@ mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{CompactionLevel, ParquetFile};
use data_types::{CompactionLevel, ParquetFile, TransitionPartitionId};
use futures::TryStreamExt;
use iox_catalog::{
interface::{get_schema_by_id, Catalog, SoftDeletedRows},
@ -243,7 +243,7 @@ mod tests {
.repositories()
.await
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
.await
.expect("query for parquet files failed");
@ -392,7 +392,7 @@ mod tests {
.repositories()
.await
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
.await
.expect("query for parquet files failed");

View File

@ -483,7 +483,7 @@ pub trait ParquetFileRepo: Send + Sync {
/// [`to_delete`](ParquetFile::to_delete).
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>>;
/// Return the parquet file with the given object store id
@ -2680,6 +2680,7 @@ pub(crate) mod test_helpers {
let other_partition_params = ParquetFileParams {
partition_id: partition2.id,
partition_hash_id: partition2.hash_id().cloned(),
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
};
@ -2691,14 +2692,16 @@ pub(crate) mod test_helpers {
let files = repos
.parquet_files()
.list_by_partition_not_to_delete(partition.id)
.list_by_partition_not_to_delete(&partition.transition_partition_id())
.await
.unwrap();
// not asserting against a vector literal to guard against flakiness due to uncertain
// ordering of SQL query in postgres impl
assert_eq!(files.len(), 2);
assert_matches!(files.iter().find(|f| f.id == parquet_file.id), Some(_));
assert_matches!(files.iter().find(|f| f.id == level1_file.id), Some(_));
let mut file_ids: Vec<_> = files.into_iter().map(|f| f.id).collect();
file_ids.sort();
let mut expected_ids = vec![parquet_file.id, level1_file.id];
expected_ids.sort();
assert_eq!(file_ids, expected_ids);
// remove namespace to avoid it from affecting later tests
repos

View File

@ -850,14 +850,20 @@ impl ParquetFileRepo for MemTxn {
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.filter(|f| f.partition_id == partition_id && f.to_delete.is_none())
.filter(|f| match partition_id {
TransitionPartitionId::Deterministic(hash_id) => {
f.partition_hash_id.as_ref().map_or(false, |h| h == hash_id)
}
TransitionPartitionId::Deprecated(id) => f.partition_id == *id,
})
.filter(|f| f.to_delete.is_none())
.cloned()
.collect())
}

View File

@ -193,7 +193,7 @@ decorate!(
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
"parquet_delete_old_ids_only" = delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>>;
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: &TransitionPartitionId) -> Result<Vec<ParquetFile>>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"parquet_exists_by_object_store_id_batch" = exists_by_object_store_id_batch(&mut self, object_store_ids: Vec<Uuid>) -> Result<Vec<Uuid>>;
"parquet_create_upgrade_delete" = create_upgrade_delete(&mut self, delete: &[ParquetFileId], upgrade: &[ParquetFileId], create: &[ParquetFileParams], target_level: CompactionLevel) -> Result<Vec<ParquetFileId>>;

View File

@ -1472,10 +1472,23 @@ RETURNING id;
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(
r#"
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
FROM parquet_file
WHERE parquet_file.partition_hash_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(hash_id), // $1
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
@ -1483,11 +1496,14 @@ FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(partition_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
)
.bind(id), // $1
};
query
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn get_by_object_store_id(

View File

@ -1338,10 +1338,23 @@ RETURNING id;
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>> {
Ok(sqlx::query_as::<_, ParquetFilePod>(
r#"
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFilePod>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
FROM parquet_file
WHERE parquet_file.partition_hash_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(hash_id), // $1
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFilePod>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
@ -1349,14 +1362,17 @@ FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(partition_id) // $1
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
)
.bind(id), // $1
};
Ok(query
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn get_by_object_store_id(

View File

@ -18,7 +18,7 @@
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
use data_types::{PartitionId, TableId};
use data_types::{PartitionId, TableId, TransitionPartitionId};
use generated_types::influxdata::iox::catalog::v1::*;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
@ -47,11 +47,11 @@ impl catalog_service_server::CatalogService for CatalogService {
) -> Result<Response<GetParquetFilesByPartitionIdResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let partition_id = PartitionId::new(req.partition_id);
let partition_id = TransitionPartitionId::Deprecated(PartitionId::new(req.partition_id));
let parquet_files = repos
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&partition_id)
.await
.map_err(|e| {
warn!(error=%e, %req.partition_id, "failed to get parquet_files for partition");