refactor: PartitionData carries PartitionKey

Changes the PartitionData to carry the derived PartitionKey for which it
is buffering ops for. This is used at persist time.
pull/24376/head
Dom Dwyer 2022-09-28 17:53:17 +02:00
parent 56a1c579a1
commit f5a7fbf8e2
5 changed files with 107 additions and 17 deletions

View File

@ -772,6 +772,14 @@ pub struct Shard {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PartitionKey(Arc<str>);
impl PartitionKey {
/// Returns true if this instance of [`PartitionKey`] is backed by the same
/// string storage as other.
pub fn ptr_eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Display for PartitionKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)

View File

@ -3,7 +3,7 @@
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::{PartitionId, SequenceNumber, ShardId, TableId, Tombstone};
use data_types::{PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, Tombstone};
use iox_query::exec::Executor;
use mutable_batch::MutableBatch;
use schema::selection::Selection;
@ -135,6 +135,9 @@ impl SnapshotBatch {
pub struct PartitionData {
/// The catalog ID of the partition this buffer is for.
id: PartitionId,
/// The string partition key for this partition.
partition_key: PartitionKey,
/// The shard and table IDs for this partition.
shard_id: ShardId,
table_id: TableId,
@ -152,6 +155,7 @@ impl PartitionData {
/// Initialize a new partition data buffer
pub(crate) fn new(
id: PartitionId,
partition_key: PartitionKey,
shard_id: ShardId,
table_id: TableId,
table_name: Arc<str>,
@ -159,6 +163,7 @@ impl PartitionData {
) -> Self {
Self {
id,
partition_key,
shard_id,
table_id,
table_name,
@ -327,6 +332,11 @@ impl PartitionData {
pub(crate) fn table_id(&self) -> TableId {
self.table_id
}
/// Return the partition key for this partition.
pub fn partition_key(&self) -> &PartitionKey {
&self.partition_key
}
}
#[cfg(test)]
@ -341,6 +351,7 @@ mod tests {
fn snapshot_buffer_different_but_compatible_schemas() {
let mut partition_data = PartitionData::new(
PartitionId::new(1),
"bananas".into(),
ShardId::new(1),
TableId::new(1),
"foo".into(),
@ -386,6 +397,7 @@ mod tests {
let p_id = 1;
let mut p = PartitionData::new(
PartitionId::new(p_id),
"bananas".into(),
ShardId::new(s_id),
TableId::new(t_id),
"restaurant".into(),

View File

@ -56,7 +56,7 @@ pub(crate) struct PartitionCache<T> {
/// It's also likely a smaller N (more tables than partition keys) making it
/// a faster search for cache misses.
#[allow(clippy::type_complexity)]
entries: Mutex<HashMap<String, HashMap<ShardId, HashMap<TableId, Entry>>>>,
entries: Mutex<HashMap<PartitionKey, HashMap<ShardId, HashMap<TableId, Entry>>>>,
}
impl<T> PartitionCache<T> {
@ -67,10 +67,10 @@ impl<T> PartitionCache<T> {
where
P: IntoIterator<Item = Partition>,
{
let mut entries = HashMap::<String, HashMap<ShardId, HashMap<TableId, Entry>>>::new();
let mut entries = HashMap::<PartitionKey, HashMap<ShardId, HashMap<TableId, Entry>>>::new();
for p in partitions.into_iter() {
entries
.entry(p.partition_key.to_string())
.entry(p.partition_key)
.or_default()
.entry(p.shard_id)
.or_default()
@ -105,10 +105,18 @@ impl<T> PartitionCache<T> {
shard_id: ShardId,
table_id: TableId,
partition_key: &PartitionKey,
) -> Option<Entry> {
) -> Option<(PartitionKey, Entry)> {
let mut entries = self.entries.lock();
let partition = entries.get_mut(&partition_key.to_string())?;
// Look up the partition key provided by the caller.
//
// If the partition key is a hit, clone the key from the map and return
// it instead of using the caller-provided partition key - this allows
// effective reuse of the same partition key str across all hits for it
// and is more memory efficient than using the caller-provided partition
// key in the PartitionData.
let key = entries.get_key_value(partition_key)?.0.clone();
let partition = entries.get_mut(partition_key).unwrap();
let shard = partition.get_mut(&shard_id)?;
let e = shard.remove(&table_id)?;
@ -122,14 +130,14 @@ impl<T> PartitionCache<T> {
// As a shard was removed, likewise the partition may now be empty!
if partition.is_empty() {
entries.remove(&partition_key.to_string());
entries.remove(partition_key);
entries.shrink_to_fit();
} else {
partition.shrink_to_fit();
}
}
Some(e)
Some((key, e))
}
}
@ -148,10 +156,14 @@ where
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
if let Some(cached) = self.find(shard_id, table_id, &partition_key) {
if let Some((key, cached)) = self.find(shard_id, table_id, &partition_key) {
debug!(%table_id, %partition_key, "partition cache hit");
// Use the returned partition key instead of the callers - this
// allows the backing str memory to be reused across all partitions
// using the same key!
return PartitionData::new(
cached.partition_id,
key,
shard_id,
table_id,
table_name,
@ -182,7 +194,14 @@ mod tests {
#[tokio::test]
async fn test_miss() {
let data = PartitionData::new(PARTITION_ID, SHARD_ID, TABLE_ID, TABLE_NAME.into(), None);
let data = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
SHARD_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
);
let inner = MockPartitionProvider::default().with_partition(PARTITION_KEY.into(), data);
let cache = PartitionCache::new(inner, []);
@ -201,27 +220,44 @@ mod tests {
async fn test_hit() {
let inner = MockPartitionProvider::default();
let stored_partition_key = PartitionKey::from(PARTITION_KEY);
let partition = Partition {
id: PARTITION_ID,
shard_id: SHARD_ID,
table_id: TABLE_ID,
partition_key: PARTITION_KEY.into(),
partition_key: stored_partition_key.clone(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
};
let cache = PartitionCache::new(inner, [partition]);
let callers_partition_key = PartitionKey::from(PARTITION_KEY);
let got = cache
.get_partition(PARTITION_KEY.into(), SHARD_ID, TABLE_ID, TABLE_NAME.into())
.get_partition(
callers_partition_key.clone(),
SHARD_ID,
TABLE_ID,
TABLE_NAME.into(),
)
.await;
assert_eq!(got.id(), PARTITION_ID);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
assert_eq!(*got.partition_key(), PartitionKey::from(PARTITION_KEY));
// The cache should have been cleaned up as it was consumed.
assert!(cache.entries.lock().is_empty());
// Assert the partition key from the cache was used for the lifetime of
// the partition, so that it is shared with the cache + other partitions
// that share the same partition key across all tables.
assert!(got.partition_key().ptr_eq(&stored_partition_key));
// It does not use the short-lived caller's partition key (derived from
// the DML op it is processing).
assert!(!got.partition_key().ptr_eq(&callers_partition_key));
}
#[tokio::test]
@ -230,7 +266,14 @@ mod tests {
let other_key_id = PartitionId::new(99);
let inner = MockPartitionProvider::default().with_partition(
other_key.clone(),
PartitionData::new(other_key_id, SHARD_ID, TABLE_ID, TABLE_NAME.into(), None),
PartitionData::new(
other_key_id,
PARTITION_KEY.into(),
SHARD_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
),
);
let partition = Partition {
@ -258,7 +301,14 @@ mod tests {
let other_table = TableId::new(1234);
let inner = MockPartitionProvider::default().with_partition(
PARTITION_KEY.into(),
PartitionData::new(PARTITION_ID, SHARD_ID, other_table, TABLE_NAME.into(), None),
PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
SHARD_ID,
other_table,
TABLE_NAME.into(),
None,
),
);
let partition = Partition {
@ -291,7 +341,14 @@ mod tests {
let other_shard = ShardId::new(1234);
let inner = MockPartitionProvider::default().with_partition(
PARTITION_KEY.into(),
PartitionData::new(PARTITION_ID, other_shard, TABLE_ID, TABLE_NAME.into(), None),
PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
other_shard,
TABLE_ID,
TABLE_NAME.into(),
None,
),
);
let partition = Partition {

View File

@ -69,6 +69,10 @@ impl PartitionProvider for CatalogPartitionResolver {
PartitionData::new(
p.id,
// Use the caller's partition key instance, as it MAY be shared with
// other instance, but the instance returned from the catalog
// definitely has no other refs.
partition_key,
shard_id,
table_id,
table_name,
@ -124,11 +128,12 @@ mod tests {
(shard.id, table.id)
};
let callers_partition_key = PartitionKey::from(PARTITION_KEY);
let table_name = TABLE_NAME.into();
let resolver = CatalogPartitionResolver::new(Arc::clone(&catalog));
let got = resolver
.get_partition(
PartitionKey::from(PARTITION_KEY),
callers_partition_key.clone(),
shard_id,
table_id,
Arc::clone(&table_name),
@ -136,6 +141,7 @@ mod tests {
.await;
assert_eq!(*got.table_name(), *table_name);
assert_eq!(got.max_persisted_sequence_number(), None);
assert!(got.partition_key.ptr_eq(&callers_partition_key));
let got = catalog
.repositories()

View File

@ -58,7 +58,14 @@ mod tests {
let table_id = TableId::new(24);
let table_name = "platanos".into();
let partition = PartitionId::new(4242);
let data = PartitionData::new(partition, shard_id, table_id, Arc::clone(&table_name), None);
let data = PartitionData::new(
partition,
"bananas".into(),
shard_id,
table_id,
Arc::clone(&table_name),
None,
);
let mock = Arc::new(MockPartitionProvider::default().with_partition(key.clone(), data));