feat: track persist time in wal file content (#25614)
parent
d2fbd65a44
commit
0daa3f2f1d
|
@ -6,11 +6,14 @@ use influxdb3_id::{ColumnId, DbId};
|
|||
|
||||
use crate::*;
|
||||
|
||||
/// Create a new [`WalContents`] with the provided arguments that will have a `persisted_timestamp_ns`
|
||||
/// of `0`.
|
||||
pub fn wal_contents(
|
||||
(min_timestamp_ns, max_timestamp_ns, wal_file_number): (i64, i64, u64),
|
||||
ops: impl IntoIterator<Item = WalOp>,
|
||||
) -> WalContents {
|
||||
WalContents {
|
||||
persist_timestamp_ms: 0,
|
||||
min_timestamp_ns,
|
||||
max_timestamp_ns,
|
||||
wal_file_number: WalFileSequenceNumber::new(wal_file_number),
|
||||
|
@ -19,6 +22,23 @@ pub fn wal_contents(
|
|||
}
|
||||
}
|
||||
|
||||
/// Create a new [`WalContents`] with the provided arguments that will have a `persisted_timestamp_ns`
|
||||
/// of `0`.
|
||||
pub fn wal_contents_with_snapshot(
|
||||
(min_timestamp_ns, max_timestamp_ns, wal_file_number): (i64, i64, u64),
|
||||
ops: impl IntoIterator<Item = WalOp>,
|
||||
snapshot: SnapshotDetails,
|
||||
) -> WalContents {
|
||||
WalContents {
|
||||
persist_timestamp_ms: 0,
|
||||
min_timestamp_ns,
|
||||
max_timestamp_ns,
|
||||
wal_file_number: WalFileSequenceNumber::new(wal_file_number),
|
||||
ops: ops.into_iter().collect(),
|
||||
snapshot: Some(snapshot),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_batch_op(write_batch: WriteBatch) -> WalOp {
|
||||
WalOp::Write(write_batch)
|
||||
}
|
||||
|
|
|
@ -701,11 +701,15 @@ impl<'a> From<&FieldValue<'a>> for FieldData {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct WalContents {
|
||||
/// The time at which this WAL file is being persisted
|
||||
pub persist_timestamp_ms: i64,
|
||||
/// The min timestamp from any writes in the WAL file
|
||||
pub min_timestamp_ns: i64,
|
||||
/// The max timestamp from any writes in the WAL file
|
||||
pub max_timestamp_ns: i64,
|
||||
/// A number that increments for every generated WAL file
|
||||
pub wal_file_number: WalFileSequenceNumber,
|
||||
/// The operations contained in the WAL file
|
||||
pub ops: Vec<WalOp>,
|
||||
/// If present, the buffer should be snapshot after the contents of this file are loaded.
|
||||
pub snapshot: Option<SnapshotDetails>,
|
||||
|
|
|
@ -8,6 +8,7 @@ use bytes::Bytes;
|
|||
use data_types::Timestamp;
|
||||
use futures_util::stream::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use iox_time::TimeProvider;
|
||||
use object_store::path::{Path, PathPart};
|
||||
use object_store::{ObjectStore, PutPayload};
|
||||
use observability_deps::tracing::{debug, error, info};
|
||||
|
@ -29,6 +30,7 @@ impl WalObjectStore {
|
|||
/// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that
|
||||
/// exist in the WAL files that haven't been cleaned up yet.
|
||||
pub async fn new(
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
object_store: Arc<dyn ObjectStore>,
|
||||
host_identifier_prefix: impl Into<String> + Send,
|
||||
file_notifier: Arc<dyn WalFileNotifier>,
|
||||
|
@ -38,6 +40,7 @@ impl WalObjectStore {
|
|||
) -> Result<Arc<Self>, crate::Error> {
|
||||
let flush_interval = config.flush_interval;
|
||||
let wal = Self::new_without_replay(
|
||||
time_provider,
|
||||
object_store,
|
||||
host_identifier_prefix,
|
||||
file_notifier,
|
||||
|
@ -54,6 +57,7 @@ impl WalObjectStore {
|
|||
}
|
||||
|
||||
fn new_without_replay(
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
object_store: Arc<dyn ObjectStore>,
|
||||
host_identifier_prefix: impl Into<String>,
|
||||
file_notifier: Arc<dyn WalFileNotifier>,
|
||||
|
@ -67,7 +71,9 @@ impl WalObjectStore {
|
|||
host_identifier_prefix: host_identifier_prefix.into(),
|
||||
file_notifier,
|
||||
flush_buffer: Mutex::new(FlushBuffer::new(
|
||||
Arc::clone(&time_provider),
|
||||
WalBuffer {
|
||||
time_provider,
|
||||
is_shutdown: false,
|
||||
wal_file_sequence_number,
|
||||
op_limit: config.max_write_buffer_size,
|
||||
|
@ -401,14 +407,20 @@ impl Wal for WalObjectStore {
|
|||
|
||||
#[derive(Debug)]
|
||||
struct FlushBuffer {
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
wal_buffer: WalBuffer,
|
||||
snapshot_tracker: SnapshotTracker,
|
||||
snapshot_semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl FlushBuffer {
|
||||
fn new(wal_buffer: WalBuffer, snapshot_tracker: SnapshotTracker) -> Self {
|
||||
fn new(
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
wal_buffer: WalBuffer,
|
||||
snapshot_tracker: SnapshotTracker,
|
||||
) -> Self {
|
||||
Self {
|
||||
time_provider,
|
||||
wal_buffer,
|
||||
snapshot_tracker,
|
||||
snapshot_semaphore: Arc::new(Semaphore::new(1)),
|
||||
|
@ -460,6 +472,7 @@ impl FlushBuffer {
|
|||
fn flush_buffer_with_responses(&mut self) -> (WalContents, Vec<oneshot::Sender<WriteResult>>) {
|
||||
// swap out the filled buffer with a new one
|
||||
let mut new_buffer = WalBuffer {
|
||||
time_provider: Arc::clone(&self.time_provider),
|
||||
is_shutdown: false,
|
||||
wal_file_sequence_number: self.wal_buffer.wal_file_sequence_number.next(),
|
||||
op_limit: self.wal_buffer.op_limit,
|
||||
|
@ -481,8 +494,9 @@ impl FlushBuffer {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
struct WalBuffer {
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
is_shutdown: bool,
|
||||
wal_file_sequence_number: WalFileSequenceNumber,
|
||||
op_limit: usize,
|
||||
|
@ -584,6 +598,7 @@ impl WalBuffer {
|
|||
|
||||
(
|
||||
WalContents {
|
||||
persist_timestamp_ms: self.time_provider.now().timestamp_millis(),
|
||||
min_timestamp_ns,
|
||||
max_timestamp_ns,
|
||||
wal_file_number: self.wal_file_sequence_number,
|
||||
|
@ -622,17 +637,21 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks,
|
||||
create, Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk,
|
||||
TableChunks,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use indexmap::IndexMap;
|
||||
use influxdb3_id::{ColumnId, DbId, TableId};
|
||||
use iox_time::{MockProvider, Time};
|
||||
use object_store::memory::InMemory;
|
||||
use std::any::Any;
|
||||
use tokio::sync::oneshot::Receiver;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn write_flush_delete_and_load() {
|
||||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let wal_config = WalConfig {
|
||||
|
@ -642,6 +661,7 @@ mod tests {
|
|||
gen1_duration: Gen1Duration::new_1m(),
|
||||
};
|
||||
let wal = WalObjectStore::new_without_replay(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&object_store),
|
||||
"my_host",
|
||||
Arc::clone(¬ifier),
|
||||
|
@ -738,11 +758,9 @@ mod tests {
|
|||
// create wal file 1
|
||||
let ret = wal.flush_buffer().await;
|
||||
assert!(ret.is_none());
|
||||
let file_1_contents = WalContents {
|
||||
min_timestamp_ns: 1,
|
||||
max_timestamp_ns: 62_000000000,
|
||||
wal_file_number: WalFileSequenceNumber(1),
|
||||
ops: vec![WalOp::Write(WriteBatch {
|
||||
let file_1_contents = create::wal_contents(
|
||||
(1, 62_000_000_000, 1),
|
||||
[WalOp::Write(WriteBatch {
|
||||
database_id: DbId::from(0),
|
||||
database_name: "db1".into(),
|
||||
table_chunks: IndexMap::from([(
|
||||
|
@ -802,18 +820,15 @@ mod tests {
|
|||
min_time_ns: 1,
|
||||
max_time_ns: 62_000000000,
|
||||
})],
|
||||
snapshot: None,
|
||||
};
|
||||
);
|
||||
|
||||
// create wal file 2
|
||||
wal.buffer_op_unconfirmed(op2.clone()).await.unwrap();
|
||||
assert!(wal.flush_buffer().await.is_none());
|
||||
|
||||
let file_2_contents = WalContents {
|
||||
min_timestamp_ns: 62000000000,
|
||||
max_timestamp_ns: 62000000000,
|
||||
wal_file_number: WalFileSequenceNumber(2),
|
||||
ops: vec![WalOp::Write(WriteBatch {
|
||||
let file_2_contents = create::wal_contents(
|
||||
(62_000_000_000, 62_000_000_000, 2),
|
||||
[WalOp::Write(WriteBatch {
|
||||
database_id: DbId::from(0),
|
||||
database_name: "db1".into(),
|
||||
table_chunks: IndexMap::from([(
|
||||
|
@ -845,12 +860,12 @@ mod tests {
|
|||
min_time_ns: 62_000000000,
|
||||
max_time_ns: 62_000000000,
|
||||
})],
|
||||
snapshot: None,
|
||||
};
|
||||
);
|
||||
|
||||
// before we trigger a snapshot, test replay with a new wal and notifier
|
||||
let replay_notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let replay_wal = WalObjectStore::new_without_replay(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&object_store),
|
||||
"my_host",
|
||||
Arc::clone(&replay_notifier),
|
||||
|
@ -938,11 +953,9 @@ mod tests {
|
|||
assert_eq!(expected_info, snapshot_info);
|
||||
snapshot_done.await.unwrap();
|
||||
|
||||
let file_3_contents = WalContents {
|
||||
min_timestamp_ns: 128_000000000,
|
||||
max_timestamp_ns: 128_000000000,
|
||||
wal_file_number: WalFileSequenceNumber(3),
|
||||
ops: vec![WalOp::Write(WriteBatch {
|
||||
let file_3_contents = create::wal_contents_with_snapshot(
|
||||
(128_000_000_000, 128_000_000_000, 3),
|
||||
[WalOp::Write(WriteBatch {
|
||||
database_id: DbId::from(0),
|
||||
database_name: "db1".into(),
|
||||
table_chunks: IndexMap::from([(
|
||||
|
@ -974,12 +987,12 @@ mod tests {
|
|||
min_time_ns: 128_000000000,
|
||||
max_time_ns: 128_000000000,
|
||||
})],
|
||||
snapshot: Some(SnapshotDetails {
|
||||
SnapshotDetails {
|
||||
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
|
||||
end_time_marker: 120_000000000,
|
||||
last_wal_sequence_number: WalFileSequenceNumber(2),
|
||||
}),
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
let notifier = notifier.as_any().downcast_ref::<TestNotfiier>().unwrap();
|
||||
|
||||
|
@ -997,6 +1010,7 @@ mod tests {
|
|||
// test that replay now only has file 3
|
||||
let replay_notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let replay_wal = WalObjectStore::new_without_replay(
|
||||
Arc::clone(&time_provider),
|
||||
object_store,
|
||||
"my_host",
|
||||
Arc::clone(&replay_notifier),
|
||||
|
@ -1021,6 +1035,8 @@ mod tests {
|
|||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn flush_for_empty_buffer_skips_notify() {
|
||||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotfiier::default());
|
||||
let wal_config = WalConfig {
|
||||
|
@ -1030,6 +1046,7 @@ mod tests {
|
|||
gen1_duration: Gen1Duration::new_1m(),
|
||||
};
|
||||
let wal = WalObjectStore::new_without_replay(
|
||||
time_provider,
|
||||
Arc::clone(&object_store),
|
||||
"my_host",
|
||||
Arc::clone(¬ifier),
|
||||
|
@ -1046,6 +1063,24 @@ mod tests {
|
|||
assert!(object_store.list(None).next().await.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persist_time_is_tracked() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let wal_buffer = WalBuffer {
|
||||
time_provider: Arc::clone(&time_provider) as _,
|
||||
is_shutdown: false,
|
||||
wal_file_sequence_number: WalFileSequenceNumber(0),
|
||||
op_limit: 10,
|
||||
op_count: 0,
|
||||
database_to_write_batch: Default::default(),
|
||||
catalog_batches: vec![],
|
||||
write_op_responses: vec![],
|
||||
};
|
||||
time_provider.set(Time::from_timestamp_millis(1234).unwrap());
|
||||
let (wal_contents, _) = wal_buffer.into_wal_contents_and_responses();
|
||||
assert_eq!(1234, wal_contents.persist_timestamp_ms);
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct TestNotfiier {
|
||||
notified_writes: parking_lot::Mutex<Vec<WalContents>>,
|
||||
|
|
|
@ -120,6 +120,7 @@ mod tests {
|
|||
table_chunks.insert(table_id, chunks);
|
||||
|
||||
let contents = WalContents {
|
||||
persist_timestamp_ms: 10,
|
||||
min_timestamp_ns: 0,
|
||||
max_timestamp_ns: 10,
|
||||
wal_file_number: WalFileSequenceNumber::new(1),
|
||||
|
|
|
@ -197,6 +197,7 @@ impl WriteBufferImpl {
|
|||
// create the wal instance, which will replay into the queryable buffer and start
|
||||
// the background flush task.
|
||||
let wal = WalObjectStore::new(
|
||||
Arc::clone(&time_provider),
|
||||
persister.object_store(),
|
||||
persister.host_identifier_prefix(),
|
||||
Arc::clone(&queryable_buffer) as Arc<dyn WalFileNotifier>,
|
||||
|
|
Loading…
Reference in New Issue