fix: port fix for serialization from enterprise

- fixes v3 catalog files are written with v3 version headers
- uses associated constants to tie version headers with their respective
  types

enterprise PR: https://github.com/influxdata/influxdb_pro/pull/873
feat/port-v3-serialization-fix
Praveen Kumar 2025-05-28 10:03:28 +01:00
parent cbf5e3a806
commit c503e66a43
No known key found for this signature in database
GPG Key ID: CB9E05780A79EA5A
9 changed files with 178 additions and 146 deletions

View File

@ -2137,7 +2137,7 @@ mod tests {
use crate::{ use crate::{
log::{FieldDataType, LastCacheSize, LastCacheTtl, MaxAge, MaxCardinality, create}, log::{FieldDataType, LastCacheSize, LastCacheTtl, MaxAge, MaxCardinality, create},
object_store::CatalogFilePath, object_store::CatalogFilePath,
serialize::{serialize_catalog_snapshot, verify_and_deserialize_catalog_checkpoint_file}, serialize::{serialize_catalog_file, verify_and_deserialize_catalog_checkpoint_file},
}; };
use super::*; use super::*;
@ -2192,7 +2192,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
// Serialize/deserialize to ensure roundtrip // Serialize/deserialize to ensure roundtrip
let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); let serialized = serialize_catalog_file(&snapshot).unwrap();
let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ;
insta::assert_json_snapshot!(snapshot, { insta::assert_json_snapshot!(snapshot, {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
@ -2322,7 +2322,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
// Serialize/deserialize to ensure roundtrip // Serialize/deserialize to ensure roundtrip
let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); let serialized = serialize_catalog_file(&snapshot).unwrap();
let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ;
insta::assert_json_snapshot!(snapshot, { insta::assert_json_snapshot!(snapshot, {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
@ -2369,7 +2369,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
// Serialize/deserialize to ensure roundtrip // Serialize/deserialize to ensure roundtrip
let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); let serialized = serialize_catalog_file(&snapshot).unwrap();
let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ;
insta::assert_json_snapshot!(snapshot, { insta::assert_json_snapshot!(snapshot, {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
@ -2415,7 +2415,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
// Serialize/deserialize to ensure roundtrip // Serialize/deserialize to ensure roundtrip
let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); let serialized = serialize_catalog_file(&snapshot).unwrap();
let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ;
insta::assert_json_snapshot!(snapshot, { insta::assert_json_snapshot!(snapshot, {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"

View File

@ -25,7 +25,7 @@ use hashbrown::HashMap;
use influxdb3_id::{ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TriggerId}; use influxdb3_id::{ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TriggerId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{CatalogError, Result, catalog::CatalogSequenceNumber}; use crate::{CatalogError, Result, catalog::CatalogSequenceNumber, serialize::VersionedFileType};
mod conversion; mod conversion;
@ -58,6 +58,10 @@ pub(crate) struct OrderedCatalogBatch {
pub(crate) sequence_number: CatalogSequenceNumber, pub(crate) sequence_number: CatalogSequenceNumber,
} }
impl VersionedFileType for OrderedCatalogBatch {
const VERSION_ID: [u8; 10] = *b"idb3.001.l";
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub(crate) enum NodeCatalogOp { pub(crate) enum NodeCatalogOp {
RegisterNode(RegisterNodeLog), RegisterNode(RegisterNodeLog),

View File

@ -21,7 +21,7 @@ use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::{CatalogError, Result, catalog::CatalogSequenceNumber}; use crate::{CatalogError, Result, catalog::CatalogSequenceNumber, serialize::VersionedFileType};
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum CatalogBatch { pub enum CatalogBatch {
@ -115,6 +115,10 @@ pub struct OrderedCatalogBatch {
pub(crate) sequence_number: CatalogSequenceNumber, pub(crate) sequence_number: CatalogSequenceNumber,
} }
impl VersionedFileType for OrderedCatalogBatch {
const VERSION_ID: [u8; 10] = *b"idb3.002.l";
}
impl OrderedCatalogBatch { impl OrderedCatalogBatch {
pub fn new(catalog: CatalogBatch, sequence_number: CatalogSequenceNumber) -> Self { pub fn new(catalog: CatalogBatch, sequence_number: CatalogSequenceNumber) -> Self {
Self { Self {

View File

@ -22,7 +22,7 @@ use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::{CatalogError, Result, catalog::CatalogSequenceNumber}; use crate::{CatalogError, Result, catalog::CatalogSequenceNumber, serialize::VersionedFileType};
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum CatalogBatch { pub enum CatalogBatch {
@ -116,6 +116,10 @@ pub struct OrderedCatalogBatch {
pub(crate) sequence_number: CatalogSequenceNumber, pub(crate) sequence_number: CatalogSequenceNumber,
} }
impl VersionedFileType for OrderedCatalogBatch {
const VERSION_ID: [u8; 10] = *b"idb3.003.l";
}
impl OrderedCatalogBatch { impl OrderedCatalogBatch {
pub fn new(catalog: CatalogBatch, sequence_number: CatalogSequenceNumber) -> Self { pub fn new(catalog: CatalogBatch, sequence_number: CatalogSequenceNumber) -> Self {
Self { Self {

View File

@ -16,9 +16,7 @@ use crate::snapshot::versions::Snapshot;
use crate::{ use crate::{
catalog::CatalogSequenceNumber, catalog::CatalogSequenceNumber,
log::OrderedCatalogBatch, log::OrderedCatalogBatch,
serialize::{ serialize::{serialize_catalog_file, verify_and_deserialize_catalog_file},
serialize_catalog_log, serialize_catalog_snapshot, verify_and_deserialize_catalog_file,
},
}; };
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -211,7 +209,7 @@ impl ObjectStoreCatalog {
) -> Result<PersistCatalogResult> { ) -> Result<PersistCatalogResult> {
let catalog_path = CatalogFilePath::log(&self.prefix, batch.sequence_number()); let catalog_path = CatalogFilePath::log(&self.prefix, batch.sequence_number());
let content = serialize_catalog_log(batch).context("failed to serialize catalog batch")?; let content = serialize_catalog_file(batch).context("failed to serialize catalog batch")?;
self.catalog_update_if_not_exists(catalog_path, content) self.catalog_update_if_not_exists(catalog_path, content)
.await .await
@ -228,7 +226,7 @@ impl ObjectStoreCatalog {
let catalog_path = CatalogFilePath::checkpoint(&self.prefix); let catalog_path = CatalogFilePath::checkpoint(&self.prefix);
let content = let content =
serialize_catalog_snapshot(snapshot).context("failed to serialize catalog snapshot")?; serialize_catalog_file(snapshot).context("failed to serialize catalog snapshot")?;
// NOTE: not sure if this should be done in a loop, i.e., what error variants from // NOTE: not sure if this should be done in a loop, i.e., what error variants from
// the object store would warrant a retry. // the object store would warrant a retry.
@ -270,7 +268,7 @@ impl ObjectStoreCatalog {
let catalog_path = CatalogFilePath::checkpoint(&self.prefix); let catalog_path = CatalogFilePath::checkpoint(&self.prefix);
let content = let content =
serialize_catalog_snapshot(snapshot).context("failed to serialize catalog snapshot")?; serialize_catalog_file(snapshot).context("failed to serialize catalog snapshot")?;
let store = Arc::clone(&self.store); let store = Arc::clone(&self.store);

View File

@ -3,10 +3,11 @@ use std::io::Cursor;
use anyhow::Context; use anyhow::Context;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use serde::Serialize;
use crate::{ use crate::{
CatalogError, Result, CatalogError, Result,
log::{self, OrderedCatalogBatch}, log::{self},
snapshot::{self, CatalogSnapshot}, snapshot::{self, CatalogSnapshot},
}; };
@ -15,11 +16,16 @@ const CHECKSUM_LEN: usize = size_of::<u32>();
pub fn verify_and_deserialize_catalog_file( pub fn verify_and_deserialize_catalog_file(
bytes: Bytes, bytes: Bytes,
) -> Result<log::versions::v3::OrderedCatalogBatch> { ) -> Result<log::versions::v3::OrderedCatalogBatch> {
if bytes.starts_with(LOG_FILE_TYPE_IDENTIFIER_V1) { let version_id: &[u8; 10] = bytes.first_chunk().ok_or(CatalogError::unexpected(
"file must contain at least 10 bytes",
))?;
match *version_id {
// Version 1 uses the `bitcode` crate for serialization/deserialization
log::versions::v1::OrderedCatalogBatch::VERSION_ID => {
// V1 Deserialization: // V1 Deserialization:
let id_len = LOG_FILE_TYPE_IDENTIFIER_V1.len(); let checksum = bytes.slice(10..10 + CHECKSUM_LEN);
let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); let data = bytes.slice(10 + CHECKSUM_LEN..);
let data = bytes.slice(id_len + CHECKSUM_LEN..);
verify_checksum(&checksum, &data)?; verify_checksum(&checksum, &data)?;
let log = bitcode::deserialize::<log::versions::v1::OrderedCatalogBatch>(&data) let log = bitcode::deserialize::<log::versions::v1::OrderedCatalogBatch>(&data)
.context("failed to deserialize v1 catalog log file contents")?; .context("failed to deserialize v1 catalog log file contents")?;
@ -30,60 +36,76 @@ pub fn verify_and_deserialize_catalog_file(
let log_v2: log::versions::v2::OrderedCatalogBatch = log.into(); let log_v2: log::versions::v2::OrderedCatalogBatch = log.into();
let log_v3: log::versions::v3::OrderedCatalogBatch = log_v2.into(); let log_v3: log::versions::v3::OrderedCatalogBatch = log_v2.into();
Ok(log_v3) Ok(log_v3)
} else if bytes.starts_with(LOG_FILE_TYPE_IDENTIFIER_V2) { }
// Version 2 uses the `serde_json` crate for serialization/deserialization
log::versions::v2::OrderedCatalogBatch::VERSION_ID => {
// V2 Deserialization: // V2 Deserialization:
let id_len = LOG_FILE_TYPE_IDENTIFIER_V2.len(); let checksum = bytes.slice(10..10 + CHECKSUM_LEN);
let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); let data = bytes.slice(10 + CHECKSUM_LEN..);
let data = bytes.slice(id_len + CHECKSUM_LEN..);
verify_checksum(&checksum, &data)?; verify_checksum(&checksum, &data)?;
let log = serde_json::from_slice::<log::versions::v2::OrderedCatalogBatch>(&data) let log = serde_json::from_slice::<log::versions::v2::OrderedCatalogBatch>(&data)
.context("failed to deserialize v2 catalog log file contents")?; .context("failed to deserialize v2 catalog log file contents")?;
Ok(log.into()) Ok(log.into())
} else if bytes.starts_with(LOG_FILE_TYPE_IDENTIFIER_V3) { }
// Version 3 added a conversion function to map db:*:write tokens to be db:*:create,write
// tokens, and because it's a one time migration it relies on the version in file the
// version has to be updated.
log::versions::v3::OrderedCatalogBatch::VERSION_ID => {
// V3 Deserialization: // V3 Deserialization:
let id_len = LOG_FILE_TYPE_IDENTIFIER_V3.len(); let checksum = bytes.slice(10..10 + CHECKSUM_LEN);
let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); let data = bytes.slice(10 + CHECKSUM_LEN..);
let data = bytes.slice(id_len + CHECKSUM_LEN..);
verify_checksum(&checksum, &data)?; verify_checksum(&checksum, &data)?;
let log = serde_json::from_slice::<log::versions::v3::OrderedCatalogBatch>(&data) let log = serde_json::from_slice::<log::versions::v3::OrderedCatalogBatch>(&data)
.context("failed to deserialize v3 catalog log file contents")?; .context("failed to deserialize v3 catalog log file contents")?;
Ok(log) Ok(log)
} else { }
Err(CatalogError::unexpected("unrecognized catalog file format")) _ => Err(CatalogError::unexpected("unrecognized catalog file format")),
} }
} }
pub fn verify_and_deserialize_catalog_checkpoint_file(bytes: Bytes) -> Result<CatalogSnapshot> { pub fn verify_and_deserialize_catalog_checkpoint_file(bytes: Bytes) -> Result<CatalogSnapshot> {
if bytes.starts_with(SNAPSHOT_FILE_TYPE_IDENTIFIER_V1) { let version_id: &[u8; 10] = bytes.first_chunk().ok_or(CatalogError::unexpected(
let id_len = SNAPSHOT_FILE_TYPE_IDENTIFIER_V1.len(); "file must contain at least 10 bytes",
let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); ))?;
let data = bytes.slice(id_len + CHECKSUM_LEN..);
match *version_id {
// Version 1 uses the `bitcode` crate for serialization/deserialization
snapshot::versions::v1::CatalogSnapshot::VERSION_ID => {
let checksum = bytes.slice(10..10 + CHECKSUM_LEN);
let data = bytes.slice(10 + CHECKSUM_LEN..);
verify_checksum(&checksum, &data)?; verify_checksum(&checksum, &data)?;
let snapshot = bitcode::deserialize::<snapshot::versions::v1::CatalogSnapshot>(&data) let snapshot = bitcode::deserialize::<snapshot::versions::v1::CatalogSnapshot>(&data);
.context("failed to deserialize catalog snapshot file contents")?; let snapshot =
snapshot.context("failed to deserialize v1 catalog snapshot file contents")?;
let snapshot_v2: snapshot::versions::v2::CatalogSnapshot = snapshot.into(); let snapshot_v2: snapshot::versions::v2::CatalogSnapshot = snapshot.into();
let snapshot_v3: snapshot::versions::v3::CatalogSnapshot = snapshot_v2.into(); let snapshot_v3: snapshot::versions::v3::CatalogSnapshot = snapshot_v2.into();
Ok(snapshot_v3) Ok(snapshot_v3)
} else if bytes.starts_with(SNAPSHOT_FILE_TYPE_IDENTIFIER_V2) { }
let id_len = SNAPSHOT_FILE_TYPE_IDENTIFIER_V2.len(); // Version 2 uses the `serde_json` crate for serialization/deserialization
let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); snapshot::versions::v2::CatalogSnapshot::VERSION_ID => {
let data = bytes.slice(id_len + CHECKSUM_LEN..); let checksum = bytes.slice(10..10 + CHECKSUM_LEN);
let data = bytes.slice(10 + CHECKSUM_LEN..);
verify_checksum(&checksum, &data)?; verify_checksum(&checksum, &data)?;
let snapshot: snapshot::versions::v2::CatalogSnapshot = serde_json::from_slice(&data) let snapshot: snapshot::versions::v2::CatalogSnapshot =
.context("failed to deserialize catalog snapshot file contents")?; serde_json::from_slice(&data)
.context("failed to deserialize v2 catalog snapshot file contents")?;
Ok(snapshot.into()) Ok(snapshot.into())
} else if bytes.starts_with(SNAPSHOT_FILE_TYPE_IDENTIFIER_V3) { }
let id_len = SNAPSHOT_FILE_TYPE_IDENTIFIER_V3.len(); // Version 3 added a conversion function to map db:*:write tokens to be db:*:create,write
let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); // tokens, and because it's a one time migration it relies on the version in file the
let data = bytes.slice(id_len + CHECKSUM_LEN..); // version has to be updated.
snapshot::versions::v3::CatalogSnapshot::VERSION_ID => {
let checksum = bytes.slice(10..10 + CHECKSUM_LEN);
let data = bytes.slice(10 + CHECKSUM_LEN..);
verify_checksum(&checksum, &data)?; verify_checksum(&checksum, &data)?;
let snapshot: snapshot::versions::v3::CatalogSnapshot = serde_json::from_slice(&data) let snapshot: snapshot::versions::v3::CatalogSnapshot =
.context("failed to deserialize catalog snapshot file contents")?; serde_json::from_slice(&data)
.context("failed to deserialize v3 catalog snapshot file contents")?;
Ok(snapshot) Ok(snapshot)
} else { }
Err(CatalogError::unexpected( _ => Err(CatalogError::unexpected(
"unrecognized catalog checkpoint file format", "unrecognized catalog checkpoint file format",
)) )),
} }
} }
@ -103,34 +125,21 @@ fn verify_checksum(checksum: &[u8], data: &[u8]) -> Result<()> {
Ok(()) Ok(())
} }
/// Version 1 uses the `bitcode` crate for serialization/deserialization pub trait VersionedFileType {
const LOG_FILE_TYPE_IDENTIFIER_V1: &[u8] = b"idb3.001.l"; const VERSION_ID: [u8; 10];
/// Version 2 uses the `serde_json` crate for serialization/deserialization
const LOG_FILE_TYPE_IDENTIFIER_V2: &[u8] = b"idb3.002.l";
/// Version 3 introduced to migration db write permission in pro
const LOG_FILE_TYPE_IDENTIFIER_V3: &[u8] = b"idb3.003.l";
pub fn serialize_catalog_log(log: &OrderedCatalogBatch) -> Result<Bytes> {
let mut buf = BytesMut::new();
buf.extend_from_slice(LOG_FILE_TYPE_IDENTIFIER_V2);
let data = serde_json::to_vec(log).context("failed to serialize catalog log file")?;
Ok(hash_and_freeze(buf, data))
} }
/// Version 1 uses the `bitcode` crate for serialization/deserialization pub fn serialize_catalog_file<T: Serialize + VersionedFileType>(file: &T) -> Result<Bytes> {
const SNAPSHOT_FILE_TYPE_IDENTIFIER_V1: &[u8] = b"idb3.001.s";
/// Version 2 uses the `serde_json` crate for serialization/deserialization
const SNAPSHOT_FILE_TYPE_IDENTIFIER_V2: &[u8] = b"idb3.002.s";
/// Version 3 introduced to migration db write permission in pro
const SNAPSHOT_FILE_TYPE_IDENTIFIER_V3: &[u8] = b"idb3.003.s";
pub fn serialize_catalog_snapshot(snapshot: &CatalogSnapshot) -> Result<Bytes> {
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
buf.extend_from_slice(SNAPSHOT_FILE_TYPE_IDENTIFIER_V2); buf.extend_from_slice(&T::VERSION_ID);
let data = serde_json::to_vec(snapshot).context("failed to serialize catalog snapshot file")?; let data = match T::VERSION_ID {
snapshot::versions::v1::CatalogSnapshot::VERSION_ID
| log::versions::v1::OrderedCatalogBatch::VERSION_ID => {
bitcode::serialize(file).context("failed to serialize catalog file")?
}
_ => serde_json::to_vec(file).context("failed to serialize catalog file")?,
};
Ok(hash_and_freeze(buf, data)) Ok(hash_and_freeze(buf, data))
} }
@ -151,7 +160,6 @@ fn hash_and_freeze(mut buf: BytesMut, data: Vec<u8>) -> Bytes {
mod v1_tests { mod v1_tests {
use std::time::Duration; use std::time::Duration;
use bytes::{Bytes, BytesMut};
use influxdb3_id::{ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TriggerId}; use influxdb3_id::{ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TriggerId};
use crate::{ use crate::{
@ -167,27 +175,15 @@ mod v1_tests {
TriggerDefinition, TriggerIdentifier, TriggerSettings, TriggerDefinition, TriggerIdentifier, TriggerSettings,
}, },
}, },
snapshot::{ serialize::VersionedFileType,
self, snapshot::versions::v1::{CatalogSnapshot, test_util::Generate},
versions::v1::{CatalogSnapshot, test_util::Generate},
},
}; };
use super::{ use super::{
LOG_FILE_TYPE_IDENTIFIER_V1, SNAPSHOT_FILE_TYPE_IDENTIFIER_V1, hash_and_freeze, serialize_catalog_file, verify_and_deserialize_catalog_checkpoint_file,
verify_and_deserialize_catalog_checkpoint_file, verify_and_deserialize_catalog_file, verify_and_deserialize_catalog_file,
}; };
/// Method that uses v1 serialization logic for catalog files
fn serialize_catalog_log_v1(log: &log::versions::v1::OrderedCatalogBatch) -> Bytes {
let mut buf = BytesMut::new();
buf.extend_from_slice(LOG_FILE_TYPE_IDENTIFIER_V1);
let data = bitcode::serialize(log).expect("failed to serialize catalog log file");
hash_and_freeze(buf, data)
}
/// Test that uses the main `verify_and_deserialize_catalog_file` method which deseriales a /// Test that uses the main `verify_and_deserialize_catalog_file` method which deseriales a
/// versioned catalog file into the latest version, to test round-trip serialize/deserialize /// versioned catalog file into the latest version, to test round-trip serialize/deserialize
/// v1 catalog log files. /// v1 catalog log files.
@ -196,7 +192,8 @@ mod v1_tests {
#[test] #[test]
fn test_deserialize_catalog_file_from_v1() { fn test_deserialize_catalog_file_from_v1() {
// test a node log file: // test a node log file:
verify_and_deserialize_catalog_file(serialize_catalog_log_v1(&OrderedCatalogBatch { verify_and_deserialize_catalog_file(
serialize_catalog_file(&OrderedCatalogBatch {
catalog_batch: log::versions::v1::CatalogBatch::Node(NodeBatch { catalog_batch: log::versions::v1::CatalogBatch::Node(NodeBatch {
time_ns: 0, time_ns: 0,
node_catalog_id: NodeId::new(0), node_catalog_id: NodeId::new(0),
@ -210,11 +207,13 @@ mod v1_tests {
})], })],
}), }),
sequence_number: CatalogSequenceNumber::new(0), sequence_number: CatalogSequenceNumber::new(0),
})) })
.expect("must be able to serialize"),
)
.expect("deserialize from v1"); .expect("deserialize from v1");
// test a database log file: // test a database log file:
verify_and_deserialize_catalog_file(serialize_catalog_log_v1(&OrderedCatalogBatch { let log_v1 = OrderedCatalogBatch {
catalog_batch: log::versions::v1::CatalogBatch::Database(DatabaseBatch { catalog_batch: log::versions::v1::CatalogBatch::Database(DatabaseBatch {
time_ns: 0, time_ns: 0,
database_id: DbId::new(0), database_id: DbId::new(0),
@ -487,25 +486,33 @@ mod v1_tests {
], ],
}), }),
sequence_number: CatalogSequenceNumber::new(0), sequence_number: CatalogSequenceNumber::new(0),
})) };
verify_and_deserialize_catalog_file(
serialize_catalog_file(&log_v1).expect("must be able to serialize"),
)
.expect("deserialize from v1 to latest"); .expect("deserialize from v1 to latest");
} }
/// Method that uses v1 serialization logic for catalog checkpoint/snapshot files #[test]
fn serialize_catalog_snapshot_v1(snapshot: &snapshot::versions::v1::CatalogSnapshot) -> Bytes { fn test_serialize_catalog_snapshot_with_latest_identifier() {
let mut buf = BytesMut::new(); let snapshot = crate::snapshot::versions::v1::CatalogSnapshot::generate();
buf.extend_from_slice(SNAPSHOT_FILE_TYPE_IDENTIFIER_V1); let snapshot: crate::snapshot::versions::v2::CatalogSnapshot = snapshot.into();
let snapshot: crate::snapshot::CatalogSnapshot = snapshot.into();
let serialized = super::serialize_catalog_file(&snapshot)
.expect("must be able to serialize generated snapshot");
let data = bitcode::serialize(snapshot).expect("failed to serialize catalog snapshot file"); assert!(
serialized.starts_with(&crate::snapshot::CatalogSnapshot::VERSION_ID),
hash_and_freeze(buf, data) "serialized catalog log file must always start with latest verison identifier"
);
} }
#[test] #[test]
fn test_deserialize_catalog_checkpoint_file_from_v1() { fn test_deserialize_catalog_checkpoint_file_from_v1() {
verify_and_deserialize_catalog_checkpoint_file(serialize_catalog_snapshot_v1( let result = verify_and_deserialize_catalog_checkpoint_file(
&CatalogSnapshot::generate(), serialize_catalog_file(&CatalogSnapshot::generate())
)) .expect("must be able to serialize"),
.expect("deserialize from v1"); );
result.expect("deserialize from v1");
} }
} }

View File

@ -16,6 +16,7 @@ use crate::{
log::versions::v1::{ log::versions::v1::{
MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition, MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition,
}, },
serialize::VersionedFileType,
}; };
use arrow::datatypes::DataType as ArrowDataType; use arrow::datatypes::DataType as ArrowDataType;
use hashbrown::HashMap; use hashbrown::HashMap;
@ -39,6 +40,10 @@ pub(crate) struct CatalogSnapshot {
catalog_uuid: Uuid, catalog_uuid: Uuid,
} }
impl VersionedFileType for CatalogSnapshot {
const VERSION_ID: [u8; 10] = *b"idb3.001.s";
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub(crate) struct NodeSnapshot { pub(crate) struct NodeSnapshot {
node_id: Arc<str>, node_id: Arc<str>,

View File

@ -4,6 +4,7 @@ use crate::catalog::CatalogSequenceNumber;
use crate::log::versions::v2::{ use crate::log::versions::v2::{
MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition, MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition,
}; };
use crate::serialize::VersionedFileType;
use arrow::datatypes::DataType as ArrowDataType; use arrow::datatypes::DataType as ArrowDataType;
use hashbrown::HashMap; use hashbrown::HashMap;
use influxdb3_id::{ use influxdb3_id::{
@ -26,6 +27,10 @@ pub struct CatalogSnapshot {
pub(crate) catalog_uuid: Uuid, pub(crate) catalog_uuid: Uuid,
} }
impl VersionedFileType for CatalogSnapshot {
const VERSION_ID: [u8; 10] = *b"idb3.002.s";
}
#[derive(Debug, Serialize, Deserialize, Default)] #[derive(Debug, Serialize, Deserialize, Default)]
pub(crate) struct TokenInfoSnapshot { pub(crate) struct TokenInfoSnapshot {
id: TokenId, id: TokenId,

View File

@ -5,6 +5,7 @@ use crate::catalog::CatalogSequenceNumber;
use crate::log::{ use crate::log::{
MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition, MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition,
}; };
use crate::serialize::VersionedFileType;
use arrow::datatypes::DataType as ArrowDataType; use arrow::datatypes::DataType as ArrowDataType;
use hashbrown::HashMap; use hashbrown::HashMap;
use influxdb3_id::{ use influxdb3_id::{
@ -27,6 +28,10 @@ pub struct CatalogSnapshot {
pub(crate) catalog_uuid: Uuid, pub(crate) catalog_uuid: Uuid,
} }
impl VersionedFileType for CatalogSnapshot {
const VERSION_ID: [u8; 10] = *b"idb3.003.s";
}
impl CatalogSnapshot { impl CatalogSnapshot {
pub(crate) fn sequence_number(&self) -> CatalogSequenceNumber { pub(crate) fn sequence_number(&self) -> CatalogSequenceNumber {
self.sequence self.sequence