Merge pull request #7559 from influxdata/dom/coalesce-partition-fetches

feat: coalesce partition catalog fetches
pull/24376/head
Dom 2023-04-14 17:02:21 +01:00 committed by GitHub
commit d55d41b174
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 484 additions and 41 deletions

View File

@ -167,7 +167,7 @@ where
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData {
) -> Arc<Mutex<PartitionData>> {
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
@ -188,7 +188,7 @@ where
// Use the returned partition key instead of the callers - this
// allows the backing str memory to be reused across all partitions
// using the same key!
return PartitionData::new(
return Arc::new(Mutex::new(PartitionData::new(
partition_id,
key,
namespace_id,
@ -197,7 +197,7 @@ where
table_name,
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
transition_shard_id,
);
)));
}
debug!(%table_id, %partition_key, "partition cache miss");
@ -218,6 +218,9 @@ where
#[cfg(test)]
mod tests {
// Harmless in tests - saves a bunch of extra vars.
#![allow(clippy::await_holding_lock)]
use data_types::ShardId;
use iox_catalog::mem::MemCatalog;
@ -282,10 +285,10 @@ mod tests {
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert_eq!(&**got.namespace_name().get().await, NAMESPACE_NAME);
assert_eq!(got.lock().partition_id(), PARTITION_ID);
assert_eq!(got.lock().table_id(), TABLE_ID);
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
assert_eq!(&**got.lock().namespace_name().get().await, NAMESPACE_NAME);
assert!(cache.inner.is_empty());
}
@ -322,11 +325,14 @@ mod tests {
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert_eq!(&**got.namespace_name().get().await, NAMESPACE_NAME);
assert_eq!(*got.partition_key(), PartitionKey::from(PARTITION_KEY));
assert_eq!(got.lock().partition_id(), PARTITION_ID);
assert_eq!(got.lock().table_id(), TABLE_ID);
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
assert_eq!(&**got.lock().namespace_name().get().await, NAMESPACE_NAME);
assert_eq!(
*got.lock().partition_key(),
PartitionKey::from(PARTITION_KEY)
);
// The cache should have been cleaned up as it was consumed.
assert!(cache.entries.lock().is_empty());
@ -334,10 +340,10 @@ mod tests {
// Assert the partition key from the cache was used for the lifetime of
// the partition, so that it is shared with the cache + other partitions
// that share the same partition key across all tables.
assert!(got.partition_key().ptr_eq(&stored_partition_key));
assert!(got.lock().partition_key().ptr_eq(&stored_partition_key));
// It does not use the short-lived caller's partition key (derived from
// the DML op it is processing).
assert!(!got.partition_key().ptr_eq(&callers_partition_key));
assert!(!got.lock().partition_key().ptr_eq(&callers_partition_key));
}
#[tokio::test]
@ -385,9 +391,9 @@ mod tests {
)
.await;
assert_eq!(got.partition_id(), other_key_id);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert_eq!(got.lock().partition_id(), other_key_id);
assert_eq!(got.lock().table_id(), TABLE_ID);
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
}
#[tokio::test]
@ -434,8 +440,8 @@ mod tests {
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.table_id(), other_table);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert_eq!(got.lock().partition_id(), PARTITION_ID);
assert_eq!(got.lock().table_id(), other_table);
assert_eq!(&**got.lock().table_name().get().await, TABLE_NAME);
}
}

View File

