diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 29020ce19d..5c14d24073 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -2137,7 +2137,7 @@ mod tests { use crate::{ log::{FieldDataType, LastCacheSize, LastCacheTtl, MaxAge, MaxCardinality, create}, object_store::CatalogFilePath, - serialize::{serialize_catalog_snapshot, verify_and_deserialize_catalog_checkpoint_file}, + serialize::{serialize_catalog_file, verify_and_deserialize_catalog_checkpoint_file}, }; use super::*; @@ -2192,7 +2192,7 @@ mod tests { ".catalog_uuid" => "[uuid]" }); // Serialize/deserialize to ensure roundtrip - let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); + let serialized = serialize_catalog_file(&snapshot).unwrap(); let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; insta::assert_json_snapshot!(snapshot, { ".catalog_uuid" => "[uuid]" @@ -2322,7 +2322,7 @@ mod tests { ".catalog_uuid" => "[uuid]" }); // Serialize/deserialize to ensure roundtrip - let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); + let serialized = serialize_catalog_file(&snapshot).unwrap(); let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; insta::assert_json_snapshot!(snapshot, { ".catalog_uuid" => "[uuid]" @@ -2369,7 +2369,7 @@ mod tests { ".catalog_uuid" => "[uuid]" }); // Serialize/deserialize to ensure roundtrip - let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); + let serialized = serialize_catalog_file(&snapshot).unwrap(); let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; insta::assert_json_snapshot!(snapshot, { ".catalog_uuid" => "[uuid]" @@ -2415,7 +2415,7 @@ mod tests { ".catalog_uuid" => "[uuid]" }); // Serialize/deserialize to ensure roundtrip - let serialized = serialize_catalog_snapshot(&snapshot).unwrap(); + let serialized = serialize_catalog_file(&snapshot).unwrap(); let snapshot = verify_and_deserialize_catalog_checkpoint_file(serialized).unwrap() ; insta::assert_json_snapshot!(snapshot, { ".catalog_uuid" => "[uuid]" diff --git a/influxdb3_catalog/src/log/versions/v1.rs b/influxdb3_catalog/src/log/versions/v1.rs index a3b26885c8..7345efc44c 100644 --- a/influxdb3_catalog/src/log/versions/v1.rs +++ b/influxdb3_catalog/src/log/versions/v1.rs @@ -25,7 +25,7 @@ use hashbrown::HashMap; use influxdb3_id::{ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TriggerId}; use serde::{Deserialize, Serialize}; -use crate::{CatalogError, Result, catalog::CatalogSequenceNumber}; +use crate::{CatalogError, Result, catalog::CatalogSequenceNumber, serialize::VersionedFileType}; mod conversion; @@ -58,6 +58,10 @@ pub(crate) struct OrderedCatalogBatch { pub(crate) sequence_number: CatalogSequenceNumber, } +impl VersionedFileType for OrderedCatalogBatch { + const VERSION_ID: [u8; 10] = *b"idb3.001.l"; +} + #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub(crate) enum NodeCatalogOp { RegisterNode(RegisterNodeLog), diff --git a/influxdb3_catalog/src/log/versions/v2.rs b/influxdb3_catalog/src/log/versions/v2.rs index 0702257933..6839991f2e 100644 --- a/influxdb3_catalog/src/log/versions/v2.rs +++ b/influxdb3_catalog/src/log/versions/v2.rs @@ -21,7 +21,7 @@ use schema::{InfluxColumnType, InfluxFieldType}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::{CatalogError, Result, catalog::CatalogSequenceNumber}; +use crate::{CatalogError, Result, catalog::CatalogSequenceNumber, serialize::VersionedFileType}; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum CatalogBatch { @@ -115,6 +115,10 @@ pub struct OrderedCatalogBatch { pub(crate) sequence_number: CatalogSequenceNumber, } +impl VersionedFileType for OrderedCatalogBatch { + const VERSION_ID: [u8; 10] = *b"idb3.002.l"; +} + impl OrderedCatalogBatch { pub fn new(catalog: CatalogBatch, sequence_number: CatalogSequenceNumber) -> Self { Self { diff --git a/influxdb3_catalog/src/log/versions/v3.rs b/influxdb3_catalog/src/log/versions/v3.rs index b698bdbd5c..c6f9204988 100644 --- a/influxdb3_catalog/src/log/versions/v3.rs +++ b/influxdb3_catalog/src/log/versions/v3.rs @@ -22,7 +22,7 @@ use schema::{InfluxColumnType, InfluxFieldType}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::{CatalogError, Result, catalog::CatalogSequenceNumber}; +use crate::{CatalogError, Result, catalog::CatalogSequenceNumber, serialize::VersionedFileType}; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum CatalogBatch { @@ -116,6 +116,10 @@ pub struct OrderedCatalogBatch { pub(crate) sequence_number: CatalogSequenceNumber, } +impl VersionedFileType for OrderedCatalogBatch { + const VERSION_ID: [u8; 10] = *b"idb3.003.l"; +} + impl OrderedCatalogBatch { pub fn new(catalog: CatalogBatch, sequence_number: CatalogSequenceNumber) -> Self { Self { diff --git a/influxdb3_catalog/src/object_store.rs b/influxdb3_catalog/src/object_store.rs index caebecf3c8..e9952cc3bd 100644 --- a/influxdb3_catalog/src/object_store.rs +++ b/influxdb3_catalog/src/object_store.rs @@ -16,9 +16,7 @@ use crate::snapshot::versions::Snapshot; use crate::{ catalog::CatalogSequenceNumber, log::OrderedCatalogBatch, - serialize::{ - serialize_catalog_log, serialize_catalog_snapshot, verify_and_deserialize_catalog_file, - }, + serialize::{serialize_catalog_file, verify_and_deserialize_catalog_file}, }; #[derive(Debug, thiserror::Error)] @@ -211,7 +209,7 @@ impl ObjectStoreCatalog { ) -> Result { let catalog_path = CatalogFilePath::log(&self.prefix, batch.sequence_number()); - let content = serialize_catalog_log(batch).context("failed to serialize catalog batch")?; + let content = serialize_catalog_file(batch).context("failed to serialize catalog batch")?; self.catalog_update_if_not_exists(catalog_path, content) .await @@ -228,7 +226,7 @@ impl ObjectStoreCatalog { let catalog_path = CatalogFilePath::checkpoint(&self.prefix); let content = - serialize_catalog_snapshot(snapshot).context("failed to serialize catalog snapshot")?; + serialize_catalog_file(snapshot).context("failed to serialize catalog snapshot")?; // NOTE: not sure if this should be done in a loop, i.e., what error variants from // the object store would warrant a retry. @@ -270,7 +268,7 @@ impl ObjectStoreCatalog { let catalog_path = CatalogFilePath::checkpoint(&self.prefix); let content = - serialize_catalog_snapshot(snapshot).context("failed to serialize catalog snapshot")?; + serialize_catalog_file(snapshot).context("failed to serialize catalog snapshot")?; let store = Arc::clone(&self.store); diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index 435136f4a5..a55a5f5c45 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -3,10 +3,11 @@ use std::io::Cursor; use anyhow::Context; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Bytes, BytesMut}; +use serde::Serialize; use crate::{ CatalogError, Result, - log::{self, OrderedCatalogBatch}, + log::{self}, snapshot::{self, CatalogSnapshot}, }; @@ -15,75 +16,96 @@ const CHECKSUM_LEN: usize = size_of::(); pub fn verify_and_deserialize_catalog_file( bytes: Bytes, ) -> Result { - if bytes.starts_with(LOG_FILE_TYPE_IDENTIFIER_V1) { - // V1 Deserialization: - let id_len = LOG_FILE_TYPE_IDENTIFIER_V1.len(); - let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); - let data = bytes.slice(id_len + CHECKSUM_LEN..); - verify_checksum(&checksum, &data)?; - let log = bitcode::deserialize::(&data) - .context("failed to deserialize v1 catalog log file contents")?; + let version_id: &[u8; 10] = bytes.first_chunk().ok_or(CatalogError::unexpected( + "file must contain at least 10 bytes", + ))?; - // explicit type annotations are needed once you start chaining `.into` (something to check - // later to see if that could be avoided, then this can just be a loop with starting and - // end point based on current log file's version) - let log_v2: log::versions::v2::OrderedCatalogBatch = log.into(); - let log_v3: log::versions::v3::OrderedCatalogBatch = log_v2.into(); - Ok(log_v3) - } else if bytes.starts_with(LOG_FILE_TYPE_IDENTIFIER_V2) { - // V2 Deserialization: - let id_len = LOG_FILE_TYPE_IDENTIFIER_V2.len(); - let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); - let data = bytes.slice(id_len + CHECKSUM_LEN..); - verify_checksum(&checksum, &data)?; - let log = serde_json::from_slice::(&data) - .context("failed to deserialize v2 catalog log file contents")?; - Ok(log.into()) - } else if bytes.starts_with(LOG_FILE_TYPE_IDENTIFIER_V3) { - // V3 Deserialization: - let id_len = LOG_FILE_TYPE_IDENTIFIER_V3.len(); - let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); - let data = bytes.slice(id_len + CHECKSUM_LEN..); - verify_checksum(&checksum, &data)?; - let log = serde_json::from_slice::(&data) - .context("failed to deserialize v3 catalog log file contents")?; - Ok(log) - } else { - Err(CatalogError::unexpected("unrecognized catalog file format")) + match *version_id { + // Version 1 uses the `bitcode` crate for serialization/deserialization + log::versions::v1::OrderedCatalogBatch::VERSION_ID => { + // V1 Deserialization: + let checksum = bytes.slice(10..10 + CHECKSUM_LEN); + let data = bytes.slice(10 + CHECKSUM_LEN..); + verify_checksum(&checksum, &data)?; + let log = bitcode::deserialize::(&data) + .context("failed to deserialize v1 catalog log file contents")?; + + // explicit type annotations are needed once you start chaining `.into` (something to check + // later to see if that could be avoided, then this can just be a loop with starting and + // end point based on current log file's version) + let log_v2: log::versions::v2::OrderedCatalogBatch = log.into(); + let log_v3: log::versions::v3::OrderedCatalogBatch = log_v2.into(); + Ok(log_v3) + } + // Version 2 uses the `serde_json` crate for serialization/deserialization + log::versions::v2::OrderedCatalogBatch::VERSION_ID => { + // V2 Deserialization: + let checksum = bytes.slice(10..10 + CHECKSUM_LEN); + let data = bytes.slice(10 + CHECKSUM_LEN..); + verify_checksum(&checksum, &data)?; + let log = serde_json::from_slice::(&data) + .context("failed to deserialize v2 catalog log file contents")?; + Ok(log.into()) + } + // Version 3 added a conversion function to map db:*:write tokens to be db:*:create,write + // tokens, and because it's a one time migration it relies on the version in file the + // version has to be updated. + log::versions::v3::OrderedCatalogBatch::VERSION_ID => { + // V3 Deserialization: + let checksum = bytes.slice(10..10 + CHECKSUM_LEN); + let data = bytes.slice(10 + CHECKSUM_LEN..); + verify_checksum(&checksum, &data)?; + let log = serde_json::from_slice::(&data) + .context("failed to deserialize v3 catalog log file contents")?; + Ok(log) + } + _ => Err(CatalogError::unexpected("unrecognized catalog file format")), } } pub fn verify_and_deserialize_catalog_checkpoint_file(bytes: Bytes) -> Result { - if bytes.starts_with(SNAPSHOT_FILE_TYPE_IDENTIFIER_V1) { - let id_len = SNAPSHOT_FILE_TYPE_IDENTIFIER_V1.len(); - let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); - let data = bytes.slice(id_len + CHECKSUM_LEN..); - verify_checksum(&checksum, &data)?; - let snapshot = bitcode::deserialize::(&data) - .context("failed to deserialize catalog snapshot file contents")?; - let snapshot_v2: snapshot::versions::v2::CatalogSnapshot = snapshot.into(); - let snapshot_v3: snapshot::versions::v3::CatalogSnapshot = snapshot_v2.into(); - Ok(snapshot_v3) - } else if bytes.starts_with(SNAPSHOT_FILE_TYPE_IDENTIFIER_V2) { - let id_len = SNAPSHOT_FILE_TYPE_IDENTIFIER_V2.len(); - let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); - let data = bytes.slice(id_len + CHECKSUM_LEN..); - verify_checksum(&checksum, &data)?; - let snapshot: snapshot::versions::v2::CatalogSnapshot = serde_json::from_slice(&data) - .context("failed to deserialize catalog snapshot file contents")?; - Ok(snapshot.into()) - } else if bytes.starts_with(SNAPSHOT_FILE_TYPE_IDENTIFIER_V3) { - let id_len = SNAPSHOT_FILE_TYPE_IDENTIFIER_V3.len(); - let checksum = bytes.slice(id_len..id_len + CHECKSUM_LEN); - let data = bytes.slice(id_len + CHECKSUM_LEN..); - verify_checksum(&checksum, &data)?; - let snapshot: snapshot::versions::v3::CatalogSnapshot = serde_json::from_slice(&data) - .context("failed to deserialize catalog snapshot file contents")?; - Ok(snapshot) - } else { - Err(CatalogError::unexpected( + let version_id: &[u8; 10] = bytes.first_chunk().ok_or(CatalogError::unexpected( + "file must contain at least 10 bytes", + ))?; + + match *version_id { + // Version 1 uses the `bitcode` crate for serialization/deserialization + snapshot::versions::v1::CatalogSnapshot::VERSION_ID => { + let checksum = bytes.slice(10..10 + CHECKSUM_LEN); + let data = bytes.slice(10 + CHECKSUM_LEN..); + verify_checksum(&checksum, &data)?; + let snapshot = bitcode::deserialize::(&data); + let snapshot = + snapshot.context("failed to deserialize v1 catalog snapshot file contents")?; + let snapshot_v2: snapshot::versions::v2::CatalogSnapshot = snapshot.into(); + let snapshot_v3: snapshot::versions::v3::CatalogSnapshot = snapshot_v2.into(); + Ok(snapshot_v3) + } + // Version 2 uses the `serde_json` crate for serialization/deserialization + snapshot::versions::v2::CatalogSnapshot::VERSION_ID => { + let checksum = bytes.slice(10..10 + CHECKSUM_LEN); + let data = bytes.slice(10 + CHECKSUM_LEN..); + verify_checksum(&checksum, &data)?; + let snapshot: snapshot::versions::v2::CatalogSnapshot = + serde_json::from_slice(&data) + .context("failed to deserialize v2 catalog snapshot file contents")?; + Ok(snapshot.into()) + } + // Version 3 added a conversion function to map db:*:write tokens to be db:*:create,write + // tokens, and because it's a one time migration it relies on the version in file the + // version has to be updated. + snapshot::versions::v3::CatalogSnapshot::VERSION_ID => { + let checksum = bytes.slice(10..10 + CHECKSUM_LEN); + let data = bytes.slice(10 + CHECKSUM_LEN..); + verify_checksum(&checksum, &data)?; + let snapshot: snapshot::versions::v3::CatalogSnapshot = + serde_json::from_slice(&data) + .context("failed to deserialize v3 catalog snapshot file contents")?; + Ok(snapshot) + } + _ => Err(CatalogError::unexpected( "unrecognized catalog checkpoint file format", - )) + )), } } @@ -103,34 +125,21 @@ fn verify_checksum(checksum: &[u8], data: &[u8]) -> Result<()> { Ok(()) } -/// Version 1 uses the `bitcode` crate for serialization/deserialization -const LOG_FILE_TYPE_IDENTIFIER_V1: &[u8] = b"idb3.001.l"; -/// Version 2 uses the `serde_json` crate for serialization/deserialization -const LOG_FILE_TYPE_IDENTIFIER_V2: &[u8] = b"idb3.002.l"; -/// Version 3 introduced to migration db write permission in pro -const LOG_FILE_TYPE_IDENTIFIER_V3: &[u8] = b"idb3.003.l"; - -pub fn serialize_catalog_log(log: &OrderedCatalogBatch) -> Result { - let mut buf = BytesMut::new(); - buf.extend_from_slice(LOG_FILE_TYPE_IDENTIFIER_V2); - - let data = serde_json::to_vec(log).context("failed to serialize catalog log file")?; - - Ok(hash_and_freeze(buf, data)) +pub trait VersionedFileType { + const VERSION_ID: [u8; 10]; } -/// Version 1 uses the `bitcode` crate for serialization/deserialization -const SNAPSHOT_FILE_TYPE_IDENTIFIER_V1: &[u8] = b"idb3.001.s"; -/// Version 2 uses the `serde_json` crate for serialization/deserialization -const SNAPSHOT_FILE_TYPE_IDENTIFIER_V2: &[u8] = b"idb3.002.s"; -/// Version 3 introduced to migration db write permission in pro -const SNAPSHOT_FILE_TYPE_IDENTIFIER_V3: &[u8] = b"idb3.003.s"; - -pub fn serialize_catalog_snapshot(snapshot: &CatalogSnapshot) -> Result { +pub fn serialize_catalog_file(file: &T) -> Result { let mut buf = BytesMut::new(); - buf.extend_from_slice(SNAPSHOT_FILE_TYPE_IDENTIFIER_V2); + buf.extend_from_slice(&T::VERSION_ID); - let data = serde_json::to_vec(snapshot).context("failed to serialize catalog snapshot file")?; + let data = match T::VERSION_ID { + snapshot::versions::v1::CatalogSnapshot::VERSION_ID + | log::versions::v1::OrderedCatalogBatch::VERSION_ID => { + bitcode::serialize(file).context("failed to serialize catalog file")? + } + _ => serde_json::to_vec(file).context("failed to serialize catalog file")?, + }; Ok(hash_and_freeze(buf, data)) } @@ -151,7 +160,6 @@ fn hash_and_freeze(mut buf: BytesMut, data: Vec) -> Bytes { mod v1_tests { use std::time::Duration; - use bytes::{Bytes, BytesMut}; use influxdb3_id::{ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TriggerId}; use crate::{ @@ -167,27 +175,15 @@ mod v1_tests { TriggerDefinition, TriggerIdentifier, TriggerSettings, }, }, - snapshot::{ - self, - versions::v1::{CatalogSnapshot, test_util::Generate}, - }, + serialize::VersionedFileType, + snapshot::versions::v1::{CatalogSnapshot, test_util::Generate}, }; use super::{ - LOG_FILE_TYPE_IDENTIFIER_V1, SNAPSHOT_FILE_TYPE_IDENTIFIER_V1, hash_and_freeze, - verify_and_deserialize_catalog_checkpoint_file, verify_and_deserialize_catalog_file, + serialize_catalog_file, verify_and_deserialize_catalog_checkpoint_file, + verify_and_deserialize_catalog_file, }; - /// Method that uses v1 serialization logic for catalog files - fn serialize_catalog_log_v1(log: &log::versions::v1::OrderedCatalogBatch) -> Bytes { - let mut buf = BytesMut::new(); - buf.extend_from_slice(LOG_FILE_TYPE_IDENTIFIER_V1); - - let data = bitcode::serialize(log).expect("failed to serialize catalog log file"); - - hash_and_freeze(buf, data) - } - /// Test that uses the main `verify_and_deserialize_catalog_file` method which deseriales a /// versioned catalog file into the latest version, to test round-trip serialize/deserialize /// v1 catalog log files. @@ -196,25 +192,28 @@ mod v1_tests { #[test] fn test_deserialize_catalog_file_from_v1() { // test a node log file: - verify_and_deserialize_catalog_file(serialize_catalog_log_v1(&OrderedCatalogBatch { - catalog_batch: log::versions::v1::CatalogBatch::Node(NodeBatch { - time_ns: 0, - node_catalog_id: NodeId::new(0), - node_id: "test-node".into(), - ops: vec![NodeCatalogOp::RegisterNode(RegisterNodeLog { + verify_and_deserialize_catalog_file( + serialize_catalog_file(&OrderedCatalogBatch { + catalog_batch: log::versions::v1::CatalogBatch::Node(NodeBatch { + time_ns: 0, + node_catalog_id: NodeId::new(0), node_id: "test-node".into(), - instance_id: "uuid".into(), - registered_time_ns: 0, - core_count: 2, - mode: vec![NodeMode::Core], - })], - }), - sequence_number: CatalogSequenceNumber::new(0), - })) + ops: vec![NodeCatalogOp::RegisterNode(RegisterNodeLog { + node_id: "test-node".into(), + instance_id: "uuid".into(), + registered_time_ns: 0, + core_count: 2, + mode: vec![NodeMode::Core], + })], + }), + sequence_number: CatalogSequenceNumber::new(0), + }) + .expect("must be able to serialize"), + ) .expect("deserialize from v1"); // test a database log file: - verify_and_deserialize_catalog_file(serialize_catalog_log_v1(&OrderedCatalogBatch { + let log_v1 = OrderedCatalogBatch { catalog_batch: log::versions::v1::CatalogBatch::Database(DatabaseBatch { time_ns: 0, database_id: DbId::new(0), @@ -487,25 +486,33 @@ mod v1_tests { ], }), sequence_number: CatalogSequenceNumber::new(0), - })) + }; + verify_and_deserialize_catalog_file( + serialize_catalog_file(&log_v1).expect("must be able to serialize"), + ) .expect("deserialize from v1 to latest"); } - /// Method that uses v1 serialization logic for catalog checkpoint/snapshot files - fn serialize_catalog_snapshot_v1(snapshot: &snapshot::versions::v1::CatalogSnapshot) -> Bytes { - let mut buf = BytesMut::new(); - buf.extend_from_slice(SNAPSHOT_FILE_TYPE_IDENTIFIER_V1); + #[test] + fn test_serialize_catalog_snapshot_with_latest_identifier() { + let snapshot = crate::snapshot::versions::v1::CatalogSnapshot::generate(); + let snapshot: crate::snapshot::versions::v2::CatalogSnapshot = snapshot.into(); + let snapshot: crate::snapshot::CatalogSnapshot = snapshot.into(); + let serialized = super::serialize_catalog_file(&snapshot) + .expect("must be able to serialize generated snapshot"); - let data = bitcode::serialize(snapshot).expect("failed to serialize catalog snapshot file"); - - hash_and_freeze(buf, data) + assert!( + serialized.starts_with(&crate::snapshot::CatalogSnapshot::VERSION_ID), + "serialized catalog log file must always start with latest verison identifier" + ); } #[test] fn test_deserialize_catalog_checkpoint_file_from_v1() { - verify_and_deserialize_catalog_checkpoint_file(serialize_catalog_snapshot_v1( - &CatalogSnapshot::generate(), - )) - .expect("deserialize from v1"); + let result = verify_and_deserialize_catalog_checkpoint_file( + serialize_catalog_file(&CatalogSnapshot::generate()) + .expect("must be able to serialize"), + ); + result.expect("deserialize from v1"); } } diff --git a/influxdb3_catalog/src/snapshot/versions/v1.rs b/influxdb3_catalog/src/snapshot/versions/v1.rs index ca4291cbd9..f79af06c6a 100644 --- a/influxdb3_catalog/src/snapshot/versions/v1.rs +++ b/influxdb3_catalog/src/snapshot/versions/v1.rs @@ -16,6 +16,7 @@ use crate::{ log::versions::v1::{ MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition, }, + serialize::VersionedFileType, }; use arrow::datatypes::DataType as ArrowDataType; use hashbrown::HashMap; @@ -39,6 +40,10 @@ pub(crate) struct CatalogSnapshot { catalog_uuid: Uuid, } +impl VersionedFileType for CatalogSnapshot { + const VERSION_ID: [u8; 10] = *b"idb3.001.s"; +} + #[derive(Debug, Serialize, Deserialize)] pub(crate) struct NodeSnapshot { node_id: Arc, diff --git a/influxdb3_catalog/src/snapshot/versions/v2.rs b/influxdb3_catalog/src/snapshot/versions/v2.rs index d50652fc3a..bff4ae1d50 100644 --- a/influxdb3_catalog/src/snapshot/versions/v2.rs +++ b/influxdb3_catalog/src/snapshot/versions/v2.rs @@ -4,6 +4,7 @@ use crate::catalog::CatalogSequenceNumber; use crate::log::versions::v2::{ MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition, }; +use crate::serialize::VersionedFileType; use arrow::datatypes::DataType as ArrowDataType; use hashbrown::HashMap; use influxdb3_id::{ @@ -26,6 +27,10 @@ pub struct CatalogSnapshot { pub(crate) catalog_uuid: Uuid, } +impl VersionedFileType for CatalogSnapshot { + const VERSION_ID: [u8; 10] = *b"idb3.002.s"; +} + #[derive(Debug, Serialize, Deserialize, Default)] pub(crate) struct TokenInfoSnapshot { id: TokenId, diff --git a/influxdb3_catalog/src/snapshot/versions/v3.rs b/influxdb3_catalog/src/snapshot/versions/v3.rs index 4d7677a8f3..2570bba0e5 100644 --- a/influxdb3_catalog/src/snapshot/versions/v3.rs +++ b/influxdb3_catalog/src/snapshot/versions/v3.rs @@ -5,6 +5,7 @@ use crate::catalog::CatalogSequenceNumber; use crate::log::{ MaxAge, MaxCardinality, NodeMode, TriggerSettings, TriggerSpecificationDefinition, }; +use crate::serialize::VersionedFileType; use arrow::datatypes::DataType as ArrowDataType; use hashbrown::HashMap; use influxdb3_id::{ @@ -27,6 +28,10 @@ pub struct CatalogSnapshot { pub(crate) catalog_uuid: Uuid, } +impl VersionedFileType for CatalogSnapshot { + const VERSION_ID: [u8; 10] = *b"idb3.003.s"; +} + impl CatalogSnapshot { pub(crate) fn sequence_number(&self) -> CatalogSequenceNumber { self.sequence