feat: Add catalog method for looking up partitions by their hash ID (#8018)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Carol (Nichols || Goulding) 2023-06-23 10:42:50 -04:00 committed by GitHub
parent 13771c4616
commit 60d0858381
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 4 deletions

View File

@ -5,8 +5,8 @@ use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
TableSchema, Timestamp,
ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction,
Table, TableId, TableSchema, Timestamp,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -372,6 +372,12 @@ pub trait PartitionRepo: Send + Sync {
/// get partition by ID
async fn get_by_id(&mut self, partition_id: PartitionId) -> Result<Option<Partition>>;
/// get partition by deterministic hash ID
async fn get_by_hash_id(
&mut self,
partition_hash_id: &PartitionHashId,
) -> Result<Option<Partition>>;
/// return the partitions by table id
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
@ -1490,12 +1496,30 @@ pub(crate) mod test_helpers {
.unwrap()
.unwrap()
);
assert_eq!(
other_partition,
repos
.partitions()
.get_by_hash_id(other_partition.hash_id().unwrap())
.await
.unwrap()
.unwrap()
);
assert!(repos
.partitions()
.get_by_id(PartitionId::new(i64::MAX))
.await
.unwrap()
.is_none());
assert!(repos
.partitions()
.get_by_hash_id(&PartitionHashId::new(
TableId::new(i64::MAX),
&PartitionKey::from("arbitrary")
))
.await
.unwrap()
.is_none());
let listed = repos
.partitions()
@ -1554,6 +1578,16 @@ pub(crate) mod test_helpers {
updated_other_partition.sort_key,
vec!["tag2", "tag1", "time"]
);
let updated_other_partition = repos
.partitions()
.get_by_hash_id(other_partition.hash_id().unwrap())
.await
.unwrap()
.unwrap();
assert_eq!(
updated_other_partition.sort_key,
vec!["tag2", "tag1", "time"]
);
// test sort key CAS with no value
let err = repos
@ -1610,6 +1644,16 @@ pub(crate) mod test_helpers {
updated_other_partition.sort_key,
vec!["tag2", "tag1", "tag3 , with comma", "time"]
);
let updated_other_partition = repos
.partitions()
.get_by_hash_id(other_partition.hash_id().unwrap())
.await
.unwrap()
.unwrap();
assert_eq!(
updated_other_partition.sort_key,
vec!["tag2", "tag1", "tag3 , with comma", "time"]
);
// The compactor can log why compaction was skipped
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();

View File

@ -18,7 +18,8 @@ use data_types::{
},
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp,
};
use iox_time::{SystemProvider, TimeProvider};
use snafu::ensure;
@ -585,6 +586,23 @@ impl PartitionRepo for MemTxn {
.cloned())
}
async fn get_by_hash_id(
&mut self,
partition_hash_id: &PartitionHashId,
) -> Result<Option<Partition>> {
let stage = self.stage();
Ok(stage
.partitions
.iter()
.find(|p| {
p.hash_id()
.map(|hash_id| hash_id == partition_hash_id)
.unwrap_or_default()
})
.cloned())
}
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
let stage = self.stage();

View File

@ -9,7 +9,8 @@ use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -170,6 +171,7 @@ decorate!(
methods = [
"partition_create_or_get" = create_or_get(&mut self, key: PartitionKey, table_id: TableId) -> Result<Partition>;
"partition_get_by_id" = get_by_id(&mut self, partition_id: PartitionId) -> Result<Option<Partition>>;
"partition_get_by_hash_id" = get_by_hash_id(&mut self, partition_hash_id: &PartitionHashId) -> Result<Option<Partition>>;
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
"partition_list_ids" = list_ids(&mut self) -> Result<Vec<PartitionId>>;
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: PartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str]) -> Result<Partition, CasFailure<Vec<String>>>;

View File

@ -1095,6 +1095,30 @@ WHERE id = $1;
Ok(Some(partition))
}
async fn get_by_hash_id(
&mut self,
partition_hash_id: &PartitionHashId,
) -> Result<Option<Partition>> {
let rec = sqlx::query_as::<_, Partition>(
r#"
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
FROM partition
WHERE hash_id = $1;
"#,
)
.bind(partition_hash_id) // $1
.fetch_one(&mut self.inner)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let partition = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(partition))
}
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
sqlx::query_as::<_, Partition>(
r#"

View File

@ -891,6 +891,30 @@ WHERE id = $1;
Ok(Some(partition.into()))
}
async fn get_by_hash_id(
&mut self,
partition_hash_id: &PartitionHashId,
) -> Result<Option<Partition>> {
let rec = sqlx::query_as::<_, PartitionPod>(
r#"
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
FROM partition
WHERE hash_id = $1;
"#,
)
.bind(partition_hash_id) // $1
.fetch_one(self.inner.get_mut())
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let partition = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(partition.into()))
}
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
Ok(sqlx::query_as::<_, PartitionPod>(
r#"