From 054ac7e8a305defd6765bba7cd9489ec803bdf0f Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 16 Sep 2024 09:49:11 -0400 Subject: [PATCH] feat: add host_id to PersistedSnapshot (#25335) --- influxdb3_write/src/lib.rs | 4 ++++ influxdb3_write/src/persister.rs | 7 +++++++ influxdb3_write/src/write_buffer/mod.rs | 1 + influxdb3_write/src/write_buffer/queryable_buffer.rs | 1 + 4 files changed, 13 insertions(+) diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index c4fd330246..9b3de60ce7 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -161,6 +161,8 @@ pub struct PersistedCatalog { /// The collection of Parquet files that were persisted in a snapshot #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct PersistedSnapshot { + /// The host identifier that persisted this snapshot + pub host_id: String, /// The next file id to be used with `ParquetFile`s when the snapshot is loaded pub next_file_id: ParquetFileId, /// The snapshot sequence number associated with this snapshot @@ -184,11 +186,13 @@ pub struct PersistedSnapshot { impl PersistedSnapshot { pub fn new( + host_id: String, snapshot_sequence_number: SnapshotSequenceNumber, wal_file_sequence_number: WalFileSequenceNumber, catalog_sequence_number: SequenceNumber, ) -> Self { Self { + host_id, next_file_id: ParquetFileId::current(), snapshot_sequence_number, wal_file_sequence_number, diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index dbe5962f49..f86427b7b3 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -474,6 +474,7 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = Persister::new(Arc::new(local_disk), "test_host"); let info_file = PersistedSnapshot { + host_id: "test_host".to_string(), next_file_id: ParquetFileId::from(0), snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), @@ -494,6 +495,7 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = Persister::new(Arc::new(local_disk), "test_host"); let info_file = PersistedSnapshot { + host_id: "test_host".to_string(), next_file_id: ParquetFileId::from(0), snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), @@ -505,6 +507,7 @@ mod tests { parquet_size_bytes: 0, }; let info_file_2 = PersistedSnapshot { + host_id: "test_host".to_string(), next_file_id: ParquetFileId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(1), wal_file_sequence_number: WalFileSequenceNumber::new(1), @@ -516,6 +519,7 @@ mod tests { parquet_size_bytes: 0, }; let info_file_3 = PersistedSnapshot { + host_id: "test_host".to_string(), next_file_id: ParquetFileId::from(2), snapshot_sequence_number: SnapshotSequenceNumber::new(2), wal_file_sequence_number: WalFileSequenceNumber::new(2), @@ -548,6 +552,7 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = Persister::new(Arc::new(local_disk), "test_host"); let info_file = PersistedSnapshot { + host_id: "test_host".to_string(), next_file_id: ParquetFileId::from(0), snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), @@ -573,6 +578,7 @@ mod tests { let persister = Persister::new(Arc::new(local_disk), "test_host"); for id in 0..9001 { let info_file = PersistedSnapshot { + host_id: "test_host".to_string(), next_file_id: ParquetFileId::from(id), snapshot_sequence_number: SnapshotSequenceNumber::new(id), wal_file_sequence_number: WalFileSequenceNumber::new(id), @@ -602,6 +608,7 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = Persister::new(Arc::new(local_disk), "test_host"); let mut info_file = PersistedSnapshot::new( + "test_host".to_string(), SnapshotSequenceNumber::new(0), WalFileSequenceNumber::new(0), SequenceNumber::new(0), diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 0fe9d28747..77d4ced19a 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -1177,6 +1177,7 @@ mod tests { NEXT_FILE_ID.store(500, Ordering::SeqCst); let prev_snapshot_seq = SnapshotSequenceNumber::new(42); let prev_snapshot = PersistedSnapshot::new( + "test_host".to_string(), prev_snapshot_seq, WalFileSequenceNumber::new(0), SequenceNumber::new(0), diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index d202090dd4..57f3a604f3 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -222,6 +222,7 @@ impl QueryableBuffer { ); // persist the individual files, building the snapshot as we go let mut persisted_snapshot = PersistedSnapshot::new( + persister.host_identifier_prefix().to_string(), snapshot_details.snapshot_sequence_number, wal_file_number, catalog.sequence_number(),