fix: Remove ChunkAddr
parent
975dd288d4
commit
faba90d992
|
@ -1,8 +1,8 @@
|
||||||
//! Queryable Compactor Data
|
//! Queryable Compactor Data
|
||||||
|
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
|
ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, Timestamp,
|
||||||
Timestamp, TimestampMinMax, Tombstone,
|
TimestampMinMax, Tombstone,
|
||||||
};
|
};
|
||||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
use observability_deps::tracing::trace;
|
use observability_deps::tracing::trace;
|
||||||
|
@ -161,11 +161,6 @@ impl QueryChunk for QueryableParquetChunk {
|
||||||
ChunkId::new_id_for_ng(timestamp_nano_u128)
|
ChunkId::new_id_for_ng(timestamp_nano_u128)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function should not be used in this context
|
|
||||||
fn addr(&self) -> ChunkAddr {
|
|
||||||
unimplemented!()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the name of the table stored in this chunk
|
/// Returns the name of the table stored in this chunk
|
||||||
fn table_name(&self) -> &str {
|
fn table_name(&self) -> &str {
|
||||||
&self.table_name
|
&self.table_name
|
||||||
|
|
|
@ -893,35 +893,6 @@ pub struct ProcessedTombstone {
|
||||||
pub parquet_file_id: ParquetFileId,
|
pub parquet_file_id: ParquetFileId,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Address of the chunk within the catalog
|
|
||||||
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
|
|
||||||
pub struct ChunkAddr {
|
|
||||||
/// Database name
|
|
||||||
pub db_name: Arc<str>,
|
|
||||||
|
|
||||||
/// What table does the chunk belong to?
|
|
||||||
pub table_name: Arc<str>,
|
|
||||||
|
|
||||||
/// What partition does the chunk belong to?
|
|
||||||
pub partition_key: Arc<str>,
|
|
||||||
|
|
||||||
/// The ID of the chunk
|
|
||||||
pub chunk_id: ChunkId,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for ChunkAddr {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"Chunk('{}':'{}':'{}':{})",
|
|
||||||
self.db_name,
|
|
||||||
self.table_name,
|
|
||||||
self.partition_key,
|
|
||||||
self.chunk_id.get()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// ID of a chunk.
|
/// ID of a chunk.
|
||||||
///
|
///
|
||||||
/// This ID is unique within a single partition.
|
/// This ID is unique within a single partition.
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::data::{QueryableBatch, SnapshotBatch};
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use arrow_util::util::merge_record_batches;
|
use arrow_util::util::merge_record_batches;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
|
ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
|
||||||
TimestampMinMax, Tombstone,
|
TimestampMinMax, Tombstone,
|
||||||
};
|
};
|
||||||
use datafusion::{
|
use datafusion::{
|
||||||
|
@ -151,11 +151,6 @@ impl QueryChunk for QueryableBatch {
|
||||||
ChunkId::new_test(0)
|
ChunkId::new_test(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function should not be used in PersistingBatch context
|
|
||||||
fn addr(&self) -> ChunkAddr {
|
|
||||||
unimplemented!()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the name of the table stored in this chunk
|
/// Returns the name of the table stored in this chunk
|
||||||
fn table_name(&self) -> &str {
|
fn table_name(&self) -> &str {
|
||||||
&self.table_name
|
&self.table_name
|
||||||
|
|
|
@ -3,8 +3,8 @@
|
||||||
use crate::cache::CatalogCache;
|
use crate::cache::CatalogCache;
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId,
|
ChunkId, ChunkOrder, DeletePredicate, ParquetFileId, ParquetFileWithMetadata, PartitionId,
|
||||||
ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, TimestampMinMax,
|
SequenceNumber, SequencerId, TimestampMinMax,
|
||||||
};
|
};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use iox_catalog::interface::Catalog;
|
use iox_catalog::interface::Catalog;
|
||||||
|
@ -23,8 +23,11 @@ mod query_access;
|
||||||
/// Immutable metadata attached to a [`QuerierChunk`].
|
/// Immutable metadata attached to a [`QuerierChunk`].
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ChunkMeta {
|
pub struct ChunkMeta {
|
||||||
/// Chunk address.
|
/// The ID of the chunk
|
||||||
addr: ChunkAddr,
|
chunk_id: ChunkId,
|
||||||
|
|
||||||
|
/// Table name
|
||||||
|
table_name: Arc<str>,
|
||||||
|
|
||||||
/// Chunk order.
|
/// Chunk order.
|
||||||
order: ChunkOrder,
|
order: ChunkOrder,
|
||||||
|
@ -49,11 +52,6 @@ pub struct ChunkMeta {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChunkMeta {
|
impl ChunkMeta {
|
||||||
/// Chunk address.
|
|
||||||
pub fn addr(&self) -> &ChunkAddr {
|
|
||||||
&self.addr
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Chunk order.
|
/// Chunk order.
|
||||||
pub fn order(&self) -> ChunkOrder {
|
pub fn order(&self) -> ChunkOrder {
|
||||||
self.order
|
self.order
|
||||||
|
@ -234,10 +232,13 @@ impl ParquetChunkAdapter {
|
||||||
parquet_file_with_metadata: ParquetFileWithMetadata,
|
parquet_file_with_metadata: ParquetFileWithMetadata,
|
||||||
) -> Option<QuerierChunk> {
|
) -> Option<QuerierChunk> {
|
||||||
let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata);
|
let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata);
|
||||||
|
let parquet_file = decoded_parquet_file.parquet_file;
|
||||||
let chunk = Arc::new(self.new_parquet_chunk(&decoded_parquet_file).await?);
|
let chunk = Arc::new(self.new_parquet_chunk(&decoded_parquet_file).await?);
|
||||||
|
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
|
||||||
let addr = self
|
let table_name = self
|
||||||
.old_gen_chunk_addr(&decoded_parquet_file.parquet_file)
|
.catalog_cache
|
||||||
|
.table()
|
||||||
|
.name(parquet_file.table_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let iox_metadata = &decoded_parquet_file.iox_metadata;
|
let iox_metadata = &decoded_parquet_file.iox_metadata;
|
||||||
|
@ -255,56 +256,18 @@ impl ParquetChunkAdapter {
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let meta = Arc::new(ChunkMeta {
|
let meta = Arc::new(ChunkMeta {
|
||||||
addr,
|
chunk_id,
|
||||||
|
table_name,
|
||||||
order,
|
order,
|
||||||
sort_key: iox_metadata.sort_key.clone(),
|
sort_key: iox_metadata.sort_key.clone(),
|
||||||
partition_sort_key,
|
partition_sort_key,
|
||||||
sequencer_id: iox_metadata.sequencer_id,
|
sequencer_id: iox_metadata.sequencer_id,
|
||||||
partition_id: iox_metadata.partition_id,
|
partition_id: iox_metadata.partition_id,
|
||||||
min_sequence_number: decoded_parquet_file.parquet_file.min_sequence_number,
|
min_sequence_number: parquet_file.min_sequence_number,
|
||||||
max_sequence_number: decoded_parquet_file.parquet_file.max_sequence_number,
|
max_sequence_number: parquet_file.max_sequence_number,
|
||||||
});
|
});
|
||||||
|
|
||||||
Some(QuerierChunk::new_parquet(
|
Some(QuerierChunk::new_parquet(parquet_file.id, chunk, meta))
|
||||||
decoded_parquet_file.parquet_file.id,
|
|
||||||
chunk,
|
|
||||||
meta,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get chunk addr for old gen.
|
|
||||||
///
|
|
||||||
/// Mapping of NG->old:
|
|
||||||
/// - `namespace.name -> db_name`
|
|
||||||
/// - `table.name -> table_name`
|
|
||||||
/// - `sequencer.id X partition.name -> partition_key`
|
|
||||||
/// - `parquet_file.id -> chunk_id`
|
|
||||||
///
|
|
||||||
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
|
|
||||||
pub async fn old_gen_chunk_addr(&self, parquet_file: &ParquetFile) -> Option<ChunkAddr> {
|
|
||||||
Some(ChunkAddr {
|
|
||||||
db_name: self
|
|
||||||
.catalog_cache
|
|
||||||
.namespace()
|
|
||||||
.name(
|
|
||||||
self.catalog_cache
|
|
||||||
.table()
|
|
||||||
.namespace_id(parquet_file.table_id)
|
|
||||||
.await?,
|
|
||||||
)
|
|
||||||
.await?,
|
|
||||||
table_name: self
|
|
||||||
.catalog_cache
|
|
||||||
.table()
|
|
||||||
.name(parquet_file.table_id)
|
|
||||||
.await?,
|
|
||||||
partition_key: self
|
|
||||||
.catalog_cache
|
|
||||||
.partition()
|
|
||||||
.old_gen_partition_key(parquet_file.partition_id)
|
|
||||||
.await,
|
|
||||||
chunk_id: ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,12 +331,6 @@ pub mod tests {
|
||||||
// create chunk
|
// create chunk
|
||||||
let chunk = adapter.new_querier_chunk(parquet_file).await.unwrap();
|
let chunk = adapter.new_querier_chunk(parquet_file).await.unwrap();
|
||||||
|
|
||||||
// check chunk addr
|
|
||||||
assert_eq!(
|
|
||||||
chunk.meta().addr().to_string(),
|
|
||||||
"Chunk('ns':'table':'1-part':00000000-0000-0000-0000-000000000001)",
|
|
||||||
);
|
|
||||||
|
|
||||||
// check chunk schema
|
// check chunk schema
|
||||||
let expected_schema = SchemaBuilder::new()
|
let expected_schema = SchemaBuilder::new()
|
||||||
.field("field_int", DataType::Int64)
|
.field("field_int", DataType::Int64)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::chunk::{ChunkStorage, QuerierChunk};
|
use crate::chunk::{ChunkStorage, QuerierChunk};
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
|
ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
|
||||||
};
|
};
|
||||||
use observability_deps::tracing::debug;
|
use observability_deps::tracing::debug;
|
||||||
use predicate::PredicateMatch;
|
use predicate::PredicateMatch;
|
||||||
|
@ -54,15 +54,11 @@ impl QueryChunkMeta for QuerierChunk {
|
||||||
|
|
||||||
impl QueryChunk for QuerierChunk {
|
impl QueryChunk for QuerierChunk {
|
||||||
fn id(&self) -> ChunkId {
|
fn id(&self) -> ChunkId {
|
||||||
self.meta().addr().chunk_id
|
self.meta().chunk_id
|
||||||
}
|
|
||||||
|
|
||||||
fn addr(&self) -> ChunkAddr {
|
|
||||||
self.meta().addr().clone()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn table_name(&self) -> &str {
|
fn table_name(&self) -> &str {
|
||||||
self.meta().addr().table_name.as_ref()
|
self.meta().table_name.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn may_contain_pk_duplicates(&self) -> bool {
|
fn may_contain_pk_duplicates(&self) -> bool {
|
||||||
|
|
|
@ -7,8 +7,8 @@ use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use client_util::connection;
|
use client_util::connection;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber,
|
ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber, SequencerId,
|
||||||
SequencerId, StatValues, Statistics, TableSummary, TimestampMinMax,
|
StatValues, Statistics, TableSummary, TimestampMinMax,
|
||||||
};
|
};
|
||||||
use datafusion_util::MemoryStream;
|
use datafusion_util::MemoryStream;
|
||||||
use futures::{stream::FuturesUnordered, TryStreamExt};
|
use futures::{stream::FuturesUnordered, TryStreamExt};
|
||||||
|
@ -414,6 +414,7 @@ async fn execute_get_write_infos(
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct IngesterPartition {
|
pub struct IngesterPartition {
|
||||||
chunk_id: ChunkId,
|
chunk_id: ChunkId,
|
||||||
|
#[allow(dead_code)]
|
||||||
namespace_name: Arc<str>,
|
namespace_name: Arc<str>,
|
||||||
table_name: Arc<str>,
|
table_name: Arc<str>,
|
||||||
partition_id: PartitionId,
|
partition_id: PartitionId,
|
||||||
|
@ -538,15 +539,6 @@ impl QueryChunk for IngesterPartition {
|
||||||
self.chunk_id
|
self.chunk_id
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addr(&self) -> data_types::ChunkAddr {
|
|
||||||
ChunkAddr {
|
|
||||||
db_name: Arc::clone(&self.namespace_name),
|
|
||||||
table_name: Arc::clone(&self.table_name),
|
|
||||||
partition_key: Arc::clone(&self.old_gen_partition_key),
|
|
||||||
chunk_id: self.chunk_id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn table_name(&self) -> &str {
|
fn table_name(&self) -> &str {
|
||||||
self.table_name.as_ref()
|
self.table_name.as_ref()
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,8 +10,7 @@
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary,
|
ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary, TimestampMinMax,
|
||||||
TimestampMinMax,
|
|
||||||
};
|
};
|
||||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
use exec::{stringset::StringSet, IOxSessionContext};
|
use exec::{stringset::StringSet, IOxSessionContext};
|
||||||
|
@ -181,9 +180,6 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
|
||||||
/// particular partition.
|
/// particular partition.
|
||||||
fn id(&self) -> ChunkId;
|
fn id(&self) -> ChunkId;
|
||||||
|
|
||||||
/// Returns the ChunkAddr of this chunk
|
|
||||||
fn addr(&self) -> ChunkAddr;
|
|
||||||
|
|
||||||
/// Returns the name of the table stored in this chunk
|
/// Returns the name of the table stored in this chunk
|
||||||
fn table_name(&self) -> &str;
|
fn table_name(&self) -> &str;
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,8 @@ use arrow::{
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId,
|
ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues,
|
||||||
StatValues, Statistics, TableSummary, TimestampMinMax,
|
Statistics, TableSummary, TimestampMinMax,
|
||||||
};
|
};
|
||||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
use datafusion_util::stream_from_batches;
|
use datafusion_util::stream_from_batches;
|
||||||
|
@ -906,15 +906,6 @@ impl QueryChunk for TestChunk {
|
||||||
self.id
|
self.id
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addr(&self) -> ChunkAddr {
|
|
||||||
ChunkAddr {
|
|
||||||
db_name: Arc::from("TestChunkDb"),
|
|
||||||
table_name: Arc::from(self.table_name.as_str()),
|
|
||||||
partition_key: Arc::from("TestChunkPartitionKey"),
|
|
||||||
chunk_id: self.id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn table_name(&self) -> &str {
|
fn table_name(&self) -> &str {
|
||||||
&self.table_name
|
&self.table_name
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue