feat: Add PeristedSnapshotVersion for snapshots (#26117)

Continuing our work of creating versioned files before Beta, this commit
adds a PersistedSnapshotVersion which is used at the boundary of
serializing and deserializing so that we can easily upgrade to a newer
version and handle old versions without breaking things for users.
processing_engine/escape_line_builder
Michael Gattozzi 2025-03-12 13:21:54 -04:00 committed by GitHub
parent b6cb6dd51e
commit 00aa90fe6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 100 additions and 51 deletions

View File

@ -92,7 +92,9 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
) -> Vec<ParquetFile>;
/// A channel to watch for when new persisted snapshots are created
fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>>;
fn watch_persisted_snapshots(
&self,
) -> tokio::sync::watch::Receiver<Option<PersistedSnapshotVersion>>;
}
/// ChunkContainer is used by the query engine to get chunks for a given table. Chunks will generally be in the
@ -142,6 +144,22 @@ pub struct BufferedWriteRequest {
pub index_count: usize,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(tag = "version")]
pub enum PersistedSnapshotVersion {
#[serde(rename = "1")]
V1(PersistedSnapshot),
}
impl PersistedSnapshotVersion {
#[cfg(test)]
fn v1_ref(&self) -> &PersistedSnapshot {
match self {
Self::V1(ps) => ps,
}
}
}
/// The collection of Parquet files that were persisted in a snapshot
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct PersistedSnapshot {

View File

@ -1,7 +1,7 @@
//! This is the implementation of the `Persister` used to write data from the buffer to object
//! storage.
use crate::PersistedSnapshot;
use crate::PersistedSnapshotVersion;
use crate::paths::ParquetFilePath;
use crate::paths::SnapshotInfoFilePath;
use arrow::datatypes::SchemaRef;
@ -119,7 +119,10 @@ impl Persister {
/// Loads the most recently persisted N snapshot parquet file lists from object storage.
///
/// This is intended to be used on server start.
pub async fn load_snapshots(&self, mut most_recent_n: usize) -> Result<Vec<PersistedSnapshot>> {
pub async fn load_snapshots(
&self,
mut most_recent_n: usize,
) -> Result<Vec<PersistedSnapshotVersion>> {
let mut futures = FuturesOrdered::new();
let mut offset: Option<ObjPath> = None;
@ -165,7 +168,7 @@ impl Persister {
async fn get_snapshot(
location: ObjPath,
object_store: Arc<dyn ObjectStore>,
) -> Result<PersistedSnapshot> {
) -> Result<PersistedSnapshotVersion> {
let bytes = object_store.get(&location).await?.bytes().await?;
serde_json::from_slice(&bytes).map_err(Into::into)
}
@ -201,10 +204,15 @@ impl Persister {
}
/// Persists the snapshot file
pub async fn persist_snapshot(&self, persisted_snapshot: &PersistedSnapshot) -> Result<()> {
pub async fn persist_snapshot(
&self,
persisted_snapshot: &PersistedSnapshotVersion,
) -> Result<()> {
let snapshot_file_path = SnapshotInfoFilePath::new(
self.node_identifier_prefix.as_str(),
persisted_snapshot.snapshot_sequence_number,
match persisted_snapshot {
PersistedSnapshotVersion::V1(ps) => ps.snapshot_sequence_number,
},
);
let json = serde_json::to_vec_pretty(persisted_snapshot)?;
self.object_store
@ -337,7 +345,9 @@ impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{DatabaseTables, ParquetFile, ParquetFileId};
use crate::{
DatabaseTables, ParquetFile, ParquetFileId, PersistedSnapshot, PersistedSnapshotVersion,
};
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_id::{DbId, SerdeVecMap, TableId};
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
@ -357,7 +367,7 @@ mod tests {
let local_disk =
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider);
let info_file = PersistedSnapshot {
let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot {
node_id: "test_host".to_string(),
next_file_id: ParquetFileId::from(0),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
@ -368,7 +378,7 @@ mod tests {
max_time: 1,
row_count: 0,
parquet_size_bytes: 0,
};
});
persister.persist_snapshot(&info_file).await.unwrap();
}
@ -379,7 +389,7 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider);
let info_file = PersistedSnapshot {
let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot {
node_id: "test_host".to_string(),
next_file_id: ParquetFileId::from(0),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
@ -390,8 +400,8 @@ mod tests {
max_time: 1,
row_count: 0,
parquet_size_bytes: 0,
};
let info_file_2 = PersistedSnapshot {
});
let info_file_2 = PersistedSnapshotVersion::V1(PersistedSnapshot {
node_id: "test_host".to_string(),
next_file_id: ParquetFileId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
@ -402,8 +412,8 @@ mod tests {
min_time: 0,
row_count: 0,
parquet_size_bytes: 0,
};
let info_file_3 = PersistedSnapshot {
});
let info_file_3 = PersistedSnapshotVersion::V1(PersistedSnapshot {
node_id: "test_host".to_string(),
next_file_id: ParquetFileId::from(2),
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
@ -414,7 +424,7 @@ mod tests {
max_time: 1,
row_count: 0,
parquet_size_bytes: 0,
};
});
persister.persist_snapshot(&info_file).await.unwrap();
persister.persist_snapshot(&info_file_2).await.unwrap();
@ -423,12 +433,12 @@ mod tests {
let snapshots = persister.load_snapshots(2).await.unwrap();
assert_eq!(snapshots.len(), 2);
// The most recent files are first
assert_eq!(snapshots[0].next_file_id.as_u64(), 2);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 2);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 2);
assert_eq!(snapshots[1].next_file_id.as_u64(), 1);
assert_eq!(snapshots[1].wal_file_sequence_number.as_u64(), 1);
assert_eq!(snapshots[1].snapshot_sequence_number.as_u64(), 1);
assert_eq!(snapshots[0].v1_ref().next_file_id.as_u64(), 2);
assert_eq!(snapshots[0].v1_ref().wal_file_sequence_number.as_u64(), 2);
assert_eq!(snapshots[0].v1_ref().snapshot_sequence_number.as_u64(), 2);
assert_eq!(snapshots[1].v1_ref().next_file_id.as_u64(), 1);
assert_eq!(snapshots[1].v1_ref().wal_file_sequence_number.as_u64(), 1);
assert_eq!(snapshots[1].v1_ref().snapshot_sequence_number.as_u64(), 1);
}
#[tokio::test]
@ -437,7 +447,7 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider);
let info_file = PersistedSnapshot {
let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot {
node_id: "test_host".to_string(),
next_file_id: ParquetFileId::from(0),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
@ -448,12 +458,12 @@ mod tests {
max_time: 1,
row_count: 0,
parquet_size_bytes: 0,
};
});
persister.persist_snapshot(&info_file).await.unwrap();
let snapshots = persister.load_snapshots(2).await.unwrap();
// We asked for the most recent 2 but there should only be 1
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 0);
assert_eq!(snapshots[0].v1_ref().wal_file_sequence_number.as_u64(), 0);
}
#[tokio::test]
@ -464,7 +474,7 @@ mod tests {
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider);
for id in 0..1001 {
let info_file = PersistedSnapshot {
let info_file = PersistedSnapshotVersion::V1(PersistedSnapshot {
node_id: "test_host".to_string(),
next_file_id: ParquetFileId::from(id),
snapshot_sequence_number: SnapshotSequenceNumber::new(id),
@ -475,16 +485,22 @@ mod tests {
max_time: 1,
row_count: 0,
parquet_size_bytes: 0,
};
});
persister.persist_snapshot(&info_file).await.unwrap();
}
let snapshots = persister.load_snapshots(1500).await.unwrap();
// We asked for the most recent 1500 so there should be 1001 of them
assert_eq!(snapshots.len(), 1001);
assert_eq!(snapshots[0].next_file_id.as_u64(), 1000);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 1000);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 1000);
assert_eq!(snapshots[0].catalog_sequence_number.get(), 1000);
assert_eq!(snapshots[0].v1_ref().next_file_id.as_u64(), 1000);
assert_eq!(
snapshots[0].v1_ref().wal_file_sequence_number.as_u64(),
1000
);
assert_eq!(
snapshots[0].v1_ref().snapshot_sequence_number.as_u64(),
1000
);
assert_eq!(snapshots[0].v1_ref().catalog_sequence_number.get(), 1000);
}
#[tokio::test]
@ -521,14 +537,17 @@ mod tests {
max_time: 1,
},
);
persister.persist_snapshot(&info_file).await.unwrap();
persister
.persist_snapshot(&PersistedSnapshotVersion::V1(info_file))
.await
.unwrap();
let snapshots = persister.load_snapshots(10).await.unwrap();
assert_eq!(snapshots.len(), 1);
// Should be the next available id after the largest number
assert_eq!(snapshots[0].next_file_id.as_u64(), 9877);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 0);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 0);
assert_eq!(snapshots[0].catalog_sequence_number.get(), 0);
assert_eq!(snapshots[0].v1_ref().next_file_id.as_u64(), 9877);
assert_eq!(snapshots[0].v1_ref().wal_file_sequence_number.as_u64(), 0);
assert_eq!(snapshots[0].v1_ref().snapshot_sequence_number.as_u64(), 0);
assert_eq!(snapshots[0].v1_ref().catalog_sequence_number.get(), 0);
}
#[tokio::test]
@ -590,7 +609,7 @@ mod tests {
),
]
.into();
let snapshot = PersistedSnapshot {
let snapshot = PersistedSnapshotVersion::V1(PersistedSnapshot {
node_id: "host".to_string(),
next_file_id: ParquetFileId::new(),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
@ -601,7 +620,7 @@ mod tests {
min_time: 0,
max_time: 1,
databases,
};
});
insta::assert_json_snapshot!(snapshot);
}

