refactor(ingester): Simplify test code and reference actor handle passing

As pointed out, use of the turbofish for the `MockPersistQueue` default
constructor can be avoided by a specialised `Default` implementation
on the type. The WAL reference actor handle is internally refcounted,
so this commit also stops wrapping it in an `Arc`.

Co-authored-by: Dom <dom@itsallbroken.com>
pull/24376/head
Fraser Savage 2023-07-13 12:20:58 +01:00
parent 516880eeb8
commit 4c54d10098
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
8 changed files with 46 additions and 41 deletions

View File

@ -8,7 +8,7 @@ use generated_types::influxdata::{
};
use ingester::internal_implementation_details::{
encode::encode_write_op,
queue::{MockPersistQueue, NopObserver},
queue::MockPersistQueue,
write::{
PartitionedData as PayloadPartitionedData, TableData as PayloadTableData, WriteOperation,
},
@ -63,7 +63,7 @@ fn wal_replay_bench(c: &mut Criterion) {
// overhead.
let sink = NopSink::default();
let persist = MockPersistQueue::<NopObserver>::default();
let persist = MockPersistQueue::default();
// Replay the wal into the NOP.
ingester::replay(&wal, &sink, Arc::new(persist), &metric::Registry::default())

View File

@ -294,7 +294,6 @@ where
// Prepare the WAL segment reference tracker
let (wal_reference_handle, wal_reference_actor) =
WalReferenceHandle::new(Arc::clone(&wal), &metrics);
let wal_reference_handle = Arc::new(wal_reference_handle);
// Spawn the persist workers to compact partition data, convert it into
// Parquet files, and upload them to object storage.
@ -308,7 +307,7 @@ where
// Register a post-persistence observer that emits Parquet file
// attributes as metrics, and notifies the WAL segment reference tracker of
// completed persist actions.
ParquetFileInstrumentation::new(Arc::clone(&wal_reference_handle), &metrics),
ParquetFileInstrumentation::new(wal_reference_handle.clone(), &metrics),
&metrics,
);
let persist_handle = Arc::new(persist_handle);
@ -337,6 +336,8 @@ where
));
// Start the WAL reference actor and then replay the WAL log files, if any.
// The tokio handle does not need retained here as the actor handle is
// responsible for aborting the actor's run loop when dropped.
tokio::spawn(wal_reference_actor.run());
let max_sequence_number =
wal_replay::replay(&wal, &buffer, Arc::clone(&persist_handle), &metrics)
@ -355,7 +356,7 @@ where
&metrics,
),
Arc::clone(&wal),
Arc::clone(&wal_reference_handle),
wal_reference_handle.clone(),
),
"wal",
),
@ -376,7 +377,7 @@ where
let rotation_task = tokio::spawn(periodic_rotation(
Arc::clone(&wal),
wal_rotation_period,
Arc::clone(&wal_reference_handle),
wal_reference_handle.clone(),
Arc::clone(&buffer),
Arc::clone(&persist_handle),
));
@ -399,7 +400,7 @@ where
Arc::clone(&buffer),
Arc::clone(&persist_handle),
Arc::clone(&wal),
Arc::clone(&wal_reference_handle),
wal_reference_handle,
));
Ok(IngesterGuard {

View File

@ -41,7 +41,7 @@ pub(super) async fn graceful_shutdown_handler<F, T, P>(
buffer: T,
persist: P,
wal: Arc<wal::Wal>,
wal_reference_handle: Arc<WalReferenceHandle>,
wal_reference_handle: WalReferenceHandle,
) where
F: Future<Output = CancellationToken> + Send,
T: PartitionIter + Sync,
@ -170,10 +170,9 @@ mod tests {
let (_tempdir, wal) = new_wal().await;
let (wal_reference_handle, wal_reference_actor) =
WalReferenceHandle::new(Arc::clone(&wal), &metric::Registry::default());
let wal_reference_handle = Arc::new(wal_reference_handle);
let persist = Arc::new(MockPersistQueue::new_with_observer(Arc::clone(
&wal_reference_handle,
)));
let persist = Arc::new(MockPersistQueue::new_with_observer(
wal_reference_handle.clone(),
));
tokio::spawn(wal_reference_actor.run());
// Ensure there is always more than 1 segment in the test, but notify the ref tracker.
@ -193,7 +192,7 @@ mod tests {
vec![Arc::clone(&partition)],
Arc::clone(&persist),
Arc::clone(&wal),
Arc::clone(&wal_reference_handle),
wal_reference_handle,
)
.await;
@ -220,10 +219,9 @@ mod tests {
let (_tempdir, wal) = new_wal().await;
let (wal_reference_handle, wal_reference_actor) =
WalReferenceHandle::new(Arc::clone(&wal), &metric::Registry::default());
let wal_reference_handle = Arc::new(wal_reference_handle);
let persist = Arc::new(MockPersistQueue::new_with_observer(Arc::clone(
&wal_reference_handle,
)));
let persist = Arc::new(MockPersistQueue::new_with_observer(
wal_reference_handle.clone(),
));
tokio::spawn(wal_reference_actor.run());
// Ensure there is always more than 1 segment in the test, but notify the ref tracker.
@ -251,7 +249,7 @@ mod tests {
vec![Arc::clone(&partition)],
Arc::clone(&persist),
Arc::clone(&wal),
Arc::clone(&wal_reference_handle),
wal_reference_handle,
));
// Wait a small duration of time for the first buffer emptiness check to
@ -336,10 +334,9 @@ mod tests {
let (_tempdir, wal) = new_wal().await;
let (wal_reference_handle, wal_reference_actor) =
WalReferenceHandle::new(Arc::clone(&wal), &metric::Registry::default());
let wal_reference_handle = Arc::new(wal_reference_handle);
let persist = Arc::new(MockPersistQueue::new_with_observer(Arc::clone(
&wal_reference_handle,
)));
let persist = Arc::new(MockPersistQueue::new_with_observer(
wal_reference_handle.clone(),
));
tokio::spawn(wal_reference_actor.run());
// Ensure there is always more than 1 segment in the test, but notify the ref tracker.
@ -364,7 +361,7 @@ mod tests {
Arc::clone(&buffer),
Arc::clone(&persist),
Arc::clone(&wal),
Arc::clone(&wal_reference_handle),
wal_reference_handle.clone(),
));
// Wait for the shutdown to complete.

View File

@ -279,7 +279,7 @@ mod tests {
buffer_tree::partition::PartitionData,
dml_payload::IngestOp,
dml_sink::mock_sink::MockDmlSink,
persist::{completion_observer::NopObserver, queue::mock::MockPersistQueue},
persist::queue::mock::MockPersistQueue,
test_util::{
assert_write_ops_eq, make_multi_table_write_op, make_write_op, PartitionDataBuilder,
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
@ -417,7 +417,7 @@ mod tests {
assert_eq!(wal.closed_segments().len(), 2);
// Initialise the mock persist system
let persist = Arc::new(MockPersistQueue::<NopObserver>::default());
let persist = Arc::new(MockPersistQueue::default());
// Replay the results into a mock to capture the DmlWrites and returns
// some dummy partitions when iterated over.

View File

@ -109,7 +109,7 @@ mod tests {
use parking_lot::Mutex;
use crate::{
persist::{completion_observer::NopObserver, queue::mock::MockPersistQueue},
persist::queue::mock::MockPersistQueue,
test_util::{PartitionDataBuilder, ARBITRARY_TABLE_NAME},
};
@ -131,7 +131,7 @@ mod tests {
let p = Arc::new(Mutex::new(p));
let metrics = metric::Registry::default();
let persist_handle = Arc::new(MockPersistQueue::<NopObserver>::default());
let persist_handle = Arc::new(MockPersistQueue::default());
let hot_partition_persister =
HotPartitionPersister::new(Arc::clone(&persist_handle), max_cost, &metrics);

View File

@ -83,10 +83,19 @@ pub(crate) mod mock {
}
/// A mock [`PersistQueue`] implementation.
#[derive(Debug, Default)]
pub struct MockPersistQueue<O = NopObserver> {
#[derive(Debug)]
pub struct MockPersistQueue<O> {
state: Mutex<State>,
completion_observer: Arc<O>,
completion_observer: O,
}
impl Default for MockPersistQueue<NopObserver> {
fn default() -> Self {
Self {
state: Default::default(),
completion_observer: Default::default(),
}
}
}
impl<O> MockPersistQueue<O>
@ -95,7 +104,7 @@ pub(crate) mod mock {
{
/// Creates a queue that notifies the [`PersistCompletionObserver`]
/// on persist enqueue completion.
pub fn new_with_observer(completion_observer: Arc<O>) -> Self {
pub fn new_with_observer(completion_observer: O) -> Self {
Self {
state: Default::default(),
completion_observer,
@ -122,7 +131,7 @@ pub(crate) mod mock {
#[async_trait]
impl<O> PersistQueue for MockPersistQueue<O>
where
O: PersistCompletionObserver + 'static,
O: PersistCompletionObserver + Clone + 'static,
{
#[allow(clippy::async_yields_async)]
async fn enqueue(
@ -135,7 +144,7 @@ pub(crate) mod mock {
let mut guard = self.state.lock();
guard.calls.push(Arc::clone(&partition));
let completion_observer = Arc::clone(&self.completion_observer);
let completion_observer = self.completion_observer.clone();
// Spawn a persist task that randomly completes (soon) in the
// future.
//

View File

@ -12,7 +12,7 @@ use crate::{
pub(crate) async fn periodic_rotation<T, P>(
wal: Arc<wal::Wal>,
period: Duration,
wal_reference_handle: Arc<WalReferenceHandle>,
wal_reference_handle: WalReferenceHandle,
buffer: T,
persist: P,
) where
@ -102,7 +102,7 @@ mod tests {
use crate::{
buffer_tree::partition::{persisting::PersistingData, PartitionData},
dml_payload::IngestOp,
persist::{completion_observer::NopObserver, queue::mock::MockPersistQueue},
persist::queue::mock::MockPersistQueue,
test_util::{
make_write_op, new_persist_notification, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID,
ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
@ -147,7 +147,7 @@ mod tests {
// Initialise a mock persist queue to inspect the calls made to the
// persist subsystem.
let persist_handle = Arc::new(MockPersistQueue::<NopObserver>::default());
let persist_handle = Arc::new(MockPersistQueue::default());
// Initialise the WAL, write the operation to it
let tmp_dir = tempdir().expect("no temp dir available");
@ -175,14 +175,13 @@ mod tests {
let (wal_reference_handle, wal_reference_actor) =
WalReferenceHandle::new(Arc::clone(&wal), &metrics);
let wal_reference_handle = Arc::new(wal_reference_handle);
tokio::spawn(wal_reference_actor.run());
// Start the rotation task
let rotate_task_handle = tokio::spawn(periodic_rotation(
Arc::clone(&wal),
TICK_INTERVAL,
Arc::clone(&wal_reference_handle),
wal_reference_handle.clone(),
vec![Arc::clone(&p)],
Arc::clone(&persist_handle),
));
@ -329,14 +328,13 @@ mod tests {
let (wal_reference_handle, wal_reference_actor) =
WalReferenceHandle::new(Arc::clone(&wal), &metrics);
let wal_reference_handle = Arc::new(wal_reference_handle);
tokio::spawn(wal_reference_actor.run());
// Start the rotation task
let rotate_task_handle = tokio::spawn(periodic_rotation(
Arc::clone(&wal),
TICK_INTERVAL,
Arc::clone(&wal_reference_handle),
wal_reference_handle,
vec![Arc::clone(&p)],
Arc::clone(&persist_handle),
));

View File

@ -142,7 +142,7 @@ impl WalAppender for Arc<wal::Wal> {
}
#[async_trait]
impl UnbufferedWriteNotifier for Arc<WalReferenceHandle> {
impl UnbufferedWriteNotifier for WalReferenceHandle {
async fn notify_failed_write_buffer(&self, set: SequenceNumberSet) {
self.enqueue_unbuffered_write(set).await;
}