fix: Have PartitionData use TransitionPartitionId

pull/24376/head
Carol (Nichols || Goulding) 2023-07-28 17:13:03 -04:00
parent 7d7871e551
commit e491ec18bb
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
6 changed files with 27 additions and 48 deletions

View File

@ -3,7 +3,7 @@
use std::sync::Arc;
use data_types::{
sequence_number_set::SequenceNumberSet, NamespaceId, PartitionHashId, PartitionId,
sequence_number_set::SequenceNumberSet, NamespaceId,
PartitionKey, SequenceNumber, TableId, TimestampMinMax, TransitionPartitionId,
};
use mutable_batch::MutableBatch;
@ -48,12 +48,8 @@ impl SortKeyState {
/// Data of an IOx Partition of a given Table of a Namespace
#[derive(Debug)]
pub struct PartitionData {
/// The catalog ID of the partition this buffer is for.
partition_id: PartitionId,
/// The deterministic hash identifier of the partition this buffer is for, if present in the
/// catalog.
partition_hash_id: Option<PartitionHashId>,
/// The partition this buffer is for.
partition_id: TransitionPartitionId,
/// The string partition key for this partition.
partition_key: PartitionKey,
@ -106,8 +102,7 @@ impl PartitionData {
/// Initialize a new partition data buffer
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
id: PartitionId,
partition_hash_id: Option<PartitionHashId>,
partition_id: TransitionPartitionId,
partition_key: PartitionKey,
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
@ -116,8 +111,7 @@ impl PartitionData {
sort_key: SortKeyState,
) -> Self {
Self {
partition_id: id,
partition_hash_id,
partition_id,
partition_key,
sort_key,
namespace_id,
@ -261,7 +255,7 @@ impl PartitionData {
// is upheld by the FSM, which ensures only non-empty snapshots /
// RecordBatch are generated. Because `data` contains at least one
// RecordBatch, this invariant holds.
let q = QueryAdaptor::new(self.transition_partition_id(), data);
let q = QueryAdaptor::new(self.partition_id.clone(), data);
// Invariant: the number of rows returned in a query MUST always match
// the row count reported by the rows() method.
@ -325,7 +319,7 @@ impl PartitionData {
// Wrap the persisting data in the type wrapper
let data = PersistingData::new(
QueryAdaptor::new(
self.transition_partition_id(),
self.partition_id.clone(),
fsm.get_query_data(&OwnedProjection::default()),
),
batch_ident,
@ -367,16 +361,8 @@ impl PartitionData {
fsm.into_sequence_number_set()
}
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
pub(crate) fn partition_hash_id(&self) -> Option<&PartitionHashId> {
self.partition_hash_id.as_ref()
}
pub(crate) fn transition_partition_id(&self) -> TransitionPartitionId {
TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref()))
pub(crate) fn partition_id(&self) -> &TransitionPartitionId {
&self.partition_id
}
/// Return the count of persisted Parquet files for this [`PartitionData`] instance.

View File

@ -2,10 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{
NamespaceId, Partition, PartitionHashId, PartitionId, PartitionKey, TableId,
TransitionPartitionId,
};
use data_types::{NamespaceId, Partition, TransitionPartitionId, PartitionKey, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
@ -21,7 +18,7 @@ use crate::{
};
/// A read-through cache mapping `(table_id, partition_key)` tuples to
/// `(partition_id, optional_partition_hash_id)`.
/// `partition_id`.
///
/// This data is safe to cache as only one ingester is ever responsible for a
/// given table partition, and this amortises partition discovery
@ -35,10 +32,9 @@ use crate::{
///
/// - `PartitionKey`: String (8 len + 8 cap + 8 ptr + data len) = 34 bytes
/// - `TableId`: 8 bytes
/// - `PartitionId`: 8 bytes
/// - `Option<PartitionHashId>`: 8 bytes
/// - `TransitionPartitionId`: 8 bytes
///
/// For a total of 58 bytes per entry - approx 18.078 entries can be held in
/// For a total of 50 bytes per entry - approx 20,971 entries can be held in
/// 1MiB of memory.
///
/// Each cache hit _removes_ the entry from the cache - this eliminates the
@ -71,7 +67,7 @@ pub(crate) struct PartitionCache<T> {
/// It's also likely a smaller N (more tables than partition keys) making it
/// a faster search for cache misses.
#[allow(clippy::type_complexity)]
entries: Mutex<HashMap<PartitionKey, HashMap<TableId, (PartitionId, Option<PartitionHashId>)>>>,
entries: Mutex<HashMap<PartitionKey, HashMap<TableId, TransitionPartitionId>>>,
/// Data needed to construct the [`SortKeyResolver`] for cached entries.
catalog: Arc<dyn Catalog>,
@ -104,14 +100,13 @@ impl<T> PartitionCache<T> {
P: IntoIterator<Item = Partition>,
{
let mut entries =
HashMap::<PartitionKey, HashMap<TableId, (PartitionId, Option<PartitionHashId>)>>::new(
HashMap::<PartitionKey, HashMap<TableId, TransitionPartitionId>>::new(
);
for p in partitions.into_iter() {
let hash_id = p.hash_id().cloned();
entries
.entry(p.partition_key)
.entry(p.partition_key.clone())
.or_default()
.insert(p.table_id, (p.id, hash_id));
.insert(p.table_id, p.transition_partition_id());
}
// Minimise the overhead of the maps.
@ -130,13 +125,13 @@ impl<T> PartitionCache<T> {
}
}
/// Search for an cached entry matching the `(partition_key, table_id)`
/// Search for a cached entry matching the `(partition_key, table_id)`
/// tuple.
fn find(
&self,
table_id: TableId,
partition_key: &PartitionKey,
) -> Option<(PartitionKey, PartitionId, Option<PartitionHashId>)> {
) -> Option<(PartitionKey, TransitionPartitionId)> {
let mut entries = self.entries.lock();
// Look up the partition key provided by the caller.
@ -149,7 +144,7 @@ impl<T> PartitionCache<T> {
let key = entries.get_key_value(partition_key)?.0.clone();
let partition = entries.get_mut(partition_key).unwrap();
let (partition_id, partition_hash_id) = partition.remove(&table_id)?;
let partition_id = partition.remove(&table_id)?;
// As a entry was removed, check if it is now empty.
if partition.is_empty() {
@ -159,7 +154,7 @@ impl<T> PartitionCache<T> {
partition.shrink_to_fit();
}
Some((key, partition_id, partition_hash_id))
Some((key, partition_id))
}
}
@ -179,14 +174,14 @@ where
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
if let Some((key, partition_id, partition_hash_id)) = self.find(table_id, &partition_key) {
if let Some((key, partition_id)) = self.find(table_id, &partition_key) {
debug!(%table_id, %partition_key, "partition cache hit");
// Initialise a deferred resolver for the sort key.
let sort_key_resolver = DeferredLoad::new(
self.max_smear,
SortKeyResolver::new(
TransitionPartitionId::from((partition_id, partition_hash_id.as_ref())),
partition_id.clone(),
Arc::clone(&__self.catalog),
self.backoff_config.clone(),
)
@ -199,7 +194,6 @@ where
// using the same key!
return Arc::new(Mutex::new(PartitionData::new(
partition_id,
partition_hash_id,
key,
namespace_id,
namespace_name,

View File

@ -77,8 +77,7 @@ impl PartitionProvider for CatalogPartitionResolver {
.expect("retry forever");
Arc::new(Mutex::new(PartitionData::new(
p.id,
p.hash_id().cloned(),
p.transition_partition_id(),
// Use the caller's partition key instance, as it MAY be shared with
// other instance, but the instance returned from the catalog
// definitely has no other refs.

View File

@ -276,7 +276,7 @@ where
let (id, completed_persistence_count, data, partition_key) = {
let mut p = p.lock();
(
p.transition_partition_id(),
p.partition_id().clone(),
p.completed_persistence_count(),
p.get_query_data(&projection),
p.partition_key().clone(),

View File

@ -149,7 +149,7 @@ impl Context {
let p = Arc::clone(&partition);
let guard = p.lock();
assert_eq!(partition_id, guard.transition_partition_id());
assert_eq!(&partition_id, guard.partition_id());
Self {
partition,

View File

@ -49,7 +49,7 @@ where
mut guard: MutexGuard<'_, PartitionData>,
) {
info!(
partition_id = guard.partition_id().get(),
partition_id = %guard.partition_id(),
cost_estimate, "marking hot partition for persistence"
);