Merge pull request #7593 from influxdata/dom/coalesce-drive-completion

fix(ingester): always drive partition resolve futures to completion
pull/24376/head
Dom 2023-04-18 18:16:43 +01:00 committed by GitHub
commit 711b69a23f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 202 additions and 62 deletions

View File

@ -113,7 +113,8 @@ enum State {
///
/// This type is cancellation safe - calls to
/// [`CoalescePartitionResolver::get_partition()`] are safe to abort at any
/// point.
/// point - the underlying resolve future is always driven to completion in the
/// background once started.
///
/// [`CatalogPartitionResolver`]: super::CatalogPartitionResolver
#[derive(Debug)]
@ -164,18 +165,15 @@ where
// Spawn a future to resolve the partition, and retain a handle
// to it.
let inner = Arc::clone(&self.inner);
let fut: BoxedResolveFuture = Box::pin(async move {
inner
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table_name,
transition_shard_id,
)
.await
});
let fut: BoxedResolveFuture = Box::pin(do_fetch(
inner,
partition_key,
namespace_id,
namespace_name,
table_id,
table_name,
transition_shard_id,
));
// Make the future poll-able by many callers, all of which
// resolve to the same output PartitionData instance.
@ -199,17 +197,18 @@ where
// As an optimisation, select exactly one thread to acquire the lock and
// change the state instead of every caller trying to set the state to
// "resolved", which involves contending on the lock for all concurrent
// "resolved", which involves contending on the lock with all concurrent
// callers for all concurrent partition fetches.
//
// Any caller that has been awaiting the shared future above is a
// candidate to perform this state change, but only one thread will
// attempt to. If the presence of aborted callers waiting on the shared
// attempt to. In the presence of aborted callers waiting on the shared
// future, each completed await caller will attempt to change state
// (cancellation safe).
if done
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
if !done.load(Ordering::Relaxed)
&& done
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
// This task should drop the Shared, swapping it for the resolved
// state.
@ -230,6 +229,42 @@ where
}
}
async fn do_fetch<T>(
inner: T,
partition_key: PartitionKey,
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> Arc<Mutex<PartitionData>>
where
T: PartitionProvider + 'static,
{
// Spawn a task, ensuring the resolve future is always driven to completion
// independently of callers polling the shared result future.
//
// This prevents the resolve future from being abandoned by all callers and
// left allocated (referenced by the ongoing map) but not polled - this
// could result in a connection being taken from the connection pool and
// never returned as the resolve future is neither completed, nor dropped
// (which would cause the connection to be returned).
tokio::spawn(async move {
inner
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table_name,
transition_shard_id,
)
.await
})
.await
.expect("coalesced partition resolve task panic")
}
#[cfg(test)]
mod tests {
use std::{
@ -241,35 +276,46 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{PartitionId, TRANSITION_SHARD_ID};
use futures::Future;
use futures::{stream::FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use test_helpers::timeout::FutureTimeout;
use tokio::sync::{Notify, Semaphore};
use crate::buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState};
use super::*;
const PARTITION_ID: PartitionId = PartitionId::new(4242);
const PARTITION_KEY: &str = "bananas";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const TABLE_ID: TableId = TableId::new(42);
lazy_static! {
static ref NAMESPACE_NAME: Arc<DeferredLoad<NamespaceName>> =
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
NamespaceName::from("ns-platanos")
}));
static ref TABLE_NAME: Arc<DeferredLoad<TableName>> =
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
}));
}
/// This test proves that parallel queries for the same partition are
/// coalesced, returning the same [`PartitionData`] instance and submitting
/// a single query to the inner resolver.
#[tokio::test]
async fn test_coalesce() {
const MAX_TASKS: usize = 50;
let namespace_id = NamespaceId::new(1234);
let namespace_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
NamespaceName::from("ns-platanos")
}));
let table_id = TableId::new(24);
let table_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
}));
let partition = PartitionId::new(4242);
let data = PartitionData::new(
partition,
PARTITION_ID,
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
NAMESPACE_ID,
Arc::clone(&*NAMESPACE_NAME),
TABLE_ID,
Arc::clone(&*TABLE_NAME),
SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
);
@ -281,13 +327,13 @@ mod tests {
let results = (0..MAX_TASKS)
.map(|_| {
let namespace_name = Arc::clone(&namespace_name);
let table_name = Arc::clone(&table_name);
let namespace_name = Arc::clone(&*NAMESPACE_NAME);
let table_name = Arc::clone(&*TABLE_NAME);
layer.get_partition(
PartitionKey::from(PARTITION_KEY),
namespace_id,
NAMESPACE_ID,
namespace_name,
table_id,
TABLE_ID,
table_name,
TRANSITION_SHARD_ID,
)
@ -343,26 +389,18 @@ mod tests {
}
}
/// This test proves queries for different partitions proceed in parallel.
#[tokio::test]
async fn test_disjoint_parallelised() {
use futures::Future;
let namespace_id = NamespaceId::new(1234);
let namespace_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
NamespaceName::from("ns-platanos")
}));
let table_id = TableId::new(24);
let table_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
}));
let partition = PartitionId::new(4242);
let data = PartitionData::new(
partition,
PARTITION_ID,
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
NAMESPACE_ID,
Arc::clone(&*NAMESPACE_NAME),
TABLE_ID,
Arc::clone(&*TABLE_NAME),
SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
);
@ -378,18 +416,18 @@ mod tests {
// neither resolve.
let pa_1 = layer.get_partition(
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
NAMESPACE_ID,
Arc::clone(&*NAMESPACE_NAME),
TABLE_ID,
Arc::clone(&*TABLE_NAME),
TRANSITION_SHARD_ID,
);
let pa_2 = layer.get_partition(
PartitionKey::from(PARTITION_KEY),
namespace_id,
Arc::clone(&namespace_name),
table_id,
Arc::clone(&table_name),
NAMESPACE_ID,
Arc::clone(&*NAMESPACE_NAME),
TABLE_ID,
Arc::clone(&*TABLE_NAME),
TRANSITION_SHARD_ID,
);
@ -407,10 +445,10 @@ mod tests {
let _ = layer
.get_partition(
PartitionKey::from("platanos"),
namespace_id,
namespace_name,
table_id,
table_name,
NAMESPACE_ID,
Arc::clone(&*NAMESPACE_NAME),
TABLE_ID,
Arc::clone(&*TABLE_NAME),
TRANSITION_SHARD_ID,
)
.with_timeout_panic(Duration::from_secs(5))
@ -420,4 +458,106 @@ mod tests {
assert_matches!(Pin::new(&mut pa_1).poll(&mut cx), Poll::Pending);
assert_matches!(Pin::new(&mut pa_2).poll(&mut cx), Poll::Pending);
}
/// A resolver that obtains a semaphore (simulating a connection pool
/// semaphore) during a resolve and then blocks until signalled.
#[derive(Debug)]
struct SemaphoreResolver {
/// The resolver call acquires a permit from here.
sem: Arc<Semaphore>,
/// An then waits for this notify to be unblocked before "completing".
wait: Arc<Notify>,
/// And returning this data
p: Arc<Mutex<PartitionData>>,
}
#[async_trait]
impl PartitionProvider for SemaphoreResolver {
async fn get_partition(
&self,
_partition_key: PartitionKey,
_namespace_id: NamespaceId,
_namespace_name: Arc<DeferredLoad<NamespaceName>>,
_table_id: TableId,
_table_name: Arc<DeferredLoad<TableName>>,
_transition_shard_id: ShardId,
) -> Arc<Mutex<PartitionData>> {
let waker = self.wait.notified();
let permit = self.sem.acquire().await.unwrap();
waker.await;
drop(permit); // explicit permit drop for clarity
Arc::clone(&self.p)
}
}
/// This test asserts a resolve future that is started, is always driven to
/// completion.
#[tokio::test]
async fn test_inner_future_always_resolves() {
// Create a fake "connection pool" to highlight the importance of
// resolve future completion.
//
// In this case, the "pool" only has one permit/connection, and the
// resolve future will obtain it. If the future does not get driven
// forwards (because the caller aborted and is no longer polling it) the
// permit will be stuck in the future that is not making progress, but
// still allocated, effectively deadlocking the pool.
let fake_conn_pool = Arc::new(Semaphore::new(1));
// A waker to unblock the resolver semaphore for completion.
let notify = Arc::new(Notify::default());
let inner = Arc::new(SemaphoreResolver {
sem: Arc::clone(&fake_conn_pool),
wait: Arc::clone(&notify),
p: Arc::new(Mutex::new(PartitionData::new(
PARTITION_ID,
PartitionKey::from(PARTITION_KEY),
NAMESPACE_ID,
Arc::clone(&*NAMESPACE_NAME),
TABLE_ID,
Arc::clone(&*TABLE_NAME),
SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
))),
});
let layer = Arc::new(CoalescePartitionResolver::new(inner));
let fut = layer.get_partition(
PartitionKey::from(PARTITION_KEY),
NAMESPACE_ID,
Arc::clone(&*NAMESPACE_NAME),
TABLE_ID,
Arc::clone(&*TABLE_NAME),
TRANSITION_SHARD_ID,
);
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
futures::pin_mut!(fut);
// Poll the future until it blocks waiting on notify.
assert_matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending);
// Drop the caller's poll future to prove nothing else polls it.
#[allow(clippy::drop_non_drop)]
drop(fut);
// Allow the resolve future to unblock and complete, if it is being
// polled.
notify.notify_waiters();
// And attempt to acquire the only semaphore permit / "connection" from
// the pool. If this succeeds, the dropped resolve future was driven to
// completion in the background, without an explicit poll by this
// thread / the only caller.
let _conn = fake_conn_pool
.acquire()
// If a "connection" is not available within 5 seconds, panic.
.with_timeout_panic(Duration::from_secs(5))
.await
.unwrap();
}
}