refactor: resolve Arc-wrapped PartitionData
Changes the PartitionResolver trait to return a ref-counted PartitionData instance, instead of a plain PartitionData (which is then wrapped in an Arc anyway). This allows resolver implementations to return multiple references to the same physical instance.pull/24376/head
parent
f46d06d56f
commit
435499e9d7
|
@ -167,7 +167,7 @@ where
|
|||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
) -> Arc<Mutex<PartitionData>> {
|
||||
// Use the cached PartitionKey instead of the caller's partition_key,
|
||||
// instead preferring to reuse the already-shared Arc<str> in the cache.
|
||||
|
||||
|
@ -188,7 +188,7 @@ where
|
|||
// Use the returned partition key instead of the callers - this
|
||||
// allows the backing str memory to be reused across all partitions
|
||||
// using the same key!
|
||||
return PartitionData::new(
|
||||
return Arc::new(Mutex::new(PartitionData::new(
|
||||
partition_id,
|
||||
key,
|
||||
namespace_id,
|
||||
|
@ -197,7 +197,7 @@ where
|
|||
table_name,
|
||||
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
|
||||
transition_shard_id,
|
||||
);
|
||||
)));
|
||||
}
|
||||
|
||||
debug!(%table_id, %partition_key, "partition cache miss");
|
||||
|
@ -218,6 +218,9 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
// Harmless in tests - saves a bunch of extra vars.
|
||||
#![allow(clippy::await_holding_lock)]
|
||||
|
||||
use data_types::ShardId;
|
||||
use iox_catalog::mem::MemCatalog;
|
||||
|
||||
|
@ -282,10 +285,10 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(got.partition_id(), PARTITION_ID);
|
||||
assert_eq!(got.table_id(), TABLE_ID);
|
||||
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
|
||||
assert_eq!(&**got.namespace_name().get().await, NAMESPACE_NAME);
|
||||
assert_eq!(got.lock().partition_id(), PARTITION_ID);
|
||||
assert_eq!(got.lock().table_id(), TABLE_ID);
|
||||
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
|
||||
assert_eq!(&**got.lock().namespace_name().get().await, NAMESPACE_NAME);
|
||||
assert!(cache.inner.is_empty());
|
||||
}
|
||||
|
||||
|
@ -322,11 +325,14 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(got.partition_id(), PARTITION_ID);
|
||||
assert_eq!(got.table_id(), TABLE_ID);
|
||||
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
|
||||
assert_eq!(&**got.namespace_name().get().await, NAMESPACE_NAME);
|
||||
assert_eq!(*got.partition_key(), PartitionKey::from(PARTITION_KEY));
|
||||
assert_eq!(got.lock().partition_id(), PARTITION_ID);
|
||||
assert_eq!(got.lock().table_id(), TABLE_ID);
|
||||
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
|
||||
assert_eq!(&**got.lock().namespace_name().get().await, NAMESPACE_NAME);
|
||||
assert_eq!(
|
||||
*got.lock().partition_key(),
|
||||
PartitionKey::from(PARTITION_KEY)
|
||||
);
|
||||
|
||||
// The cache should have been cleaned up as it was consumed.
|
||||
assert!(cache.entries.lock().is_empty());
|
||||
|
@ -334,10 +340,10 @@ mod tests {
|
|||
// Assert the partition key from the cache was used for the lifetime of
|
||||
// the partition, so that it is shared with the cache + other partitions
|
||||
// that share the same partition key across all tables.
|
||||
assert!(got.partition_key().ptr_eq(&stored_partition_key));
|
||||
assert!(got.lock().partition_key().ptr_eq(&stored_partition_key));
|
||||
// It does not use the short-lived caller's partition key (derived from
|
||||
// the DML op it is processing).
|
||||
assert!(!got.partition_key().ptr_eq(&callers_partition_key));
|
||||
assert!(!got.lock().partition_key().ptr_eq(&callers_partition_key));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -385,9 +391,9 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(got.partition_id(), other_key_id);
|
||||
assert_eq!(got.table_id(), TABLE_ID);
|
||||
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
|
||||
assert_eq!(got.lock().partition_id(), other_key_id);
|
||||
assert_eq!(got.lock().table_id(), TABLE_ID);
|
||||
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -434,8 +440,8 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(got.partition_id(), PARTITION_ID);
|
||||
assert_eq!(got.table_id(), other_table);
|
||||
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
|
||||
assert_eq!(got.lock().partition_id(), PARTITION_ID);
|
||||
assert_eq!(got.lock().table_id(), other_table);
|
||||
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use backoff::{Backoff, BackoffConfig};
|
|||
use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use observability_deps::tracing::debug;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use super::r#trait::PartitionProvider;
|
||||
use crate::{
|
||||
|
@ -63,7 +64,7 @@ impl PartitionProvider for CatalogPartitionResolver {
|
|||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
) -> Arc<Mutex<PartitionData>> {
|
||||
debug!(
|
||||
%partition_key,
|
||||
%table_id,
|
||||
|
@ -78,7 +79,7 @@ impl PartitionProvider for CatalogPartitionResolver {
|
|||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
PartitionData::new(
|
||||
Arc::new(Mutex::new(PartitionData::new(
|
||||
p.id,
|
||||
// Use the caller's partition key instance, as it MAY be shared with
|
||||
// other instance, but the instance returned from the catalog
|
||||
|
@ -90,12 +91,15 @@ impl PartitionProvider for CatalogPartitionResolver {
|
|||
table_name,
|
||||
SortKeyState::Provided(p.sort_key()),
|
||||
transition_shard_id,
|
||||
)
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
// Harmless in tests - saves a bunch of extra vars.
|
||||
#![allow(clippy::await_holding_lock)]
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
|
@ -157,18 +161,18 @@ mod tests {
|
|||
.await;
|
||||
|
||||
// Ensure the table name is available.
|
||||
let _ = got.table_name().get().await;
|
||||
let _ = got.lock().table_name().get().await;
|
||||
|
||||
assert_eq!(got.namespace_id(), namespace_id);
|
||||
assert_eq!(got.table_name().to_string(), table_name.to_string());
|
||||
assert_matches!(got.sort_key(), SortKeyState::Provided(None));
|
||||
assert!(got.partition_key.ptr_eq(&callers_partition_key));
|
||||
assert_eq!(got.lock().namespace_id(), namespace_id);
|
||||
assert_eq!(got.lock().table_name().to_string(), table_name.to_string());
|
||||
assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None));
|
||||
assert!(got.lock().partition_key.ptr_eq(&callers_partition_key));
|
||||
|
||||
let got = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(got.partition_id)
|
||||
.get_by_id(got.lock().partition_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("partition not created");
|
||||
|
|
|
@ -55,7 +55,7 @@ impl PartitionProvider for MockPartitionProvider {
|
|||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
_transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
) -> Arc<Mutex<PartitionData>> {
|
||||
let p = self
|
||||
.partitions
|
||||
.lock()
|
||||
|
@ -67,6 +67,6 @@ impl PartitionProvider for MockPartitionProvider {
|
|||
assert_eq!(p.namespace_id(), namespace_id);
|
||||
assert_eq!(p.namespace_name().to_string(), namespace_name.to_string());
|
||||
assert_eq!(p.table_name().to_string(), table_name.to_string());
|
||||
p
|
||||
Arc::new(Mutex::new(p))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::{fmt::Debug, sync::Arc};
|
|||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::{
|
||||
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
|
||||
|
@ -25,7 +26,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug {
|
|||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData;
|
||||
) -> Arc<Mutex<PartitionData>>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -41,7 +42,7 @@ where
|
|||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
) -> Arc<Mutex<PartitionData>> {
|
||||
(**self)
|
||||
.get_partition(
|
||||
partition_key,
|
||||
|
@ -101,9 +102,12 @@ mod tests {
|
|||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(got.partition_id(), partition);
|
||||
assert_eq!(got.namespace_id(), namespace_id);
|
||||
assert_eq!(got.namespace_name().to_string(), namespace_name.to_string());
|
||||
assert_eq!(got.table_name().to_string(), table_name.to_string());
|
||||
assert_eq!(got.lock().partition_id(), partition);
|
||||
assert_eq!(got.lock().namespace_id(), namespace_id);
|
||||
assert_eq!(
|
||||
got.lock().namespace_name().to_string(),
|
||||
namespace_name.to_string()
|
||||
);
|
||||
assert_eq!(got.lock().table_name().to_string(), table_name.to_string());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,8 +183,7 @@ where
|
|||
//
|
||||
// This MAY return a different instance than `p` if another
|
||||
// thread has already initialised the partition.
|
||||
self.partition_data
|
||||
.get_or_insert_with(&partition_key, || Arc::new(Mutex::new(p)))
|
||||
self.partition_data.get_or_insert_with(&partition_key, || p)
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue