refactor(catalog): remove partition_info_by_id()

This method used to return a subset of partition metadata, and was used
exclusively for persistence in the ingester. It is now no longer
necessary.
pull/24376/head
Dom Dwyer 2022-10-13 15:23:49 +02:00
parent 3fbeaa1314
commit 3e70dc44a0
6 changed files with 19 additions and 120 deletions

View File

@ -899,15 +899,6 @@ impl Partition {
}
}
/// Information for a partition from the catalog.
#[derive(Debug)]
#[allow(missing_docs)]
pub struct PartitionInfo {
pub partition: Partition,
pub namespace_name: String,
pub table_name: String,
}
/// Data for a partition chosen from its parquet files
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::FromRow)]
pub struct PartitionParam {

View File

@ -876,11 +876,11 @@ mod tests {
let mut repos = catalog.repositories().await;
let partition_info = repos
.partitions()
.partition_info_by_id(partition_id)
.get_by_id(partition_id)
.await
.unwrap()
.unwrap();
assert!(partition_info.partition.sort_key.is_empty());
assert!(partition_info.sort_key.is_empty());
}
data.persist(shard1.id, namespace.id, table_id, partition_id)
@ -970,11 +970,11 @@ mod tests {
// verify it set a sort key on the partition in the catalog
let partition_info = repos
.partitions()
.partition_info_by_id(partition_id)
.get_by_id(partition_id)
.await
.unwrap()
.unwrap();
assert_eq!(partition_info.partition.sort_key, vec!["time"]);
assert_eq!(partition_info.sort_key, vec!["time"]);
let mem_table = n.table_data(&"mem".into()).unwrap();
let mem_table = mem_table.read().await;

View File

@ -4,9 +4,9 @@ use async_trait::async_trait;
use data_types::{
Column, ColumnSchema, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId,
NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId,
PartitionInfo, PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId,
SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition,
TableSchema, Timestamp, Tombstone, TombstoneId, TopicId, TopicMetadata,
PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition, TableSchema,
Timestamp, Tombstone, TombstoneId, TopicId, TopicMetadata,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -437,13 +437,6 @@ pub trait PartitionRepo: Send + Sync {
/// return the partitions by table id
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
/// return the partition record, the namespace name it belongs to, and the table name it is
/// under
async fn partition_info_by_id(
&mut self,
partition_id: PartitionId,
) -> Result<Option<PartitionInfo>>;
/// Update the sort key for the partition.
///
/// NOTE: it is expected that ONLY the ingesters update sort keys for
@ -1435,17 +1428,6 @@ pub(crate) mod test_helpers {
created.insert(other_partition.id, other_partition.clone());
assert_eq!(created, listed);
// test get_partition_info_by_id
let info = repos
.partitions()
.partition_info_by_id(other_partition.id)
.await
.unwrap()
.unwrap();
assert_eq!(info.partition, other_partition);
assert_eq!(info.table_name, "test_table");
assert_eq!(info.namespace_name, "namespace_partition_test");
// test list_by_namespace
let namespace2 = repos
.namespaces()

View File

@ -13,10 +13,10 @@ use crate::{
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp,
Tombstone, TombstoneId, TopicId, TopicMetadata,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId,
ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone,
TombstoneId, TopicId, TopicMetadata,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::warn;
@ -766,43 +766,6 @@ impl PartitionRepo for MemTxn {
Ok(partitions)
}
async fn partition_info_by_id(
&mut self,
partition_id: PartitionId,
) -> Result<Option<PartitionInfo>> {
let stage = self.stage();
let partition = stage
.partitions
.iter()
.find(|p| p.id == partition_id)
.cloned();
if let Some(partition) = partition {
let table = stage
.tables
.iter()
.find(|t| t.id == partition.table_id)
.cloned();
if let Some(table) = table {
let namespace = stage
.namespaces
.iter()
.find(|n| n.id == table.namespace_id)
.cloned();
if let Some(namespace) = namespace {
return Ok(Some(PartitionInfo {
namespace_name: namespace.name,
table_name: table.name,
partition,
}));
}
}
}
Ok(None)
}
async fn update_sort_key(
&mut self,
partition_id: PartitionId,

View File

@ -8,10 +8,10 @@ use crate::interface::{
use async_trait::async_trait;
use data_types::{
Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, PartitionKey,
PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId,
ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone,
TombstoneId, TopicId, TopicMetadata,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex,
SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId,
TopicMetadata,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -244,7 +244,6 @@ decorate!(
"partition_list_by_shard" = list_by_shard(&mut self, shard_id: ShardId) -> Result<Vec<Partition>>;
"partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>;
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
"partition_partition_info_by_id" = partition_info_by_id(&mut self, partition_id: PartitionId) -> Result<Option<PartitionInfo>>;
"partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &[&str]) -> Result<Partition>;
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize,estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;

View File

@ -12,10 +12,10 @@ use crate::{
use async_trait::async_trait;
use data_types::{
Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, PartitionKey,
PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId,
ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone,
TombstoneId, TopicId, TopicMetadata,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex,
SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId,
TopicMetadata,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, info, warn};
@ -1203,42 +1203,6 @@ WHERE table_id = $1;
.map_err(|e| Error::SqlxError { source: e })
}
async fn partition_info_by_id(
&mut self,
partition_id: PartitionId,
) -> Result<Option<PartitionInfo>> {
let info = sqlx::query(
r#"
SELECT namespace.name as namespace_name, table_name.name as table_name, partition.*
FROM partition
INNER JOIN table_name on table_name.id = partition.table_id
INNER JOIN namespace on namespace.id = table_name.namespace_id
WHERE partition.id = $1;
"#,
)
.bind(&partition_id) // $1
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let namespace_name = info.get("namespace_name");
let table_name = info.get("table_name");
let partition = Partition {
id: info.get("id"),
shard_id: info.get("shard_id"),
table_id: info.get("table_id"),
partition_key: info.get("partition_key"),
sort_key: info.get("sort_key"),
persisted_sequence_number: info.get("persisted_sequence_number"),
};
Ok(Some(PartitionInfo {
namespace_name,
table_name,
partition,
}))
}
async fn update_sort_key(
&mut self,
partition_id: PartitionId,