diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index f7a05d26b1..21649b8385 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -19,6 +19,7 @@ use datafusion::prelude::Expr; use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_id::ParquetFileId; +use influxdb3_id::SerdeVecMap; use influxdb3_id::TableId; use influxdb3_id::{ColumnId, DbId}; use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber}; @@ -26,7 +27,6 @@ use iox_query::QueryChunk; use iox_time::Time; use last_cache::LastCacheProvider; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; @@ -180,7 +180,7 @@ pub struct PersistedSnapshot { pub max_time: i64, /// The collection of databases that had tables persisted in this snapshot. The tables will then have their /// name and the parquet file. - pub databases: HashMap, + pub databases: SerdeVecMap, } impl PersistedSnapshot { @@ -203,7 +203,7 @@ impl PersistedSnapshot { row_count: 0, min_time: i64::MAX, max_time: i64::MIN, - databases: HashMap::new(), + databases: SerdeVecMap::new(), } } @@ -232,7 +232,7 @@ impl PersistedSnapshot { #[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)] pub struct DatabaseTables { - pub tables: hashbrown::HashMap>, + pub tables: SerdeVecMap>, } /// The summary data for a persisted parquet file in a snapshot. @@ -256,6 +256,21 @@ impl ParquetFile { } } +#[cfg(test)] +impl ParquetFile { + pub(crate) fn create_for_test(path: impl Into) -> Self { + Self { + id: ParquetFileId::new(), + path: path.into(), + size_bytes: 1024, + row_count: 1, + chunk_time: 0, + min_time: 0, + max_time: 1, + } + } +} + /// The precision of the timestamp #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index c8cdee4801..dc714a60d8 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -396,9 +396,9 @@ impl TrackedMemoryArrowWriter { #[cfg(test)] mod tests { use super::*; - use crate::ParquetFileId; + use crate::{DatabaseTables, ParquetFile, ParquetFileId}; use influxdb3_catalog::catalog::CatalogSequenceNumber; - use influxdb3_id::{ColumnId, DbId, TableId}; + use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; use object_store::memory::InMemory; use observability_deps::tracing::info; @@ -407,7 +407,7 @@ mod tests { arrow::array::Int32Array, arrow::datatypes::DataType, arrow::datatypes::Field, arrow::datatypes::Schema, chrono::Utc, datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder, - object_store::local::LocalFileSystem, std::collections::HashMap, + object_store::local::LocalFileSystem, }; #[tokio::test] @@ -466,7 +466,7 @@ mod tests { snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), catalog_sequence_number: CatalogSequenceNumber::new(0), - databases: HashMap::new(), + databases: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -490,7 +490,7 @@ mod tests { snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), catalog_sequence_number: CatalogSequenceNumber::default(), - databases: HashMap::new(), + databases: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -505,7 +505,7 @@ mod tests { snapshot_sequence_number: SnapshotSequenceNumber::new(1), wal_file_sequence_number: WalFileSequenceNumber::new(1), catalog_sequence_number: CatalogSequenceNumber::default(), - databases: HashMap::new(), + databases: SerdeVecMap::new(), max_time: 1, min_time: 0, row_count: 0, @@ -520,7 +520,7 @@ mod tests { snapshot_sequence_number: SnapshotSequenceNumber::new(2), wal_file_sequence_number: WalFileSequenceNumber::new(2), catalog_sequence_number: CatalogSequenceNumber::default(), - databases: HashMap::new(), + databases: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -556,7 +556,7 @@ mod tests { snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), catalog_sequence_number: CatalogSequenceNumber::default(), - databases: HashMap::new(), + databases: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -585,7 +585,7 @@ mod tests { snapshot_sequence_number: SnapshotSequenceNumber::new(id), wal_file_sequence_number: WalFileSequenceNumber::new(id), catalog_sequence_number: CatalogSequenceNumber::new(id as u32), - databases: HashMap::new(), + databases: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -654,6 +654,73 @@ mod tests { assert!(snapshots.is_empty()); } + #[test] + fn persisted_snapshot_structure() { + let databases = [ + ( + DbId::new(), + DatabaseTables { + tables: [ + ( + TableId::new(), + vec![ + ParquetFile::create_for_test("1.parquet"), + ParquetFile::create_for_test("2.parquet"), + ], + ), + ( + TableId::new(), + vec![ + ParquetFile::create_for_test("3.parquet"), + ParquetFile::create_for_test("4.parquet"), + ], + ), + ] + .into(), + }, + ), + ( + DbId::new(), + DatabaseTables { + tables: [ + ( + TableId::new(), + vec![ + ParquetFile::create_for_test("5.parquet"), + ParquetFile::create_for_test("6.parquet"), + ], + ), + ( + TableId::new(), + vec![ + ParquetFile::create_for_test("7.parquet"), + ParquetFile::create_for_test("8.parquet"), + ], + ), + ] + .into(), + }, + ), + ] + .into(); + let snapshot = PersistedSnapshot { + host_id: "host".to_string(), + next_file_id: ParquetFileId::new(), + next_db_id: DbId::new(), + next_table_id: TableId::new(), + next_column_id: ColumnId::new(), + snapshot_sequence_number: SnapshotSequenceNumber::new(0), + wal_file_sequence_number: WalFileSequenceNumber::new(0), + catalog_sequence_number: CatalogSequenceNumber::new(0), + parquet_size_bytes: 1_024, + row_count: 1, + min_time: 0, + max_time: 1, + databases, + }; + insta::assert_json_snapshot!(snapshot); + } + #[tokio::test] async fn get_parquet_bytes() { let local_disk = diff --git a/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap b/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap new file mode 100644 index 0000000000..6df66c1e61 --- /dev/null +++ b/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap @@ -0,0 +1,126 @@ +--- +source: influxdb3_write/src/persister.rs +expression: snapshot +--- +{ + "host_id": "host", + "next_file_id": 8, + "next_db_id": 2, + "next_table_id": 4, + "next_column_id": 0, + "snapshot_sequence_number": 0, + "wal_file_sequence_number": 0, + "catalog_sequence_number": 0, + "parquet_size_bytes": 1024, + "row_count": 1, + "min_time": 0, + "max_time": 1, + "databases": [ + [ + 0, + { + "tables": [ + [ + 0, + [ + { + "id": 0, + "path": "1.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + }, + { + "id": 1, + "path": "2.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + } + ] + ], + [ + 1, + [ + { + "id": 2, + "path": "3.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + }, + { + "id": 3, + "path": "4.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + } + ] + ] + ] + } + ], + [ + 1, + { + "tables": [ + [ + 2, + [ + { + "id": 4, + "path": "5.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + }, + { + "id": 5, + "path": "6.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + } + ] + ], + [ + 3, + [ + { + "id": 6, + "path": "7.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + }, + { + "id": 7, + "path": "8.parquet", + "size_bytes": 1024, + "row_count": 1, + "chunk_time": 0, + "min_time": 0, + "max_time": 1 + } + ] + ] + ] + } + ] + ] +}