@ -8,6 +8,7 @@ use backoff::{Backoff, BackoffConfig};
use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use super::r#trait::PartitionProvider;
use crate::{
@ -63,7 +64,7 @@ impl PartitionProvider for CatalogPartitionResolver {
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData {
) -> Arc<Mutex<PartitionData>> {
debug!(
%partition_key,
%table_id,
@ -78,7 +79,7 @@ impl PartitionProvider for CatalogPartitionResolver {
.await
.expect("retry forever");
PartitionData::new(
Arc::new(Mutex::new(PartitionData::new(
p.id,
// Use the caller's partition key instance, as it MAY be shared with
// other instance, but the instance returned from the catalog
@ -90,12 +91,15 @@ impl PartitionProvider for CatalogPartitionResolver {
table_name,
SortKeyState::Provided(p.sort_key()),
transition_shard_id,
)
)))
}
}
#[cfg(test)]
mod tests {
// Harmless in tests - saves a bunch of extra vars.
#![allow(clippy::await_holding_lock)]
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
@ -157,18 +161,18 @@ mod tests {
.await;
// Ensure the table name is available.
let _ = got.table_name().get().await;
let _ = got.lock().table_name().get().await;
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(got.table_name().to_string(), table_name.to_string());
assert_matches!(got.sort_key(), SortKeyState::Provided(None));
assert!(got.partition_key.ptr_eq(&callers_partition_key));
assert_eq!(got.lock().namespace_id(), namespace_id);
assert_eq!(got.lock().table_name().to_string(), table_name.to_string());
assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None));
assert!(got.lock().partition_key.ptr_eq(&callers_partition_key));
let got = catalog
.repositories()
.await
.partitions()
.get_by_id(got.partition_id)
.get_by_id(got.lock().partition_id)
.await
.unwrap()
.expect("partition not created");

View File

@ -0,0 +1,423 @@
use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use arrow::compute::kernels::partition;
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use futures::{future::Shared, FutureExt};
use hashbrown::{hash_map::Entry, HashMap};
use parking_lot::Mutex;
use crate::{
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
deferred_load::DeferredLoad,
};
use super::PartitionProvider;
/// A helper alias for a boxed, dynamically dispatched future that resolves to a
/// arc/mutex wrapped [`PartitionData`].
type BoxedResolveFuture =
Pin<Box<dyn std::future::Future<Output = Arc<Mutex<PartitionData>>> + Send>>;
/// A compound key of `(namespace, table, partition_key)` which uniquely
/// identifies a single partition.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct Key {
namespace_id: NamespaceId,
table_id: TableId,
partition_key: PartitionKey,
}
/// The state of the resolver.
///
/// The [`Shared`] requires more space than the simple ref-pointer to the
/// [`PartitionData`], so resolving callers replace the shared handle with the
/// resolved result where possible.
#[derive(Debug)]
enum State {
/// A resolve task is ongoing, and the caller can await the [`Shared`]
/// future to obtain the result.
///
/// If the atomic bool is false, no thread is changing this [`State`] to
/// [`State::Resolved`] for the resolved partition. If true, a thread is in
/// the process of setting (or already has set) the state to
/// [`State::Resolved`].
Resolving(Shared<BoxedResolveFuture>, Arc<AtomicBool>),
/// A prior call resolved this partition.
Resolved(Arc<Mutex<PartitionData>>),
}
/// A coalescing [`PartitionProvider`] reducing N partition fetch requests into
/// a single call to `T` on a per-partition basis.
///
/// This type solves a concurrency problem, where a series of concurrent cache
/// misses "above" this type causes a series of concurrent lookups against the
/// inner resolver "below" this type for a single partition. This is wasteful,
/// as only one result is retained by the callers (a single [`PartitionData`] is
/// used to reference a partition of data).
///
/// This type is typically used to coalesce requests against the
/// [`CatalogPartitionResolver`]:
///
/// ```text
/// ┌─────────────────────────────┐
/// │ Cache │
/// └─────────────────────────────┘
/// │ │ │
/// ▼ ▼ ▼
/// ┌─────────────────────────────┐
/// │ CoalescePartitionResolver │
/// └─────────────────────────────┘
/// │
/// ▼
/// ┌─────────────────────────────┐
/// │ CatalogPartitionResolver │
/// └─────────────────────────────┘
/// ```
///
/// Imagine the following concurrent requests without this type:
///
/// * T1: check cache for partition A, miss
/// * T2: check cache for partition A, miss
/// * T1: inner.get_partition(A)
/// * T2: inner.get_partition(A)
/// * T1: cache put partition A
/// * T2: cache put partition A
///
/// With this type, the concurrent requests for a single partition (A) are
/// coalesced into a single request against the inner resolver:
///
/// * T1: check cache for partition A, miss
/// * T2: check cache for partition A, miss
/// * T1: CoalescePartitionResolver::get_partition(A)
/// * T2: CoalescePartitionResolver::get_partition(A)
/// * inner.get_partition() **(a single call to inner is made)**
/// * T1: cache put partition A
/// * T2: cache put partition A
///
/// # Memory Overhead
///
/// This type makes a best effort attempt to minimise the memory overhead of
/// memorising partition fetches. Callers drop the intermediate resolving state
/// upon success, leaving only a ref-counted pointer to the shared
/// [`PartitionData`] (a single [`Arc`] ref overhead).
///
/// # Cancellation Safety
///
/// This type is cancellation safe - calls to
/// [`CoalescePartitionResolver::get_partition()`] are safe to abort at any
/// point.
///
/// [`CatalogPartitionResolver`]: super::CatalogPartitionResolver
#[derive(Debug)]
pub struct CoalescePartitionResolver<T> {
/// The inner resolver the actual partition fetch is delegated to.
inner: Arc<T>,
/// A map of handles to ongoing resolve futures.
ongoing: Mutex<HashMap<Key, State>>,
}
impl<T> CoalescePartitionResolver<T> {
pub fn new(inner: Arc<T>) -> Self {
Self {
inner,
ongoing: Mutex::new(HashMap::default()),
}
}
}
#[async_trait]
impl<T> PartitionProvider for CoalescePartitionResolver<T>
where
T: PartitionProvider + 'static,
{
async fn get_partition(
&self,
partition_key: PartitionKey,
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> Arc<Mutex<PartitionData>> {
let key = Key {
namespace_id,
table_id,
partition_key: partition_key.clone(), // Ref-counted anyway!
};
// Check if there's an ongoing (or recently completed) resolve.
let (shared, done) = match self.ongoing.lock().entry(key.clone()) {
Entry::Occupied(v) => match v.get() {
State::Resolving(fut, done) => (fut.clone(), Arc::clone(done)),
State::Resolved(v) => return Arc::clone(v),
},
Entry::Vacant(v) => {
// Spawn a future to resolve the partition, and retain a handle
// to it.
let inner = Arc::clone(&self.inner);
let fut: BoxedResolveFuture = Box::pin(async move {
inner
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table_name,
transition_shard_id,
)
.await
});
// Make the future poll-able by many callers, all of which
// resolve to the same output PartitionData instance.
let fut = fut.shared();
let done = Arc::new(AtomicBool::new(false));
// Allow future callers to obtain this shared handle, instead of
// resolving the partition themselves.
v.insert(State::Resolving(fut.clone(), Arc::clone(&done)));
(fut, done)
}
};
// Wait for the resolve to complete.
//
// If this caller future is dropped before this resolve future
// completes, then it remains unpolled until the next caller obtains a
// shared handle and continues the process.
let res = shared.await;
// As an optimisation, select exactly one thread to acquire the lock and
// change the state instead of every caller trying to set the state to
// "resolved", which involves contending on the lock for all concurrent
// callers for all concurrent partition fetches.
//
// Any caller that has been awaiting the shared future above is a
// candidate to perform this state change, but only one thread will
// attempt to. If the presence of aborted callers waiting on the shared
// future, each completed await caller will attempt to change state
// (cancellation safe).
if done
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
// This task should drop the Shared, swapping it for the resolved
// state.
//
// This thread SHOULD NOT fail to perform this action as no other
// thread will attempt it now the bool has been toggled.
let old = self
.ongoing
.lock()
.insert(key, State::Resolved(Arc::clone(&res)));
// Invariant: the resolve future must exist in the map, and the
// state may only be changed by the thread that won the CAS.
assert!(matches!(old, Some(State::Resolving(..))));
}
res
}
}
#[cfg(test)]
mod tests {
use std::{
future,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use assert_matches::assert_matches;
use data_types::{PartitionId, TRANSITION_SHARD_ID};
use futures::{stream::FuturesUnordered, StreamExt};
use test_helpers::timeout::FutureTimeout;
use crate::buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState};
use super::*;
const PARTITION_KEY: &str = "bananas";
#[tokio::test]
async fn test_coalesce() {
const MAX_TASKS: usize = 50;
let namespace_id = NamespaceId::new(1234);
let namespace_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
NamespaceName::from("ns-platanos")
}));
let table_id = TableId::new(24);
let table_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
}));
let partition = PartitionId::new(4242);
let data = PartitionData::new(
partition,
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
);
// Add a single instance of the partition - if more than one call is
// made, this will cause a panic.
let inner = Arc::new(MockPartitionProvider::default().with_partition(data));
let layer = Arc::new(CoalescePartitionResolver::new(Arc::clone(&inner)));
let results = (0..MAX_TASKS)
.map(|_| {
let namespace_name = Arc::clone(&namespace_name);
let table_name = Arc::clone(&table_name);
layer.get_partition(
PartitionKey::from(PARTITION_KEY),
namespace_id,
namespace_name,
table_id,
table_name,
TRANSITION_SHARD_ID,
)
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;
// All the resulting instances of PartitionData MUST be the same
// ref-counted instance.
results.as_slice().windows(2).for_each(|v| {
assert!(Arc::ptr_eq(&v[0], &v[1]));
});
// The state should have been set to "resolved" to reclaim memory
assert_matches!(
layer.ongoing.lock().values().next(),
Some(State::Resolved(..))
);
}
// A resolver that blocks forever when resolving PARTITION_KEY but instantly
// finishes all others.
#[derive(Debug)]
struct BlockingResolver {
p: Arc<Mutex<PartitionData>>,
}
impl PartitionProvider for BlockingResolver {
fn get_partition<'life0, 'async_trait>(
&'life0 self,
partition_key: PartitionKey,
_namespace_id: NamespaceId,
_namespace_name: Arc<DeferredLoad<NamespaceName>>,
_table_id: TableId,
_table_name: Arc<DeferredLoad<TableName>>,
_transition_shard_id: ShardId,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Arc<Mutex<PartitionData>>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
if partition_key == PartitionKey::from(PARTITION_KEY) {
return future::pending().boxed();
}
future::ready(Arc::clone(&self.p)).boxed()
}
}
#[tokio::test]
async fn test_disjoint_parallelised() {
use futures::Future;
let namespace_id = NamespaceId::new(1234);
let namespace_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
NamespaceName::from("ns-platanos")
}));
let table_id = TableId::new(24);
let table_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
}));
let partition = PartitionId::new(4242);
let data = PartitionData::new(
partition,
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
);
// Add a single instance of the partition - if more than one call is
// made to the mock, it will panic.
let inner = Arc::new(BlockingResolver {
p: Arc::new(Mutex::new(data)),
});
let layer = Arc::new(CoalescePartitionResolver::new(inner));
// The following two partitions are for the same (blocked) partition and
// neither resolve.
let pa_1 = layer.get_partition(
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
TRANSITION_SHARD_ID,
);
let pa_2 = layer.get_partition(
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
TRANSITION_SHARD_ID,
);
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
futures::pin_mut!(pa_1);
futures::pin_mut!(pa_2);
// Neither make progress
assert_matches!(Pin::new(&mut pa_1).poll(&mut cx), Poll::Pending);
assert_matches!(Pin::new(&mut pa_2).poll(&mut cx), Poll::Pending);
// But a non-blocked partition is resolved without issue.
let _ = layer
.get_partition(
PartitionKey::from("platanos"),
namespace_id,
namespace_name,
table_id,
table_name,
TRANSITION_SHARD_ID,
)
.with_timeout_panic(Duration::from_secs(5))
.await;
// While the original requests are still blocked.
assert_matches!(Pin::new(&mut pa_1).poll(&mut cx), Poll::Pending);
assert_matches!(Pin::new(&mut pa_2).poll(&mut cx), Poll::Pending);
}
}

