refactor: remove `QueryChunk::partition_sort_key` (#7680)

As of #7250 / #7449 the partition sort key is no longer required for
query planning. Instead we use a combination of
`QueryChunk::partition_id` and `QueryChunk::sort_key` which is more
robust and easier to reason about.

Removing it simplifies the querier code a lot since we no longer need to
have a sort key for the ingester chunks and also don't need to "sync"
the sort key between chunks for consistency.
pull/24376/head
Marco Neumann 2023-04-27 12:54:41 +02:00 committed by GitHub
parent f87f4c1869
commit 0556fdae53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 26 additions and 315 deletions

View File

@ -25,19 +25,16 @@ pub struct QueryableParquetChunk {
delete_predicates: Vec<Arc<DeletePredicate>>,
partition_id: PartitionId,
sort_key: Option<SortKey>,
partition_sort_key: Option<SortKey>,
order: ChunkOrder,
summary: Arc<TableSummary>,
}
impl QueryableParquetChunk {
/// Initialize a QueryableParquetChunk
#[allow(clippy::too_many_arguments)]
pub fn new(
partition_id: PartitionId,
data: Arc<ParquetChunk>,
sort_key: Option<SortKey>,
partition_sort_key: Option<SortKey>,
order: ChunkOrder,
) -> Self {
let summary = Arc::new(create_basic_summary(
@ -50,7 +47,6 @@ impl QueryableParquetChunk {
delete_predicates: vec![],
partition_id,
sort_key,
partition_sort_key,
order,
summary,
}
@ -80,10 +76,6 @@ impl QueryChunkMeta for QueryableParquetChunk {
self.data.schema()
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.partition_sort_key.as_ref()
}
fn partition_id(&self) -> PartitionId {
self.partition_id
}
@ -218,11 +210,5 @@ fn to_queryable_parquet_chunk(
);
let parquet_chunk = ParquetChunk::new(Arc::new(file.file.clone()), schema, store);
QueryableParquetChunk::new(
partition_id,
Arc::new(parquet_chunk),
sort_key,
partition_info.sort_key.clone(),
file.order,
)
QueryableParquetChunk::new(partition_id, Arc::new(parquet_chunk), sort_key, file.order)
}

View File

@ -127,10 +127,6 @@ impl QueryChunkMeta for QueryAdaptor {
&self.schema
}
fn partition_sort_key(&self) -> Option<&SortKey> {
None // Ingester data has not persisted yet and should not be attached to any partition
}
fn partition_id(&self) -> PartitionId {
self.partition_id
}

View File

@ -63,10 +63,6 @@ pub trait QueryChunkMeta {
/// return a reference to the summary of the data held in this chunk
fn schema(&self) -> &Schema;
/// Return a reference to the chunk's partition sort key if any.
/// Only persisted chunk has its partition sort key
fn partition_sort_key(&self) -> Option<&SortKey>;
/// Return partition id for this chunk
fn partition_id(&self) -> PartitionId;
@ -317,10 +313,6 @@ where
self.as_ref().sort_key()
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.as_ref().partition_sort_key()
}
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
let pred = self.as_ref().delete_predicates();
debug!(?pred, "Delete predicate in QueryChunkMeta");
@ -346,10 +338,6 @@ impl QueryChunkMeta for Arc<dyn QueryChunk> {
self.as_ref().sort_key()
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.as_ref().partition_sort_key()
}
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
let pred = self.as_ref().delete_predicates();
debug!(?pred, "Delete predicate in QueryChunkMeta");

View File

@ -343,9 +343,6 @@ pub struct TestChunk {
/// The sort key of this chunk
sort_key: Option<SortKey>,
/// The partition sort key of this chunk
partition_sort_key: Option<SortKey>,
/// Suppress output
quiet: bool,
}
@ -427,7 +424,6 @@ impl TestChunk {
delete_predicates: Default::default(),
order: ChunkOrder::MIN,
sort_key: None,
partition_sort_key: None,
partition_id: PartitionId::new(0),
quiet: false,
}
@ -1133,14 +1129,6 @@ impl TestChunk {
}
}
/// Set the partition sort key for this chunk
pub fn with_partition_sort_key(self, sort_key: SortKey) -> Self {
Self {
partition_sort_key: Some(sort_key),
..self
}
}
/// Returns all columns of the table
pub fn all_column_names(&self) -> StringSet {
self.schema
@ -1252,10 +1240,6 @@ impl QueryChunkMeta for TestChunk {
&self.schema
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.partition_sort_key.as_ref()
}
fn partition_id(&self) -> PartitionId {
self.partition_id
}

View File

@ -236,7 +236,6 @@ struct IngesterResponseOk {
struct ObserveIngesterRequest<'a> {
res: Option<Result<IngesterResponseOk, ()>>,
t_start: Time,
time_provider: Arc<dyn TimeProvider>,
metrics: Arc<IngesterConnectionMetrics>,
request: GetPartitionForIngester<'a>,
span_recorder: SpanRecorder,
@ -248,14 +247,12 @@ impl<'a> ObserveIngesterRequest<'a> {
metrics: Arc<IngesterConnectionMetrics>,
span_recorder: &SpanRecorder,
) -> Self {
let time_provider = request.catalog_cache.time_provider();
let t_start = time_provider.now();
let t_start = request.time_provider.now();
let span_recorder = span_recorder.child("flight request");
Self {
res: None,
t_start,
time_provider,
metrics,
request,
span_recorder,
@ -279,7 +276,7 @@ impl<'a> ObserveIngesterRequest<'a> {
impl<'a> Drop for ObserveIngesterRequest<'a> {
fn drop(&mut self) {
let t_end = self.time_provider.now();
let t_end = self.request.time_provider.now();
if let Some(ingester_duration) = t_end.checked_duration_since(self.t_start) {
let (metric, status, ok_status) = match self.res {
@ -314,7 +311,7 @@ impl<'a> Drop for ObserveIngesterRequest<'a> {
pub struct IngesterConnectionImpl {
unique_ingester_addresses: HashSet<Arc<str>>,
flight_client: Arc<dyn IngesterFlightClient>,
catalog_cache: Arc<CatalogCache>,
time_provider: Arc<dyn TimeProvider>,
metrics: Arc<IngesterConnectionMetrics>,
backoff_config: BackoffConfig,
}
@ -362,7 +359,7 @@ impl IngesterConnectionImpl {
Self {
unique_ingester_addresses: ingester_addresses.into_iter().collect(),
flight_client,
catalog_cache,
time_provider: catalog_cache.time_provider(),
metrics,
backoff_config,
}
@ -373,7 +370,7 @@ impl IngesterConnectionImpl {
#[derive(Debug, Clone)]
struct GetPartitionForIngester<'a> {
flight_client: Arc<dyn IngesterFlightClient>,
catalog_cache: Arc<CatalogCache>,
time_provider: Arc<dyn TimeProvider>,
ingester_address: Arc<str>,
namespace_id: NamespaceId,
columns: Vec<String>,
@ -388,7 +385,7 @@ async fn execute(
) -> Result<Vec<IngesterPartition>> {
let GetPartitionForIngester {
flight_client,
catalog_cache,
time_provider: _,
ingester_address,
namespace_id,
columns,
@ -477,15 +474,14 @@ async fn execute(
// reconstruct partitions
let mut decoder = IngesterStreamDecoder::new(
ingester_address,
catalog_cache,
cached_table,
span_recorder.child_span("IngesterStreamDecoder"),
);
for (msg, md) in messages {
decoder.register(msg, md).await?;
decoder.register(msg, md)?;
}
decoder.finalize().await
decoder.finalize()
}
/// Helper to disassemble the data from the ingester Apache Flight arrow stream.
@ -497,25 +493,18 @@ struct IngesterStreamDecoder {
current_partition: Option<IngesterPartition>,
current_chunk: Option<(Schema, Vec<RecordBatch>)>,
ingester_address: Arc<str>,
catalog_cache: Arc<CatalogCache>,
cached_table: Arc<CachedTable>,
span_recorder: SpanRecorder,
}
impl IngesterStreamDecoder {
/// Create empty decoder.
fn new(
ingester_address: Arc<str>,
catalog_cache: Arc<CatalogCache>,
cached_table: Arc<CachedTable>,
span: Option<Span>,
) -> Self {
fn new(ingester_address: Arc<str>, cached_table: Arc<CachedTable>, span: Option<Span>) -> Self {
Self {
finished_partitions: HashMap::new(),
current_partition: None,
current_chunk: None,
ingester_address,
catalog_cache,
cached_table,
span_recorder: SpanRecorder::new(span),
}
@ -538,40 +527,10 @@ impl IngesterStreamDecoder {
/// Flush current partition, if any.
///
/// This will also flush the current chunk.
async fn flush_partition(&mut self) -> Result<()> {
fn flush_partition(&mut self) -> Result<()> {
self.flush_chunk()?;
if let Some(current_partition) = self.current_partition.take() {
let schemas: Vec<_> = current_partition
.chunks()
.iter()
.map(|c| c.schema())
.collect();
let primary_keys: Vec<_> = schemas.iter().map(|s| s.primary_key()).collect();
let primary_key: Vec<_> = primary_keys
.iter()
.flat_map(|pk| pk.iter())
// cache may be older then the ingester response status, so some entries might be missing
.filter_map(|name| {
self.cached_table
.column_id_map_rev
.get(&Arc::from(name.to_owned()))
})
.copied()
.collect();
let partition_sort_key = self
.catalog_cache
.partition()
.sort_key(
Arc::clone(&self.cached_table),
current_partition.partition_id(),
&primary_key,
self.span_recorder
.child_span("cache GET partition sort key"),
)
.await
.map(|sort_key| Arc::clone(&sort_key.sort_key));
let current_partition = current_partition.with_partition_sort_key(partition_sort_key);
self.finished_partitions
.insert(current_partition.partition_id, current_partition);
}
@ -580,7 +539,7 @@ impl IngesterStreamDecoder {
}
/// Register a new message and its metadata from the Flight stream.
async fn register(
fn register(
&mut self,
msg: DecodedPayload,
md: IngesterQueryResponseMetadata,
@ -588,7 +547,7 @@ impl IngesterStreamDecoder {
match msg {
DecodedPayload::None => {
// new partition announced
self.flush_partition().await?;
self.flush_partition()?;
let partition_id = PartitionId::new(md.partition_id);
let status = md.status.context(PartitionStatusMissingSnafu {
@ -664,8 +623,8 @@ impl IngesterStreamDecoder {
}
/// Flush internal state and return sorted set of partitions.
async fn finalize(mut self) -> Result<Vec<IngesterPartition>> {
self.flush_partition().await?;
fn finalize(mut self) -> Result<Vec<IngesterPartition>> {
self.flush_partition()?;
let mut ids: Vec<_> = self.finished_partitions.keys().copied().collect();
ids.sort();
@ -718,7 +677,7 @@ impl IngesterConnection for IngesterConnectionImpl {
let metrics = Arc::clone(&metrics);
let request = GetPartitionForIngester {
flight_client: Arc::clone(&self.flight_client),
catalog_cache: Arc::clone(&self.catalog_cache),
time_provider: Arc::clone(&self.time_provider),
ingester_address: Arc::clone(&ingester_address),
namespace_id,
cached_table: Arc::clone(&cached_table),
@ -901,19 +860,6 @@ impl IngesterPartition {
Ok(self)
}
/// Update partition sort key
pub(crate) fn with_partition_sort_key(self, partition_sort_key: Option<Arc<SortKey>>) -> Self {
Self {
partition_sort_key: partition_sort_key.clone(),
chunks: self
.chunks
.into_iter()
.map(|c| c.with_partition_sort_key(partition_sort_key.clone()))
.collect(),
..self
}
}
pub(crate) fn ingester_uuid(&self) -> Option<Uuid> {
self.ingester_uuid
}
@ -978,21 +924,6 @@ pub struct IngesterChunk {
}
impl IngesterChunk {
/// [`Arc`]ed version of the partition sort key.
///
/// Note that this might NOT be the up-to-date sort key of the partition but the one that existed when the chunk was
/// created. You must sync the keys to use the chunks.
pub(crate) fn partition_sort_key_arc(&self) -> Option<Arc<SortKey>> {
self.partition_sort_key.clone()
}
pub(crate) fn with_partition_sort_key(self, partition_sort_key: Option<Arc<SortKey>>) -> Self {
Self {
partition_sort_key,
..self
}
}
pub(crate) fn estimate_size(&self) -> usize {
self.batches
.iter()
@ -1024,10 +955,6 @@ impl QueryChunkMeta for IngesterChunk {
&self.schema
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.partition_sort_key.as_ref().map(|sk| sk.as_ref())
}
fn partition_id(&self) -> PartitionId {
self.partition_id
}

View File

@ -236,10 +236,6 @@ impl ChunkAdapter {
self.catalog_cache.parquet_store(),
));
Some(QuerierParquetChunk::new(
parquet_chunk,
meta,
Some(Arc::clone(&partition_sort_key.sort_key)),
))
Some(QuerierParquetChunk::new(parquet_chunk, meta))
}
}

View File

@ -71,9 +71,6 @@ pub struct QuerierParquetChunk {
/// Delete predicates to be combined with the chunk
delete_predicates: Vec<Arc<DeletePredicate>>,
/// Partition sort key (how does the read buffer use this?)
partition_sort_key: Option<Arc<SortKey>>,
/// Chunk of the Parquet file
parquet_chunk: Arc<ParquetChunk>,
@ -83,11 +80,7 @@ pub struct QuerierParquetChunk {
impl QuerierParquetChunk {
/// Create new parquet-backed chunk (object store data).
pub fn new(
parquet_chunk: Arc<ParquetChunk>,
meta: Arc<QuerierParquetChunkMeta>,
partition_sort_key: Option<Arc<SortKey>>,
) -> Self {
pub fn new(parquet_chunk: Arc<ParquetChunk>, meta: Arc<QuerierParquetChunkMeta>) -> Self {
let table_summary = Arc::new(create_basic_summary(
parquet_chunk.rows() as u64,
parquet_chunk.schema(),
@ -97,7 +90,6 @@ impl QuerierParquetChunk {
Self {
meta,
delete_predicates: Vec::new(),
partition_sort_key,
parquet_chunk,
table_summary,
}
@ -116,22 +108,6 @@ impl QuerierParquetChunk {
self.meta.as_ref()
}
/// [`Arc`]ed version of the partition sort key.
///
/// Note that this might NOT be the up-to-date sort key of the partition but the one that existed when the chunk was
/// created. You must sync the keys to use the chunks.
pub fn partition_sort_key_arc(&self) -> Option<Arc<SortKey>> {
self.partition_sort_key.clone()
}
/// Set partition sort key
pub fn with_partition_sort_key(self, partition_sort_key: Option<Arc<SortKey>>) -> Self {
Self {
partition_sort_key,
..self
}
}
pub fn estimate_size(&self) -> usize {
self.parquet_chunk.parquet_file().file_size_bytes as usize
}

View File

@ -18,10 +18,6 @@ impl QueryChunkMeta for QuerierParquetChunk {
self.parquet_chunk.schema()
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.partition_sort_key.as_ref().map(|sk| sk.as_ref())
}
fn partition_id(&self) -> PartitionId {
self.meta().partition_id()
}

View File

@ -2,18 +2,14 @@
mod interface;
use data_types::{DeletePredicate, PartitionId};
use data_types::DeletePredicate;
use iox_query::QueryChunk;
use observability_deps::tracing::debug;
use schema::sort::SortKey;
use snafu::Snafu;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use std::sync::Arc;
use trace::span::{Span, SpanRecorder};
use crate::{ingester::IngesterChunk, parquet::QuerierParquetChunk, IngesterPartition};
use crate::{parquet::QuerierParquetChunk, IngesterPartition};
#[derive(Snafu, Debug)]
#[allow(missing_copy_implementations)]
@ -58,13 +54,6 @@ impl Reconciler {
chunks.extend(self.build_ingester_chunks(ingester_partitions, retention_delete_pred));
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
let chunks = self.sync_partition_sort_keys(chunks);
let chunks: Vec<Arc<dyn QueryChunk>> = chunks
.into_iter()
.map(|c| c.upcast_to_querier_chunk().into())
.collect();
Ok(chunks)
}
@ -74,7 +63,7 @@ impl Reconciler {
retention_delete_pred: Option<Arc<DeletePredicate>>,
parquet_files: Vec<QuerierParquetChunk>,
_span: Option<Span>,
) -> Result<Vec<Box<dyn UpdatableQuerierChunk>>, ReconcileError> {
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
debug!(
namespace=%self.namespace_name(),
table_name=%self.table_name(),
@ -84,7 +73,7 @@ impl Reconciler {
debug!(num_chunks=%parquet_files.len(), "Created chunks from parquet files");
let mut chunks: Vec<Box<dyn UpdatableQuerierChunk>> =
let mut chunks: Vec<Arc<dyn QueryChunk>> =
Vec::with_capacity(parquet_files.len() + ingester_partitions.len());
let retention_expr_len = usize::from(retention_delete_pred.is_some());
@ -97,7 +86,7 @@ impl Reconciler {
let chunk = chunk.with_delete_predicates(delete_predicates);
chunks.push(Box::new(chunk) as Box<dyn UpdatableQuerierChunk>);
chunks.push(Arc::new(chunk));
}
Ok(chunks)
@ -107,7 +96,7 @@ impl Reconciler {
&self,
ingester_partitions: Vec<IngesterPartition>,
retention_delete_pred: Option<Arc<DeletePredicate>>,
) -> impl Iterator<Item = Box<dyn UpdatableQuerierChunk>> {
) -> impl Iterator<Item = Arc<dyn QueryChunk>> {
// Add ingester chunks to the overall chunk list.
// - filter out chunks that don't have any record batches
ingester_partitions
@ -119,43 +108,7 @@ impl Reconciler {
};
c.into_chunks().into_iter()
})
.map(|c| Box::new(c) as Box<dyn UpdatableQuerierChunk>)
}
fn sync_partition_sort_keys(
&self,
chunks: Vec<Box<dyn UpdatableQuerierChunk>>,
) -> Vec<Box<dyn UpdatableQuerierChunk>> {
// collect latest (= longest) sort key
// Note that the partition sort key may stale (only a subset of the most recent partition
// sort key) because newer chunks have new columns.
// However, since the querier doesn't (yet) know about these chunks in the `chunks` list above
// using the most up to date sort key from the chunks it does know about is sufficient.
let mut sort_keys = HashMap::<PartitionId, Arc<SortKey>>::new();
for c in &chunks {
if let Some(sort_key) = c.partition_sort_key_arc() {
match sort_keys.entry(c.partition_id()) {
Entry::Occupied(mut o) => {
if sort_key.len() > o.get().len() {
*o.get_mut() = sort_key;
}
}
Entry::Vacant(v) => {
v.insert(sort_key);
}
}
}
}
// write partition sort keys to chunks
chunks
.into_iter()
.map(|chunk| {
let partition_id = chunk.partition_id();
let sort_key = sort_keys.get(&partition_id);
chunk.update_partition_sort_key(sort_key.cloned())
})
.collect()
.map(|c| Arc::new(c) as Arc<dyn QueryChunk>)
}
#[must_use]
@ -168,94 +121,3 @@ impl Reconciler {
self.namespace_name.as_ref()
}
}
trait UpdatableQuerierChunk: QueryChunk {
fn partition_sort_key_arc(&self) -> Option<Arc<SortKey>>;
fn update_partition_sort_key(
self: Box<Self>,
sort_key: Option<Arc<SortKey>>,
) -> Box<dyn UpdatableQuerierChunk>;
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk>;
}
impl UpdatableQuerierChunk for QuerierParquetChunk {
fn partition_sort_key_arc(&self) -> Option<Arc<SortKey>> {
self.partition_sort_key_arc()
}
fn update_partition_sort_key(
self: Box<Self>,
sort_key: Option<Arc<SortKey>>,
) -> Box<dyn UpdatableQuerierChunk> {
Box::new(self.with_partition_sort_key(sort_key))
}
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk> {
self as _
}
}
impl UpdatableQuerierChunk for IngesterChunk {
fn partition_sort_key_arc(&self) -> Option<Arc<SortKey>> {
self.partition_sort_key_arc()
}
fn update_partition_sort_key(
self: Box<Self>,
sort_key: Option<Arc<SortKey>>,
) -> Box<dyn UpdatableQuerierChunk> {
Box::new(self.with_partition_sort_key(sort_key))
}
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk> {
self as _
}
}
#[cfg(test)]
mod tests {
use super::{
interface::{IngesterPartitionInfo, ParquetFileInfo},
*,
};
use data_types::{CompactionLevel, SequenceNumber};
#[derive(Debug)]
struct MockIngesterPartitionInfo {
partition_id: PartitionId,
parquet_max_sequence_number: Option<SequenceNumber>,
}
impl IngesterPartitionInfo for MockIngesterPartitionInfo {
fn partition_id(&self) -> PartitionId {
self.partition_id
}
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.parquet_max_sequence_number
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct MockParquetFileInfo {
partition_id: PartitionId,
max_sequence_number: SequenceNumber,
compaction_level: CompactionLevel,
}
impl ParquetFileInfo for MockParquetFileInfo {
fn partition_id(&self) -> PartitionId {
self.partition_id
}
fn max_sequence_number(&self) -> SequenceNumber {
self.max_sequence_number
}
fn compaction_level(&self) -> CompactionLevel {
self.compaction_level
}
}
}