View File

@ -3,6 +3,7 @@ source: influxdb3_write/src/persister.rs
expression: snapshot
---
{
"version": "1",
"node_id": "host",
"next_file_id": 8,
"snapshot_sequence_number": 0,

View File

@ -10,7 +10,8 @@ pub mod validator;
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, ChunkFilter, DistinctCacheManager,
LastCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
LastCacheManager, ParquetFile, PersistedSnapshot, PersistedSnapshotVersion, Precision,
WriteBuffer, WriteLineError,
chunk::ParquetChunk,
persister::{Persister, PersisterError},
write_buffer::{
@ -197,7 +198,13 @@ impl WriteBufferImpl {
// load snapshots and replay the wal into the in memory buffer
let persisted_snapshots = persister
.load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START)
.await?;
.await?
.into_iter()
// map the persisted snapshots into the newest version
.map(|psv| match psv {
PersistedSnapshotVersion::V1(ps) => ps,
})
.collect::<Vec<PersistedSnapshot>>();
let last_wal_sequence_number = persisted_snapshots
.first()
.map(|s| s.wal_file_sequence_number);
@ -544,7 +551,7 @@ impl Bufferer for WriteBufferImpl {
self.buffer.persisted_parquet_files(db_id, table_id, filter)
}
fn watch_persisted_snapshots(&self) -> Receiver<Option<PersistedSnapshot>> {
fn watch_persisted_snapshots(&self) -> Receiver<Option<PersistedSnapshotVersion>> {
self.buffer.persisted_snapshot_notify_rx()
}
}
@ -1096,8 +1103,8 @@ mod tests {
let persisted = write_buffer.persister.load_snapshots(1000).await.unwrap();
if !persisted.is_empty() {
assert_eq!(persisted.len(), 1);
assert_eq!(persisted[0].min_time, 10000000000);
assert_eq!(persisted[0].row_count, 2);
assert_eq!(persisted[0].v1_ref().min_time, 10000000000);
assert_eq!(persisted[0].v1_ref().row_count, 2);
break;
} else if ticks > 10 {
panic!("not persisting");
@ -1349,7 +1356,8 @@ mod tests {
WalFileSequenceNumber::new(0),
CatalogSequenceNumber::new(0),
);
let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap();
let snapshot_json =
serde_json::to_vec(&PersistedSnapshotVersion::V1(prev_snapshot)).unwrap();
// set ParquetFileId to be 0 so that we can make sure when it's loaded from the
// snapshot that it becomes the expected number
ParquetFileId::from(0).set_next_id();
@ -1485,7 +1493,8 @@ mod tests {
assert_eq!(file.id.as_u64(), i as u64);
}
let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap();
let snapshot_json =
serde_json::to_vec(&PersistedSnapshotVersion::V1(prev_snapshot)).unwrap();
// put the snapshot file in object store:
object_store

View File

@ -3,7 +3,7 @@ use crate::paths::ParquetFilePath;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::table_buffer::TableBuffer;
use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot};
use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot, PersistedSnapshotVersion};
use anyhow::Context;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@ -48,8 +48,8 @@ pub struct QueryableBuffer {
buffer: Arc<RwLock<BufferState>>,
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
/// Sends a notification to this watch channel whenever a snapshot info is persisted
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshotVersion>>,
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshotVersion>>,
}
#[derive(Debug)]
@ -346,9 +346,11 @@ impl QueryableBuffer {
// force_snapshot) snapshot runs, snapshot_tracker will check if
// wal_periods are empty so it won't trigger a snapshot in the first
// place.
let persisted_snapshot = Arc::into_inner(persisted_snapshot)
.expect("Should only have one strong reference")
.into_inner();
let persisted_snapshot = PersistedSnapshotVersion::V1(
Arc::into_inner(persisted_snapshot)
.expect("Should only have one strong reference")
.into_inner(),
);
if !persist_jobs_empty {
loop {
match persister.persist_snapshot(&persisted_snapshot).await {
@ -385,7 +387,7 @@ impl QueryableBuffer {
pub fn persisted_snapshot_notify_rx(
&self,
) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>> {
) -> tokio::sync::watch::Receiver<Option<PersistedSnapshotVersion>> {
self.persisted_snapshot_notify_rx.clone()
}