feat(ingester): WIP - WAL rotate task uses reference tracker for delete
This is the first commit in line to connect the WAL segment reference tracker actor up to the rest of the ingester. It removes the segment file deletion and hacky sleep from the rotate task, deferring to the actor for deletion tracking.pull/24376/head
parent
7b2ef53c7b
commit
fd8a89deea
|
@ -45,7 +45,9 @@ use crate::{
|
|||
},
|
||||
server::grpc::GrpcDelegate,
|
||||
timestamp_oracle::TimestampOracle,
|
||||
wal::{rotate_task::periodic_rotation, wal_sink::WalSink},
|
||||
wal::{
|
||||
reference_tracker::WalReferenceHandle, rotate_task::periodic_rotation, wal_sink::WalSink,
|
||||
},
|
||||
};
|
||||
|
||||
use self::graceful_shutdown::graceful_shutdown_handler;
|
||||
|
@ -98,6 +100,11 @@ pub struct IngesterGuard<T> {
|
|||
/// Aborted on drop.
|
||||
rotation_task: tokio::task::JoinHandle<()>,
|
||||
|
||||
/// Handle to the WAL reference actor's task, it
|
||||
/// is aborted on drop of the guard, or the actor's
|
||||
/// handle.
|
||||
wal_reference_actor_task: tokio::task::JoinHandle<()>,
|
||||
|
||||
/// The task handle executing the graceful shutdown once triggered.
|
||||
graceful_shutdown_handler: tokio::task::JoinHandle<()>,
|
||||
shutdown_complete: Shared<oneshot::Receiver<()>>,
|
||||
|
@ -124,6 +131,7 @@ where
|
|||
impl<T> Drop for IngesterGuard<T> {
|
||||
fn drop(&mut self) {
|
||||
self.rotation_task.abort();
|
||||
self.wal_reference_actor_task.abort();
|
||||
self.graceful_shutdown_handler.abort();
|
||||
}
|
||||
}
|
||||
|
@ -328,6 +336,12 @@ where
|
|||
// Initialise the WAL
|
||||
let wal = Wal::new(wal_directory).await.map_err(InitError::WalInit)?;
|
||||
|
||||
// Set up the WAL segment reference tracker
|
||||
let (wal_reference_handle, wal_reference_actor) =
|
||||
WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
let wal_reference_handle = Arc::new(wal_reference_handle);
|
||||
let wal_reference_actor_task = tokio::spawn(wal_reference_actor.run());
|
||||
|
||||
// Replay the WAL log files, if any.
|
||||
let max_sequence_number =
|
||||
wal_replay::replay(&wal, &buffer, Arc::clone(&persist_handle), &metrics)
|
||||
|
@ -366,6 +380,7 @@ where
|
|||
let rotation_task = tokio::spawn(periodic_rotation(
|
||||
Arc::clone(&wal),
|
||||
wal_rotation_period,
|
||||
Arc::clone(&wal_reference_handle),
|
||||
Arc::clone(&buffer),
|
||||
Arc::clone(&persist_handle),
|
||||
));
|
||||
|
@ -403,6 +418,7 @@ where
|
|||
persist_handle,
|
||||
),
|
||||
rotation_task,
|
||||
wal_reference_actor_task,
|
||||
graceful_shutdown_handler: shutdown_task,
|
||||
shutdown_complete: shutdown_rx.shared(),
|
||||
})
|
||||
|
|
|
@ -105,6 +105,8 @@ pub(super) async fn graceful_shutdown_handler<F, T, P>(
|
|||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/6566
|
||||
//
|
||||
// TODO(savage): Remove this once the WAL reference tracker is hooked up.
|
||||
// Should this drop the reference to the handle?
|
||||
wal.rotate().expect("failed to rotate wal");
|
||||
for file in wal.closed_segments() {
|
||||
if let Err(error) = wal.delete(file.id()).await {
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||
|
||||
use data_types::{
|
||||
partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey,
|
||||
SequenceNumber, TableId,
|
||||
partition_template::TablePartitionTemplateOverride, ColumnId, ColumnSet, NamespaceId,
|
||||
ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, TableId, Timestamp,
|
||||
};
|
||||
use hashbrown::HashSet;
|
||||
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
|
||||
|
@ -25,6 +25,7 @@ use crate::{
|
|||
},
|
||||
deferred_load::DeferredLoad,
|
||||
dml_payload::write::{PartitionedData, TableData, WriteOperation},
|
||||
persist::completion_observer::CompletedPersist,
|
||||
};
|
||||
|
||||
pub(crate) const ARBITRARY_PARTITION_ID: PartitionId = PartitionId::new(1);
|
||||
|
@ -359,6 +360,35 @@ pub(crate) fn make_multi_table_write_op<
|
|||
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None)
|
||||
}
|
||||
|
||||
/// Return a persist completion notification for the given
|
||||
/// sequence numbers.
|
||||
pub(crate) fn new_persist_notification<T>(sequence_numbers: T) -> Arc<CompletedPersist>
|
||||
where
|
||||
T: IntoIterator<Item = u64>,
|
||||
{
|
||||
Arc::new(CompletedPersist::new(
|
||||
ParquetFileParams {
|
||||
namespace_id: NamespaceId::new(1),
|
||||
table_id: TableId::new(2),
|
||||
partition_id: PartitionId::new(3),
|
||||
partition_hash_id: None,
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(42),
|
||||
max_time: Timestamp::new(42),
|
||||
file_size_bytes: 42424242,
|
||||
row_count: 24,
|
||||
compaction_level: data_types::CompactionLevel::Initial,
|
||||
created_at: Timestamp::new(1234),
|
||||
column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)),
|
||||
max_l0_created_at: Timestamp::new(42),
|
||||
},
|
||||
sequence_numbers
|
||||
.into_iter()
|
||||
.map(SequenceNumber::new)
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn populate_catalog(
|
||||
catalog: &dyn Catalog,
|
||||
namespace: &str,
|
||||
|
|
|
@ -163,7 +163,7 @@ impl WalReferenceHandle {
|
|||
/// A future that resolves when there are no partially persisted / inactive
|
||||
/// WAL segment files known to the reference tracker.
|
||||
///
|
||||
/// NOTE: the active WAL segement file may contain unpersisted operations!
|
||||
/// NOTE: the active WAL segment file may contain unpersisted operations!
|
||||
/// The tracker is only aware of inactive/rotated-out WAL files.
|
||||
///
|
||||
/// NOTE: the number of references may reach 0 multiple times over the
|
||||
|
@ -204,11 +204,9 @@ impl WalReferenceHandle {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID};
|
||||
use assert_matches::assert_matches;
|
||||
use async_trait::async_trait;
|
||||
use data_types::{ColumnId, ColumnSet, ParquetFileParams, SequenceNumber, Timestamp};
|
||||
use data_types::SequenceNumber;
|
||||
use futures::{task::Context, Future, FutureExt};
|
||||
use metric::{assert_counter, U64Gauge};
|
||||
use parking_lot::Mutex;
|
||||
|
@ -216,6 +214,9 @@ mod tests {
|
|||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::new_persist_notification;
|
||||
|
||||
/// A mock file deleter that records the IDs it was asked to delete.
|
||||
#[derive(Debug, Default)]
|
||||
struct MockWalDeleter {
|
||||
|
@ -253,32 +254,6 @@ mod tests {
|
|||
vals.into_iter().map(SequenceNumber::new).collect()
|
||||
}
|
||||
|
||||
/// Return a persist completion notification with the given
|
||||
/// [`SequenceNumberSet`] values.
|
||||
fn new_note<T>(vals: T) -> Arc<CompletedPersist>
|
||||
where
|
||||
T: IntoIterator<Item = u64>,
|
||||
{
|
||||
Arc::new(CompletedPersist::new(
|
||||
ParquetFileParams {
|
||||
namespace_id: ARBITRARY_NAMESPACE_ID,
|
||||
table_id: ARBITRARY_TABLE_ID,
|
||||
partition_id: ARBITRARY_PARTITION_ID,
|
||||
partition_hash_id: None,
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(42),
|
||||
max_time: Timestamp::new(42),
|
||||
file_size_bytes: 42424242,
|
||||
row_count: 24,
|
||||
compaction_level: data_types::CompactionLevel::Initial,
|
||||
created_at: Timestamp::new(1234),
|
||||
column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)),
|
||||
max_l0_created_at: Timestamp::new(42),
|
||||
},
|
||||
new_set(vals),
|
||||
))
|
||||
}
|
||||
|
||||
/// Test in-order notifications:
|
||||
///
|
||||
/// * WAL file is rotated and the tracker notified
|
||||
|
@ -305,7 +280,9 @@ mod tests {
|
|||
.expect("did not receive file processed notification");
|
||||
|
||||
// Submit a persist notification that removes refs 1 & 2.
|
||||
handle.enqueue_persist_notification(new_note([1, 2])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([1, 2]))
|
||||
.await;
|
||||
|
||||
// Ensure the file was not deleted
|
||||
assert!(wal.calls().is_empty());
|
||||
|
@ -318,7 +295,9 @@ mod tests {
|
|||
|
||||
// Finally release the last IDs
|
||||
let deleted_file_waker = wal.deleted_file_waker();
|
||||
handle.enqueue_persist_notification(new_note([3, 4])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([3, 4]))
|
||||
.await;
|
||||
|
||||
// Wait for it to be processed
|
||||
deleted_file_waker.await;
|
||||
|
@ -372,9 +351,15 @@ mod tests {
|
|||
|
||||
// Submit a persist notification for the entire set of IDs [1,2,3,4] in
|
||||
// the upcoming first WAL, and partially the second WAL
|
||||
handle.enqueue_persist_notification(new_note([2])).await;
|
||||
handle.enqueue_persist_notification(new_note([1])).await;
|
||||
handle.enqueue_persist_notification(new_note([3, 4])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([2]))
|
||||
.await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([1]))
|
||||
.await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([3, 4]))
|
||||
.await;
|
||||
|
||||
// Add a file with IDs 1, 2, 3
|
||||
let deleted_file_waker = wal.deleted_file_waker();
|
||||
|
@ -404,7 +389,9 @@ mod tests {
|
|||
assert_eq!(wal.calls().len(), 1);
|
||||
|
||||
// Release one of the remaining two refs
|
||||
handle.enqueue_persist_notification(new_note([6])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([6]))
|
||||
.await;
|
||||
|
||||
// Still no deletion
|
||||
assert_eq!(wal.calls().len(), 1);
|
||||
|
@ -519,7 +506,9 @@ mod tests {
|
|||
.expect("did not receive file processed notification");
|
||||
|
||||
// Reduce the reference count for file 1 (leaving 3 references)
|
||||
handle.enqueue_persist_notification(new_note([1, 2])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([1, 2]))
|
||||
.await;
|
||||
|
||||
// Enqueue the second file.
|
||||
handle
|
||||
|
@ -541,7 +530,9 @@ mod tests {
|
|||
|
||||
// Release the references to file 2
|
||||
let deleted_file_waker = wal.deleted_file_waker();
|
||||
handle.enqueue_persist_notification(new_note([6])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([6]))
|
||||
.await;
|
||||
|
||||
deleted_file_waker.await;
|
||||
|
||||
|
@ -612,13 +603,17 @@ mod tests {
|
|||
.expect("did not receive file processed notification");
|
||||
|
||||
// Remove some file references, leaving 1 reference
|
||||
handle.enqueue_persist_notification(new_note([1, 2])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([1, 2]))
|
||||
.await;
|
||||
|
||||
// The tracker is not empty, so the future must not resolve.
|
||||
assert_matches!(Pin::new(&mut empty_waker).poll(&mut cx), Poll::Pending);
|
||||
|
||||
// Add a persist notification, populating the "persisted" set.
|
||||
handle.enqueue_persist_notification(new_note([5])).await;
|
||||
handle
|
||||
.enqueue_persist_notification(new_persist_notification([5]))
|
||||
.await;
|
||||
|
||||
// Release the file reference, leaving only the above reference
|
||||
// remaining (id=5)
|
||||
|
|
|
@ -4,12 +4,15 @@ use std::{sync::Arc, time::Duration};
|
|||
use crate::{
|
||||
partition_iter::PartitionIter,
|
||||
persist::{drain_buffer::persist_partitions, queue::PersistQueue},
|
||||
wal::reference_tracker::WalReferenceHandle,
|
||||
};
|
||||
|
||||
/// Rotate the `wal` segment file every `period` duration of time.
|
||||
/// Rotate the `wal` segment file every `period` duration of time, notifying
|
||||
/// the [`WalReferenceHandle`].
|
||||
pub(crate) async fn periodic_rotation<T, P>(
|
||||
wal: Arc<wal::Wal>,
|
||||
period: Duration,
|
||||
wal_reference_handle: Arc<WalReferenceHandle>,
|
||||
buffer: T,
|
||||
persist: P,
|
||||
) where
|
||||
|
@ -33,51 +36,9 @@ pub(crate) async fn periodic_rotation<T, P>(
|
|||
n_ops = ids.len(),
|
||||
"rotated wal"
|
||||
);
|
||||
|
||||
// TEMPORARY HACK: wait 5 seconds for in-flight writes to the old WAL
|
||||
// segment to complete before draining the partitions.
|
||||
//
|
||||
// This can occur because writes to the WAL & buffer tree are not atomic
|
||||
// (avoiding a serialising mutex in the write path).
|
||||
//
|
||||
// A flawed solution would be to have this code read the current
|
||||
// SequenceNumber after rotation, and then wait until at least that
|
||||
// sequence number has been buffered in the BufferTree. This may work in
|
||||
// most cases, but is racy / not deterministic - writes are not ordered,
|
||||
// so sequence number 5 might be buffered before sequence number 1.
|
||||
//
|
||||
// As a temporary hack, wait 5 seconds for in-flight writes to complete
|
||||
// (which should be more than enough time) before proceeding under the
|
||||
// assumption that they have indeed completed, and all writes from the
|
||||
// previous WAL segment are now buffered. Because they're buffered, the
|
||||
// persist operation performed next will persist all the writes that
|
||||
// were in the previous WAL segment, and therefore at the end of the
|
||||
// persist operation the WAL segment can be dropped.
|
||||
//
|
||||
// The potential downside of this hack is that in the very unlikely
|
||||
// situation that an in-flight write has not completed before the
|
||||
// persist operation starts (after the 5 second sleep) and the WAL entry
|
||||
// for it is dropped - we then reduce the durability of that write until
|
||||
// it is persisted next time, or it is lost after an ingester crash
|
||||
// before the next rotation.
|
||||
//
|
||||
// In the future, a proper fix will be to keep the set of sequence
|
||||
// numbers wrote to each partition buffer, and each WAL segment as a
|
||||
// bitmap, and after persistence submit the partition's bitmap to the
|
||||
// WAL for it to do a set difference to derive the remaining sequence
|
||||
// IDs, and therefore number of references to the WAL segment. Once the
|
||||
// set of remaining IDs is empty (all data is persisted), the segment is
|
||||
// safe to delete. This content-addressed reference counting technique
|
||||
// has the added advantage of working even with parallel / out-of-order
|
||||
// / hot partition persists that span WAL segments, and means there's no
|
||||
// special code path between "hot partition persist" and "wal rotation
|
||||
// persist" - it all works the same way!
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/6566
|
||||
//
|
||||
// TODO: this properly as described above.
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
wal_reference_handle
|
||||
.enqueue_rotated_file(stats.id(), ids)
|
||||
.await;
|
||||
|
||||
// Do not block the ticker while partitions are persisted to ensure
|
||||
// timely ticking.
|
||||
|
@ -97,7 +58,6 @@ pub(crate) async fn periodic_rotation<T, P>(
|
|||
// the right WAL segment regardless of concurrent tasks.
|
||||
tokio::spawn({
|
||||
let persist = persist.clone();
|
||||
let wal = Arc::clone(&wal);
|
||||
let iter = buffer.partition_iter();
|
||||
async move {
|
||||
// Drain the BufferTree of partition data and persist each one.
|
||||
|
@ -121,17 +81,6 @@ pub(crate) async fn periodic_rotation<T, P>(
|
|||
closed_id = %stats.id(),
|
||||
"partitions persisted"
|
||||
);
|
||||
|
||||
wal.delete(stats.id())
|
||||
.await
|
||||
.expect("failed to drop wal segment");
|
||||
|
||||
info!(
|
||||
closed_id = %stats.id(),
|
||||
file_bytes = stats.size(),
|
||||
n_ops = ids.len(),
|
||||
"dropped persisted wal segment"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -143,40 +92,64 @@ mod tests {
|
|||
|
||||
use assert_matches::assert_matches;
|
||||
use async_trait::async_trait;
|
||||
use data_types::SequenceNumber;
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use parking_lot::Mutex;
|
||||
use tempfile::tempdir;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::sync::oneshot;
|
||||
use wal::WriteResult;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
buffer_tree::{partition::persisting::PersistingData, partition::PartitionData},
|
||||
buffer_tree::partition::{persisting::PersistingData, PartitionData},
|
||||
dml_payload::IngestOp,
|
||||
persist::queue::mock::MockPersistQueue,
|
||||
test_util::{PartitionDataBuilder, ARBITRARY_PARTITION_ID},
|
||||
test_util::{
|
||||
make_write_op, new_persist_notification, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID,
|
||||
ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
|
||||
ARBITRARY_TABLE_NAME,
|
||||
},
|
||||
wal::traits::WalAppender,
|
||||
};
|
||||
|
||||
const TICK_INTERVAL: Duration = Duration::from_millis(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_persist() {
|
||||
async fn test_notify_rotate_persist() {
|
||||
let metrics = metric::Registry::default();
|
||||
|
||||
// Create a write operation to stick in the WAL, and create a partition
|
||||
// iter from the data within it to mock out the buffer tree.
|
||||
let write_op = make_write_op(
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
1,
|
||||
&format!(
|
||||
r#"{},city=London people=2,pigeons="millions" 10"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut p = PartitionDataBuilder::new().build();
|
||||
|
||||
// Perform a single write to populate the partition.
|
||||
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
|
||||
p.buffer_write(mb, SequenceNumber::new(1))
|
||||
for (_, table_data) in write_op.tables() {
|
||||
let partitioned_data = table_data.partitioned_data();
|
||||
p.buffer_write(
|
||||
partitioned_data.data().clone(),
|
||||
partitioned_data.sequence_number(),
|
||||
)
|
||||
.expect("write should succeed");
|
||||
|
||||
}
|
||||
// Wrap the partition in the lock.
|
||||
assert_eq!(p.completed_persistence_count(), 0);
|
||||
let p = Arc::new(Mutex::new(p));
|
||||
|
||||
// Initialise a mock persist queue to inspect the calls made to the
|
||||
// persist subsystem.
|
||||
let persist = Arc::new(MockPersistQueue::default());
|
||||
let persist_handle = Arc::new(MockPersistQueue::default());
|
||||
|
||||
// Initialise the WAL
|
||||
// Initialise the WAL, write the operation to it
|
||||
let tmp_dir = tempdir().expect("no temp dir available");
|
||||
let wal = wal::Wal::new(tmp_dir.path())
|
||||
.await
|
||||
|
@ -184,12 +157,34 @@ mod tests {
|
|||
|
||||
assert_eq!(wal.closed_segments().len(), 0);
|
||||
|
||||
let mut write_result = wal.append(&IngestOp::Write(write_op));
|
||||
|
||||
write_result
|
||||
.changed()
|
||||
.await
|
||||
.expect("should be able to get WAL write result");
|
||||
|
||||
assert_matches!(
|
||||
write_result
|
||||
.borrow()
|
||||
.as_ref()
|
||||
.expect("WAL should always return result"),
|
||||
WriteResult::Ok(_),
|
||||
"test write should succeed"
|
||||
);
|
||||
|
||||
let (wal_reference_handle, wal_reference_actor) =
|
||||
WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
let wal_reference_handle = Arc::new(wal_reference_handle);
|
||||
let wal_reference_actor_task = tokio::spawn(wal_reference_actor.run());
|
||||
|
||||
// Start the rotation task
|
||||
let handle = tokio::spawn(periodic_rotation(
|
||||
let rotate_task_handle = tokio::spawn(periodic_rotation(
|
||||
Arc::clone(&wal),
|
||||
TICK_INTERVAL,
|
||||
Arc::clone(&wal_reference_handle),
|
||||
vec![Arc::clone(&p)],
|
||||
Arc::clone(&persist),
|
||||
Arc::clone(&persist_handle),
|
||||
));
|
||||
|
||||
tokio::time::pause();
|
||||
|
@ -209,39 +204,38 @@ mod tests {
|
|||
.await;
|
||||
|
||||
// There should be exactly 1 segment.
|
||||
let mut segment = wal.closed_segments();
|
||||
assert_eq!(segment.len(), 1);
|
||||
let segment = segment.pop().unwrap();
|
||||
let mut segments = wal.closed_segments();
|
||||
assert_eq!(segments.len(), 1);
|
||||
let closed_segment = segments.pop().unwrap();
|
||||
|
||||
// Move past the hacky sleep.
|
||||
tokio::time::pause();
|
||||
tokio::time::advance(Duration::from_secs(10)).await;
|
||||
tokio::time::resume();
|
||||
// Send a persistence notification to allow the actor to delete
|
||||
// the WAL file
|
||||
wal_reference_handle
|
||||
.enqueue_persist_notification(new_persist_notification([1]))
|
||||
.await;
|
||||
|
||||
// Wait for the WAL segment to be deleted, indicating the end of
|
||||
// processing.
|
||||
// Wait for the closed segment to no longer appear in the WAL,
|
||||
// indicating deletion
|
||||
async {
|
||||
loop {
|
||||
match wal.closed_segments().pop() {
|
||||
Some(p) if p.id() != segment.id() => {
|
||||
// Rotation has occurred.
|
||||
break;
|
||||
}
|
||||
// Rotation has not yet occurred.
|
||||
Some(_) => tokio::task::yield_now().await,
|
||||
// The old file was deleted and no new one has yet taken its
|
||||
// place.
|
||||
None => break,
|
||||
if wal
|
||||
.closed_segments()
|
||||
.iter()
|
||||
.all(|s| s.id() != closed_segment.id())
|
||||
{
|
||||
break;
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await;
|
||||
|
||||
// Stop the worker and assert the state of the persist queue.
|
||||
handle.abort();
|
||||
// Stop the tasks and assert the state of the persist queue
|
||||
rotate_task_handle.abort();
|
||||
wal_reference_actor_task.abort();
|
||||
|
||||
assert_matches!(persist.calls().as_slice(), [got] => {
|
||||
assert_matches!(persist_handle.calls().as_slice(), [got] => {
|
||||
let guard = got.lock();
|
||||
assert_eq!(guard.partition_id(), ARBITRARY_PARTITION_ID);
|
||||
})
|
||||
|
@ -277,19 +271,38 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_persist_ticks_when_blocked() {
|
||||
let metrics = metric::Registry::default();
|
||||
|
||||
// Create a write operation to stick in the WAL, and create a partition
|
||||
// iter from the data within it to mock out the buffer tree.
|
||||
let write_op = make_write_op(
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
1,
|
||||
&format!(
|
||||
r#"{},city=London people=2,pigeons="millions" 10"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut p = PartitionDataBuilder::new().build();
|
||||
|
||||
// Perform a single write to populate the partition.
|
||||
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
|
||||
p.buffer_write(mb.clone(), SequenceNumber::new(1))
|
||||
for (_, table_data) in write_op.tables() {
|
||||
let partitioned_data = table_data.partitioned_data();
|
||||
p.buffer_write(
|
||||
partitioned_data.data().clone(),
|
||||
partitioned_data.sequence_number(),
|
||||
)
|
||||
.expect("write should succeed");
|
||||
|
||||
}
|
||||
// Wrap the partition in the lock.
|
||||
assert_eq!(p.completed_persistence_count(), 0);
|
||||
let p = Arc::new(Mutex::new(p));
|
||||
|
||||
// Initialise a mock persist queue that never completes.
|
||||
let persist = Arc::new(BlockedPersistQueue::default());
|
||||
let persist_handle = Arc::new(BlockedPersistQueue::default());
|
||||
|
||||
// Initialise the WAL
|
||||
let tmp_dir = tempdir().expect("no temp dir available");
|
||||
|
@ -299,12 +312,34 @@ mod tests {
|
|||
|
||||
assert_eq!(wal.closed_segments().len(), 0);
|
||||
|
||||
let mut write_result = wal.append(&IngestOp::Write(write_op.clone()));
|
||||
|
||||
write_result
|
||||
.changed()
|
||||
.await
|
||||
.expect("should be able to get WAL write result");
|
||||
|
||||
assert_matches!(
|
||||
write_result
|
||||
.borrow()
|
||||
.as_ref()
|
||||
.expect("WAL should always return result"),
|
||||
WriteResult::Ok(_),
|
||||
"test write should succeed"
|
||||
);
|
||||
|
||||
let (wal_reference_handle, wal_reference_actor) =
|
||||
WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
let wal_reference_handle = Arc::new(wal_reference_handle);
|
||||
let wal_reference_actor_task = tokio::spawn(wal_reference_actor.run());
|
||||
|
||||
// Start the rotation task
|
||||
let handle = tokio::spawn(periodic_rotation(
|
||||
let rotate_task_handle = tokio::spawn(periodic_rotation(
|
||||
Arc::clone(&wal),
|
||||
TICK_INTERVAL,
|
||||
Arc::clone(&wal_reference_handle),
|
||||
vec![Arc::clone(&p)],
|
||||
Arc::clone(&persist),
|
||||
Arc::clone(&persist_handle),
|
||||
));
|
||||
|
||||
tokio::time::pause();
|
||||
|
@ -355,24 +390,25 @@ mod tests {
|
|||
.await;
|
||||
|
||||
// Pause the ticker loop and buffer another write in the partition.
|
||||
p.lock()
|
||||
.buffer_write(mb, SequenceNumber::new(2))
|
||||
.expect("write should succeed");
|
||||
for (i, (_, table_data)) in write_op.tables().enumerate() {
|
||||
let partitioned_data = table_data.partitioned_data();
|
||||
p.lock()
|
||||
.buffer_write(
|
||||
partitioned_data.data().clone(),
|
||||
partitioned_data.sequence_number() + i as u64 + 1,
|
||||
)
|
||||
.expect("write should succeed");
|
||||
}
|
||||
|
||||
// Cause another tick to occur, driving the loop again.
|
||||
tokio::time::pause();
|
||||
tokio::time::advance(TICK_INTERVAL).await;
|
||||
tokio::time::resume();
|
||||
|
||||
// Move past the sleep.
|
||||
tokio::time::pause();
|
||||
tokio::time::advance(Duration::from_secs(10)).await;
|
||||
tokio::time::resume();
|
||||
|
||||
// Wait the second tick to complete.
|
||||
async {
|
||||
loop {
|
||||
if persist.calls.lock().len() == 2 {
|
||||
if persist_handle.calls.lock().len() == 2 {
|
||||
break;
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
|
@ -381,10 +417,11 @@ mod tests {
|
|||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await;
|
||||
|
||||
// Stop the worker and assert the state of the persist queue.
|
||||
handle.abort();
|
||||
// Stop the workers and assert the state of the persist queue.
|
||||
rotate_task_handle.abort();
|
||||
wal_reference_actor_task.abort();
|
||||
|
||||
let calls = persist.calls.lock().clone();
|
||||
let calls = persist_handle.calls.lock().clone();
|
||||
assert_matches!(calls.as_slice(), [got1, got2] => {
|
||||
assert!(Arc::ptr_eq(got1, got2));
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue