feat: Add u64 id field to ParquetFiles (#25258)

* feat: Add u64 id field to ParquetFiles

This commit does a few things:
1. It adds a u64 id field to ParquetFile
2. It gets this from an AtomicU64 so they can always use the most up to
   date one across threads
3. The last available file id is persisted as part of our snapshot
   process
4. The snapshot when loaded will set the AtomicU64 to the proper value

With this current work on the FileIndex in our Pro version will be able
to utilize these ids while doing compaction and we can refer to them
with a unique u64.

Closes #25250
pull/25260/head
Michael Gattozzi 2024-08-21 14:14:33 -04:00 committed by GitHub
parent 3b174a2f98
commit 0fec72d243
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 115 additions and 2 deletions

View File

@ -1,6 +1,8 @@
use crate::persister::serialize_to_parquet;
use crate::persister::Error;
use crate::ParquetFile;
use crate::ParquetFileId;
use bytes::Bytes;
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::physical_plan::SendableRecordBatchStream;
@ -84,6 +86,7 @@ impl ParquetCache {
files.insert(
path.clone(),
ParquetFile {
id: ParquetFileId::new(),
chunk_time: min_time,
path: path.clone(),
size_bytes,
@ -97,6 +100,7 @@ impl ParquetCache {
HashMap::from([(
path.clone(),
ParquetFile {
id: ParquetFileId::new(),
chunk_time: min_time,
path: path.clone(),
size_bytes,
@ -113,6 +117,7 @@ impl ParquetCache {
HashMap::from([(
path.clone(),
ParquetFile {
id: ParquetFileId::new(),
chunk_time: min_time,
path: path.clone(),
size_bytes,

View File

@ -30,6 +30,8 @@ use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
@ -214,6 +216,8 @@ pub struct PersistedCatalog {
/// The collection of Parquet files that were persisted in a snapshot
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct PersistedSnapshot {
/// The next file id to be used with `ParquetFile`s when the snapshot is loaded
pub next_file_id: ParquetFileId,
/// The snapshot sequence number associated with this snapshot
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// The wal file sequence number that triggered this snapshot
@ -240,6 +244,7 @@ impl PersistedSnapshot {
catalog_sequence_number: SequenceNumber,
) -> Self {
Self {
next_file_id: ParquetFileId::current(),
snapshot_sequence_number,
wal_file_sequence_number,
catalog_sequence_number,
@ -257,6 +262,9 @@ impl PersistedSnapshot {
table_name: Arc<str>,
parquet_file: ParquetFile,
) {
if self.next_file_id < parquet_file.id {
self.next_file_id = ParquetFileId::from(parquet_file.id.as_u64() + 1);
}
self.parquet_size_bytes += parquet_file.size_bytes;
self.row_count += parquet_file.row_count;
self.min_time = self.min_time.min(parquet_file.min_time);
@ -277,9 +285,43 @@ pub struct DatabaseTables {
pub tables: hashbrown::HashMap<Arc<str>, Vec<ParquetFile>>,
}
/// The next file id to be used when persisting `ParquetFile`s
pub static NEXT_FILE_ID: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone, PartialOrd, Ord)]
/// A newtype wrapper for ids used with `ParquetFile`
pub struct ParquetFileId(u64);
impl ParquetFileId {
pub fn new() -> Self {
Self(NEXT_FILE_ID.fetch_add(1, Ordering::SeqCst))
}
pub fn as_u64(&self) -> u64 {
self.0
}
pub fn current() -> Self {
ParquetFileId(NEXT_FILE_ID.load(Ordering::SeqCst))
}
}
impl From<u64> for ParquetFileId {
fn from(value: u64) -> Self {
Self(value)
}
}
impl Default for ParquetFileId {
fn default() -> Self {
Self::new()
}
}
/// The summary data for a persisted parquet file in a snapshot.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct ParquetFile {
pub id: ParquetFileId,
pub path: String,
pub size_bytes: u64,
pub row_count: u64,

View File

@ -362,6 +362,7 @@ impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
#[cfg(test)]
mod tests {
use super::*;
use crate::ParquetFileId;
use influxdb3_catalog::catalog::SequenceNumber;
use influxdb3_wal::SnapshotSequenceNumber;
use object_store::memory::InMemory;
@ -427,6 +428,7 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
let info_file = PersistedSnapshot {
next_file_id: ParquetFileId::from(0),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::new(0),
@ -446,6 +448,7 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
let info_file = PersistedSnapshot {
next_file_id: ParquetFileId::from(0),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
@ -456,6 +459,7 @@ mod tests {
parquet_size_bytes: 0,
};
let info_file_2 = PersistedSnapshot {
next_file_id: ParquetFileId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
wal_file_sequence_number: WalFileSequenceNumber::new(1),
catalog_sequence_number: SequenceNumber::default(),
@ -466,6 +470,7 @@ mod tests {
parquet_size_bytes: 0,
};
let info_file_3 = PersistedSnapshot {
next_file_id: ParquetFileId::from(2),
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
wal_file_sequence_number: WalFileSequenceNumber::new(2),
catalog_sequence_number: SequenceNumber::default(),
@ -483,8 +488,10 @@ mod tests {
let snapshots = persister.load_snapshots(2).await.unwrap();
assert_eq!(snapshots.len(), 2);
// The most recent files are first
assert_eq!(snapshots[0].next_file_id.as_u64(), 2);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 2);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 2);
assert_eq!(snapshots[1].next_file_id.as_u64(), 1);
assert_eq!(snapshots[1].wal_file_sequence_number.as_u64(), 1);
assert_eq!(snapshots[1].snapshot_sequence_number.as_u64(), 1);
}
@ -495,6 +502,7 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
let info_file = PersistedSnapshot {
next_file_id: ParquetFileId::from(0),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
@ -519,6 +527,7 @@ mod tests {
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
for id in 0..9001 {
let info_file = PersistedSnapshot {
next_file_id: ParquetFileId::from(id),
snapshot_sequence_number: SnapshotSequenceNumber::new(id),
wal_file_sequence_number: WalFileSequenceNumber::new(id),
catalog_sequence_number: SequenceNumber::new(id as u32),
@ -533,11 +542,50 @@ mod tests {
let snapshots = persister.load_snapshots(9500).await.unwrap();
// We asked for the most recent 9500 so there should be 9001 of them
assert_eq!(snapshots.len(), 9001);
assert_eq!(snapshots[0].next_file_id.as_u64(), 9000);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 9000);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 9000);
assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 9000);
}
#[tokio::test]
// This test makes sure that the proper next_file_id is used if a parquet file
// is added
async fn persist_add_parquet_file_and_load_snapshot() {
let local_disk =
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk), "test_host");
let mut info_file = PersistedSnapshot::new(
SnapshotSequenceNumber::new(0),
WalFileSequenceNumber::new(0),
SequenceNumber::new(0),
);
info_file.add_parquet_file(
"foo".into(),
"bar".into(),
crate::ParquetFile {
// Use a number that will be bigger than what's created in the
// PersistedSnapshot automatically
id: ParquetFileId::from(9876),
path: "test".into(),
size_bytes: 5,
row_count: 5,
chunk_time: 5,
min_time: 0,
max_time: 1,
},
);
persister.persist_snapshot(&info_file).await.unwrap();
let snapshots = persister.load_snapshots(10).await.unwrap();
assert_eq!(snapshots.len(), 1);
// Should be the next available id after the largest number
assert_eq!(snapshots[0].next_file_id.as_u64(), 9877);
assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 0);
assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 0);
assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 0);
}
#[tokio::test]
async fn load_snapshot_works_with_no_exising_snapshots() {
let store = InMemory::new();

View File

@ -14,7 +14,7 @@ use crate::write_buffer::queryable_buffer::QueryableBuffer;
use crate::write_buffer::validator::WriteValidator;
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, ParquetFile, Persister,
Precision, WriteBuffer, WriteLineError,
Precision, WriteBuffer, WriteLineError, NEXT_FILE_ID,
};
use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError};
@ -38,6 +38,7 @@ use object_store::{ObjectMeta, ObjectStore};
use observability_deps::tracing::{debug, error};
use parquet_file::storage::ParquetExecInput;
use schema::Schema;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
@ -139,6 +140,14 @@ impl<T: TimeProvider> WriteBufferImpl<T> {
let last_snapshot_sequence_number = persisted_snapshots
.first()
.map(|s| s.snapshot_sequence_number);
// Set the next file id to use when persisting ParquetFiles
NEXT_FILE_ID.store(
persisted_snapshots
.first()
.map(|s| s.next_file_id.as_u64())
.unwrap_or(0),
Ordering::SeqCst,
);
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
persisted_snapshots,
));
@ -1171,6 +1180,8 @@ mod tests {
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
// create a snapshot file that will be loaded on initialization of the write buffer:
// Set NEXT_FILE_ID to a non zero number for the snapshot
NEXT_FILE_ID.store(500, Ordering::SeqCst);
let prev_snapshot_seq = SnapshotSequenceNumber::new(42);
let prev_snapshot = PersistedSnapshot::new(
prev_snapshot_seq,
@ -1178,6 +1189,9 @@ mod tests {
SequenceNumber::new(0),
);
let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap();
// set NEXT_FILE_ID to be 0 so that we can make sure when it's loaded from the
// snapshot that it becomes the expected number
NEXT_FILE_ID.store(0, Ordering::SeqCst);
// put the snapshot file in object store:
object_store
@ -1201,6 +1215,9 @@ mod tests {
)
.await;
// Assert that loading the snapshots sets NEXT_FILE_ID to the correct id number
assert_eq!(NEXT_FILE_ID.load(Ordering::SeqCst), 500);
// there should be one snapshot already, i.e., the one we created above:
verify_snapshot_count(1, &wbuf.persister).await;
// there aren't any catalogs yet:

View File

@ -4,7 +4,7 @@ use crate::paths::ParquetFilePath;
use crate::persister::PersisterImpl;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::table_buffer::TableBuffer;
use crate::{ParquetFile, PersistedSnapshot, Persister};
use crate::{ParquetFile, ParquetFileId, PersistedSnapshot, Persister};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::{
@ -245,6 +245,7 @@ impl QueryableBuffer {
database_name,
table_name,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes,
row_count: meta.num_rows as u64,