View File

@ -55,7 +55,7 @@ impl PartitionProvider for MockPartitionProvider {
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
_transition_shard_id: ShardId,
) -> PartitionData {
) -> Arc<Mutex<PartitionData>> {
let p = self
.partitions
.lock()
@ -67,6 +67,6 @@ impl PartitionProvider for MockPartitionProvider {
assert_eq!(p.namespace_id(), namespace_id);
assert_eq!(p.namespace_name().to_string(), namespace_name.to_string());
assert_eq!(p.table_name().to_string(), table_name.to_string());
p
Arc::new(Mutex::new(p))
}
}

View File

@ -16,5 +16,8 @@ pub(crate) use catalog::*;
mod sort_key;
pub(crate) use sort_key::*;
mod coalesce;
pub(crate) use coalesce::*;
#[cfg(test)]
pub(crate) mod mock;

View File

@ -2,6 +2,7 @@ use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use parking_lot::Mutex;
use crate::{
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
@ -25,7 +26,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug {
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData;
) -> Arc<Mutex<PartitionData>>;
}
#[async_trait]
@ -41,7 +42,7 @@ where
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData {
) -> Arc<Mutex<PartitionData>> {
(**self)
.get_partition(
partition_key,
@ -101,9 +102,12 @@ mod tests {
TRANSITION_SHARD_ID,
)
.await;
assert_eq!(got.partition_id(), partition);
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(got.namespace_name().to_string(), namespace_name.to_string());
assert_eq!(got.table_name().to_string(), table_name.to_string());
assert_eq!(got.lock().partition_id(), partition);
assert_eq!(got.lock().namespace_id(), namespace_id);
assert_eq!(
got.lock().namespace_name().to_string(),
namespace_name.to_string()
);
assert_eq!(got.lock().table_name().to_string(), table_name.to_string());
}
}

