refactor: use `SerdeVecMap` in `PersistedSnapshot` (#25541)
* refactor: use SerdeVecMap in PersistedSnapshot This changes from the use of a HashMap to store the DB -> Table structure in the PersistedSnapshot files to using a SerdeVecMap, which will have the identifiers serialized as integers instead of strings. * test: add a snapshot test for persisted snapshotspull/25552/head
parent
814eb31309
commit
2ac3df1bca
|
@ -19,6 +19,7 @@ use datafusion::prelude::Expr;
|
|||
use influxdb3_catalog::catalog::Catalog;
|
||||
use influxdb3_catalog::catalog::CatalogSequenceNumber;
|
||||
use influxdb3_id::ParquetFileId;
|
||||
use influxdb3_id::SerdeVecMap;
|
||||
use influxdb3_id::TableId;
|
||||
use influxdb3_id::{ColumnId, DbId};
|
||||
use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
|
@ -26,7 +27,6 @@ use iox_query::QueryChunk;
|
|||
use iox_time::Time;
|
||||
use last_cache::LastCacheProvider;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
@ -180,7 +180,7 @@ pub struct PersistedSnapshot {
|
|||
pub max_time: i64,
|
||||
/// The collection of databases that had tables persisted in this snapshot. The tables will then have their
|
||||
/// name and the parquet file.
|
||||
pub databases: HashMap<DbId, DatabaseTables>,
|
||||
pub databases: SerdeVecMap<DbId, DatabaseTables>,
|
||||
}
|
||||
|
||||
impl PersistedSnapshot {
|
||||
|
@ -203,7 +203,7 @@ impl PersistedSnapshot {
|
|||
row_count: 0,
|
||||
min_time: i64::MAX,
|
||||
max_time: i64::MIN,
|
||||
databases: HashMap::new(),
|
||||
databases: SerdeVecMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -232,7 +232,7 @@ impl PersistedSnapshot {
|
|||
|
||||
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
|
||||
pub struct DatabaseTables {
|
||||
pub tables: hashbrown::HashMap<TableId, Vec<ParquetFile>>,
|
||||
pub tables: SerdeVecMap<TableId, Vec<ParquetFile>>,
|
||||
}
|
||||
|
||||
/// The summary data for a persisted parquet file in a snapshot.
|
||||
|
@ -256,6 +256,21 @@ impl ParquetFile {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl ParquetFile {
|
||||
pub(crate) fn create_for_test(path: impl Into<String>) -> Self {
|
||||
Self {
|
||||
id: ParquetFileId::new(),
|
||||
path: path.into(),
|
||||
size_bytes: 1024,
|
||||
row_count: 1,
|
||||
chunk_time: 0,
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The precision of the timestamp
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
|
|
|
@ -396,9 +396,9 @@ impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::ParquetFileId;
|
||||
use crate::{DatabaseTables, ParquetFile, ParquetFileId};
|
||||
use influxdb3_catalog::catalog::CatalogSequenceNumber;
|
||||
use influxdb3_id::{ColumnId, DbId, TableId};
|
||||
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
|
||||
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
use object_store::memory::InMemory;
|
||||
use observability_deps::tracing::info;
|
||||
|
@ -407,7 +407,7 @@ mod tests {
|
|||
arrow::array::Int32Array, arrow::datatypes::DataType, arrow::datatypes::Field,
|
||||
arrow::datatypes::Schema, chrono::Utc,
|
||||
datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder,
|
||||
object_store::local::LocalFileSystem, std::collections::HashMap,
|
||||
object_store::local::LocalFileSystem,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -466,7 +466,7 @@ mod tests {
|
|||
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(0),
|
||||
catalog_sequence_number: CatalogSequenceNumber::new(0),
|
||||
databases: HashMap::new(),
|
||||
databases: SerdeVecMap::new(),
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
row_count: 0,
|
||||
|
@ -490,7 +490,7 @@ mod tests {
|
|||
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(0),
|
||||
catalog_sequence_number: CatalogSequenceNumber::default(),
|
||||
databases: HashMap::new(),
|
||||
databases: SerdeVecMap::new(),
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
row_count: 0,
|
||||
|
@ -505,7 +505,7 @@ mod tests {
|
|||
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(1),
|
||||
catalog_sequence_number: CatalogSequenceNumber::default(),
|
||||
databases: HashMap::new(),
|
||||
databases: SerdeVecMap::new(),
|
||||
max_time: 1,
|
||||
min_time: 0,
|
||||
row_count: 0,
|
||||
|
@ -520,7 +520,7 @@ mod tests {
|
|||
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(2),
|
||||
catalog_sequence_number: CatalogSequenceNumber::default(),
|
||||
databases: HashMap::new(),
|
||||
databases: SerdeVecMap::new(),
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
row_count: 0,
|
||||
|
@ -556,7 +556,7 @@ mod tests {
|
|||
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(0),
|
||||
catalog_sequence_number: CatalogSequenceNumber::default(),
|
||||
databases: HashMap::new(),
|
||||
databases: SerdeVecMap::new(),
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
row_count: 0,
|
||||
|
@ -585,7 +585,7 @@ mod tests {
|
|||
snapshot_sequence_number: SnapshotSequenceNumber::new(id),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(id),
|
||||
catalog_sequence_number: CatalogSequenceNumber::new(id as u32),
|
||||
databases: HashMap::new(),
|
||||
databases: SerdeVecMap::new(),
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
row_count: 0,
|
||||
|
@ -654,6 +654,73 @@ mod tests {
|
|||
assert!(snapshots.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persisted_snapshot_structure() {
|
||||
let databases = [
|
||||
(
|
||||
DbId::new(),
|
||||
DatabaseTables {
|
||||
tables: [
|
||||
(
|
||||
TableId::new(),
|
||||
vec![
|
||||
ParquetFile::create_for_test("1.parquet"),
|
||||
ParquetFile::create_for_test("2.parquet"),
|
||||
],
|
||||
),
|
||||
(
|
||||
TableId::new(),
|
||||
vec![
|
||||
ParquetFile::create_for_test("3.parquet"),
|
||||
ParquetFile::create_for_test("4.parquet"),
|
||||
],
|
||||
),
|
||||
]
|
||||
.into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
DbId::new(),
|
||||
DatabaseTables {
|
||||
tables: [
|
||||
(
|
||||
TableId::new(),
|
||||
vec![
|
||||
ParquetFile::create_for_test("5.parquet"),
|
||||
ParquetFile::create_for_test("6.parquet"),
|
||||
],
|
||||
),
|
||||
(
|
||||
TableId::new(),
|
||||
vec![
|
||||
ParquetFile::create_for_test("7.parquet"),
|
||||
ParquetFile::create_for_test("8.parquet"),
|
||||
],
|
||||
),
|
||||
]
|
||||
.into(),
|
||||
},
|
||||
),
|
||||
]
|
||||
.into();
|
||||
let snapshot = PersistedSnapshot {
|
||||
host_id: "host".to_string(),
|
||||
next_file_id: ParquetFileId::new(),
|
||||
next_db_id: DbId::new(),
|
||||
next_table_id: TableId::new(),
|
||||
next_column_id: ColumnId::new(),
|
||||
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(0),
|
||||
catalog_sequence_number: CatalogSequenceNumber::new(0),
|
||||
parquet_size_bytes: 1_024,
|
||||
row_count: 1,
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
databases,
|
||||
};
|
||||
insta::assert_json_snapshot!(snapshot);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_parquet_bytes() {
|
||||
let local_disk =
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
---
|
||||
source: influxdb3_write/src/persister.rs
|
||||
expression: snapshot
|
||||
---
|
||||
{
|
||||
"host_id": "host",
|
||||
"next_file_id": 8,
|
||||
"next_db_id": 2,
|
||||
"next_table_id": 4,
|
||||
"next_column_id": 0,
|
||||
"snapshot_sequence_number": 0,
|
||||
"wal_file_sequence_number": 0,
|
||||
"catalog_sequence_number": 0,
|
||||
"parquet_size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"min_time": 0,
|
||||
"max_time": 1,
|
||||
"databases": [
|
||||
[
|
||||
0,
|
||||
{
|
||||
"tables": [
|
||||
[
|
||||
0,
|
||||
[
|
||||
{
|
||||
"id": 0,
|
||||
"path": "1.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"path": "2.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
}
|
||||
]
|
||||
],
|
||||
[
|
||||
1,
|
||||
[
|
||||
{
|
||||
"id": 2,
|
||||
"path": "3.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"path": "4.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
}
|
||||
]
|
||||
]
|
||||
]
|
||||
}
|
||||
],
|
||||
[
|
||||
1,
|
||||
{
|
||||
"tables": [
|
||||
[
|
||||
2,
|
||||
[
|
||||
{
|
||||
"id": 4,
|
||||
"path": "5.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
},
|
||||
{
|
||||
"id": 5,
|
||||
"path": "6.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
}
|
||||
]
|
||||
],
|
||||
[
|
||||
3,
|
||||
[
|
||||
{
|
||||
"id": 6,
|
||||
"path": "7.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
},
|
||||
{
|
||||
"id": 7,
|
||||
"path": "8.parquet",
|
||||
"size_bytes": 1024,
|
||||
"row_count": 1,
|
||||
"chunk_time": 0,
|
||||
"min_time": 0,
|
||||
"max_time": 1
|
||||
}
|
||||
]
|
||||
]
|
||||
]
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
Loading…
Reference in New Issue