feat: snapshots track their own sequence number (#25255)

pull/25258/head
Trevor Hilton 2024-08-20 15:55:47 -07:00 committed by GitHub
parent d0720a4fe4
commit 3b174a2f98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 295 additions and 60 deletions

View File

@ -86,6 +86,10 @@ impl SequenceNumber {
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
pub fn as_u32(&self) -> u32 {
self.0
}
}
#[derive(Debug)]

View File

@ -81,7 +81,10 @@ pub trait Wal: Debug + Send + Sync + 'static {
);
/// Returns the last persisted wal file sequence number
async fn last_sequence_number(&self) -> WalFileSequenceNumber;
async fn last_wal_sequence_number(&self) -> WalFileSequenceNumber;
/// Returns the last persisted wal file sequence number
async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber;
/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);
@ -600,7 +603,7 @@ impl WalFileSequenceNumber {
Self(self.0 + 1)
}
pub fn get(&self) -> u64 {
pub fn as_u64(&self) -> u64 {
self.0
}
}
@ -611,13 +614,40 @@ impl std::fmt::Display for WalFileSequenceNumber {
}
}
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub struct SnapshotSequenceNumber(u64);
impl SnapshotSequenceNumber {
pub fn new(number: u64) -> Self {
Self(number)
}
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
impl std::fmt::Display for SnapshotSequenceNumber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Details about a snapshot of the WAL
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub struct SnapshotDetails {
/// The sequence number for this snapshot
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// All chunks with data before this time can be snapshot and persisted
pub end_time_marker: i64,
/// All wal files with a sequence number <= to this can be deleted once snapshotting is complete
pub last_sequence_number: WalFileSequenceNumber,
pub last_wal_sequence_number: WalFileSequenceNumber,
}
pub fn background_wal_flush<W: Wal>(

View File

@ -1,8 +1,8 @@
use crate::serialize::verify_file_type_and_deserialize;
use crate::snapshot_tracker::{SnapshotInfo, SnapshotTracker, WalPeriod};
use crate::{
background_wal_flush, CatalogBatch, SnapshotDetails, Wal, WalConfig, WalContents,
WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch,
background_wal_flush, CatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal, WalConfig,
WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch,
};
use bytes::Bytes;
use data_types::Timestamp;
@ -33,7 +33,8 @@ impl WalObjectStore {
host_identifier_prefix: impl Into<String> + Send,
file_notifier: Arc<dyn WalFileNotifier>,
config: WalConfig,
last_snapshot_wal_sequence: Option<WalFileSequenceNumber>,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
) -> Result<Arc<Self>, crate::Error> {
let flush_interval = config.flush_interval;
let wal = Self::new_without_replay(
@ -41,7 +42,8 @@ impl WalObjectStore {
host_identifier_prefix,
file_notifier,
config,
last_snapshot_wal_sequence,
last_wal_sequence_number,
last_snapshot_sequence_number,
);
wal.replay().await?;
@ -56,9 +58,10 @@ impl WalObjectStore {
host_identifier_prefix: impl Into<String>,
file_notifier: Arc<dyn WalFileNotifier>,
config: WalConfig,
last_snapshot_wal_sequence: Option<WalFileSequenceNumber>,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
) -> Self {
let wal_file_sequence_number = last_snapshot_wal_sequence.unwrap_or_default().next();
let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next();
Self {
object_store,
host_identifier_prefix: host_identifier_prefix.into(),
@ -73,7 +76,11 @@ impl WalObjectStore {
catalog_batches: vec![],
write_op_responses: vec![],
},
SnapshotTracker::new(config.snapshot_size, config.level_0_duration),
SnapshotTracker::new(
config.snapshot_size,
config.level_0_duration,
last_snapshot_sequence_number,
),
)),
}
}
@ -256,7 +263,7 @@ impl WalObjectStore {
None => {
debug!(
"notify sent to buffer for wal file {}",
wal_contents.wal_file_number.get()
wal_contents.wal_file_number.as_u64()
);
self.file_notifier.notify(wal_contents);
None
@ -359,12 +366,20 @@ impl Wal for WalObjectStore {
.await
}
async fn last_sequence_number(&self) -> WalFileSequenceNumber {
async fn last_wal_sequence_number(&self) -> WalFileSequenceNumber {
self.flush_buffer
.lock()
.await
.snapshot_tracker
.last_sequence_number()
.last_wal_sequence_number()
}
async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber {
self.flush_buffer
.lock()
.await
.snapshot_tracker
.last_snapshot_sequence_number()
}
async fn shutdown(&self) {
@ -577,7 +592,9 @@ fn wal_path(host_identifier_prefix: &str, wal_file_number: WalFileSequenceNumber
#[cfg(test)]
mod tests {
use super::*;
use crate::{Field, FieldData, Level0Duration, Row, TableChunk, TableChunks};
use crate::{
Field, FieldData, Level0Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks,
};
use async_trait::async_trait;
use object_store::memory::InMemory;
use std::any::Any;
@ -599,6 +616,7 @@ mod tests {
Arc::clone(&notifier),
wal_config,
None,
None,
);
let db_name: Arc<str> = "db1".into();
@ -805,6 +823,7 @@ mod tests {
snapshot_size: 2,
},
None,
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
@ -864,8 +883,9 @@ mod tests {
let (snapshot_done, snapshot_info, snapshot_permit) = wal.flush_buffer().await.unwrap();
let expected_info = SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 120000000000,
last_sequence_number: WalFileSequenceNumber(2),
last_wal_sequence_number: WalFileSequenceNumber(2),
},
wal_periods: vec![
WalPeriod {
@ -918,8 +938,9 @@ mod tests {
max_time_ns: 128_000000000,
})],
snapshot: Some(SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 120_000000000,
last_sequence_number: WalFileSequenceNumber(2),
last_wal_sequence_number: WalFileSequenceNumber(2),
}),
};
@ -944,6 +965,7 @@ mod tests {
Arc::clone(&replay_notifier),
wal_config,
None,
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
@ -976,6 +998,7 @@ mod tests {
Arc::clone(&notifier),
wal_config,
None,
None,
);
assert!(wal.flush_buffer().await.is_none());

View File

@ -4,13 +4,14 @@
//! configured as it can be used to ensure that data in the write buffer is persisted in blocks
//! that are not too large and unlikely to overlap.
use crate::{Level0Duration, SnapshotDetails, WalFileSequenceNumber};
use crate::{Level0Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber};
use data_types::Timestamp;
/// A struct that tracks the WAL periods (files if using object store) and decides when to snapshot the WAL.
#[derive(Debug)]
pub(crate) struct SnapshotTracker {
last_sequence_number: WalFileSequenceNumber,
last_snapshot_sequence_number: SnapshotSequenceNumber,
last_wal_sequence_number: WalFileSequenceNumber,
wal_periods: Vec<WalPeriod>,
snapshot_size: usize,
level_0_duration: Level0Duration,
@ -20,9 +21,14 @@ impl SnapshotTracker {
/// Create a new `SnapshotTracker` with the given snapshot size and level 0 duration. The
/// level 0 duration is the size of chunks in the write buffer that will be persisted as
/// parquet files.
pub(crate) fn new(snapshot_size: usize, level_0_duration: Level0Duration) -> Self {
pub(crate) fn new(
snapshot_size: usize,
level_0_duration: Level0Duration,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
) -> Self {
Self {
last_sequence_number: WalFileSequenceNumber::default(),
last_snapshot_sequence_number: last_snapshot_sequence_number.unwrap_or_default(),
last_wal_sequence_number: WalFileSequenceNumber::default(),
wal_periods: Vec::new(),
snapshot_size,
level_0_duration,
@ -38,7 +44,7 @@ impl SnapshotTracker {
assert!(last_period.wal_file_number < wal_period.wal_file_number);
}
self.last_sequence_number = wal_period.wal_file_number;
self.last_wal_sequence_number = wal_period.wal_file_number;
self.wal_periods.push(wal_period);
}
@ -59,7 +65,7 @@ impl SnapshotTracker {
// if the number of wal periods is > 3x the snapshot size, snapshot everything up to the last period
if self.wal_periods.len() >= 3 * self.snapshot_size {
let last_sequence_number = self.wal_periods.last().unwrap().wal_file_number;
let last_wal_sequence_number = self.wal_periods.last().unwrap().wal_file_number;
let max_time = self
.wal_periods
.iter()
@ -72,8 +78,9 @@ impl SnapshotTracker {
// remove the wal periods and return the snapshot details
let wal_periods = std::mem::take(&mut self.wal_periods);
let snapshot_details = SnapshotDetails {
snapshot_sequence_number: self.increment_snapshot_sequence_number(),
end_time_marker: t.get(),
last_sequence_number,
last_wal_sequence_number,
};
return Some(SnapshotInfo {
@ -101,8 +108,9 @@ impl SnapshotTracker {
SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: self.increment_snapshot_sequence_number(),
end_time_marker: t.get(),
last_sequence_number: period.wal_file_number,
last_wal_sequence_number: period.wal_file_number,
},
wal_periods: periods_to_snapshot,
}
@ -115,9 +123,19 @@ impl SnapshotTracker {
self.snapshot_size + self.snapshot_size / 2
}
/// Returns the last `WalFileSequenceNumber` that was added to the tracker.
pub(crate) fn last_sequence_number(&self) -> WalFileSequenceNumber {
self.last_sequence_number
/// Returns the last [`WalFileSequenceNumber`] that was added to the tracker.
pub(crate) fn last_wal_sequence_number(&self) -> WalFileSequenceNumber {
self.last_wal_sequence_number
}
/// Returns the last [`SnapshotSequenceNumber`] that was added to the tracker.
pub(crate) fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber {
self.last_snapshot_sequence_number
}
fn increment_snapshot_sequence_number(&mut self) -> SnapshotSequenceNumber {
self.last_snapshot_sequence_number = self.last_snapshot_sequence_number.next();
self.last_snapshot_sequence_number
}
}
@ -158,7 +176,7 @@ mod tests {
#[test]
fn snapshot() {
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m());
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None);
let p1 = WalPeriod::new(
WalFileSequenceNumber::new(1),
Timestamp::new(0),
@ -200,8 +218,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 120_000000000,
last_sequence_number: WalFileSequenceNumber::new(2)
last_wal_sequence_number: WalFileSequenceNumber::new(2)
},
wal_periods: vec![p1, p2],
})
@ -213,8 +232,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
end_time_marker: 240_000000000,
last_sequence_number: WalFileSequenceNumber::new(3)
last_wal_sequence_number: WalFileSequenceNumber::new(3)
},
wal_periods: vec![p3]
})
@ -227,8 +247,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(3),
end_time_marker: 360_000000000,
last_sequence_number: WalFileSequenceNumber::new(5)
last_wal_sequence_number: WalFileSequenceNumber::new(5)
},
wal_periods: vec![p4, p5]
})
@ -239,7 +260,7 @@ mod tests {
#[test]
fn snapshot_future_data_forces_snapshot() {
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m());
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None);
let p1 = WalPeriod::new(
WalFileSequenceNumber::new(1),
Timestamp::new(0),
@ -285,8 +306,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 360000000000,
last_sequence_number: WalFileSequenceNumber::new(6)
last_wal_sequence_number: WalFileSequenceNumber::new(6)
},
wal_periods: vec![p1, p2, p3, p4, p5, p6]
})

View File

@ -20,8 +20,8 @@ use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::Expr;
use influxdb3_catalog::catalog;
use influxdb3_wal::{LastCacheDefinition, WalFileSequenceNumber};
use influxdb3_catalog::catalog::{self, SequenceNumber};
use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::QueryChunk;
use iox_time::Time;
use last_cache::LastCacheProvider;
@ -214,8 +214,12 @@ pub struct PersistedCatalog {
/// The collection of Parquet files that were persisted in a snapshot
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct PersistedSnapshot {
/// The snapshot sequence number associated with this snapshot
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// The wal file sequence number that triggered this snapshot
pub wal_file_sequence_number: WalFileSequenceNumber,
/// The catalog sequence number associated with this snapshot
pub catalog_sequence_number: SequenceNumber,
/// The size of the snapshot parquet files in bytes.
pub parquet_size_bytes: u64,
/// The number of rows across all parquet files in the snapshot.
@ -230,9 +234,15 @@ pub struct PersistedSnapshot {
}
impl PersistedSnapshot {
pub fn new(wal_file_sequence_number: WalFileSequenceNumber) -> Self {
pub fn new(
snapshot_sequence_number: SnapshotSequenceNumber,
wal_file_sequence_number: WalFileSequenceNumber,
catalog_sequence_number: SequenceNumber,
) -> Self {
Self {
snapshot_sequence_number,
wal_file_sequence_number,
catalog_sequence_number,
parquet_size_bytes: 0,
row_count: 0,
min_time: i64::MAX,

View File

@ -1,5 +1,5 @@
use chrono::prelude::*;
use influxdb3_wal::WalFileSequenceNumber;
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
use object_store::path::Path as ObjPath;
use std::ops::Deref;
@ -23,7 +23,7 @@ impl CatalogFilePath {
pub fn new(host_prefix: &str, wal_file_sequence_number: WalFileSequenceNumber) -> Self {
let path = ObjPath::from(format!(
"{host_prefix}/catalogs/{:020}.{}",
object_store_file_stem(wal_file_sequence_number.get()),
object_store_file_stem(wal_file_sequence_number.as_u64()),
CATALOG_FILE_EXTENSION
));
Self(path)
@ -62,7 +62,7 @@ impl ParquetFilePath {
let path = ObjPath::from(format!(
"{host_prefix}/dbs/{db_name}/{table_name}/{}/{}.{}",
date.format("%Y-%m-%d/%H-%M"),
wal_file_sequence_number.get(),
wal_file_sequence_number.as_u64(),
PARQUET_FILE_EXTENSION
));
Self(path)
@ -79,7 +79,7 @@ impl ParquetFilePath {
let path = ObjPath::from(format!(
"dbs/{db_name}/{table_name}/{}/{:010}.{}",
date_time.format("%Y-%m-%d/%H-%M"),
wal_file_sequence_number.get(),
wal_file_sequence_number.as_u64(),
PARQUET_FILE_EXTENSION
));
Self(path)
@ -104,10 +104,10 @@ impl AsRef<ObjPath> for ParquetFilePath {
pub struct SnapshotInfoFilePath(ObjPath);
impl SnapshotInfoFilePath {
pub fn new(host_prefix: &str, wal_file_sequence_number: WalFileSequenceNumber) -> Self {
pub fn new(host_prefix: &str, snapshot_sequence_number: SnapshotSequenceNumber) -> Self {
let path = ObjPath::from(format!(
"{host_prefix}/snapshots/{:020}.{}",
object_store_file_stem(wal_file_sequence_number.get()),
object_store_file_stem(snapshot_sequence_number.as_u64()),
SNAPSHOT_INFO_FILE_EXTENSION
));
Self(path)
@ -173,7 +173,7 @@ fn parquet_file_percent_encoded() {
#[test]
fn snapshot_info_file_path_new() {
assert_eq!(
*SnapshotInfoFilePath::new("my_host", WalFileSequenceNumber::new(0)),
*SnapshotInfoFilePath::new("my_host", SnapshotSequenceNumber::new(0)),
ObjPath::from("my_host/snapshots/18446744073709551615.info.json")
);
}

View File

@ -270,7 +270,7 @@ impl Persister for PersisterImpl {
async fn persist_snapshot(&self, persisted_snapshot: &PersistedSnapshot) -> Result<()> {
let snapshot_file_path = SnapshotInfoFilePath::new(
self.host_identifier_prefix.as_str(),
persisted_snapshot.wal_file_sequence_number,
persisted_snapshot.snapshot_sequence_number,
);
let json = serde_json::to_vec_pretty(persisted_snapshot)?;
self.object_store
@ -362,6 +362,8 @@ impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
#[cfg(test)]
mod tests {
use super::*;
use influxdb3_catalog::catalog::SequenceNumber;
use influxdb3_wal::SnapshotSequenceNumber;
use object_store::memory::InMemory;
use {
arrow::array::Int32Array, arrow::datatypes::DataType, arrow::datatypes::Field,
@ -425,7 +427,9 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
let info_file = PersistedSnapshot {
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::new(0),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
@ -442,7 +446,9 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
let info_file = PersistedSnapshot {
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
@ -450,7 +456,9 @@ mod tests {
parquet_size_bytes: 0,
};
let info_file_2 = PersistedSnapshot {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
wal_file_sequence_number: WalFileSequenceNumber::new(1),
catalog_sequence_number: SequenceNumber::default(),
databases: HashMap::new(),
max_time: 1,
min_time: 0,
@ -458,7 +466,9 @@ mod tests {
parquet_size_bytes: 0,
};
let info_file_3 = PersistedSnapshot {
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
wal_file_sequence_number: WalFileSequenceNumber::new(2),
catalog_sequence_number: SequenceNumber::default(),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
@ -472,9 +482,11 @@ mod tests {
let snapshots = persister.load_snapshots(2).await.unwrap();
assert_eq!(snapshots.len(), 2);
// The most recent one is first
assert_eq!(snapshots[0].wal_file_sequence_number.get(), 2);
assert_eq!(snapshots[1].wal_file_sequence_number.get(), 1);
// The most recent files are first
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 2);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 2);
assert_eq!(snapshots[1].wal_file_sequence_number.as_u64(), 1);
assert_eq!(snapshots[1].snapshot_sequence_number.as_u64(), 1);
}
#[tokio::test]
@ -483,7 +495,9 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
let info_file = PersistedSnapshot {
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
@ -494,7 +508,7 @@ mod tests {
let snapshots = persister.load_snapshots(2).await.unwrap();
// We asked for the most recent 2 but there should only be 1
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].wal_file_sequence_number.get(), 0);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 0);
}
#[tokio::test]
@ -505,7 +519,9 @@ mod tests {
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
for id in 0..9001 {
let info_file = PersistedSnapshot {
snapshot_sequence_number: SnapshotSequenceNumber::new(id),
wal_file_sequence_number: WalFileSequenceNumber::new(id),
catalog_sequence_number: SequenceNumber::new(id as u32),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
@ -517,7 +533,9 @@ mod tests {
let snapshots = persister.load_snapshots(9500).await.unwrap();
// We asked for the most recent 9500 so there should be 9001 of them
assert_eq!(snapshots.len(), 9001);
assert_eq!(snapshots[0].wal_file_sequence_number.get(), 9000);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 9000);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 9000);
assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 9000);
}
#[tokio::test]

View File

@ -111,6 +111,8 @@ pub struct WriteBufferImpl<T> {
last_cache: Arc<LastCacheProvider>,
}
const N_SNAPSHOTS_TO_LOAD_ON_START: usize = 1_000;
impl<T: TimeProvider> WriteBufferImpl<T> {
pub async fn new(
persister: Arc<PersisterImpl>,
@ -128,10 +130,15 @@ impl<T: TimeProvider> WriteBufferImpl<T> {
let last_cache = Arc::new(LastCacheProvider::new_from_catalog(&catalog.clone_inner())?);
let persisted_snapshots = persister.load_snapshots(1000).await?;
let last_snapshot_wal_sequence = persisted_snapshots
let persisted_snapshots = persister
.load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START)
.await?;
let last_wal_sequence_number = persisted_snapshots
.first()
.map(|s| s.wal_file_sequence_number);
let last_snapshot_sequence_number = persisted_snapshots
.first()
.map(|s| s.snapshot_sequence_number);
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
persisted_snapshots,
));
@ -150,7 +157,8 @@ impl<T: TimeProvider> WriteBufferImpl<T> {
persister.host_identifier_prefix(),
Arc::clone(&queryable_buffer) as Arc<dyn WalFileNotifier>,
wal_config,
last_snapshot_wal_sequence,
last_wal_sequence_number,
last_snapshot_sequence_number,
)
.await?;
@ -581,18 +589,22 @@ impl<T: TimeProvider> WriteBuffer for WriteBufferImpl<T> {}
#[cfg(test)]
mod tests {
use super::*;
use crate::paths::CatalogFilePath;
use crate::paths::{CatalogFilePath, SnapshotInfoFilePath};
use crate::persister::PersisterImpl;
use crate::PersistedSnapshot;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bytes::Bytes;
use datafusion::assert_batches_sorted_eq;
use datafusion_util::config::register_iox_object_store;
use futures_util::StreamExt;
use influxdb3_wal::Level0Duration;
use influxdb3_catalog::catalog::SequenceNumber;
use influxdb3_wal::{Level0Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::exec::IOxSessionContext;
use iox_time::{MockProvider, Time};
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::ObjectStore;
use object_store::{ObjectStore, PutPayload};
#[test]
fn parse_lp_into_buffer() {
@ -712,6 +724,7 @@ mod tests {
async fn last_cache_create_and_delete_is_durable() {
let (wbuf, _ctx) = setup(
Time::from_timestamp_nanos(0),
Arc::new(InMemory::new()),
WalConfig {
level_0_duration: Level0Duration::new_1m(),
max_write_buffer_size: 100,
@ -841,6 +854,7 @@ mod tests {
async fn returns_chunks_across_parquet_and_buffered_data() {
let (write_buffer, session_context) = setup(
Time::from_timestamp_nanos(0),
Arc::new(InMemory::new()),
WalConfig {
level_0_duration: Level0Duration::new_1m(),
max_write_buffer_size: 100,
@ -1031,6 +1045,7 @@ mod tests {
async fn catalog_snapshots_only_if_updated() {
let (write_buffer, _ctx) = setup(
Time::from_timestamp_nanos(0),
Arc::new(InMemory::new()),
WalConfig {
level_0_duration: Level0Duration::new_1m(),
max_write_buffer_size: 100,
@ -1147,6 +1162,112 @@ mod tests {
verify_snapshot_count(3, &write_buffer.persister).await;
}
/// Check that when a WriteBuffer is initialized with existing snapshot files, that newly
/// generated snapshot files use the next sequence number.
#[tokio::test]
async fn new_snapshots_use_correct_sequence() {
// set up a local file system object store:
let object_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
// create a snapshot file that will be loaded on initialization of the write buffer:
let prev_snapshot_seq = SnapshotSequenceNumber::new(42);
let prev_snapshot = PersistedSnapshot::new(
prev_snapshot_seq,
WalFileSequenceNumber::new(0),
SequenceNumber::new(0),
);
let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap();
// put the snapshot file in object store:
object_store
.put(
&SnapshotInfoFilePath::new("test_host", prev_snapshot_seq),
PutPayload::from_bytes(Bytes::from(snapshot_json)),
)
.await
.unwrap();
// setup the write buffer:
let (wbuf, _ctx) = setup(
Time::from_timestamp_nanos(0),
Arc::clone(&object_store),
WalConfig {
level_0_duration: Level0Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(5),
snapshot_size: 1,
},
)
.await;
// there should be one snapshot already, i.e., the one we created above:
verify_snapshot_count(1, &wbuf.persister).await;
// there aren't any catalogs yet:
verify_catalog_count(0, object_store.clone()).await;
// do three writes to force a new snapshot
wbuf.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=1",
Time::from_timestamp(10, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
wbuf.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=2",
Time::from_timestamp(20, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
wbuf.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=3",
Time::from_timestamp(30, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
// Check that there are now 2 snapshots:
verify_snapshot_count(2, &wbuf.persister).await;
// Check that the next sequence number is used for the new snapshot:
assert_eq!(
prev_snapshot_seq.next(),
wbuf.wal.last_snapshot_sequence_number().await
);
// There should be a catalog now, since the above writes updated the catalog
verify_catalog_count(1, object_store.clone()).await;
// Check the catalog sequence number in the latest snapshot is correct:
let persisted_snapshot_bytes = object_store
.get(&SnapshotInfoFilePath::new(
"test_host",
prev_snapshot_seq.next(),
))
.await
.unwrap()
.bytes()
.await
.unwrap();
let persisted_snapshot =
serde_json::from_slice::<PersistedSnapshot>(&persisted_snapshot_bytes).unwrap();
// NOTE: it appears that writes which create a new db increment the catalog sequence twice.
// This is likely due to the catalog sequence being incremented first for the db creation and
// then again for the updates to the table written to. Hence the sequence number is 2 here.
// If we manage to make it so that scenario only increments the catalog sequence once, then
// this assertion may fail:
assert_eq!(
SequenceNumber::new(2),
persisted_snapshot.catalog_sequence_number
);
}
async fn verify_catalog_count(n: usize, object_store: Arc<dyn ObjectStore>) {
let mut checks = 0;
loop {
@ -1159,7 +1280,7 @@ mod tests {
if catalogs.len() > n {
panic!("checking for {} catalogs but found {}", n, catalogs.len());
} else if catalogs.len() == n && checks > 5 {
// let enough checks happen to ensure extra catalog persists aren't running ion the background
// let enough checks happen to ensure extra catalog persists aren't running in the background
break;
} else {
checks += 1;
@ -1201,9 +1322,9 @@ mod tests {
async fn setup(
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (WriteBufferImpl<MockProvider>, IOxSessionContext) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(start));
let wbuf = WriteBufferImpl::new(

View File

@ -197,7 +197,10 @@ impl QueryableBuffer {
if !catalog.is_updated() {
break;
}
info!("persisting catalog for wal file {}", wal_file_number.get());
info!(
"persisting catalog for wal file {}",
wal_file_number.as_u64()
);
let inner_catalog = catalog.clone_inner();
let sequence_number = inner_catalog.sequence_number();
@ -219,10 +222,14 @@ impl QueryableBuffer {
info!(
"persisting {} chunks for wal number {}",
persist_jobs.len(),
wal_file_number.get(),
wal_file_number.as_u64(),
);
// persist the individual files, building the snapshot as we go
let mut persisted_snapshot = PersistedSnapshot::new(wal_file_number);
let mut persisted_snapshot = PersistedSnapshot::new(
snapshot_details.snapshot_sequence_number,
wal_file_number,
catalog.sequence_number(),
);
for persist_job in persist_jobs {
let path = persist_job.path.to_string();
let database_name = Arc::clone(&persist_job.database_name);