View File

@ -183,8 +183,7 @@ where
//
// This MAY return a different instance than `p` if another
// thread has already initialised the partition.
self.partition_data
.get_or_insert_with(&partition_key, || Arc::new(Mutex::new(p)))
self.partition_data.get_or_insert_with(&partition_key, || p)
}
};

View File

@ -26,7 +26,9 @@ use wal::Wal;
use crate::{
buffer_tree::{
namespace::name_resolver::{NamespaceNameProvider, NamespaceNameResolver},
partition::resolver::{CatalogPartitionResolver, PartitionCache, PartitionProvider},
partition::resolver::{
CatalogPartitionResolver, CoalescePartitionResolver, PartitionCache, PartitionProvider,
},
table::name_resolver::{TableNameProvider, TableNameResolver},
BufferTree,
},
@ -281,8 +283,10 @@ where
.await
.map_err(InitError::PreWarmPartitions)?;
// Build the partition provider, wrapped in the partition cache.
// Build the partition provider, wrapped in the partition cache and request
// coalescer.
let partition_provider = CatalogPartitionResolver::new(Arc::clone(&catalog));
let partition_provider = CoalescePartitionResolver::new(Arc::new(partition_provider));
let partition_provider = PartitionCache::new(
partition_provider,
recent_partitions,