refactor: ref PartitionData by key & ID

Changes the TableData to hold a map of partition key -> PartitionData,
and partition ID -> PartitionData simultaneously. This allows for cheap
lookups when the caller holds an ID.

This commit also manages to internalise the partition map within the
TableData - one less pub / peeking!

This commit also switches from a BTreeMap to a HashMap as the backing
collection, as maintaining key ordering doesn't appear to be necessary.
pull/24376/head
Dom Dwyer 2022-09-30 15:14:33 +02:00
parent 0847cc5458
commit f9bf86927d
3 changed files with 144 additions and 22 deletions

View File

@ -810,7 +810,9 @@ mod tests {
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("mem").is_some());
let mem_table = mem_table.write().await;
let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap();
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
.unwrap();
p.partition_id()
};
@ -952,7 +954,9 @@ mod tests {
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("cpu").is_some());
let mem_table = mem_table.write().await;
let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap();
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
.unwrap();
table_id = mem_table.table_id();
partition_id = p.partition_id();
@ -1352,7 +1356,7 @@ mod tests {
{
let table_data = data.table_data("mem").unwrap();
let table = table_data.read().await;
let p = table.partition_data.get(&"1970-01-01".into()).unwrap();
let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
p.max_persisted_sequence_number(),
Some(SequenceNumber::new(1))
@ -1368,7 +1372,7 @@ mod tests {
let table_data = data.table_data("mem").unwrap();
let table = table_data.read().await;
let partition = table.partition_data.get(&"1970-01-01".into()).unwrap();
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
partition.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(2)

View File

@ -226,7 +226,7 @@ impl NamespaceData {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
return t.partition_data.get_mut(partition_key).map(|p| {
return t.get_partition_by_key_mut(partition_key).map(|p| {
p.data
.generate_snapshot()
.expect("snapshot on mutable batch should never fail");
@ -249,8 +249,7 @@ impl NamespaceData {
let mut table_data = table_data.write().await;
return table_data
.partition_data
.get_mut(partition_key)
.get_partition_by_key_mut(partition_key)
.and_then(|partition_data| partition_data.snapshot_to_persisting_batch());
}
@ -321,7 +320,7 @@ impl NamespaceData {
) {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
let partition = t.partition_data.get_mut(partition_key);
let partition = t.get_partition_by_key_mut(partition_key);
if let Some(p) = partition {
p.mark_persisted(sequence_number);

View File

@ -1,9 +1,10 @@
//! Table level data buffer structures.
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::HashMap, sync::Arc};
use data_types::{
DeletePredicate, NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp,
DeletePredicate, NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId,
Timestamp,
};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -16,6 +17,39 @@ use super::partition::{
};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`PartitionData`] can be looked up by
/// [`PartitionKey`], or ID.
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_key: HashMap<PartitionKey, PartitionData>,
by_id: HashMap<PartitionId, PartitionKey>,
}
impl DoubleRef {
fn insert(&mut self, ns: PartitionData) {
let id = ns.partition_id();
let key = ns.partition_key().clone();
assert!(self.by_key.insert(key.clone(), ns).is_none());
assert!(self.by_id.insert(id, key).is_none());
}
#[cfg(test)]
fn by_key(&self, key: &PartitionKey) -> Option<&PartitionData> {
self.by_key.get(key)
}
fn by_key_mut(&mut self, key: &PartitionKey) -> Option<&mut PartitionData> {
self.by_key.get_mut(key)
}
fn by_id_mut(&mut self, id: PartitionId) -> Option<&mut PartitionData> {
let key = self.by_id.get(&id)?.clone();
self.by_key_mut(&key)
}
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct TableData {
@ -34,8 +68,8 @@ pub(crate) struct TableData {
/// `(key, shard, table)` triplet.
partition_provider: Arc<dyn PartitionProvider>,
// Map pf partition key to its data
pub(super) partition_data: BTreeMap<PartitionKey, PartitionData>,
// Map of partition key to its data
partition_data: DoubleRef,
}
impl TableData {
@ -71,6 +105,7 @@ impl TableData {
/// Return parquet_max_sequence_number
pub(super) fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.partition_data
.by_key
.values()
.map(|p| p.max_persisted_sequence_number())
.max()
@ -92,7 +127,7 @@ impl TableData {
partition_key: PartitionKey,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool, super::Error> {
let partition_data = match self.partition_data.get_mut(&partition_key) {
let partition_data = match self.partition_data.by_key.get_mut(&partition_key) {
Some(p) => p,
None => {
let p = self
@ -105,12 +140,9 @@ impl TableData {
Arc::clone(&self.table_name),
)
.await;
// Add the partition to the map.
assert!(self
.partition_data
.insert(partition_key.clone(), p)
.is_none());
self.partition_data.get_mut(&partition_key).unwrap()
// Add the double-referenced partition to the map.
self.partition_data.insert(p);
self.partition_data.by_key_mut(&partition_key).unwrap()
}
};
@ -171,15 +203,42 @@ impl TableData {
self.tombstone_max_sequence_number = Some(sequence_number);
// modify one partition at a time
for data in self.partition_data.values_mut() {
for data in self.partition_data.by_key.values_mut() {
data.buffer_tombstone(executor, tombstone.clone()).await;
}
Ok(())
}
/// Return the [`PartitionData`] for the specified ID.
#[allow(unused)]
pub(crate) fn get_partition(
&mut self,
partition_id: PartitionId,
) -> Option<&mut PartitionData> {
self.partition_data.by_id_mut(partition_id)
}
/// Return the [`PartitionData`] for the specified partition key.
#[cfg(test)]
pub(crate) fn get_partition_by_key(
&self,
partition_key: &PartitionKey,
) -> Option<&PartitionData> {
self.partition_data.by_key(partition_key)
}
/// Return the [`PartitionData`] for the specified partition key.
pub(crate) fn get_partition_by_key_mut(
&mut self,
partition_key: &PartitionKey,
) -> Option<&mut PartitionData> {
self.partition_data.by_key_mut(partition_key)
}
pub(crate) fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
self.partition_data
.by_key
.values()
.map(|p| UnpersistedPartitionData {
partition_id: p.partition_id(),
@ -204,6 +263,7 @@ impl TableData {
};
self.partition_data
.by_key
.values()
.fold(progress, |progress, partition_data| {
progress.combine(partition_data.progress())
@ -248,6 +308,65 @@ mod tests {
const PARTITION_KEY: &str = "platanos";
const PARTITION_ID: PartitionId = PartitionId::new(0);
#[tokio::test]
async fn test_partition_double_ref() {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
// Populate the catalog with the shard / namespace / table
let (shard_id, ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
shard_id,
ns_id,
table_id,
TABLE_NAME.into(),
None,
),
));
let mut table = TableData::new(
table_id,
TABLE_NAME,
shard_id,
ns_id,
None,
partition_provider,
);
let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0)
.unwrap()
.remove(TABLE_NAME)
.unwrap();
// Assert the table does not contain the test partition
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none());
assert!(table.partition_data.by_id_mut(PARTITION_ID).is_none());
// Write some test data
let pause = table
.buffer_table_write(
SequenceNumber::new(42),
batch,
PARTITION_KEY.into(),
&MockLifecycleHandle::default(),
)
.await
.expect("buffer op should succeed");
assert!(!pause);
// Referencing the partition should succeed
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());
assert!(table.partition_data.by_id_mut(PARTITION_ID).is_some());
}
#[tokio::test]
async fn test_bad_write_memory_counting() {
let metrics = Arc::new(metric::Registry::default());
@ -291,7 +410,7 @@ mod tests {
let handle = MockLifecycleHandle::default();
// Assert the table does not contain the test partition
assert!(table.partition_data.get(&PARTITION_KEY.into()).is_none());
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none());
// Write some test data
let pause = table
@ -306,7 +425,7 @@ mod tests {
assert!(!pause);
// Referencing the partition should succeed
assert!(table.partition_data.get(&PARTITION_KEY.into()).is_some());
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());
// And the lifecycle handle was called with the expected values
assert_eq!(