diff --git a/influxdb3_wal/src/create.rs b/influxdb3_wal/src/create.rs index 7649b8bb61..d1fb4e3312 100644 --- a/influxdb3_wal/src/create.rs +++ b/influxdb3_wal/src/create.rs @@ -6,11 +6,14 @@ use influxdb3_id::{ColumnId, DbId}; use crate::*; +/// Create a new [`WalContents`] with the provided arguments that will have a `persisted_timestamp_ns` +/// of `0`. pub fn wal_contents( (min_timestamp_ns, max_timestamp_ns, wal_file_number): (i64, i64, u64), ops: impl IntoIterator, ) -> WalContents { WalContents { + persist_timestamp_ms: 0, min_timestamp_ns, max_timestamp_ns, wal_file_number: WalFileSequenceNumber::new(wal_file_number), @@ -19,6 +22,23 @@ pub fn wal_contents( } } +/// Create a new [`WalContents`] with the provided arguments that will have a `persisted_timestamp_ns` +/// of `0`. +pub fn wal_contents_with_snapshot( + (min_timestamp_ns, max_timestamp_ns, wal_file_number): (i64, i64, u64), + ops: impl IntoIterator, + snapshot: SnapshotDetails, +) -> WalContents { + WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns, + max_timestamp_ns, + wal_file_number: WalFileSequenceNumber::new(wal_file_number), + ops: ops.into_iter().collect(), + snapshot: Some(snapshot), + } +} + pub fn write_batch_op(write_batch: WriteBatch) -> WalOp { WalOp::Write(write_batch) } diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index c29d13304c..898ff2f674 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -701,11 +701,15 @@ impl<'a> From<&FieldValue<'a>> for FieldData { #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct WalContents { + /// The time at which this WAL file is being persisted + pub persist_timestamp_ms: i64, /// The min timestamp from any writes in the WAL file pub min_timestamp_ns: i64, /// The max timestamp from any writes in the WAL file pub max_timestamp_ns: i64, + /// A number that increments for every generated WAL file pub wal_file_number: WalFileSequenceNumber, + /// The operations contained in the WAL file pub ops: Vec, /// If present, the buffer should be snapshot after the contents of this file are loaded. pub snapshot: Option, diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index e96525e8d1..65d080e83a 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use data_types::Timestamp; use futures_util::stream::StreamExt; use hashbrown::HashMap; +use iox_time::TimeProvider; use object_store::path::{Path, PathPart}; use object_store::{ObjectStore, PutPayload}; use observability_deps::tracing::{debug, error, info}; @@ -29,6 +30,7 @@ impl WalObjectStore { /// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that /// exist in the WAL files that haven't been cleaned up yet. pub async fn new( + time_provider: Arc, object_store: Arc, host_identifier_prefix: impl Into + Send, file_notifier: Arc, @@ -38,6 +40,7 @@ impl WalObjectStore { ) -> Result, crate::Error> { let flush_interval = config.flush_interval; let wal = Self::new_without_replay( + time_provider, object_store, host_identifier_prefix, file_notifier, @@ -54,6 +57,7 @@ impl WalObjectStore { } fn new_without_replay( + time_provider: Arc, object_store: Arc, host_identifier_prefix: impl Into, file_notifier: Arc, @@ -67,7 +71,9 @@ impl WalObjectStore { host_identifier_prefix: host_identifier_prefix.into(), file_notifier, flush_buffer: Mutex::new(FlushBuffer::new( + Arc::clone(&time_provider), WalBuffer { + time_provider, is_shutdown: false, wal_file_sequence_number, op_limit: config.max_write_buffer_size, @@ -401,14 +407,20 @@ impl Wal for WalObjectStore { #[derive(Debug)] struct FlushBuffer { + time_provider: Arc, wal_buffer: WalBuffer, snapshot_tracker: SnapshotTracker, snapshot_semaphore: Arc, } impl FlushBuffer { - fn new(wal_buffer: WalBuffer, snapshot_tracker: SnapshotTracker) -> Self { + fn new( + time_provider: Arc, + wal_buffer: WalBuffer, + snapshot_tracker: SnapshotTracker, + ) -> Self { Self { + time_provider, wal_buffer, snapshot_tracker, snapshot_semaphore: Arc::new(Semaphore::new(1)), @@ -460,6 +472,7 @@ impl FlushBuffer { fn flush_buffer_with_responses(&mut self) -> (WalContents, Vec>) { // swap out the filled buffer with a new one let mut new_buffer = WalBuffer { + time_provider: Arc::clone(&self.time_provider), is_shutdown: false, wal_file_sequence_number: self.wal_buffer.wal_file_sequence_number.next(), op_limit: self.wal_buffer.op_limit, @@ -481,8 +494,9 @@ impl FlushBuffer { } } -#[derive(Debug, Default)] +#[derive(Debug)] struct WalBuffer { + time_provider: Arc, is_shutdown: bool, wal_file_sequence_number: WalFileSequenceNumber, op_limit: usize, @@ -584,6 +598,7 @@ impl WalBuffer { ( WalContents { + persist_timestamp_ms: self.time_provider.now().timestamp_millis(), min_timestamp_ns, max_timestamp_ns, wal_file_number: self.wal_file_sequence_number, @@ -622,17 +637,21 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber { mod tests { use super::*; use crate::{ - Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks, + create, Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk, + TableChunks, }; use async_trait::async_trait; use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, TableId}; + use iox_time::{MockProvider, Time}; use object_store::memory::InMemory; use std::any::Any; use tokio::sync::oneshot::Receiver; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_flush_delete_and_load() { + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let object_store: Arc = Arc::new(InMemory::new()); let notifier: Arc = Arc::new(TestNotfiier::default()); let wal_config = WalConfig { @@ -642,6 +661,7 @@ mod tests { gen1_duration: Gen1Duration::new_1m(), }; let wal = WalObjectStore::new_without_replay( + Arc::clone(&time_provider), Arc::clone(&object_store), "my_host", Arc::clone(¬ifier), @@ -738,11 +758,9 @@ mod tests { // create wal file 1 let ret = wal.flush_buffer().await; assert!(ret.is_none()); - let file_1_contents = WalContents { - min_timestamp_ns: 1, - max_timestamp_ns: 62_000000000, - wal_file_number: WalFileSequenceNumber(1), - ops: vec![WalOp::Write(WriteBatch { + let file_1_contents = create::wal_contents( + (1, 62_000_000_000, 1), + [WalOp::Write(WriteBatch { database_id: DbId::from(0), database_name: "db1".into(), table_chunks: IndexMap::from([( @@ -802,18 +820,15 @@ mod tests { min_time_ns: 1, max_time_ns: 62_000000000, })], - snapshot: None, - }; + ); // create wal file 2 wal.buffer_op_unconfirmed(op2.clone()).await.unwrap(); assert!(wal.flush_buffer().await.is_none()); - let file_2_contents = WalContents { - min_timestamp_ns: 62000000000, - max_timestamp_ns: 62000000000, - wal_file_number: WalFileSequenceNumber(2), - ops: vec![WalOp::Write(WriteBatch { + let file_2_contents = create::wal_contents( + (62_000_000_000, 62_000_000_000, 2), + [WalOp::Write(WriteBatch { database_id: DbId::from(0), database_name: "db1".into(), table_chunks: IndexMap::from([( @@ -845,12 +860,12 @@ mod tests { min_time_ns: 62_000000000, max_time_ns: 62_000000000, })], - snapshot: None, - }; + ); // before we trigger a snapshot, test replay with a new wal and notifier let replay_notifier: Arc = Arc::new(TestNotfiier::default()); let replay_wal = WalObjectStore::new_without_replay( + Arc::clone(&time_provider), Arc::clone(&object_store), "my_host", Arc::clone(&replay_notifier), @@ -938,11 +953,9 @@ mod tests { assert_eq!(expected_info, snapshot_info); snapshot_done.await.unwrap(); - let file_3_contents = WalContents { - min_timestamp_ns: 128_000000000, - max_timestamp_ns: 128_000000000, - wal_file_number: WalFileSequenceNumber(3), - ops: vec![WalOp::Write(WriteBatch { + let file_3_contents = create::wal_contents_with_snapshot( + (128_000_000_000, 128_000_000_000, 3), + [WalOp::Write(WriteBatch { database_id: DbId::from(0), database_name: "db1".into(), table_chunks: IndexMap::from([( @@ -974,12 +987,12 @@ mod tests { min_time_ns: 128_000000000, max_time_ns: 128_000000000, })], - snapshot: Some(SnapshotDetails { + SnapshotDetails { snapshot_sequence_number: SnapshotSequenceNumber::new(1), end_time_marker: 120_000000000, last_wal_sequence_number: WalFileSequenceNumber(2), - }), - }; + }, + ); let notifier = notifier.as_any().downcast_ref::().unwrap(); @@ -997,6 +1010,7 @@ mod tests { // test that replay now only has file 3 let replay_notifier: Arc = Arc::new(TestNotfiier::default()); let replay_wal = WalObjectStore::new_without_replay( + Arc::clone(&time_provider), object_store, "my_host", Arc::clone(&replay_notifier), @@ -1021,6 +1035,8 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn flush_for_empty_buffer_skips_notify() { + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let object_store: Arc = Arc::new(InMemory::new()); let notifier: Arc = Arc::new(TestNotfiier::default()); let wal_config = WalConfig { @@ -1030,6 +1046,7 @@ mod tests { gen1_duration: Gen1Duration::new_1m(), }; let wal = WalObjectStore::new_without_replay( + time_provider, Arc::clone(&object_store), "my_host", Arc::clone(¬ifier), @@ -1046,6 +1063,24 @@ mod tests { assert!(object_store.list(None).next().await.is_none()); } + #[test] + fn persist_time_is_tracked() { + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let wal_buffer = WalBuffer { + time_provider: Arc::clone(&time_provider) as _, + is_shutdown: false, + wal_file_sequence_number: WalFileSequenceNumber(0), + op_limit: 10, + op_count: 0, + database_to_write_batch: Default::default(), + catalog_batches: vec![], + write_op_responses: vec![], + }; + time_provider.set(Time::from_timestamp_millis(1234).unwrap()); + let (wal_contents, _) = wal_buffer.into_wal_contents_and_responses(); + assert_eq!(1234, wal_contents.persist_timestamp_ms); + } + #[derive(Debug, Default)] struct TestNotfiier { notified_writes: parking_lot::Mutex>, diff --git a/influxdb3_wal/src/serialize.rs b/influxdb3_wal/src/serialize.rs index 9b3a603f11..71ef0b077c 100644 --- a/influxdb3_wal/src/serialize.rs +++ b/influxdb3_wal/src/serialize.rs @@ -120,6 +120,7 @@ mod tests { table_chunks.insert(table_id, chunks); let contents = WalContents { + persist_timestamp_ms: 10, min_timestamp_ns: 0, max_timestamp_ns: 10, wal_file_number: WalFileSequenceNumber::new(1), diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 5ddd931659..369ca0c103 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -197,6 +197,7 @@ impl WriteBufferImpl { // create the wal instance, which will replay into the queryable buffer and start // the background flush task. let wal = WalObjectStore::new( + Arc::clone(&time_provider), persister.object_store(), persister.host_identifier_prefix(), Arc::clone(&queryable_buffer) as Arc,