From faba90d992c6f458077e95ce13a538e1e818cfdf Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 12 May 2022 15:45:10 -0400 Subject: [PATCH] fix: Remove ChunkAddr --- compactor/src/query.rs | 9 +--- data_types/src/lib.rs | 29 ------------ ingester/src/query.rs | 7 +-- querier/src/chunk/mod.rs | 79 +++++++------------------------ querier/src/chunk/query_access.rs | 10 ++-- querier/src/ingester/mod.rs | 14 ++---- query/src/lib.rs | 6 +-- query/src/test.rs | 13 +---- 8 files changed, 30 insertions(+), 137 deletions(-) diff --git a/compactor/src/query.rs b/compactor/src/query.rs index b37fa51c99..939e9fdf47 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -1,8 +1,8 @@ //! Queryable Compactor Data use data_types::{ - ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, - Timestamp, TimestampMinMax, Tombstone, + ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, Timestamp, + TimestampMinMax, Tombstone, }; use datafusion::physical_plan::SendableRecordBatchStream; use observability_deps::tracing::trace; @@ -161,11 +161,6 @@ impl QueryChunk for QueryableParquetChunk { ChunkId::new_id_for_ng(timestamp_nano_u128) } - // This function should not be used in this context - fn addr(&self) -> ChunkAddr { - unimplemented!() - } - /// Returns the name of the table stored in this chunk fn table_name(&self) -> &str { &self.table_name diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index e4cf94cf50..9b5877e152 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -893,35 +893,6 @@ pub struct ProcessedTombstone { pub parquet_file_id: ParquetFileId, } -/// Address of the chunk within the catalog -#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] -pub struct ChunkAddr { - /// Database name - pub db_name: Arc, - - /// What table does the chunk belong to? - pub table_name: Arc, - - /// What partition does the chunk belong to? - pub partition_key: Arc, - - /// The ID of the chunk - pub chunk_id: ChunkId, -} - -impl std::fmt::Display for ChunkAddr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Chunk('{}':'{}':'{}':{})", - self.db_name, - self.table_name, - self.partition_key, - self.chunk_id.get() - ) - } -} - /// ID of a chunk. /// /// This ID is unique within a single partition. diff --git a/ingester/src/query.rs b/ingester/src/query.rs index fcf3fb1609..528d34a202 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -4,7 +4,7 @@ use crate::data::{QueryableBatch, SnapshotBatch}; use arrow::record_batch::RecordBatch; use arrow_util::util::merge_record_batches; use data_types::{ - ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, + ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, TimestampMinMax, Tombstone, }; use datafusion::{ @@ -151,11 +151,6 @@ impl QueryChunk for QueryableBatch { ChunkId::new_test(0) } - // This function should not be used in PersistingBatch context - fn addr(&self) -> ChunkAddr { - unimplemented!() - } - /// Returns the name of the table stored in this chunk fn table_name(&self) -> &str { &self.table_name diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index b38b3178c7..33c3f95ab3 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -3,8 +3,8 @@ use crate::cache::CatalogCache; use arrow::record_batch::RecordBatch; use data_types::{ - ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, - ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, TimestampMinMax, + ChunkId, ChunkOrder, DeletePredicate, ParquetFileId, ParquetFileWithMetadata, PartitionId, + SequenceNumber, SequencerId, TimestampMinMax, }; use futures::StreamExt; use iox_catalog::interface::Catalog; @@ -23,8 +23,11 @@ mod query_access; /// Immutable metadata attached to a [`QuerierChunk`]. #[derive(Debug)] pub struct ChunkMeta { - /// Chunk address. - addr: ChunkAddr, + /// The ID of the chunk + chunk_id: ChunkId, + + /// Table name + table_name: Arc, /// Chunk order. order: ChunkOrder, @@ -49,11 +52,6 @@ pub struct ChunkMeta { } impl ChunkMeta { - /// Chunk address. - pub fn addr(&self) -> &ChunkAddr { - &self.addr - } - /// Chunk order. pub fn order(&self) -> ChunkOrder { self.order @@ -234,10 +232,13 @@ impl ParquetChunkAdapter { parquet_file_with_metadata: ParquetFileWithMetadata, ) -> Option { let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata); + let parquet_file = decoded_parquet_file.parquet_file; let chunk = Arc::new(self.new_parquet_chunk(&decoded_parquet_file).await?); - - let addr = self - .old_gen_chunk_addr(&decoded_parquet_file.parquet_file) + let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)); + let table_name = self + .catalog_cache + .table() + .name(parquet_file.table_id) .await?; let iox_metadata = &decoded_parquet_file.iox_metadata; @@ -255,56 +256,18 @@ impl ParquetChunkAdapter { .await; let meta = Arc::new(ChunkMeta { - addr, + chunk_id, + table_name, order, sort_key: iox_metadata.sort_key.clone(), partition_sort_key, sequencer_id: iox_metadata.sequencer_id, partition_id: iox_metadata.partition_id, - min_sequence_number: decoded_parquet_file.parquet_file.min_sequence_number, - max_sequence_number: decoded_parquet_file.parquet_file.max_sequence_number, + min_sequence_number: parquet_file.min_sequence_number, + max_sequence_number: parquet_file.max_sequence_number, }); - Some(QuerierChunk::new_parquet( - decoded_parquet_file.parquet_file.id, - chunk, - meta, - )) - } - - /// Get chunk addr for old gen. - /// - /// Mapping of NG->old: - /// - `namespace.name -> db_name` - /// - `table.name -> table_name` - /// - `sequencer.id X partition.name -> partition_key` - /// - `parquet_file.id -> chunk_id` - /// - /// Returns `None` if some data required to create this chunk is already gone from the catalog. - pub async fn old_gen_chunk_addr(&self, parquet_file: &ParquetFile) -> Option { - Some(ChunkAddr { - db_name: self - .catalog_cache - .namespace() - .name( - self.catalog_cache - .table() - .namespace_id(parquet_file.table_id) - .await?, - ) - .await?, - table_name: self - .catalog_cache - .table() - .name(parquet_file.table_id) - .await?, - partition_key: self - .catalog_cache - .partition() - .old_gen_partition_key(parquet_file.partition_id) - .await, - chunk_id: ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)), - }) + Some(QuerierChunk::new_parquet(parquet_file.id, chunk, meta)) } } @@ -368,12 +331,6 @@ pub mod tests { // create chunk let chunk = adapter.new_querier_chunk(parquet_file).await.unwrap(); - // check chunk addr - assert_eq!( - chunk.meta().addr().to_string(), - "Chunk('ns':'table':'1-part':00000000-0000-0000-0000-000000000001)", - ); - // check chunk schema let expected_schema = SchemaBuilder::new() .field("field_int", DataType::Int64) diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 99e6d1649a..c33a495dad 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -1,6 +1,6 @@ use crate::chunk::{ChunkStorage, QuerierChunk}; use data_types::{ - ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, + ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, }; use observability_deps::tracing::debug; use predicate::PredicateMatch; @@ -54,15 +54,11 @@ impl QueryChunkMeta for QuerierChunk { impl QueryChunk for QuerierChunk { fn id(&self) -> ChunkId { - self.meta().addr().chunk_id - } - - fn addr(&self) -> ChunkAddr { - self.meta().addr().clone() + self.meta().chunk_id } fn table_name(&self) -> &str { - self.meta().addr().table_name.as_ref() + self.meta().table_name.as_ref() } fn may_contain_pk_duplicates(&self) -> bool { diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 972d4c3022..54fec8eb77 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -7,8 +7,8 @@ use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; use async_trait::async_trait; use client_util::connection; use data_types::{ - ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber, - SequencerId, StatValues, Statistics, TableSummary, TimestampMinMax, + ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber, SequencerId, + StatValues, Statistics, TableSummary, TimestampMinMax, }; use datafusion_util::MemoryStream; use futures::{stream::FuturesUnordered, TryStreamExt}; @@ -414,6 +414,7 @@ async fn execute_get_write_infos( #[derive(Debug, Clone)] pub struct IngesterPartition { chunk_id: ChunkId, + #[allow(dead_code)] namespace_name: Arc, table_name: Arc, partition_id: PartitionId, @@ -538,15 +539,6 @@ impl QueryChunk for IngesterPartition { self.chunk_id } - fn addr(&self) -> data_types::ChunkAddr { - ChunkAddr { - db_name: Arc::clone(&self.namespace_name), - table_name: Arc::clone(&self.table_name), - partition_key: Arc::clone(&self.old_gen_partition_key), - chunk_id: self.chunk_id, - } - } - fn table_name(&self) -> &str { self.table_name.as_ref() } diff --git a/query/src/lib.rs b/query/src/lib.rs index 4d7d80d46d..05cad81f05 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -10,8 +10,7 @@ use async_trait::async_trait; use data_types::{ - ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary, - TimestampMinMax, + ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary, TimestampMinMax, }; use datafusion::physical_plan::SendableRecordBatchStream; use exec::{stringset::StringSet, IOxSessionContext}; @@ -181,9 +180,6 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { /// particular partition. fn id(&self) -> ChunkId; - /// Returns the ChunkAddr of this chunk - fn addr(&self) -> ChunkAddr; - /// Returns the name of the table stored in this chunk fn table_name(&self) -> &str; diff --git a/query/src/test.rs b/query/src/test.rs index 52cff881ae..c7a8ff03d4 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -20,8 +20,8 @@ use arrow::{ }; use async_trait::async_trait; use data_types::{ - ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, - StatValues, Statistics, TableSummary, TimestampMinMax, + ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues, + Statistics, TableSummary, TimestampMinMax, }; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::stream_from_batches; @@ -906,15 +906,6 @@ impl QueryChunk for TestChunk { self.id } - fn addr(&self) -> ChunkAddr { - ChunkAddr { - db_name: Arc::from("TestChunkDb"), - table_name: Arc::from(self.table_name.as_str()), - partition_key: Arc::from("TestChunkPartitionKey"), - chunk_id: self.id, - } - } - fn table_name(&self) -> &str { &self.table_name }