feat: Make parquet_file.partition_id optional in the catalog (#8339)
* feat: Make parquet_file.partition_id optional in the catalog This will acquire a short lock on the table in postgres, per: <https://stackoverflow.com/questions/52760971/will-making-column-nullable-lock-the-table-for-reads> This allows us to persist data for new partitions and associate the Parquet file catalog records with the partition records using only the partition hash ID, rather than both that are used now. * fix: Support transition partition ID in the catalog service * fix: Use transition partition ID in import/export This commit also removes support for the `--partition-id` flag of the `influxdb_iox remote store get-table` command, which Andrew approved. The `--partition-id` filter was getting the results of the catalog gRPC service's query for Parquet files of a table and then keeping only the files whose partition IDs matched. The gRPC query is no longer returning the partition ID from the Parquet file table, and really, this command should instead be using `GetParquetFilesByPartitionId` to only request what's needed rather than filtering. * feat: Support looking up Parquet files by either kind of Partition id Regardless of which is actually stored on the Parquet file record. That is, say there's a Partition in the catalog with: Partition { id: 3, hash_id: abcdefg, } and a Parquet file that has: ParquetFile { partition_hash_id: abcdefg, } calling `list_by_partition_not_to_delete(PartitionId(3))` should still return this Parquet file because it is associated with the partition that has ID 3. This is important for the compactor, which is currently only dealing in PartitionIds, and I'd like to keep it that way for now to avoid having to change Even More in this PR. * fix: Use and set new partition ID fields everywhere they want to be --------- Co-authored-by: Dom <dom@itsallbroken.com>pull/24376/head
parent
9b52bfdeaa
commit
4a9e76b8b7
|
@ -171,7 +171,7 @@ fn to_queryable_parquet_chunk(
|
|||
parquet_file_id = file.file.id.get(),
|
||||
parquet_file_namespace_id = file.file.namespace_id.get(),
|
||||
parquet_file_table_id = file.file.table_id.get(),
|
||||
parquet_file_partition_id = file.file.partition_id.get(),
|
||||
parquet_file_partition_id = %file.file.partition_id,
|
||||
parquet_file_object_store_id = uuid.to_string().as_str(),
|
||||
"built parquet chunk from metadata"
|
||||
);
|
||||
|
|
|
@ -70,8 +70,7 @@ impl ParquetFileSink for MockParquetFileSink {
|
|||
let out = ((row_count > 0) || !self.filter_empty_files).then(|| ParquetFileParams {
|
||||
namespace_id: partition.namespace_id,
|
||||
table_id: partition.table.id,
|
||||
partition_id: partition.partition_id,
|
||||
partition_hash_id: partition.partition_hash_id.clone(),
|
||||
partition_id: partition.transition_partition_id(),
|
||||
object_store_id: Uuid::from_u128(guard.len() as u128),
|
||||
min_time: Timestamp::new(0),
|
||||
max_time: Timestamp::new(0),
|
||||
|
@ -95,7 +94,7 @@ impl ParquetFileSink for MockParquetFileSink {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_util::assert_batches_eq;
|
||||
use data_types::{NamespaceId, PartitionId, TableId};
|
||||
use data_types::{NamespaceId, TableId};
|
||||
use datafusion::{
|
||||
arrow::{array::new_null_array, datatypes::DataType},
|
||||
physical_plan::stream::RecordBatchStreamAdapter,
|
||||
|
@ -159,7 +158,7 @@ mod tests {
|
|||
Arc::clone(&schema),
|
||||
futures::stream::once(async move { Ok(record_batch_captured) }),
|
||||
));
|
||||
let partition_hash_id = partition.partition_hash_id.clone();
|
||||
let partition_id = partition.transition_partition_id();
|
||||
assert_eq!(
|
||||
sink.store(stream, Arc::clone(&partition), level, max_l0_created_at)
|
||||
.await
|
||||
|
@ -167,8 +166,7 @@ mod tests {
|
|||
Some(ParquetFileParams {
|
||||
namespace_id: NamespaceId::new(2),
|
||||
table_id: TableId::new(3),
|
||||
partition_id: PartitionId::new(1),
|
||||
partition_hash_id,
|
||||
partition_id,
|
||||
object_store_id: Uuid::from_u128(2),
|
||||
min_time: Timestamp::new(0),
|
||||
max_time: Timestamp::new(0),
|
||||
|
@ -223,7 +221,7 @@ mod tests {
|
|||
Arc::clone(&schema),
|
||||
futures::stream::empty(),
|
||||
));
|
||||
let partition_hash_id = partition.partition_hash_id.clone();
|
||||
let partition_id = partition.transition_partition_id();
|
||||
assert_eq!(
|
||||
sink.store(stream, Arc::clone(&partition), level, max_l0_created_at)
|
||||
.await
|
||||
|
@ -231,8 +229,7 @@ mod tests {
|
|||
Some(ParquetFileParams {
|
||||
namespace_id: NamespaceId::new(2),
|
||||
table_id: TableId::new(3),
|
||||
partition_id: PartitionId::new(1),
|
||||
partition_hash_id,
|
||||
partition_id,
|
||||
object_store_id: Uuid::from_u128(0),
|
||||
min_time: Timestamp::new(0),
|
||||
max_time: Timestamp::new(0),
|
||||
|
|
|
@ -1,19 +1,35 @@
|
|||
use std::{collections::HashMap, fmt::Display};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{ParquetFile, PartitionId};
|
||||
|
||||
use super::PartitionFilesSource;
|
||||
use async_trait::async_trait;
|
||||
use data_types::{ParquetFile, PartitionId, TransitionPartitionId};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MockPartitionFilesSource {
|
||||
files: HashMap<PartitionId, Vec<ParquetFile>>,
|
||||
// This complexity is because we're in the process of moving to partition hash IDs rather than
|
||||
// partition catalog IDs, and Parquet files might only have the partition hash ID on their
|
||||
// record, but the compactor deals with partition catalog IDs because we haven't transitioned
|
||||
// it yet. This should become simpler when the transition is complete.
|
||||
partition_lookup: HashMap<PartitionId, TransitionPartitionId>,
|
||||
file_lookup: HashMap<TransitionPartitionId, Vec<ParquetFile>>,
|
||||
}
|
||||
|
||||
impl MockPartitionFilesSource {
|
||||
#[allow(dead_code)] // not used anywhere
|
||||
pub fn new(files: HashMap<PartitionId, Vec<ParquetFile>>) -> Self {
|
||||
Self { files }
|
||||
#[cfg(test)]
|
||||
pub fn new(
|
||||
partition_lookup: HashMap<PartitionId, TransitionPartitionId>,
|
||||
parquet_files: Vec<ParquetFile>,
|
||||
) -> Self {
|
||||
let mut file_lookup: HashMap<TransitionPartitionId, Vec<ParquetFile>> = HashMap::new();
|
||||
for file in parquet_files {
|
||||
let files = file_lookup.entry(file.partition_id.clone()).or_default();
|
||||
files.push(file);
|
||||
}
|
||||
|
||||
Self {
|
||||
partition_lookup,
|
||||
file_lookup,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,46 +41,60 @@ impl Display for MockPartitionFilesSource {
|
|||
|
||||
#[async_trait]
|
||||
impl PartitionFilesSource for MockPartitionFilesSource {
|
||||
async fn fetch(&self, partition: PartitionId) -> Vec<ParquetFile> {
|
||||
self.files.get(&partition).cloned().unwrap_or_default()
|
||||
async fn fetch(&self, partition_id: PartitionId) -> Vec<ParquetFile> {
|
||||
self.partition_lookup
|
||||
.get(&partition_id)
|
||||
.and_then(|partition_hash_id| self.file_lookup.get(partition_hash_id).cloned())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use iox_tests::ParquetFileBuilder;
|
||||
|
||||
use super::*;
|
||||
use iox_tests::{partition_identifier, ParquetFileBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
assert_eq!(
|
||||
MockPartitionFilesSource::new(HashMap::default()).to_string(),
|
||||
MockPartitionFilesSource::new(Default::default(), Default::default()).to_string(),
|
||||
"mock",
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fetch() {
|
||||
let f_1_1 = ParquetFileBuilder::new(1).with_partition(1).build();
|
||||
let f_1_2 = ParquetFileBuilder::new(2).with_partition(1).build();
|
||||
let f_2_1 = ParquetFileBuilder::new(3).with_partition(2).build();
|
||||
let partition_id_1 = PartitionId::new(1);
|
||||
let partition_id_2 = PartitionId::new(2);
|
||||
let partition_identifier_1 = partition_identifier(1);
|
||||
let partition_identifier_2 = partition_identifier(2);
|
||||
let f_1_1 = ParquetFileBuilder::new(1)
|
||||
.with_partition(partition_identifier_1.clone())
|
||||
.build();
|
||||
let f_1_2 = ParquetFileBuilder::new(2)
|
||||
.with_partition(partition_identifier_1.clone())
|
||||
.build();
|
||||
let f_2_1 = ParquetFileBuilder::new(3)
|
||||
.with_partition(partition_identifier_2.clone())
|
||||
.build();
|
||||
|
||||
let files = HashMap::from([
|
||||
(PartitionId::new(1), vec![f_1_1.clone(), f_1_2.clone()]),
|
||||
(PartitionId::new(2), vec![f_2_1.clone()]),
|
||||
let partition_lookup = HashMap::from([
|
||||
(partition_id_1, partition_identifier_1.clone()),
|
||||
(partition_id_2, partition_identifier_2.clone()),
|
||||
]);
|
||||
let source = MockPartitionFilesSource::new(files);
|
||||
|
||||
let files = vec![f_1_1.clone(), f_1_2.clone(), f_2_1.clone()];
|
||||
let source = MockPartitionFilesSource::new(partition_lookup, files);
|
||||
|
||||
// different partitions
|
||||
assert_eq!(
|
||||
source.fetch(PartitionId::new(1)).await,
|
||||
source.fetch(partition_id_1).await,
|
||||
vec![f_1_1.clone(), f_1_2.clone()],
|
||||
);
|
||||
assert_eq!(source.fetch(PartitionId::new(2)).await, vec![f_2_1],);
|
||||
assert_eq!(source.fetch(partition_id_2).await, vec![f_2_1],);
|
||||
|
||||
// fetching does not drain
|
||||
assert_eq!(source.fetch(PartitionId::new(1)).await, vec![f_1_1, f_1_2],);
|
||||
assert_eq!(source.fetch(partition_id_1).await, vec![f_1_1, f_1_2],);
|
||||
|
||||
// unknown partition => empty result
|
||||
assert_eq!(source.fetch(PartitionId::new(3)).await, vec![],);
|
||||
|
|
|
@ -78,14 +78,12 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use test_helpers::tracing::TracingCapture;
|
||||
|
||||
use super::*;
|
||||
use crate::commit::mock::{CommitHistoryEntry, MockCommit};
|
||||
use iox_tests::ParquetFileBuilder;
|
||||
use assert_matches::assert_matches;
|
||||
use iox_tests::{partition_identifier, ParquetFileBuilder};
|
||||
use std::sync::Arc;
|
||||
use test_helpers::tracing::TracingCapture;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
|
@ -111,14 +109,21 @@ mod tests {
|
|||
.with_row_count(105)
|
||||
.build();
|
||||
|
||||
let created_1 = ParquetFileBuilder::new(1000).with_partition(1).build();
|
||||
let created_2 = ParquetFileBuilder::new(1001).with_partition(1).build();
|
||||
let partition_id_1 = PartitionId::new(1);
|
||||
let transition_partition_id_1 = partition_identifier(1);
|
||||
|
||||
let created_1 = ParquetFileBuilder::new(1000)
|
||||
.with_partition(transition_partition_id_1.clone())
|
||||
.build();
|
||||
let created_2 = ParquetFileBuilder::new(1001)
|
||||
.with_partition(transition_partition_id_1)
|
||||
.build();
|
||||
|
||||
let capture = TracingCapture::new();
|
||||
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(1),
|
||||
partition_id_1,
|
||||
&[existing_1.clone()],
|
||||
&[],
|
||||
&[created_1.clone().into(), created_2.clone().into()],
|
||||
|
@ -130,9 +135,11 @@ mod tests {
|
|||
Ok(res) if res == vec![ParquetFileId::new(1000), ParquetFileId::new(1001)]
|
||||
);
|
||||
|
||||
let partition_id_2 = PartitionId::new(2);
|
||||
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(2),
|
||||
partition_id_2,
|
||||
&[existing_2.clone(), existing_3.clone()],
|
||||
&[existing_1.clone()],
|
||||
&[],
|
||||
|
@ -151,14 +158,14 @@ level = INFO; message = committed parquet file change; target_level = Final; par
|
|||
inner.history(),
|
||||
vec![
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(1),
|
||||
partition_id: partition_id_1,
|
||||
delete: vec![existing_1.clone()],
|
||||
upgrade: vec![],
|
||||
created: vec![created_1, created_2],
|
||||
target_level: CompactionLevel::Final,
|
||||
},
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(2),
|
||||
partition_id: partition_id_2,
|
||||
delete: vec![existing_2, existing_3],
|
||||
upgrade: vec![existing_1],
|
||||
created: vec![],
|
||||
|
|
|
@ -303,15 +303,12 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use metric::{assert_histogram, Attributes};
|
||||
|
||||
use crate::commit::mock::{CommitHistoryEntry, MockCommit};
|
||||
use iox_tests::ParquetFileBuilder;
|
||||
|
||||
use super::*;
|
||||
use crate::commit::mock::{CommitHistoryEntry, MockCommit};
|
||||
use assert_matches::assert_matches;
|
||||
use iox_tests::{partition_identifier, ParquetFileBuilder};
|
||||
use metric::{assert_histogram, Attributes};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
|
@ -326,6 +323,9 @@ mod tests {
|
|||
let inner = Arc::new(MockCommit::new());
|
||||
let commit = MetricsCommitWrapper::new(Arc::clone(&inner), ®istry);
|
||||
|
||||
let partition_id_1 = PartitionId::new(1);
|
||||
let transition_partition_id_1 = partition_identifier(1);
|
||||
|
||||
let existing_1 = ParquetFileBuilder::new(1)
|
||||
.with_file_size_bytes(10_001)
|
||||
.with_row_count(1_001)
|
||||
|
@ -350,7 +350,7 @@ mod tests {
|
|||
let created = ParquetFileBuilder::new(1000)
|
||||
.with_file_size_bytes(10_016)
|
||||
.with_row_count(1_016)
|
||||
.with_partition(1)
|
||||
.with_partition(transition_partition_id_1)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.build();
|
||||
|
||||
|
@ -392,7 +392,7 @@ mod tests {
|
|||
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(1),
|
||||
partition_id_1,
|
||||
&[existing_1.clone()],
|
||||
&[existing_2a.clone()],
|
||||
&[created.clone().into()],
|
||||
|
@ -401,9 +401,11 @@ mod tests {
|
|||
.await;
|
||||
assert_matches!(ids, Ok(res) if res == vec![ParquetFileId::new(1000)]);
|
||||
|
||||
let partition_id_2 = PartitionId::new(2);
|
||||
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(2),
|
||||
partition_id_2,
|
||||
&[existing_2b.clone(), existing_3.clone()],
|
||||
&[existing_4.clone()],
|
||||
&[],
|
||||
|
@ -449,14 +451,14 @@ mod tests {
|
|||
inner.history(),
|
||||
vec![
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(1),
|
||||
partition_id: partition_id_1,
|
||||
delete: vec![existing_1],
|
||||
upgrade: vec![existing_2a.clone()],
|
||||
created: vec![created],
|
||||
target_level: CompactionLevel::FileNonOverlapped,
|
||||
},
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(2),
|
||||
partition_id: partition_id_2,
|
||||
delete: vec![existing_2b, existing_3],
|
||||
upgrade: vec![existing_4],
|
||||
created: vec![],
|
||||
|
|
|
@ -78,10 +78,9 @@ impl Commit for MockCommit {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use iox_tests::ParquetFileBuilder;
|
||||
|
||||
use super::*;
|
||||
use assert_matches::assert_matches;
|
||||
use iox_tests::{partition_identifier, ParquetFileBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
|
@ -92,6 +91,11 @@ mod tests {
|
|||
async fn test_commit() {
|
||||
let commit = MockCommit::new();
|
||||
|
||||
let partition_id_1 = PartitionId::new(1);
|
||||
let transition_partition_id_1 = partition_identifier(1);
|
||||
let partition_id_2 = PartitionId::new(2);
|
||||
let transition_partition_id_2 = partition_identifier(2);
|
||||
|
||||
let existing_1 = ParquetFileBuilder::new(1).build();
|
||||
let existing_2 = ParquetFileBuilder::new(2).build();
|
||||
let existing_3 = ParquetFileBuilder::new(3).build();
|
||||
|
@ -101,14 +105,22 @@ mod tests {
|
|||
let existing_7 = ParquetFileBuilder::new(7).build();
|
||||
let existing_8 = ParquetFileBuilder::new(8).build();
|
||||
|
||||
let created_1_1 = ParquetFileBuilder::new(1000).with_partition(1).build();
|
||||
let created_1_2 = ParquetFileBuilder::new(1001).with_partition(1).build();
|
||||
let created_1_3 = ParquetFileBuilder::new(1003).with_partition(1).build();
|
||||
let created_2_1 = ParquetFileBuilder::new(1002).with_partition(2).build();
|
||||
let created_1_1 = ParquetFileBuilder::new(1000)
|
||||
.with_partition(transition_partition_id_1.clone())
|
||||
.build();
|
||||
let created_1_2 = ParquetFileBuilder::new(1001)
|
||||
.with_partition(transition_partition_id_1.clone())
|
||||
.build();
|
||||
let created_1_3 = ParquetFileBuilder::new(1003)
|
||||
.with_partition(transition_partition_id_1)
|
||||
.build();
|
||||
let created_2_1 = ParquetFileBuilder::new(1002)
|
||||
.with_partition(transition_partition_id_2)
|
||||
.build();
|
||||
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(1),
|
||||
partition_id_1,
|
||||
&[existing_1.clone(), existing_2.clone()],
|
||||
&[existing_3.clone(), existing_4.clone()],
|
||||
&[created_1_1.clone().into(), created_1_2.clone().into()],
|
||||
|
@ -122,7 +134,7 @@ mod tests {
|
|||
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(2),
|
||||
partition_id_2,
|
||||
&[existing_3.clone()],
|
||||
&[],
|
||||
&[created_2_1.clone().into()],
|
||||
|
@ -136,7 +148,7 @@ mod tests {
|
|||
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(1),
|
||||
partition_id_1,
|
||||
&[existing_5.clone(), existing_6.clone(), existing_7.clone()],
|
||||
&[],
|
||||
&[created_1_3.clone().into()],
|
||||
|
@ -151,7 +163,7 @@ mod tests {
|
|||
// simulate fill implosion of the file (this may happen w/ delete predicates)
|
||||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(1),
|
||||
partition_id_1,
|
||||
&[existing_8.clone()],
|
||||
&[],
|
||||
&[],
|
||||
|
@ -167,28 +179,28 @@ mod tests {
|
|||
commit.history(),
|
||||
vec![
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(1),
|
||||
partition_id: partition_id_1,
|
||||
delete: vec![existing_1, existing_2],
|
||||
upgrade: vec![existing_3.clone(), existing_4.clone()],
|
||||
created: vec![created_1_1, created_1_2],
|
||||
target_level: CompactionLevel::FileNonOverlapped,
|
||||
},
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(2),
|
||||
partition_id: partition_id_2,
|
||||
delete: vec![existing_3],
|
||||
upgrade: vec![],
|
||||
created: vec![created_2_1],
|
||||
target_level: CompactionLevel::Final,
|
||||
},
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(1),
|
||||
partition_id: partition_id_1,
|
||||
delete: vec![existing_5, existing_6, existing_7,],
|
||||
upgrade: vec![],
|
||||
created: vec![created_1_3],
|
||||
target_level: CompactionLevel::FileNonOverlapped,
|
||||
},
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(1),
|
||||
partition_id: partition_id_1,
|
||||
delete: vec![existing_8],
|
||||
upgrade: vec![],
|
||||
created: vec![],
|
||||
|
|
|
@ -4,7 +4,7 @@ use assert_matches::assert_matches;
|
|||
use compactor_scheduler::{
|
||||
create_scheduler, CompactionJob, LocalSchedulerConfig, Scheduler, SchedulerConfig,
|
||||
};
|
||||
use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId};
|
||||
use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId, TransitionPartitionId};
|
||||
use iox_tests::{ParquetFileBuilder, TestCatalog, TestParquetFileBuilder, TestPartition};
|
||||
|
||||
mod end_job;
|
||||
|
@ -65,7 +65,7 @@ impl TestLocalScheduler {
|
|||
|
||||
pub async fn create_params_for_new_parquet_file(&self) -> ParquetFileParams {
|
||||
ParquetFileBuilder::new(42)
|
||||
.with_partition(self.get_partition_id().get())
|
||||
.with_partition(self.get_transition_partition_id())
|
||||
.build()
|
||||
.into()
|
||||
}
|
||||
|
@ -81,4 +81,8 @@ impl TestLocalScheduler {
|
|||
pub fn get_partition_id(&self) -> PartitionId {
|
||||
self.test_partition.partition.id
|
||||
}
|
||||
|
||||
pub fn get_transition_partition_id(&self) -> TransitionPartitionId {
|
||||
self.test_partition.partition.transition_partition_id()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,8 +202,7 @@ impl SimulatedFile {
|
|||
ParquetFileParams {
|
||||
namespace_id: partition_info.namespace_id,
|
||||
table_id: partition_info.table.id,
|
||||
partition_id: partition_info.partition_id,
|
||||
partition_hash_id: partition_info.partition_hash_id.clone(),
|
||||
partition_id: partition_info.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_time,
|
||||
max_time,
|
||||
|
|
|
@ -527,10 +527,9 @@ pub struct ParquetFile {
|
|||
pub namespace_id: NamespaceId,
|
||||
/// the table
|
||||
pub table_id: TableId,
|
||||
/// the partition
|
||||
pub partition_id: PartitionId,
|
||||
/// the partition hash ID, if generated
|
||||
pub partition_hash_id: Option<PartitionHashId>,
|
||||
/// the partition identifier
|
||||
#[sqlx(flatten)]
|
||||
pub partition_id: TransitionPartitionId,
|
||||
/// the uuid used in the object store path for this file
|
||||
pub object_store_id: Uuid,
|
||||
/// the min timestamp of data in this file
|
||||
|
@ -588,7 +587,6 @@ impl ParquetFile {
|
|||
namespace_id: params.namespace_id,
|
||||
table_id: params.table_id,
|
||||
partition_id: params.partition_id,
|
||||
partition_hash_id: params.partition_hash_id,
|
||||
object_store_id: params.object_store_id,
|
||||
min_time: params.min_time,
|
||||
max_time: params.max_time,
|
||||
|
@ -602,21 +600,9 @@ impl ParquetFile {
|
|||
}
|
||||
}
|
||||
|
||||
/// If this parquet file params will be storing a `PartitionHashId` in the catalog, use that.
|
||||
/// Otherwise, use the database-assigned `PartitionId`.
|
||||
pub fn transition_partition_id(&self) -> TransitionPartitionId {
|
||||
TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref()))
|
||||
}
|
||||
|
||||
/// Estimate the memory consumption of this object and its contents
|
||||
pub fn size(&self) -> usize {
|
||||
std::mem::size_of_val(self)
|
||||
+ self
|
||||
.partition_hash_id
|
||||
.as_ref()
|
||||
.map(|id| id.size() - std::mem::size_of_val(id))
|
||||
.unwrap_or_default()
|
||||
+ self.column_set.size()
|
||||
std::mem::size_of_val(self) + self.partition_id.size() + self.column_set.size()
|
||||
- std::mem::size_of_val(&self.column_set)
|
||||
}
|
||||
|
||||
|
@ -638,10 +624,8 @@ pub struct ParquetFileParams {
|
|||
pub namespace_id: NamespaceId,
|
||||
/// the table
|
||||
pub table_id: TableId,
|
||||
/// the partition
|
||||
pub partition_id: PartitionId,
|
||||
/// the partition hash ID, if generated
|
||||
pub partition_hash_id: Option<PartitionHashId>,
|
||||
/// the partition identifier
|
||||
pub partition_id: TransitionPartitionId,
|
||||
/// the uuid used in the object store path for this file
|
||||
pub object_store_id: Uuid,
|
||||
/// the min timestamp of data in this file
|
||||
|
@ -662,21 +646,12 @@ pub struct ParquetFileParams {
|
|||
pub max_l0_created_at: Timestamp,
|
||||
}
|
||||
|
||||
impl ParquetFileParams {
|
||||
/// If this parquet file params will be storing a `PartitionHashId` in the catalog, use that.
|
||||
/// Otherwise, use the database-assigned `PartitionId`.
|
||||
pub fn transition_partition_id(&self) -> TransitionPartitionId {
|
||||
TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ParquetFile> for ParquetFileParams {
|
||||
fn from(value: ParquetFile) -> Self {
|
||||
Self {
|
||||
namespace_id: value.namespace_id,
|
||||
table_id: value.table_id,
|
||||
partition_id: value.partition_id,
|
||||
partition_hash_id: value.partition_hash_id,
|
||||
object_store_id: value.object_store_id,
|
||||
min_time: value.min_time,
|
||||
max_time: value.max_time,
|
||||
|
|
|
@ -31,6 +31,34 @@ impl TransitionPartitionId {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a, R> sqlx::FromRow<'a, R> for TransitionPartitionId
|
||||
where
|
||||
R: sqlx::Row,
|
||||
&'static str: sqlx::ColumnIndex<R>,
|
||||
PartitionId: sqlx::decode::Decode<'a, R::Database>,
|
||||
PartitionId: sqlx::types::Type<R::Database>,
|
||||
Option<PartitionHashId>: sqlx::decode::Decode<'a, R::Database>,
|
||||
Option<PartitionHashId>: sqlx::types::Type<R::Database>,
|
||||
{
|
||||
fn from_row(row: &'a R) -> sqlx::Result<Self> {
|
||||
let partition_id: Option<PartitionId> = row.try_get("partition_id")?;
|
||||
let partition_hash_id: Option<PartitionHashId> = row.try_get("partition_hash_id")?;
|
||||
|
||||
let transition_partition_id = match (partition_id, partition_hash_id) {
|
||||
(_, Some(hash_id)) => TransitionPartitionId::Deterministic(hash_id),
|
||||
(Some(id), _) => TransitionPartitionId::Deprecated(id),
|
||||
(None, None) => {
|
||||
return Err(sqlx::Error::ColumnDecode {
|
||||
index: "partition_id".into(),
|
||||
source: "Both partition_id and partition_hash_id were NULL".into(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
Ok(transition_partition_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(PartitionId, Option<&PartitionHashId>)> for TransitionPartitionId {
|
||||
fn from((partition_id, partition_hash_id): (PartitionId, Option<&PartitionHashId>)) -> Self {
|
||||
partition_hash_id
|
||||
|
|
|
@ -267,8 +267,7 @@ mod tests {
|
|||
let parquet_file_params = ParquetFileParams {
|
||||
namespace_id: namespace.id,
|
||||
table_id: partition.table_id,
|
||||
partition_id: partition.id,
|
||||
partition_hash_id: partition.hash_id().cloned(),
|
||||
partition_id: partition.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(10),
|
||||
|
@ -298,7 +297,7 @@ mod tests {
|
|||
let location = ParquetFilePath::new(
|
||||
file_in_catalog.namespace_id,
|
||||
file_in_catalog.table_id,
|
||||
&file_in_catalog.transition_partition_id(),
|
||||
&file_in_catalog.partition_id.clone(),
|
||||
file_in_catalog.object_store_id,
|
||||
)
|
||||
.object_store_path();
|
||||
|
@ -376,7 +375,7 @@ mod tests {
|
|||
let location = ParquetFilePath::new(
|
||||
file_in_catalog.namespace_id,
|
||||
file_in_catalog.table_id,
|
||||
&file_in_catalog.transition_partition_id(),
|
||||
&file_in_catalog.partition_id.clone(),
|
||||
file_in_catalog.object_store_id,
|
||||
)
|
||||
.object_store_path();
|
||||
|
@ -469,7 +468,7 @@ mod tests {
|
|||
let loc = ParquetFilePath::new(
|
||||
file_in_catalog.namespace_id,
|
||||
file_in_catalog.table_id,
|
||||
&file_in_catalog.transition_partition_id(),
|
||||
&file_in_catalog.partition_id.clone(),
|
||||
file_in_catalog.object_store_id,
|
||||
)
|
||||
.object_store_path();
|
||||
|
|
|
@ -52,6 +52,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
|
|||
let proto_files = vec![
|
||||
authz_path.join("authz.proto"),
|
||||
catalog_path.join("parquet_file.proto"),
|
||||
catalog_path.join("partition_identifier.proto"),
|
||||
catalog_path.join("service.proto"),
|
||||
compactor_path.join("service.proto"),
|
||||
delete_path.join("service.proto"),
|
||||
|
|
|
@ -2,6 +2,8 @@ syntax = "proto3";
|
|||
package influxdata.iox.catalog.v1;
|
||||
option go_package = "github.com/influxdata/iox/catalog/v1";
|
||||
|
||||
import "influxdata/iox/catalog/v1/partition_identifier.proto";
|
||||
|
||||
message ParquetFile {
|
||||
reserved 7;
|
||||
reserved "min_sequence_number";
|
||||
|
@ -11,6 +13,8 @@ message ParquetFile {
|
|||
reserved "shard_id";
|
||||
reserved 8;
|
||||
reserved "max_sequence_number";
|
||||
reserved 5;
|
||||
reserved "partition_id";
|
||||
|
||||
// the id of the file in the catalog
|
||||
int64 id = 1;
|
||||
|
@ -18,8 +22,9 @@ message ParquetFile {
|
|||
int64 namespace_id = 3;
|
||||
// the table id
|
||||
int64 table_id = 4;
|
||||
// the partition id
|
||||
int64 partition_id = 5;
|
||||
|
||||
PartitionIdentifier partition_identifier = 19;
|
||||
|
||||
// the object store uuid
|
||||
string object_store_id = 6;
|
||||
// the min timestamp of data in this file
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
syntax = "proto3";
|
||||
package influxdata.iox.catalog.v1;
|
||||
option go_package = "github.com/influxdata/iox/catalog/v1";
|
||||
|
||||
message PartitionIdentifier {
|
||||
// Either the catalog-assigned partition ID or the deterministic identifier created from the
|
||||
// table ID and partition key.
|
||||
oneof id {
|
||||
int64 catalog_id = 1;
|
||||
bytes hash_id = 2;
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package influxdata.iox.catalog.v1;
|
|||
option go_package = "github.com/influxdata/iox/catalog/v1";
|
||||
|
||||
import "influxdata/iox/catalog/v1/parquet_file.proto";
|
||||
import "influxdata/iox/catalog/v1/partition_identifier.proto";
|
||||
|
||||
service CatalogService {
|
||||
// Get the parquet_file catalog records in the given partition
|
||||
|
@ -19,8 +20,11 @@ service CatalogService {
|
|||
}
|
||||
|
||||
message GetParquetFilesByPartitionIdRequest {
|
||||
// the partition id
|
||||
int64 partition_id = 1;
|
||||
// Was the catalog-assigned partition ID.
|
||||
reserved 1;
|
||||
reserved "partition_id";
|
||||
|
||||
PartitionIdentifier partition_identifier = 2;
|
||||
}
|
||||
|
||||
message GetParquetFilesByPartitionIdResponse {
|
||||
|
@ -35,15 +39,17 @@ message Partition {
|
|||
reserved "sequencer_id";
|
||||
reserved 7;
|
||||
reserved "shard_id";
|
||||
reserved 1;
|
||||
reserved "id";
|
||||
|
||||
// the partition id
|
||||
int64 id = 1;
|
||||
// the table id the partition is in
|
||||
int64 table_id = 3;
|
||||
// the partition key
|
||||
string key = 4;
|
||||
// the sort key for data in parquet files in the partition
|
||||
repeated string array_sort_key = 6;
|
||||
|
||||
PartitionIdentifier identifier = 8;
|
||||
}
|
||||
|
||||
message GetPartitionsByTableIdRequest {
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
use data_types::{PartitionHashId, PartitionId, TransitionPartitionId};
|
||||
use futures_util::TryStreamExt;
|
||||
use influxdb_iox_client::{
|
||||
catalog::{self, generated_types::ParquetFile},
|
||||
catalog::{
|
||||
self,
|
||||
generated_types::{partition_identifier, ParquetFile, PartitionIdentifier},
|
||||
},
|
||||
connection::Connection,
|
||||
store,
|
||||
};
|
||||
use observability_deps::tracing::{debug, info};
|
||||
use std::path::{Path, PathBuf};
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
|
@ -35,10 +38,6 @@ type Result<T, E = ExportError> = std::result::Result<T, E>;
|
|||
pub struct RemoteExporter {
|
||||
catalog_client: catalog::Client,
|
||||
store_client: store::Client,
|
||||
|
||||
/// Optional partition filter. If `Some(partition_id)`, only these
|
||||
/// files with that `partition_id` are downloaded.
|
||||
partition_filter: Option<i64>,
|
||||
}
|
||||
|
||||
impl RemoteExporter {
|
||||
|
@ -46,19 +45,9 @@ impl RemoteExporter {
|
|||
Self {
|
||||
catalog_client: catalog::Client::new(connection.clone()),
|
||||
store_client: store::Client::new(connection),
|
||||
partition_filter: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify that only files and metadata for the specific
|
||||
/// partition id should be exported.
|
||||
pub fn with_partition_filter(mut self, partition_id: i64) -> Self {
|
||||
info!(partition_id, "Filtering by partition");
|
||||
|
||||
self.partition_filter = Some(partition_id);
|
||||
self
|
||||
}
|
||||
|
||||
/// Exports all data and metadata for `table_name` in
|
||||
/// `namespace` to local files.
|
||||
///
|
||||
|
@ -95,39 +84,14 @@ impl RemoteExporter {
|
|||
let indexed_parquet_file_metadata = parquet_files.into_iter().enumerate();
|
||||
|
||||
for (index, parquet_file) in indexed_parquet_file_metadata {
|
||||
if self.should_export(parquet_file.partition_id) {
|
||||
self.export_parquet_file(
|
||||
&output_directory,
|
||||
index,
|
||||
num_parquet_files,
|
||||
&parquet_file,
|
||||
)
|
||||
self.export_parquet_file(&output_directory, index, num_parquet_files, &parquet_file)
|
||||
.await?;
|
||||
} else {
|
||||
debug!(
|
||||
"skipping file {} of {num_parquet_files} ({} does not match request)",
|
||||
index + 1,
|
||||
parquet_file.partition_id
|
||||
);
|
||||
}
|
||||
}
|
||||
println!("Done.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return true if this partition should be exported
|
||||
fn should_export(&self, partition_id: i64) -> bool {
|
||||
self.partition_filter
|
||||
.map(|partition_filter| {
|
||||
// if a partition filter was specified, only export
|
||||
// the file if the partition matches
|
||||
partition_filter == partition_id
|
||||
})
|
||||
// export files if there is no partition
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
/// Exports table and partition information for the specified
|
||||
/// table. Overwrites existing files, if any, to ensure it has the
|
||||
/// latest catalog information.
|
||||
|
@ -158,13 +122,11 @@ impl RemoteExporter {
|
|||
.await?;
|
||||
|
||||
for partition in partitions {
|
||||
let partition_id = partition.id;
|
||||
if self.should_export(partition_id) {
|
||||
let partition_json = serde_json::to_string_pretty(&partition)?;
|
||||
let filename = format!("partition.{partition_id}.json");
|
||||
let file_path = output_directory.join(&filename);
|
||||
write_string_to_file(&partition_json, &file_path).await?;
|
||||
}
|
||||
let partition_id = to_partition_id(partition.identifier.as_ref());
|
||||
let partition_json = serde_json::to_string_pretty(&partition)?;
|
||||
let filename = format!("partition.{partition_id}.json");
|
||||
let file_path = output_directory.join(&filename);
|
||||
write_string_to_file(&partition_json, &file_path).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -183,9 +145,10 @@ impl RemoteExporter {
|
|||
parquet_file: &ParquetFile,
|
||||
) -> Result<()> {
|
||||
let uuid = &parquet_file.object_store_id;
|
||||
let partition_id = parquet_file.partition_id;
|
||||
let file_size_bytes = parquet_file.file_size_bytes as u64;
|
||||
|
||||
let partition_id = to_partition_id(parquet_file.partition_identifier.as_ref());
|
||||
|
||||
// copy out the metadata as pbjson encoded data always (to
|
||||
// ensure we have the most up to date version)
|
||||
{
|
||||
|
@ -230,6 +193,21 @@ impl RemoteExporter {
|
|||
}
|
||||
}
|
||||
|
||||
fn to_partition_id(partition_identifier: Option<&PartitionIdentifier>) -> TransitionPartitionId {
|
||||
match partition_identifier
|
||||
.and_then(|pi| pi.id.as_ref())
|
||||
.expect("Catalog service should send the partition identifier")
|
||||
{
|
||||
partition_identifier::Id::HashId(bytes) => TransitionPartitionId::Deterministic(
|
||||
PartitionHashId::try_from(&bytes[..])
|
||||
.expect("Catalog service should send valid hash_id bytes"),
|
||||
),
|
||||
partition_identifier::Id::CatalogId(id) => {
|
||||
TransitionPartitionId::Deprecated(PartitionId::new(*id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// writes the contents of a string to a file, overwriting the previous contents, if any
|
||||
async fn write_string_to_file(contents: &str, path: &Path) -> Result<()> {
|
||||
let mut file = OpenOptions::new()
|
||||
|
|
|
@ -7,7 +7,7 @@ use data_types::{
|
|||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO,
|
||||
},
|
||||
ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError,
|
||||
ParquetFileParams, Partition, PartitionHashId, Statistics, Table, TableId, Timestamp,
|
||||
ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp,
|
||||
};
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
// ParquetFile as ProtoParquetFile, Partition as ProtoPartition,
|
||||
|
@ -567,9 +567,6 @@ impl RemoteImporter {
|
|||
// need to make columns in the target catalog
|
||||
let column_set = insert_columns(table.id, decoded_iox_parquet_metadata, repos).await?;
|
||||
|
||||
// Create the the partition_hash_id
|
||||
let partition_hash_id = Some(PartitionHashId::new(table.id, &partition.partition_key));
|
||||
|
||||
let params = if let Some(proto_parquet_file) = &parquet_metadata {
|
||||
let compaction_level = proto_parquet_file
|
||||
.compaction_level
|
||||
|
@ -579,8 +576,7 @@ impl RemoteImporter {
|
|||
ParquetFileParams {
|
||||
namespace_id: namespace.id,
|
||||
table_id: table.id,
|
||||
partition_hash_id,
|
||||
partition_id: partition.id,
|
||||
partition_id: partition.transition_partition_id(),
|
||||
object_store_id,
|
||||
min_time: Timestamp::new(proto_parquet_file.min_time),
|
||||
max_time: Timestamp::new(proto_parquet_file.max_time),
|
||||
|
@ -599,8 +595,7 @@ impl RemoteImporter {
|
|||
ParquetFileParams {
|
||||
namespace_id: namespace.id,
|
||||
table_id: table.id,
|
||||
partition_hash_id,
|
||||
partition_id: partition.id,
|
||||
partition_id: partition.transition_partition_id(),
|
||||
object_store_id,
|
||||
min_time,
|
||||
max_time,
|
||||
|
|
|
@ -55,10 +55,6 @@ struct GetTable {
|
|||
#[clap(action)]
|
||||
table: String,
|
||||
|
||||
/// If specified, only files from the specified partitions are downloaded
|
||||
#[clap(action, short, long)]
|
||||
partition_id: Option<i64>,
|
||||
|
||||
/// The output directory to use. If not specified, files will be placed in a directory named
|
||||
/// after the table in the current working directory.
|
||||
#[clap(action, short)]
|
||||
|
@ -91,13 +87,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
Command::GetTable(GetTable {
|
||||
namespace,
|
||||
table,
|
||||
partition_id,
|
||||
output_directory,
|
||||
}) => {
|
||||
let mut exporter = RemoteExporter::new(connection);
|
||||
if let Some(partition_id) = partition_id {
|
||||
exporter = exporter.with_partition_filter(partition_id);
|
||||
}
|
||||
Ok(exporter
|
||||
.export_table(output_directory, namespace, table)
|
||||
.await?)
|
||||
|
|
|
@ -157,10 +157,12 @@ async fn sharded_compactor_0_always_compacts_partition_1() {
|
|||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
// Important parts are the expected partition ID
|
||||
predicate::str::contains(r#""partitionId": "1","#)
|
||||
// and compaction level
|
||||
.and(predicate::str::contains(r#""compactionLevel": 1"#)),
|
||||
// Important parts are the expected partition identifier
|
||||
predicate::str::contains(
|
||||
r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#,
|
||||
)
|
||||
// and compaction level
|
||||
.and(predicate::str::contains(r#""compactionLevel": 1"#)),
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
|
@ -240,10 +242,12 @@ async fn sharded_compactor_1_never_compacts_partition_1() {
|
|||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
// Important parts are the expected partition ID
|
||||
predicate::str::contains(r#""partitionId": "1","#)
|
||||
// and compaction level is 0 so it's not returned
|
||||
.and(predicate::str::contains("compactionLevel").not()),
|
||||
// Important parts are the expected partition identifier
|
||||
predicate::str::contains(
|
||||
r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#,
|
||||
)
|
||||
// and compaction level is 0 so it's not returned
|
||||
.and(predicate::str::contains("compactionLevel").not()),
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -280,10 +280,9 @@ async fn remote_partition_and_get_from_store_and_pull() {
|
|||
.arg("1")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
predicate::str::contains(r#""id": "1""#)
|
||||
.and(predicate::str::contains(r#""partitionId": "1","#)),
|
||||
)
|
||||
.stdout(predicate::str::contains(
|
||||
r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#,
|
||||
))
|
||||
.get_output()
|
||||
.stdout
|
||||
.clone();
|
||||
|
|
|
@ -29,9 +29,15 @@ impl Client {
|
|||
&mut self,
|
||||
partition_id: i64,
|
||||
) -> Result<Vec<ParquetFile>, Error> {
|
||||
let partition_identifier = PartitionIdentifier {
|
||||
id: Some(partition_identifier::Id::CatalogId(partition_id)),
|
||||
};
|
||||
|
||||
let response = self
|
||||
.inner
|
||||
.get_parquet_files_by_partition_id(GetParquetFilesByPartitionIdRequest { partition_id })
|
||||
.get_parquet_files_by_partition_id(GetParquetFilesByPartitionIdRequest {
|
||||
partition_identifier: Some(partition_identifier),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(response.into_inner().parquet_files)
|
||||
|
|
|
@ -2,7 +2,8 @@ use std::{fmt::Debug, sync::Arc, time::Duration};
|
|||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, PartitionId, TableId,
|
||||
sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, TableId,
|
||||
TransitionPartitionId,
|
||||
};
|
||||
|
||||
use crate::wal::reference_tracker::WalReferenceHandle;
|
||||
|
@ -54,9 +55,9 @@ impl CompletedPersist {
|
|||
self.meta.table_id
|
||||
}
|
||||
|
||||
/// Returns the [`PartitionId`] of the persisted data.
|
||||
pub(crate) fn partition_id(&self) -> PartitionId {
|
||||
self.meta.partition_id
|
||||
/// Returns the [`TransitionPartitionId`] of the persisted data.
|
||||
pub(crate) fn partition_id(&self) -> &TransitionPartitionId {
|
||||
&self.meta.partition_id
|
||||
}
|
||||
|
||||
/// Returns the [`SequenceNumberSet`] of the persisted data.
|
||||
|
@ -166,15 +167,16 @@ pub(crate) mod mock {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID};
|
||||
use crate::test_util::{
|
||||
ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID,
|
||||
};
|
||||
use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp};
|
||||
|
||||
fn arbitrary_file_meta() -> ParquetFileParams {
|
||||
ParquetFileParams {
|
||||
namespace_id: ARBITRARY_NAMESPACE_ID,
|
||||
table_id: ARBITRARY_TABLE_ID,
|
||||
partition_id: ARBITRARY_PARTITION_ID,
|
||||
partition_hash_id: None,
|
||||
partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(),
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(42),
|
||||
max_time: Timestamp::new(42),
|
||||
|
@ -226,7 +228,7 @@ mod tests {
|
|||
|
||||
assert_eq!(note.namespace_id(), meta.namespace_id);
|
||||
assert_eq!(note.table_id(), meta.table_id);
|
||||
assert_eq!(note.partition_id(), meta.partition_id);
|
||||
assert_eq!(note.partition_id(), &meta.partition_id);
|
||||
|
||||
assert_eq!(note.column_count(), meta.column_set.len());
|
||||
assert_eq!(note.row_count(), meta.row_count as usize);
|
||||
|
|
|
@ -151,7 +151,9 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::{
|
||||
persist::completion_observer::mock::MockCompletionObserver,
|
||||
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID},
|
||||
test_util::{
|
||||
ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID,
|
||||
},
|
||||
};
|
||||
use data_types::{
|
||||
sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, ParquetFileParams, Timestamp,
|
||||
|
@ -169,8 +171,7 @@ mod tests {
|
|||
let meta = ParquetFileParams {
|
||||
namespace_id: ARBITRARY_NAMESPACE_ID,
|
||||
table_id: ARBITRARY_TABLE_ID,
|
||||
partition_id: ARBITRARY_PARTITION_ID,
|
||||
partition_hash_id: None,
|
||||
partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(),
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _),
|
||||
max_time: Timestamp::new(Duration::from_secs(1_042).as_nanos() as _), // 42 seconds later
|
||||
|
|
|
@ -16,7 +16,7 @@ mod tests {
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{CompactionLevel, ParquetFile, TransitionPartitionId};
|
||||
use data_types::{CompactionLevel, ParquetFile};
|
||||
use futures::TryStreamExt;
|
||||
use iox_catalog::{
|
||||
interface::{get_schema_by_id, Catalog, SoftDeletedRows},
|
||||
|
@ -190,7 +190,7 @@ mod tests {
|
|||
// Generate a partition with data
|
||||
let partition = partition_with_write(Arc::clone(&catalog)).await;
|
||||
let table_id = partition.lock().table_id();
|
||||
let partition_id = partition.lock().partition_id();
|
||||
let partition_id = partition.lock().transition_partition_id();
|
||||
let namespace_id = partition.lock().namespace_id();
|
||||
assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(None));
|
||||
|
||||
|
@ -221,7 +221,7 @@ mod tests {
|
|||
assert_matches!(&completion_observer.calls().as_slice(), &[n] => {
|
||||
assert_eq!(n.namespace_id(), namespace_id);
|
||||
assert_eq!(n.table_id(), table_id);
|
||||
assert_eq!(n.partition_id(), partition_id);
|
||||
assert_eq!(n.partition_id(), &partition_id);
|
||||
assert_eq!(n.sequence_numbers().len(), 1);
|
||||
});
|
||||
|
||||
|
@ -243,12 +243,12 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
|
||||
.list_by_partition_not_to_delete(&partition_id)
|
||||
.await
|
||||
.expect("query for parquet files failed");
|
||||
|
||||
// Validate a single file was inserted with the expected properties.
|
||||
let (object_store_id, file_size_bytes) = assert_matches!(&*files, &[ParquetFile {
|
||||
let (object_store_id, file_size_bytes) = assert_matches!(&*files, [ParquetFile {
|
||||
namespace_id: got_namespace_id,
|
||||
table_id: got_table_id,
|
||||
partition_id: got_partition_id,
|
||||
|
@ -263,12 +263,12 @@ mod tests {
|
|||
{
|
||||
assert_eq!(created_at.get(), max_l0_created_at.get());
|
||||
|
||||
assert_eq!(got_namespace_id, namespace_id);
|
||||
assert_eq!(got_table_id, table_id);
|
||||
assert_eq!(got_partition_id, partition_id);
|
||||
assert_eq!(got_namespace_id, &namespace_id);
|
||||
assert_eq!(got_table_id, &table_id);
|
||||
assert_eq!(got_partition_id, &partition_id);
|
||||
|
||||
assert_eq!(row_count, 1);
|
||||
assert_eq!(compaction_level, CompactionLevel::Initial);
|
||||
assert_eq!(*row_count, 1);
|
||||
assert_eq!(compaction_level, &CompactionLevel::Initial);
|
||||
|
||||
(object_store_id, file_size_bytes)
|
||||
}
|
||||
|
@ -292,7 +292,7 @@ mod tests {
|
|||
}] => {
|
||||
let want_path = format!("{object_store_id}.parquet");
|
||||
assert!(location.as_ref().ends_with(&want_path));
|
||||
assert_eq!(size, file_size_bytes as usize);
|
||||
assert_eq!(size, *file_size_bytes as usize);
|
||||
}
|
||||
)
|
||||
}
|
||||
|
@ -326,8 +326,7 @@ mod tests {
|
|||
// Generate a partition with data
|
||||
let partition = partition_with_write(Arc::clone(&catalog)).await;
|
||||
let table_id = partition.lock().table_id();
|
||||
let partition_id = partition.lock().partition_id();
|
||||
let transition_partition_id = partition.lock().transition_partition_id();
|
||||
let partition_id = partition.lock().transition_partition_id();
|
||||
let namespace_id = partition.lock().namespace_id();
|
||||
assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(None));
|
||||
|
||||
|
@ -344,7 +343,7 @@ mod tests {
|
|||
.await
|
||||
.partitions()
|
||||
.cas_sort_key(
|
||||
&transition_partition_id,
|
||||
&partition_id,
|
||||
None,
|
||||
&["bananas", "are", "good", "for", "you"],
|
||||
)
|
||||
|
@ -367,7 +366,7 @@ mod tests {
|
|||
assert_matches!(&completion_observer.calls().as_slice(), &[n] => {
|
||||
assert_eq!(n.namespace_id(), namespace_id);
|
||||
assert_eq!(n.table_id(), table_id);
|
||||
assert_eq!(n.partition_id(), partition_id);
|
||||
assert_eq!(n.partition_id(), &partition_id);
|
||||
assert_eq!(n.sequence_numbers().len(), 1);
|
||||
});
|
||||
|
||||
|
@ -392,12 +391,12 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
|
||||
.list_by_partition_not_to_delete(&partition_id)
|
||||
.await
|
||||
.expect("query for parquet files failed");
|
||||
|
||||
// Validate a single file was inserted with the expected properties.
|
||||
let (object_store_id, file_size_bytes) = assert_matches!(&*files, &[ParquetFile {
|
||||
let (object_store_id, file_size_bytes) = assert_matches!(&*files, [ParquetFile {
|
||||
namespace_id: got_namespace_id,
|
||||
table_id: got_table_id,
|
||||
partition_id: got_partition_id,
|
||||
|
@ -412,12 +411,12 @@ mod tests {
|
|||
{
|
||||
assert_eq!(created_at.get(), max_l0_created_at.get());
|
||||
|
||||
assert_eq!(got_namespace_id, namespace_id);
|
||||
assert_eq!(got_table_id, table_id);
|
||||
assert_eq!(got_partition_id, partition_id);
|
||||
assert_eq!(got_namespace_id, &namespace_id);
|
||||
assert_eq!(got_table_id, &table_id);
|
||||
assert_eq!(got_partition_id, &partition_id);
|
||||
|
||||
assert_eq!(row_count, 1);
|
||||
assert_eq!(compaction_level, CompactionLevel::Initial);
|
||||
assert_eq!(*row_count, 1);
|
||||
assert_eq!(compaction_level, &CompactionLevel::Initial);
|
||||
|
||||
(object_store_id, file_size_bytes)
|
||||
}
|
||||
|
@ -438,18 +437,14 @@ mod tests {
|
|||
assert_eq!(files.len(), 2, "expected two uploaded files");
|
||||
|
||||
// Ensure the catalog record points at a valid file in object storage.
|
||||
let want_path = ParquetFilePath::new(
|
||||
namespace_id,
|
||||
table_id,
|
||||
&transition_partition_id,
|
||||
object_store_id,
|
||||
)
|
||||
.object_store_path();
|
||||
let want_path =
|
||||
ParquetFilePath::new(namespace_id, table_id, &partition_id, *object_store_id)
|
||||
.object_store_path();
|
||||
let file = files
|
||||
.into_iter()
|
||||
.find(|f| f.location == want_path)
|
||||
.expect("did not find final file in object storage");
|
||||
|
||||
assert_eq!(file.size, file_size_bytes as usize);
|
||||
assert_eq!(file.size, *file_size_bytes as usize);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,7 +55,8 @@ pub(crate) mod mock {
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use data_types::{
|
||||
ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp,
|
||||
ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionHashId, PartitionKey,
|
||||
TableId, Timestamp, TransitionPartitionId,
|
||||
};
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::task::JoinHandle;
|
||||
|
@ -155,13 +156,16 @@ pub(crate) mod mock {
|
|||
let wait_ms: u64 = rand::random::<u64>() % 100;
|
||||
tokio::time::sleep(Duration::from_millis(wait_ms)).await;
|
||||
let sequence_numbers = partition.lock().mark_persisted(data);
|
||||
let table_id = TableId::new(2);
|
||||
let partition_hash_id =
|
||||
PartitionHashId::new(table_id, &PartitionKey::from("arbitrary"));
|
||||
let partition_id = TransitionPartitionId::Deterministic(partition_hash_id);
|
||||
completion_observer
|
||||
.persist_complete(Arc::new(CompletedPersist::new(
|
||||
ParquetFileParams {
|
||||
namespace_id: NamespaceId::new(1),
|
||||
table_id: TableId::new(2),
|
||||
partition_id: PartitionId::new(3),
|
||||
partition_hash_id: None,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(42),
|
||||
max_time: Timestamp::new(42),
|
||||
|
|
|
@ -394,8 +394,7 @@ where
|
|||
ParquetFileParams {
|
||||
namespace_id: NamespaceId::new(1),
|
||||
table_id: TableId::new(2),
|
||||
partition_id: PartitionId::new(3),
|
||||
partition_hash_id: None,
|
||||
partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(),
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(42),
|
||||
max_time: Timestamp::new(42),
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
DROP TRIGGER IF EXISTS update_partition ON parquet_file;
|
||||
|
||||
ALTER TABLE parquet_file
|
||||
ALTER COLUMN partition_id
|
||||
DROP NOT NULL;
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_partition_on_new_file_at()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE PLPGSQL
|
||||
AS $$
|
||||
BEGIN
|
||||
UPDATE partition
|
||||
SET new_file_at = NEW.created_at
|
||||
WHERE (NEW.partition_id IS NULL OR id = NEW.partition_id)
|
||||
AND (NEW.partition_hash_id IS NULL OR hash_id = NEW.partition_hash_id);
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER update_partition
|
||||
AFTER INSERT ON parquet_file
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE update_partition_on_new_file_at();
|
|
@ -0,0 +1,98 @@
|
|||
CREATE TABLE parquet_file_temp
|
||||
AS SELECT * FROM parquet_file;
|
||||
|
||||
DROP TABLE parquet_file;
|
||||
|
||||
CREATE TABLE parquet_file
|
||||
(
|
||||
id INTEGER
|
||||
constraint parquet_file_pkey
|
||||
primary key autoincrement,
|
||||
shard_id numeric not null
|
||||
constraint parquet_file_sequencer_id_fkey
|
||||
references shard,
|
||||
table_id numeric not null
|
||||
references table_name,
|
||||
partition_id numeric
|
||||
references partition,
|
||||
partition_hash_id bytea
|
||||
references partition (hash_id),
|
||||
|
||||
object_store_id uuid not null
|
||||
constraint parquet_location_unique
|
||||
unique,
|
||||
max_sequence_number numeric,
|
||||
min_time numeric,
|
||||
max_time numeric,
|
||||
to_delete numeric,
|
||||
row_count numeric default 0 not null,
|
||||
file_size_bytes numeric default 0 not null,
|
||||
compaction_level smallint default 0 not null,
|
||||
created_at numeric,
|
||||
namespace_id numeric not null
|
||||
references namespace
|
||||
on delete cascade,
|
||||
column_set numeric[] not null,
|
||||
max_l0_created_at numeric default 0 not null
|
||||
);
|
||||
|
||||
create index if not exists parquet_file_deleted_at_idx
|
||||
on parquet_file (to_delete);
|
||||
|
||||
create index if not exists parquet_file_partition_idx
|
||||
on parquet_file (partition_id);
|
||||
|
||||
create index if not exists parquet_file_table_idx
|
||||
on parquet_file (table_id);
|
||||
|
||||
create index if not exists parquet_file_shard_compaction_delete_idx
|
||||
on parquet_file (shard_id, compaction_level, to_delete);
|
||||
|
||||
create index if not exists parquet_file_shard_compaction_delete_created_idx
|
||||
on parquet_file (shard_id, compaction_level, to_delete, created_at);
|
||||
|
||||
create index if not exists parquet_file_partition_created_idx
|
||||
on parquet_file (partition_id, created_at);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS parquet_file_partition_hash_id_idx
|
||||
ON parquet_file (partition_hash_id)
|
||||
WHERE partition_hash_id IS NOT NULL;
|
||||
|
||||
create trigger if not exists update_partition
|
||||
after insert
|
||||
on parquet_file
|
||||
for each row
|
||||
begin
|
||||
UPDATE partition
|
||||
SET new_file_at = NEW.created_at
|
||||
WHERE (NEW.partition_id IS NULL OR id = NEW.partition_id)
|
||||
AND (NEW.partition_hash_id IS NULL OR hash_id = NEW.partition_hash_id);
|
||||
end;
|
||||
|
||||
create trigger if not exists update_billing
|
||||
after insert
|
||||
on parquet_file
|
||||
for each row
|
||||
begin
|
||||
INSERT INTO billing_summary (namespace_id, total_file_size_bytes)
|
||||
VALUES (NEW.namespace_id, NEW.file_size_bytes)
|
||||
ON CONFLICT (namespace_id) DO UPDATE
|
||||
SET total_file_size_bytes = billing_summary.total_file_size_bytes + NEW.file_size_bytes
|
||||
WHERE billing_summary.namespace_id = NEW.namespace_id;
|
||||
end;
|
||||
|
||||
create trigger if not exists decrement_summary
|
||||
after update
|
||||
on parquet_file
|
||||
for each row
|
||||
when OLD.to_delete IS NULL AND NEW.to_delete IS NOT NULL
|
||||
begin
|
||||
UPDATE billing_summary
|
||||
SET total_file_size_bytes = billing_summary.total_file_size_bytes - OLD.file_size_bytes
|
||||
WHERE billing_summary.namespace_id = OLD.namespace_id;
|
||||
end;
|
||||
|
||||
INSERT INTO parquet_file
|
||||
SELECT * FROM parquet_file_temp;
|
||||
|
||||
DROP TABLE parquet_file_temp;
|
|
@ -1865,7 +1865,7 @@ pub(crate) mod test_helpers {
|
|||
|
||||
let other_params = ParquetFileParams {
|
||||
table_id: other_partition.table_id,
|
||||
partition_id: other_partition.id,
|
||||
partition_id: other_partition.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_time: Timestamp::new(50),
|
||||
max_time: Timestamp::new(60),
|
||||
|
@ -1978,7 +1978,7 @@ pub(crate) mod test_helpers {
|
|||
|
||||
let f1_params = ParquetFileParams {
|
||||
table_id: partition2.table_id,
|
||||
partition_id: partition2.id,
|
||||
partition_id: partition2.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(10),
|
||||
|
@ -2449,7 +2449,7 @@ pub(crate) mod test_helpers {
|
|||
let l0_five_hour_ago_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
created_at: time_five_hour_ago,
|
||||
partition_id: partition2.id,
|
||||
partition_id: partition2.transition_partition_id(),
|
||||
..parquet_file_params.clone()
|
||||
};
|
||||
repos
|
||||
|
@ -2492,7 +2492,7 @@ pub(crate) mod test_helpers {
|
|||
let l1_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
created_at: time_now,
|
||||
partition_id: partition2.id,
|
||||
partition_id: partition2.transition_partition_id(),
|
||||
compaction_level: CompactionLevel::FileNonOverlapped,
|
||||
..parquet_file_params.clone()
|
||||
};
|
||||
|
@ -2578,7 +2578,7 @@ pub(crate) mod test_helpers {
|
|||
let l2_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
created_at: time_now,
|
||||
partition_id: partition3.id,
|
||||
partition_id: partition3.transition_partition_id(),
|
||||
compaction_level: CompactionLevel::Final,
|
||||
..parquet_file_params.clone()
|
||||
};
|
||||
|
@ -2619,7 +2619,7 @@ pub(crate) mod test_helpers {
|
|||
let l0_one_hour_ago_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
created_at: time_one_hour_ago,
|
||||
partition_id: partition3.id,
|
||||
partition_id: partition3.transition_partition_id(),
|
||||
..parquet_file_params.clone()
|
||||
};
|
||||
repos
|
||||
|
@ -2720,8 +2720,7 @@ pub(crate) mod test_helpers {
|
|||
level1_file.compaction_level = CompactionLevel::FileNonOverlapped;
|
||||
|
||||
let other_partition_params = ParquetFileParams {
|
||||
partition_id: partition2.id,
|
||||
partition_hash_id: partition2.hash_id().cloned(),
|
||||
partition_id: partition2.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
..parquet_file_params.clone()
|
||||
};
|
||||
|
@ -2744,12 +2743,20 @@ pub(crate) mod test_helpers {
|
|||
expected_ids.sort();
|
||||
assert_eq!(file_ids, expected_ids);
|
||||
|
||||
// remove namespace to avoid it from affecting later tests
|
||||
repos
|
||||
.namespaces()
|
||||
.soft_delete("namespace_parquet_file_test_list_by_partiton_not_to_delete")
|
||||
// Using the catalog partition ID should return the same files, even if the Parquet file
|
||||
// records don't have the partition ID on them (which is the default now)
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition.id))
|
||||
.await
|
||||
.expect("delete namespace should succeed");
|
||||
.unwrap();
|
||||
assert_eq!(files.len(), 2);
|
||||
|
||||
let mut file_ids: Vec<_> = files.into_iter().map(|f| f.id).collect();
|
||||
file_ids.sort();
|
||||
let mut expected_ids = vec![parquet_file.id, level1_file.id];
|
||||
expected_ids.sort();
|
||||
assert_eq!(file_ids, expected_ids);
|
||||
}
|
||||
|
||||
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
|
||||
|
|
|
@ -396,8 +396,7 @@ pub mod test_helpers {
|
|||
ParquetFileParams {
|
||||
namespace_id: namespace.id,
|
||||
table_id: table.id,
|
||||
partition_id: partition.id,
|
||||
partition_hash_id: partition.hash_id().cloned(),
|
||||
partition_id: partition.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(10),
|
||||
|
|
|
@ -887,14 +887,28 @@ impl ParquetFileRepo for MemTxn {
|
|||
) -> Result<Vec<ParquetFile>> {
|
||||
let stage = self.stage();
|
||||
|
||||
let partition = stage
|
||||
.partitions
|
||||
.iter()
|
||||
.find(|p| match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => p
|
||||
.hash_id()
|
||||
.map(|p_hash_id| p_hash_id == hash_id)
|
||||
.unwrap_or(false),
|
||||
TransitionPartitionId::Deprecated(id) => id == &p.id,
|
||||
})
|
||||
.unwrap()
|
||||
.clone();
|
||||
|
||||
Ok(stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => {
|
||||
f.partition_hash_id.as_ref().map_or(false, |h| h == hash_id)
|
||||
}
|
||||
TransitionPartitionId::Deprecated(id) => f.partition_id == *id,
|
||||
.filter(|f| match &f.partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => partition
|
||||
.hash_id()
|
||||
.map(|p_hash_id| p_hash_id == hash_id)
|
||||
.unwrap_or(false),
|
||||
TransitionPartitionId::Deprecated(id) => id == &partition.id,
|
||||
})
|
||||
.filter(|f| f.to_delete.is_none())
|
||||
.cloned()
|
||||
|
@ -996,17 +1010,15 @@ async fn create_parquet_file(
|
|||
ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
|
||||
);
|
||||
let created_at = parquet_file.created_at;
|
||||
let partition_id = parquet_file.partition_id;
|
||||
let partition_id = parquet_file.partition_id.clone();
|
||||
stage.parquet_files.push(parquet_file);
|
||||
|
||||
// Update the new_file_at field its partition to the time of created_at
|
||||
let partition = stage
|
||||
.partitions
|
||||
.iter_mut()
|
||||
.find(|p| p.id == partition_id)
|
||||
.ok_or(Error::PartitionNotFound {
|
||||
id: TransitionPartitionId::Deprecated(partition_id),
|
||||
})?;
|
||||
.find(|p| p.transition_partition_id() == partition_id)
|
||||
.ok_or(Error::PartitionNotFound { id: partition_id })?;
|
||||
partition.new_file_at = Some(created_at);
|
||||
|
||||
Ok(stage.parquet_files.last().unwrap().clone())
|
||||
|
|
|
@ -1627,22 +1627,26 @@ RETURNING id;
|
|||
let query = match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
|
||||
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
|
||||
max_l0_created_at
|
||||
SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id,
|
||||
object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count,
|
||||
compaction_level, created_at, column_set, max_l0_created_at
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.partition_hash_id = $1
|
||||
INNER JOIN partition
|
||||
ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id
|
||||
WHERE partition.hash_id = $1
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
.bind(hash_id), // $1
|
||||
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
|
||||
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
|
||||
max_l0_created_at
|
||||
SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id,
|
||||
object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count,
|
||||
compaction_level, created_at, column_set, max_l0_created_at
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.partition_id = $1
|
||||
INNER JOIN partition
|
||||
ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id
|
||||
WHERE partition.id = $1
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
|
@ -1754,7 +1758,6 @@ where
|
|||
namespace_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
partition_hash_id,
|
||||
object_store_id,
|
||||
min_time,
|
||||
max_time,
|
||||
|
@ -1766,6 +1769,11 @@ where
|
|||
max_l0_created_at,
|
||||
} = parquet_file_params;
|
||||
|
||||
let (partition_id, partition_hash_id) = match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => (None, Some(hash_id)),
|
||||
TransitionPartitionId::Deprecated(id) => (Some(id), None),
|
||||
};
|
||||
|
||||
let partition_hash_id_ref = &partition_hash_id.as_ref();
|
||||
let query = sqlx::query_scalar::<_, ParquetFileId>(
|
||||
r#"
|
||||
|
@ -2203,7 +2211,10 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
|||
.create(parquet_file_params)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(parquet_file.partition_hash_id.is_none());
|
||||
assert_matches!(
|
||||
parquet_file.partition_id,
|
||||
TransitionPartitionId::Deprecated(_)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1221,8 +1221,8 @@ struct ParquetFilePod {
|
|||
id: ParquetFileId,
|
||||
namespace_id: NamespaceId,
|
||||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
partition_hash_id: Option<PartitionHashId>,
|
||||
#[sqlx(flatten)]
|
||||
partition_id: TransitionPartitionId,
|
||||
object_store_id: Uuid,
|
||||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
|
@ -1242,7 +1242,6 @@ impl From<ParquetFilePod> for ParquetFile {
|
|||
namespace_id: value.namespace_id,
|
||||
table_id: value.table_id,
|
||||
partition_id: value.partition_id,
|
||||
partition_hash_id: value.partition_hash_id,
|
||||
object_store_id: value.object_store_id,
|
||||
min_time: value.min_time,
|
||||
max_time: value.max_time,
|
||||
|
@ -1395,22 +1394,26 @@ RETURNING id;
|
|||
let query = match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFilePod>(
|
||||
r#"
|
||||
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
|
||||
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
|
||||
max_l0_created_at
|
||||
SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id,
|
||||
object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count,
|
||||
compaction_level, created_at, column_set, max_l0_created_at
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.partition_hash_id = $1
|
||||
INNER JOIN partition
|
||||
ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id
|
||||
WHERE partition.hash_id = $1
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
.bind(hash_id), // $1
|
||||
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFilePod>(
|
||||
r#"
|
||||
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
|
||||
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
|
||||
max_l0_created_at
|
||||
SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id,
|
||||
object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count,
|
||||
compaction_level, created_at, column_set, max_l0_created_at
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.partition_id = $1
|
||||
INNER JOIN partition
|
||||
ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id
|
||||
WHERE partition.id = $1
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
|
@ -1533,7 +1536,6 @@ where
|
|||
namespace_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
partition_hash_id,
|
||||
object_store_id,
|
||||
min_time,
|
||||
max_time,
|
||||
|
@ -1545,7 +1547,10 @@ where
|
|||
max_l0_created_at,
|
||||
} = parquet_file_params;
|
||||
|
||||
let partition_hash_id_ref = &partition_hash_id.as_ref();
|
||||
let (partition_id, partition_hash_id) = match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => (None, Some(hash_id)),
|
||||
TransitionPartitionId::Deprecated(id) => (Some(id), None),
|
||||
};
|
||||
let res = sqlx::query_as::<_, ParquetFilePod>(
|
||||
r#"
|
||||
INSERT INTO parquet_file (
|
||||
|
@ -1562,7 +1567,7 @@ RETURNING
|
|||
.bind(TRANSITION_SHARD_ID) // $1
|
||||
.bind(table_id) // $2
|
||||
.bind(partition_id) // $3
|
||||
.bind(partition_hash_id_ref) // $4
|
||||
.bind(partition_hash_id.as_ref()) // $4
|
||||
.bind(object_store_id) // $5
|
||||
.bind(min_time) // $6
|
||||
.bind(max_time) // $7
|
||||
|
@ -1811,7 +1816,10 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
|||
.create(parquet_file_params)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(parquet_file.partition_hash_id.is_none());
|
||||
assert_matches!(
|
||||
parquet_file.partition_id,
|
||||
TransitionPartitionId::Deprecated(_)
|
||||
);
|
||||
}
|
||||
|
||||
macro_rules! test_column_create_or_get_many_unchecked {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use data_types::{
|
||||
ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, Partition,
|
||||
PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
|
||||
TransitionPartitionId,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -20,8 +21,7 @@ impl ParquetFileBuilder {
|
|||
id: ParquetFileId::new(id),
|
||||
namespace_id: NamespaceId::new(0),
|
||||
table_id,
|
||||
partition_id: PartitionId::new(0),
|
||||
partition_hash_id: Some(PartitionHashId::new(
|
||||
partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new(
|
||||
table_id,
|
||||
&PartitionKey::from("arbitrary"),
|
||||
)),
|
||||
|
@ -39,11 +39,11 @@ impl ParquetFileBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
/// Set the partition id
|
||||
pub fn with_partition(self, id: i64) -> Self {
|
||||
/// Set the partition identifier
|
||||
pub fn with_partition(self, partition_id: TransitionPartitionId) -> Self {
|
||||
Self {
|
||||
file: ParquetFile {
|
||||
partition_id: PartitionId::new(id),
|
||||
partition_id,
|
||||
..self.file
|
||||
},
|
||||
}
|
||||
|
|
|
@ -602,8 +602,7 @@ impl TestPartition {
|
|||
let parquet_file_params = ParquetFileParams {
|
||||
namespace_id: self.namespace.namespace.id,
|
||||
table_id: self.table.table.id,
|
||||
partition_id: self.partition.id,
|
||||
partition_hash_id: self.partition.hash_id().cloned(),
|
||||
partition_id: self.partition.transition_partition_id(),
|
||||
object_store_id: object_store_id.unwrap_or_else(Uuid::new_v4),
|
||||
min_time: Timestamp::new(min_time),
|
||||
max_time: Timestamp::new(max_time),
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
// Workaround for "unused crate" lint false positives.
|
||||
use workspace_hack as _;
|
||||
|
||||
use data_types::{PartitionHashId, PartitionKey, TableId, TransitionPartitionId};
|
||||
|
||||
mod catalog;
|
||||
pub use catalog::{
|
||||
TestCatalog, TestNamespace, TestParquetFile, TestParquetFileBuilder, TestPartition, TestTable,
|
||||
|
@ -24,3 +26,14 @@ pub use catalog::{
|
|||
|
||||
mod builders;
|
||||
pub use builders::{ParquetFileBuilder, PartitionBuilder, SkippedCompactionBuilder, TableBuilder};
|
||||
|
||||
/// Create a partition identifier from an int (which gets used as the table ID) and a partition key
|
||||
/// with the string "arbitrary". Most useful in cases where there isn't any actual catalog
|
||||
/// interaction (that is, in mocks) and when the important property of the partition identifiers is
|
||||
/// that they're either the same or different than other partition identifiers.
|
||||
pub fn partition_identifier(table_id: i64) -> TransitionPartitionId {
|
||||
TransitionPartitionId::Deterministic(PartitionHashId::new(
|
||||
TableId::new(table_id),
|
||||
&PartitionKey::from("arbitrary"),
|
||||
))
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ impl From<&ParquetFile> for ParquetFilePath {
|
|||
Self {
|
||||
namespace_id: f.namespace_id,
|
||||
table_id: f.table_id,
|
||||
partition_id: f.transition_partition_id(),
|
||||
partition_id: f.partition_id.clone(),
|
||||
object_store_id: f.object_store_id,
|
||||
}
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ impl From<&ParquetFileParams> for ParquetFilePath {
|
|||
Self {
|
||||
namespace_id: f.namespace_id,
|
||||
table_id: f.table_id,
|
||||
partition_id: f.transition_partition_id(),
|
||||
partition_id: f.partition_id.clone(),
|
||||
object_store_id: f.object_store_id,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ use bytes::Bytes;
|
|||
use data_types::{
|
||||
ColumnId, ColumnSet, ColumnSummary, CompactionLevel, InfluxDbType, NamespaceId,
|
||||
ParquetFileParams, PartitionHashId, PartitionId, PartitionKey, StatValues, Statistics, TableId,
|
||||
Timestamp,
|
||||
Timestamp, TransitionPartitionId,
|
||||
};
|
||||
use generated_types::influxdata::iox::ingester::v1 as proto;
|
||||
use iox_time::Time;
|
||||
|
@ -443,6 +443,7 @@ impl IoxMetadata {
|
|||
where
|
||||
F: for<'a> Fn(&'a str) -> ColumnId,
|
||||
{
|
||||
let partition_id = TransitionPartitionId::from((partition_id, partition_hash_id.as_ref()));
|
||||
let decoded = metadata.decode().expect("invalid IOx metadata");
|
||||
trace!(
|
||||
?partition_id,
|
||||
|
@ -487,7 +488,6 @@ impl IoxMetadata {
|
|||
namespace_id: self.namespace_id,
|
||||
table_id: self.table_id,
|
||||
partition_id,
|
||||
partition_hash_id,
|
||||
object_store_id: self.object_store_id,
|
||||
min_time,
|
||||
max_time,
|
||||
|
|
|
@ -361,8 +361,8 @@ mod tests {
|
|||
partition.create_parquet_file(builder).await;
|
||||
let table_id = table.table.id;
|
||||
|
||||
let single_file_size = 240;
|
||||
let two_file_size = 448;
|
||||
let single_file_size = 256;
|
||||
let two_file_size = 480;
|
||||
assert!(single_file_size < two_file_size);
|
||||
|
||||
let cache = make_cache(&catalog);
|
||||
|
|
|
@ -17,7 +17,7 @@ use cache_system::{
|
|||
};
|
||||
use data_types::{
|
||||
partition_template::{build_column_values, ColumnValue},
|
||||
ColumnId, Partition, PartitionId, TransitionPartitionId,
|
||||
ColumnId, Partition, TransitionPartitionId,
|
||||
};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use iox_catalog::{interface::Catalog, partition_lookup_batch};
|
||||
|
@ -38,7 +38,7 @@ const CACHE_ID: &str = "partition";
|
|||
|
||||
type CacheT = Box<
|
||||
dyn Cache<
|
||||
K = PartitionId,
|
||||
K = TransitionPartitionId,
|
||||
V = Option<CachedPartition>,
|
||||
GetExtra = (Arc<CachedTable>, Option<Span>),
|
||||
PeekExtra = ((), Option<Span>),
|
||||
|
@ -49,7 +49,7 @@ type CacheT = Box<
|
|||
#[derive(Debug)]
|
||||
pub struct PartitionCache {
|
||||
cache: CacheT,
|
||||
remove_if_handle: RemoveIfHandle<PartitionId, Option<CachedPartition>>,
|
||||
remove_if_handle: RemoveIfHandle<TransitionPartitionId, Option<CachedPartition>>,
|
||||
flusher: Arc<dyn BatchLoaderFlusher>,
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,8 @@ impl PartitionCache {
|
|||
testing: bool,
|
||||
) -> Self {
|
||||
let loader = FunctionLoader::new(
|
||||
move |partition_ids: Vec<PartitionId>, cached_tables: Vec<Arc<CachedTable>>| {
|
||||
move |partition_ids: Vec<TransitionPartitionId>,
|
||||
cached_tables: Vec<Arc<CachedTable>>| {
|
||||
// sanity checks
|
||||
assert_eq!(partition_ids.len(), cached_tables.len());
|
||||
|
||||
|
@ -75,23 +76,20 @@ impl PartitionCache {
|
|||
// prepare output buffer
|
||||
let mut out = (0..partition_ids.len()).map(|_| None).collect::<Vec<_>>();
|
||||
let mut out_map =
|
||||
HashMap::<PartitionId, usize>::with_capacity(partition_ids.len());
|
||||
HashMap::<TransitionPartitionId, usize>::with_capacity(partition_ids.len());
|
||||
for (idx, id) in partition_ids.iter().enumerate() {
|
||||
match out_map.entry(*id) {
|
||||
Entry::Occupied(_) => unreachable!("cache system requested same partition from loader concurrently, this should have been prevented by the CacheDriver"),
|
||||
match out_map.entry(id.clone()) {
|
||||
Entry::Occupied(_) => unreachable!(
|
||||
"cache system requested same partition from loader concurrently, \
|
||||
this should have been prevented by the CacheDriver"
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// build `&[&TransitionPartitionId]` for batch catalog request
|
||||
let ids = partition_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.map(TransitionPartitionId::Deprecated)
|
||||
.collect::<Vec<_>>();
|
||||
let ids = ids.iter().collect::<Vec<_>>();
|
||||
let ids: Vec<&TransitionPartitionId> = partition_ids.iter().collect();
|
||||
|
||||
// fetch catalog data
|
||||
let partitions = Backoff::new(&backoff_config)
|
||||
|
@ -104,7 +102,7 @@ impl PartitionCache {
|
|||
|
||||
// build output
|
||||
for p in partitions {
|
||||
let idx = out_map[&p.id];
|
||||
let idx = out_map[&p.transition_partition_id()];
|
||||
let cached_table = &cached_tables[idx];
|
||||
let p = CachedPartition::new(p, cached_table);
|
||||
out[idx] = Some(p);
|
||||
|
@ -180,7 +178,7 @@ impl PartitionCache {
|
|||
|
||||
self.remove_if_handle.remove_if_and_get(
|
||||
&self.cache,
|
||||
partition_id,
|
||||
partition_id.clone(),
|
||||
move |cached_partition| {
|
||||
let invalidates = if let Some(sort_key) =
|
||||
&cached_partition.and_then(|p| p.sort_key)
|
||||
|
@ -195,7 +193,7 @@ impl PartitionCache {
|
|||
|
||||
if invalidates {
|
||||
debug!(
|
||||
partition_id = partition_id.get(),
|
||||
partition_id = %partition_id,
|
||||
"invalidate partition cache",
|
||||
);
|
||||
}
|
||||
|
@ -217,13 +215,13 @@ impl PartitionCache {
|
|||
/// Request for [`PartitionCache::get`].
|
||||
#[derive(Debug)]
|
||||
pub struct PartitionRequest {
|
||||
pub partition_id: PartitionId,
|
||||
pub partition_id: TransitionPartitionId,
|
||||
pub sort_key_should_cover: Vec<ColumnId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct CachedPartition {
|
||||
pub id: PartitionId,
|
||||
pub id: TransitionPartitionId,
|
||||
pub sort_key: Option<Arc<PartitionSortKey>>,
|
||||
pub column_ranges: ColumnRanges,
|
||||
}
|
||||
|
@ -299,7 +297,7 @@ impl CachedPartition {
|
|||
column_ranges.shrink_to_fit();
|
||||
|
||||
Self {
|
||||
id: partition.id,
|
||||
id: partition.transition_partition_id(),
|
||||
sort_key,
|
||||
column_ranges: Arc::new(column_ranges),
|
||||
}
|
||||
|
@ -368,7 +366,10 @@ mod tests {
|
|||
ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType};
|
||||
use data_types::{
|
||||
partition_template::TablePartitionTemplateOverride, ColumnType, PartitionHashId,
|
||||
PartitionId, PartitionKey, TableId,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use generated_types::influxdata::iox::partition_template::v1::{
|
||||
template_part::Part, PartitionTemplate, TemplatePart,
|
||||
|
@ -419,8 +420,11 @@ mod tests {
|
|||
true,
|
||||
);
|
||||
|
||||
let p1_id = p1.transition_partition_id();
|
||||
let p2_id = p2.transition_partition_id();
|
||||
|
||||
let sort_key1a = cache
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
|
||||
.get_one(Arc::clone(&cached_table), &p1_id, &Vec::new(), None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -434,24 +438,24 @@ mod tests {
|
|||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
let sort_key2 = cache
|
||||
.get_one(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
|
||||
.get_one(Arc::clone(&cached_table), &p2_id, &Vec::new(), None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key2, None);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
let sort_key1b = cache
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
|
||||
.get_one(Arc::clone(&cached_table), &p1_id, &Vec::new(), None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -461,16 +465,37 @@ mod tests {
|
|||
));
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
// non-existing partition
|
||||
for _ in 0..2 {
|
||||
// Non-existing partition identified by partition hash ID
|
||||
let res = cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
PartitionId::new(i64::MAX),
|
||||
&TransitionPartitionId::Deterministic(PartitionHashId::new(
|
||||
TableId::new(i64::MAX),
|
||||
&PartitionKey::from("bananas_not_found"),
|
||||
)),
|
||||
&[],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(res, None);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_hash_id_batch",
|
||||
3,
|
||||
);
|
||||
|
||||
// Non-existing partition identified by deprecated catalog IDs; this part can be
|
||||
// removed when partition identification is fully transitioned to partition hash IDs
|
||||
let res = cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
&TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)),
|
||||
&Vec::new(),
|
||||
None,
|
||||
)
|
||||
|
@ -479,7 +504,7 @@ mod tests {
|
|||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
3,
|
||||
1,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -548,8 +573,14 @@ mod tests {
|
|||
true,
|
||||
);
|
||||
|
||||
let p1_id = p1.transition_partition_id();
|
||||
let p2_id = p2.transition_partition_id();
|
||||
let p3_id = p3.transition_partition_id();
|
||||
let p4_id = p4.transition_partition_id();
|
||||
let p5_id = p5.transition_partition_id();
|
||||
|
||||
let ranges1a = cache
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p1_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -578,12 +609,12 @@ mod tests {
|
|||
));
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
let ranges2 = cache
|
||||
.get_one(Arc::clone(&cached_table), p2.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p2_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -599,12 +630,12 @@ mod tests {
|
|||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
let ranges3 = cache
|
||||
.get_one(Arc::clone(&cached_table), p3.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p3_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -629,12 +660,12 @@ mod tests {
|
|||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
3,
|
||||
);
|
||||
|
||||
let ranges4 = cache
|
||||
.get_one(Arc::clone(&cached_table), p4.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p4_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -659,12 +690,12 @@ mod tests {
|
|||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
4,
|
||||
);
|
||||
|
||||
let ranges5 = cache
|
||||
.get_one(Arc::clone(&cached_table), p5.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p5_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -680,28 +711,48 @@ mod tests {
|
|||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
5,
|
||||
);
|
||||
|
||||
let ranges1b = cache
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p1_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
assert!(Arc::ptr_eq(&ranges1a, &ranges1b));
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
5,
|
||||
);
|
||||
|
||||
// non-existing partition
|
||||
for _ in 0..2 {
|
||||
// Non-existing partition identified by partition hash ID
|
||||
let res = cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
PartitionId::new(i64::MAX),
|
||||
&TransitionPartitionId::Deterministic(PartitionHashId::new(
|
||||
TableId::new(i64::MAX),
|
||||
&PartitionKey::from("bananas_not_found"),
|
||||
)),
|
||||
&[],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(res, None);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_hash_id_batch",
|
||||
6,
|
||||
);
|
||||
|
||||
// Non-existing partition identified by deprecated catalog IDs; this part can be
|
||||
// removed when partition identification is fully transitioned to partition hash IDs
|
||||
let res = cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
&TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)),
|
||||
&[],
|
||||
None,
|
||||
)
|
||||
|
@ -710,7 +761,7 @@ mod tests {
|
|||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
6,
|
||||
1,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -724,7 +775,7 @@ mod tests {
|
|||
let c1 = t.create_column("foo", ColumnType::Tag).await;
|
||||
let c2 = t.create_column("time", ColumnType::Time).await;
|
||||
let p = t.create_partition("k1").await;
|
||||
let p_id = p.partition.id;
|
||||
let p_id = p.partition.transition_partition_id();
|
||||
let p_sort_key = p.partition.sort_key();
|
||||
let cached_table = Arc::new(CachedTable {
|
||||
id: t.table.id,
|
||||
|
@ -751,41 +802,41 @@ mod tests {
|
|||
);
|
||||
|
||||
let sort_key = cache
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key, None,);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
// requesting nother will not expire
|
||||
assert!(p_sort_key.is_none());
|
||||
let sort_key = cache
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), &p_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key, None,);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
// but requesting something will expire
|
||||
let sort_key = cache
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
|
||||
.get_one(Arc::clone(&cached_table), &p_id, &[c1.column.id], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key, None,);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
|
@ -801,7 +852,7 @@ mod tests {
|
|||
// expire & fetch
|
||||
let p_sort_key = p.partition.sort_key();
|
||||
let sort_key = cache
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
|
||||
.get_one(Arc::clone(&cached_table), &p_id, &[c1.column.id], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -815,7 +866,7 @@ mod tests {
|
|||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
3,
|
||||
);
|
||||
|
||||
|
@ -827,7 +878,7 @@ mod tests {
|
|||
vec![c1.column.id, c2.column.id],
|
||||
] {
|
||||
let sort_key_2 = cache
|
||||
.get_one(Arc::clone(&cached_table), p_id, &should_cover, None)
|
||||
.get_one(Arc::clone(&cached_table), &p_id, &should_cover, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -837,7 +888,7 @@ mod tests {
|
|||
));
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
3,
|
||||
);
|
||||
}
|
||||
|
@ -847,7 +898,7 @@ mod tests {
|
|||
let sort_key_2 = cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
p_id,
|
||||
&p_id,
|
||||
&[c1.column.id, c3.column.id],
|
||||
None,
|
||||
)
|
||||
|
@ -861,7 +912,7 @@ mod tests {
|
|||
assert_eq!(sort_key, sort_key_2);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
4,
|
||||
);
|
||||
}
|
||||
|
@ -892,34 +943,45 @@ mod tests {
|
|||
true,
|
||||
);
|
||||
|
||||
let p1_id = p1.transition_partition_id();
|
||||
let p2_id = p2.transition_partition_id();
|
||||
|
||||
let mut res = cache
|
||||
.get(
|
||||
Arc::clone(&cached_table),
|
||||
vec![
|
||||
PartitionRequest {
|
||||
partition_id: p1.id,
|
||||
partition_id: p1_id.clone(),
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
PartitionRequest {
|
||||
partition_id: p2.id,
|
||||
partition_id: p2_id.clone(),
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
PartitionRequest {
|
||||
partition_id: p1.id,
|
||||
partition_id: p1_id.clone(),
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
// requesting non-existing partitions is fine, they just don't appear in
|
||||
// the output
|
||||
PartitionRequest {
|
||||
partition_id: TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)),
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
PartitionRequest {
|
||||
// requesting non-existing partitions is fine, they just don't appear in the output
|
||||
partition_id: PartitionId::new(i64::MAX),
|
||||
partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new(
|
||||
TableId::new(i64::MAX),
|
||||
&PartitionKey::from("bananas_not_found"),
|
||||
)),
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
res.sort_by_key(|p| p.id);
|
||||
let ids = res.iter().map(|p| p.id).collect::<Vec<_>>();
|
||||
assert_eq!(ids, vec![p1.id, p1.id, p2.id]);
|
||||
res.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
let ids = res.into_iter().map(|p| p.id).collect::<Vec<_>>();
|
||||
assert_eq!(ids, vec![p1_id.clone(), p1_id, p2_id]);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
|
@ -1008,7 +1070,7 @@ mod tests {
|
|||
c_id: ColumnId,
|
||||
|
||||
/// Partitions within that table.
|
||||
partitions: Vec<PartitionId>,
|
||||
partitions: Vec<TransitionPartitionId>,
|
||||
}
|
||||
|
||||
impl ConcurrencyTestState {
|
||||
|
@ -1032,7 +1094,7 @@ mod tests {
|
|||
t.create_partition_with_sort_key(&format!("p{i}"), &["time"])
|
||||
.await
|
||||
.partition
|
||||
.id
|
||||
.transition_partition_id()
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
|
@ -1046,7 +1108,8 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
/// Perform the actual [`PartitionCache::get`] call and run some basic sanity checks on the result.
|
||||
/// Perform the actual [`PartitionCache::get`] call and run some basic sanity checks on the
|
||||
/// result.
|
||||
async fn run(self, cache: Arc<PartitionCache>) {
|
||||
let Self {
|
||||
cached_table,
|
||||
|
@ -1060,15 +1123,15 @@ mod tests {
|
|||
partitions
|
||||
.iter()
|
||||
.map(|p| PartitionRequest {
|
||||
partition_id: *p,
|
||||
partition_id: p.clone(),
|
||||
sort_key_should_cover: vec![],
|
||||
})
|
||||
.collect(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
results.sort_by_key(|p| p.id);
|
||||
let partitions_res = results.iter().map(|p| p.id).collect::<Vec<_>>();
|
||||
results.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
let partitions_res = results.iter().map(|p| p.id.clone()).collect::<Vec<_>>();
|
||||
assert_eq!(partitions, partitions_res);
|
||||
assert!(results
|
||||
.iter()
|
||||
|
@ -1086,7 +1149,7 @@ mod tests {
|
|||
async fn get_one(
|
||||
&self,
|
||||
cached_table: Arc<CachedTable>,
|
||||
partition_id: PartitionId,
|
||||
partition_id: &TransitionPartitionId,
|
||||
sort_key_should_cover: &[ColumnId],
|
||||
span: Option<Span>,
|
||||
) -> Option<CachedPartition>;
|
||||
|
@ -1097,14 +1160,14 @@ mod tests {
|
|||
async fn get_one(
|
||||
&self,
|
||||
cached_table: Arc<CachedTable>,
|
||||
partition_id: PartitionId,
|
||||
partition_id: &TransitionPartitionId,
|
||||
sort_key_should_cover: &[ColumnId],
|
||||
span: Option<Span>,
|
||||
) -> Option<CachedPartition> {
|
||||
self.get(
|
||||
cached_table,
|
||||
vec![PartitionRequest {
|
||||
partition_id,
|
||||
partition_id: partition_id.clone(),
|
||||
sort_key_should_cover: sort_key_should_cover.to_vec(),
|
||||
}],
|
||||
span,
|
||||
|
|
|
@ -859,10 +859,6 @@ impl IngesterPartition {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn partition_id(&self) -> PartitionId {
|
||||
self.partition_id
|
||||
}
|
||||
|
||||
pub(crate) fn transition_partition_id(&self) -> TransitionPartitionId {
|
||||
TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref()))
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, PartitionId, TransitionPartitionId};
|
||||
use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, TransitionPartitionId};
|
||||
use futures::StreamExt;
|
||||
use hashbrown::HashSet;
|
||||
use iox_catalog::interface::Catalog;
|
||||
|
@ -56,7 +56,7 @@ impl ChunkAdapter {
|
|||
&self,
|
||||
cached_table: Arc<CachedTable>,
|
||||
files: Arc<[Arc<ParquetFile>]>,
|
||||
cached_partitions: &HashMap<PartitionId, CachedPartition>,
|
||||
cached_partitions: &HashMap<TransitionPartitionId, CachedPartition>,
|
||||
span: Option<Span>,
|
||||
) -> Vec<QuerierParquetChunk> {
|
||||
let span_recorder = SpanRecorder::new(span);
|
||||
|
@ -170,18 +170,13 @@ impl ChunkAdapter {
|
|||
|
||||
let order = ChunkOrder::new(parquet_file.file.max_l0_created_at.get());
|
||||
|
||||
let partition_id = parquet_file.file.partition_id;
|
||||
let transition_partition_id = TransitionPartitionId::from((
|
||||
partition_id,
|
||||
parquet_file.file.partition_hash_id.as_ref(),
|
||||
));
|
||||
let partition_id = parquet_file.file.partition_id.clone();
|
||||
|
||||
let meta = Arc::new(QuerierParquetChunkMeta {
|
||||
chunk_id,
|
||||
order,
|
||||
sort_key: Some(sort_key),
|
||||
partition_id,
|
||||
transition_partition_id,
|
||||
});
|
||||
|
||||
let parquet_chunk = Arc::new(ParquetChunk::new(
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Querier Chunks
|
||||
|
||||
use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId};
|
||||
use data_types::{ChunkId, ChunkOrder, TransitionPartitionId};
|
||||
use datafusion::physical_plan::Statistics;
|
||||
use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRanges};
|
||||
use parquet_file::chunk::ParquetChunk;
|
||||
|
@ -25,10 +25,7 @@ pub struct QuerierParquetChunkMeta {
|
|||
sort_key: Option<SortKey>,
|
||||
|
||||
/// Partition ID.
|
||||
partition_id: PartitionId,
|
||||
|
||||
/// Transition partition ID.
|
||||
transition_partition_id: TransitionPartitionId,
|
||||
partition_id: TransitionPartitionId,
|
||||
}
|
||||
|
||||
impl QuerierParquetChunkMeta {
|
||||
|
@ -43,13 +40,8 @@ impl QuerierParquetChunkMeta {
|
|||
}
|
||||
|
||||
/// Partition ID.
|
||||
pub fn partition_id(&self) -> PartitionId {
|
||||
self.partition_id
|
||||
}
|
||||
|
||||
/// Partition ID.
|
||||
pub fn transition_partition_id(&self) -> &TransitionPartitionId {
|
||||
&self.transition_partition_id
|
||||
pub fn partition_id(&self) -> &TransitionPartitionId {
|
||||
&self.partition_id
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -251,7 +243,7 @@ pub mod tests {
|
|||
.get(
|
||||
Arc::clone(&self.cached_table),
|
||||
vec![PartitionRequest {
|
||||
partition_id: self.parquet_file.partition_id,
|
||||
partition_id: self.parquet_file.partition_id.clone(),
|
||||
sort_key_should_cover: vec![],
|
||||
}],
|
||||
None,
|
||||
|
@ -261,7 +253,7 @@ pub mod tests {
|
|||
.next()
|
||||
.unwrap();
|
||||
let cached_partitions =
|
||||
HashMap::from([(self.parquet_file.partition_id, cached_partition)]);
|
||||
HashMap::from([(self.parquet_file.partition_id.clone(), cached_partition)]);
|
||||
self.adapter
|
||||
.new_chunks(
|
||||
Arc::clone(&self.cached_table),
|
||||
|
|
|
@ -15,11 +15,11 @@ impl QueryChunk for QuerierParquetChunk {
|
|||
}
|
||||
|
||||
fn partition_id(&self) -> PartitionId {
|
||||
self.meta().partition_id()
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn transition_partition_id(&self) -> &TransitionPartitionId {
|
||||
self.meta().transition_partition_id()
|
||||
self.meta().partition_id()
|
||||
}
|
||||
|
||||
fn sort_key(&self) -> Option<&SortKey> {
|
||||
|
|
|
@ -8,7 +8,7 @@ use crate::{
|
|||
parquet::ChunkAdapter,
|
||||
IngesterConnection,
|
||||
};
|
||||
use data_types::{ColumnId, NamespaceId, ParquetFile, PartitionId, TableId};
|
||||
use data_types::{ColumnId, NamespaceId, ParquetFile, TableId, TransitionPartitionId};
|
||||
use datafusion::error::DataFusionError;
|
||||
use futures::join;
|
||||
use iox_query::{provider, provider::ChunkPruner, QueryChunk};
|
||||
|
@ -282,7 +282,7 @@ impl QuerierTable {
|
|||
let chunks = partitions
|
||||
.into_iter()
|
||||
.filter_map(|mut c| {
|
||||
let cached_partition = cached_partitions.get(&c.partition_id())?;
|
||||
let cached_partition = cached_partitions.get(&c.transition_partition_id())?;
|
||||
c.set_partition_column_ranges(&cached_partition.column_ranges);
|
||||
Some(c)
|
||||
})
|
||||
|
@ -322,16 +322,16 @@ impl QuerierTable {
|
|||
ingester_partitions: &[IngesterPartition],
|
||||
parquet_files: &[Arc<ParquetFile>],
|
||||
span: Option<Span>,
|
||||
) -> HashMap<PartitionId, CachedPartition> {
|
||||
) -> HashMap<TransitionPartitionId, CachedPartition> {
|
||||
let span_recorder = SpanRecorder::new(span);
|
||||
|
||||
let mut should_cover: HashMap<PartitionId, HashSet<ColumnId>> =
|
||||
let mut should_cover: HashMap<TransitionPartitionId, HashSet<ColumnId>> =
|
||||
HashMap::with_capacity(ingester_partitions.len());
|
||||
|
||||
// For ingester partitions we only need the column ranges -- which are static -- not the sort key. So it is
|
||||
// sufficient to collect the partition IDs.
|
||||
for p in ingester_partitions {
|
||||
should_cover.entry(p.partition_id()).or_default();
|
||||
should_cover.entry(p.transition_partition_id()).or_default();
|
||||
}
|
||||
|
||||
// For parquet files we must ensure that the -- potentially evolving -- sort key coveres the primary key.
|
||||
|
@ -342,7 +342,7 @@ impl QuerierTable {
|
|||
.collect::<HashSet<_>>();
|
||||
for f in parquet_files {
|
||||
should_cover
|
||||
.entry(f.partition_id)
|
||||
.entry(f.partition_id.clone())
|
||||
.or_default()
|
||||
.extend(f.column_set.iter().copied().filter(|id| pk.contains(id)));
|
||||
}
|
||||
|
@ -366,7 +366,7 @@ impl QuerierTable {
|
|||
)
|
||||
.await;
|
||||
|
||||
partitions.into_iter().map(|p| (p.id, p)).collect()
|
||||
partitions.into_iter().map(|p| (p.id.clone(), p)).collect()
|
||||
}
|
||||
|
||||
/// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks)
|
||||
|
@ -889,7 +889,7 @@ mod tests {
|
|||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 2);
|
||||
|
@ -899,7 +899,7 @@ mod tests {
|
|||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 4);
|
||||
|
@ -912,7 +912,7 @@ mod tests {
|
|||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
|
@ -922,7 +922,7 @@ mod tests {
|
|||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
1,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 6);
|
||||
|
@ -936,7 +936,7 @@ mod tests {
|
|||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
"partition_get_by_hash_id_batch",
|
||||
2,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 8);
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
// Workaround for "unused crate" lint false positives.
|
||||
use workspace_hack as _;
|
||||
|
||||
use data_types::{PartitionId, TableId, TransitionPartitionId};
|
||||
use data_types::{PartitionHashId, PartitionId, TableId, TransitionPartitionId};
|
||||
use generated_types::influxdata::iox::catalog::v1::*;
|
||||
use iox_catalog::interface::{Catalog, SoftDeletedRows};
|
||||
use observability_deps::tracing::*;
|
||||
|
@ -47,14 +47,14 @@ impl catalog_service_server::CatalogService for CatalogService {
|
|||
) -> Result<Response<GetParquetFilesByPartitionIdResponse>, Status> {
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
let req = request.into_inner();
|
||||
let partition_id = TransitionPartitionId::Deprecated(PartitionId::new(req.partition_id));
|
||||
let partition_id = to_partition_id(req.partition_identifier)?;
|
||||
|
||||
let parquet_files = repos
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(&partition_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!(error=%e, %req.partition_id, "failed to get parquet_files for partition");
|
||||
warn!(error=%e, %partition_id, "failed to get parquet_files for partition");
|
||||
Status::not_found(e.to_string())
|
||||
})?;
|
||||
|
||||
|
@ -169,13 +169,52 @@ impl catalog_service_server::CatalogService for CatalogService {
|
|||
}
|
||||
}
|
||||
|
||||
fn to_partition_identifier(partition_id: &TransitionPartitionId) -> PartitionIdentifier {
|
||||
match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => PartitionIdentifier {
|
||||
id: Some(partition_identifier::Id::HashId(
|
||||
hash_id.as_bytes().to_owned(),
|
||||
)),
|
||||
},
|
||||
TransitionPartitionId::Deprecated(id) => PartitionIdentifier {
|
||||
id: Some(partition_identifier::Id::CatalogId(id.get())),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn to_partition_id(
|
||||
partition_identifier: Option<PartitionIdentifier>,
|
||||
) -> Result<TransitionPartitionId, Status> {
|
||||
let partition_id =
|
||||
match partition_identifier
|
||||
.and_then(|pi| pi.id)
|
||||
.ok_or(Status::invalid_argument(
|
||||
"No partition identifier specified",
|
||||
))? {
|
||||
partition_identifier::Id::HashId(bytes) => TransitionPartitionId::Deterministic(
|
||||
PartitionHashId::try_from(&bytes[..]).map_err(|e| {
|
||||
Status::invalid_argument(format!(
|
||||
"Could not parse bytes as a `PartitionHashId`: {e}"
|
||||
))
|
||||
})?,
|
||||
),
|
||||
partition_identifier::Id::CatalogId(id) => {
|
||||
TransitionPartitionId::Deprecated(PartitionId::new(id))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(partition_id)
|
||||
}
|
||||
|
||||
// converts the catalog ParquetFile to protobuf
|
||||
fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile {
|
||||
let partition_identifier = to_partition_identifier(&p.partition_id);
|
||||
|
||||
ParquetFile {
|
||||
id: p.id.get(),
|
||||
namespace_id: p.namespace_id.get(),
|
||||
table_id: p.table_id.get(),
|
||||
partition_id: p.partition_id.get(),
|
||||
partition_identifier: Some(partition_identifier),
|
||||
object_store_id: p.object_store_id.to_string(),
|
||||
min_time: p.min_time.get(),
|
||||
max_time: p.max_time.get(),
|
||||
|
@ -191,8 +230,10 @@ fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile {
|
|||
|
||||
// converts the catalog Partition to protobuf
|
||||
fn to_partition(p: data_types::Partition) -> Partition {
|
||||
let identifier = to_partition_identifier(&p.transition_partition_id());
|
||||
|
||||
Partition {
|
||||
id: p.id.get(),
|
||||
identifier: Some(identifier),
|
||||
key: p.partition_key.to_string(),
|
||||
table_id: p.table_id.get(),
|
||||
array_sort_key: p.sort_key,
|
||||
|
@ -230,8 +271,7 @@ mod tests {
|
|||
let p1params = ParquetFileParams {
|
||||
namespace_id: namespace.id,
|
||||
table_id: table.id,
|
||||
partition_id: partition.id,
|
||||
partition_hash_id: partition.hash_id().cloned(),
|
||||
partition_id: partition.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(5),
|
||||
|
@ -248,13 +288,15 @@ mod tests {
|
|||
};
|
||||
p1 = repos.parquet_files().create(p1params).await.unwrap();
|
||||
p2 = repos.parquet_files().create(p2params).await.unwrap();
|
||||
partition_id = partition.id;
|
||||
partition_id = partition.transition_partition_id();
|
||||
Arc::clone(&catalog)
|
||||
};
|
||||
|
||||
let partition_identifier = to_partition_identifier(&partition_id);
|
||||
|
||||
let grpc = super::CatalogService::new(catalog);
|
||||
let request = GetParquetFilesByPartitionIdRequest {
|
||||
partition_id: partition_id.get(),
|
||||
partition_identifier: Some(partition_identifier),
|
||||
};
|
||||
|
||||
let tonic_response = grpc
|
||||
|
|
|
@ -75,7 +75,7 @@ impl object_store_service_server::ObjectStoreService for ObjectStoreService {
|
|||
let path = ParquetFilePath::new(
|
||||
parquet_file.namespace_id,
|
||||
parquet_file.table_id,
|
||||
&parquet_file.transition_partition_id(),
|
||||
&parquet_file.partition_id.clone(),
|
||||
parquet_file.object_store_id,
|
||||
);
|
||||
let path = path.object_store_path();
|
||||
|
@ -128,8 +128,7 @@ mod tests {
|
|||
let p1params = ParquetFileParams {
|
||||
namespace_id: namespace.id,
|
||||
table_id: table.id,
|
||||
partition_id: partition.id,
|
||||
partition_hash_id: partition.hash_id().cloned(),
|
||||
partition_id: partition.transition_partition_id(),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(5),
|
||||
|
@ -150,7 +149,7 @@ mod tests {
|
|||
let path = ParquetFilePath::new(
|
||||
p1.namespace_id,
|
||||
p1.table_id,
|
||||
&p1.transition_partition_id(),
|
||||
&p1.partition_id.clone(),
|
||||
p1.object_store_id,
|
||||
);
|
||||
let path = path.object_store_path();
|
||||
|
|
Loading…
Reference in New Issue