diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index fc51ca3cdd..44bc150660 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -92,7 +92,9 @@ pub trait Bufferer: Debug + Send + Sync + 'static { ) -> Vec; /// A channel to watch for when new persisted snapshots are created - fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver>; + fn watch_persisted_snapshots( + &self, + ) -> tokio::sync::watch::Receiver>; } /// ChunkContainer is used by the query engine to get chunks for a given table. Chunks will generally be in the @@ -142,6 +144,22 @@ pub struct BufferedWriteRequest { pub index_count: usize, } +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[serde(tag = "version")] +pub enum PersistedSnapshotVersion { + #[serde(rename = "1")] + V1(PersistedSnapshot), +} + +impl PersistedSnapshotVersion { + #[cfg(test)] + fn v1_ref(&self) -> &PersistedSnapshot { + match self { + Self::V1(ps) => ps, + } + } +} + /// The collection of Parquet files that were persisted in a snapshot #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct PersistedSnapshot { diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index a02574854d..be7d85fabd 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -1,7 +1,7 @@ //! This is the implementation of the `Persister` used to write data from the buffer to object //! storage. -use crate::PersistedSnapshot; +use crate::PersistedSnapshotVersion; use crate::paths::ParquetFilePath; use crate::paths::SnapshotInfoFilePath; use arrow::datatypes::SchemaRef; @@ -119,7 +119,10 @@ impl Persister { /// Loads the most recently persisted N snapshot parquet file lists from object storage. /// /// This is intended to be used on server start. - pub async fn load_snapshots(&self, mut most_recent_n: usize) -> Result> { + pub async fn load_snapshots( + &self, + mut most_recent_n: usize, + ) -> Result> { let mut futures = FuturesOrdered::new(); let mut offset: Option = None; @@ -165,7 +168,7 @@ impl Persister { async fn get_snapshot( location: ObjPath, object_store: Arc, - ) -> Result { + ) -> Result { let bytes = object_store.get(&location).await?.bytes().await?; serde_json::from_slice(&bytes).map_err(Into::into) } @@ -201,10 +204,15 @@ impl Persister { } /// Persists the snapshot file - pub async fn persist_snapshot(&self, persisted_snapshot: &PersistedSnapshot) -> Result<()> { + pub async fn persist_snapshot( + &self, + persisted_snapshot: &PersistedSnapshotVersion, + ) -> Result<()> { let snapshot_file_path = SnapshotInfoFilePath::new( self.node_identifier_prefix.as_str(), - persisted_snapshot.snapshot_sequence_number, + match persisted_snapshot { + PersistedSnapshotVersion::V1(ps) => ps.snapshot_sequence_number, + }, ); let json = serde_json::to_vec_pretty(persisted_snapshot)?; self.object_store @@ -337,7 +345,9 @@ impl TrackedMemoryArrowWriter { #[cfg(test)] mod tests { use super::*; - use crate::{DatabaseTables, ParquetFile, ParquetFileId}; + use crate::{ + DatabaseTables, ParquetFile, ParquetFileId, PersistedSnapshot, PersistedSnapshotVersion, + }; use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_id::{DbId, SerdeVecMap, TableId}; use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; @@ -357,7 +367,7 @@ mod tests { let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider); - let info_file = PersistedSnapshot { + let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot { node_id: "test_host".to_string(), next_file_id: ParquetFileId::from(0), snapshot_sequence_number: SnapshotSequenceNumber::new(0), @@ -368,7 +378,7 @@ mod tests { max_time: 1, row_count: 0, parquet_size_bytes: 0, - }; + }); persister.persist_snapshot(&info_file).await.unwrap(); } @@ -379,7 +389,7 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider); - let info_file = PersistedSnapshot { + let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot { node_id: "test_host".to_string(), next_file_id: ParquetFileId::from(0), snapshot_sequence_number: SnapshotSequenceNumber::new(0), @@ -390,8 +400,8 @@ mod tests { max_time: 1, row_count: 0, parquet_size_bytes: 0, - }; - let info_file_2 = PersistedSnapshot { + }); + let info_file_2 = PersistedSnapshotVersion::V1(PersistedSnapshot { node_id: "test_host".to_string(), next_file_id: ParquetFileId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(1), @@ -402,8 +412,8 @@ mod tests { min_time: 0, row_count: 0, parquet_size_bytes: 0, - }; - let info_file_3 = PersistedSnapshot { + }); + let info_file_3 = PersistedSnapshotVersion::V1(PersistedSnapshot { node_id: "test_host".to_string(), next_file_id: ParquetFileId::from(2), snapshot_sequence_number: SnapshotSequenceNumber::new(2), @@ -414,7 +424,7 @@ mod tests { max_time: 1, row_count: 0, parquet_size_bytes: 0, - }; + }); persister.persist_snapshot(&info_file).await.unwrap(); persister.persist_snapshot(&info_file_2).await.unwrap(); @@ -423,12 +433,12 @@ mod tests { let snapshots = persister.load_snapshots(2).await.unwrap(); assert_eq!(snapshots.len(), 2); // The most recent files are first - assert_eq!(snapshots[0].next_file_id.as_u64(), 2); - assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 2); - assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 2); - assert_eq!(snapshots[1].next_file_id.as_u64(), 1); - assert_eq!(snapshots[1].wal_file_sequence_number.as_u64(), 1); - assert_eq!(snapshots[1].snapshot_sequence_number.as_u64(), 1); + assert_eq!(snapshots[0].v1_ref().next_file_id.as_u64(), 2); + assert_eq!(snapshots[0].v1_ref().wal_file_sequence_number.as_u64(), 2); + assert_eq!(snapshots[0].v1_ref().snapshot_sequence_number.as_u64(), 2); + assert_eq!(snapshots[1].v1_ref().next_file_id.as_u64(), 1); + assert_eq!(snapshots[1].v1_ref().wal_file_sequence_number.as_u64(), 1); + assert_eq!(snapshots[1].v1_ref().snapshot_sequence_number.as_u64(), 1); } #[tokio::test] @@ -437,7 +447,7 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider); - let info_file = PersistedSnapshot { + let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot { node_id: "test_host".to_string(), next_file_id: ParquetFileId::from(0), snapshot_sequence_number: SnapshotSequenceNumber::new(0), @@ -448,12 +458,12 @@ mod tests { max_time: 1, row_count: 0, parquet_size_bytes: 0, - }; + }); persister.persist_snapshot(&info_file).await.unwrap(); let snapshots = persister.load_snapshots(2).await.unwrap(); // We asked for the most recent 2 but there should only be 1 assert_eq!(snapshots.len(), 1); - assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 0); + assert_eq!(snapshots[0].v1_ref().wal_file_sequence_number.as_u64(), 0); } #[tokio::test] @@ -464,7 +474,7 @@ mod tests { let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider); for id in 0..1001 { - let info_file = PersistedSnapshot { + let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot { node_id: "test_host".to_string(), next_file_id: ParquetFileId::from(id), snapshot_sequence_number: SnapshotSequenceNumber::new(id), @@ -475,16 +485,22 @@ mod tests { max_time: 1, row_count: 0, parquet_size_bytes: 0, - }; + }); persister.persist_snapshot(&info_file).await.unwrap(); } let snapshots = persister.load_snapshots(1500).await.unwrap(); // We asked for the most recent 1500 so there should be 1001 of them assert_eq!(snapshots.len(), 1001); - assert_eq!(snapshots[0].next_file_id.as_u64(), 1000); - assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 1000); - assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 1000); - assert_eq!(snapshots[0].catalog_sequence_number.get(), 1000); + assert_eq!(snapshots[0].v1_ref().next_file_id.as_u64(), 1000); + assert_eq!( + snapshots[0].v1_ref().wal_file_sequence_number.as_u64(), + 1000 + ); + assert_eq!( + snapshots[0].v1_ref().snapshot_sequence_number.as_u64(), + 1000 + ); + assert_eq!(snapshots[0].v1_ref().catalog_sequence_number.get(), 1000); } #[tokio::test] @@ -521,14 +537,17 @@ mod tests { max_time: 1, }, ); - persister.persist_snapshot(&info_file).await.unwrap(); + persister + .persist_snapshot(&PersistedSnapshotVersion::V1(info_file)) + .await + .unwrap(); let snapshots = persister.load_snapshots(10).await.unwrap(); assert_eq!(snapshots.len(), 1); // Should be the next available id after the largest number - assert_eq!(snapshots[0].next_file_id.as_u64(), 9877); - assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 0); - assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 0); - assert_eq!(snapshots[0].catalog_sequence_number.get(), 0); + assert_eq!(snapshots[0].v1_ref().next_file_id.as_u64(), 9877); + assert_eq!(snapshots[0].v1_ref().wal_file_sequence_number.as_u64(), 0); + assert_eq!(snapshots[0].v1_ref().snapshot_sequence_number.as_u64(), 0); + assert_eq!(snapshots[0].v1_ref().catalog_sequence_number.get(), 0); } #[tokio::test] @@ -590,7 +609,7 @@ mod tests { ), ] .into(); - let snapshot = PersistedSnapshot { + let snapshot = PersistedSnapshotVersion::V1(PersistedSnapshot { node_id: "host".to_string(), next_file_id: ParquetFileId::new(), snapshot_sequence_number: SnapshotSequenceNumber::new(0), @@ -601,7 +620,7 @@ mod tests { min_time: 0, max_time: 1, databases, - }; + }); insta::assert_json_snapshot!(snapshot); } diff --git a/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap b/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap index 599477f96f..6f88476fe0 100644 --- a/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap +++ b/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap @@ -3,6 +3,7 @@ source: influxdb3_write/src/persister.rs expression: snapshot --- { + "version": "1", "node_id": "host", "next_file_id": 8, "snapshot_sequence_number": 0, diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 4d76035478..cd6324a2b1 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -10,7 +10,8 @@ pub mod validator; use crate::{ BufferedWriteRequest, Bufferer, ChunkContainer, ChunkFilter, DistinctCacheManager, - LastCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError, + LastCacheManager, ParquetFile, PersistedSnapshot, PersistedSnapshotVersion, Precision, + WriteBuffer, WriteLineError, chunk::ParquetChunk, persister::{Persister, PersisterError}, write_buffer::{ @@ -197,7 +198,13 @@ impl WriteBufferImpl { // load snapshots and replay the wal into the in memory buffer let persisted_snapshots = persister .load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START) - .await?; + .await? + .into_iter() + // map the persisted snapshots into the newest version + .map(|psv| match psv { + PersistedSnapshotVersion::V1(ps) => ps, + }) + .collect::>(); let last_wal_sequence_number = persisted_snapshots .first() .map(|s| s.wal_file_sequence_number); @@ -544,7 +551,7 @@ impl Bufferer for WriteBufferImpl { self.buffer.persisted_parquet_files(db_id, table_id, filter) } - fn watch_persisted_snapshots(&self) -> Receiver> { + fn watch_persisted_snapshots(&self) -> Receiver> { self.buffer.persisted_snapshot_notify_rx() } } @@ -1096,8 +1103,8 @@ mod tests { let persisted = write_buffer.persister.load_snapshots(1000).await.unwrap(); if !persisted.is_empty() { assert_eq!(persisted.len(), 1); - assert_eq!(persisted[0].min_time, 10000000000); - assert_eq!(persisted[0].row_count, 2); + assert_eq!(persisted[0].v1_ref().min_time, 10000000000); + assert_eq!(persisted[0].v1_ref().row_count, 2); break; } else if ticks > 10 { panic!("not persisting"); @@ -1349,7 +1356,8 @@ mod tests { WalFileSequenceNumber::new(0), CatalogSequenceNumber::new(0), ); - let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap(); + let snapshot_json = + serde_json::to_vec(&PersistedSnapshotVersion::V1(prev_snapshot)).unwrap(); // set ParquetFileId to be 0 so that we can make sure when it's loaded from the // snapshot that it becomes the expected number ParquetFileId::from(0).set_next_id(); @@ -1485,7 +1493,8 @@ mod tests { assert_eq!(file.id.as_u64(), i as u64); } - let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap(); + let snapshot_json = + serde_json::to_vec(&PersistedSnapshotVersion::V1(prev_snapshot)).unwrap(); // put the snapshot file in object store: object_store diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 52a9f0cfe2..e0d7916c8e 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -3,7 +3,7 @@ use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; -use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot}; +use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot, PersistedSnapshotVersion}; use anyhow::Context; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -48,8 +48,8 @@ pub struct QueryableBuffer { buffer: Arc>, parquet_cache: Option>, /// Sends a notification to this watch channel whenever a snapshot info is persisted - persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, - persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, + persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, + persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, } #[derive(Debug)] @@ -346,9 +346,11 @@ impl QueryableBuffer { // force_snapshot) snapshot runs, snapshot_tracker will check if // wal_periods are empty so it won't trigger a snapshot in the first // place. - let persisted_snapshot = Arc::into_inner(persisted_snapshot) - .expect("Should only have one strong reference") - .into_inner(); + let persisted_snapshot = PersistedSnapshotVersion::V1( + Arc::into_inner(persisted_snapshot) + .expect("Should only have one strong reference") + .into_inner(), + ); if !persist_jobs_empty { loop { match persister.persist_snapshot(&persisted_snapshot).await { @@ -385,7 +387,7 @@ impl QueryableBuffer { pub fn persisted_snapshot_notify_rx( &self, - ) -> tokio::sync::watch::Receiver> { + ) -> tokio::sync::watch::Receiver> { self.persisted_snapshot_notify_rx.clone() }