diff --git a/db/src/lifecycle/compact_object_store.rs b/db/src/lifecycle/compact_object_store.rs index aa5feeb2f1..a795a9dad4 100644 --- a/db/src/lifecycle/compact_object_store.rs +++ b/db/src/lifecycle/compact_object_store.rs @@ -27,7 +27,7 @@ use observability_deps::tracing::info; use parquet_catalog::interface::CatalogParquetInfo; use parquet_file::{ chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, - metadata::IoxMetadata, + metadata::IoxMetadataOld, storage::Storage, }; use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint}; @@ -105,7 +105,7 @@ pub(crate) fn compact_object_store_chunks( let _guard = db.cleanup_lock.read().await; // Step 4.1: Write the chunk as a parquet file into the object store - let iox_metadata = IoxMetadata { + let iox_metadata = IoxMetadataOld { creation_timestamp: db.time_provider.now(), table_name: Arc::clone(&partition_addr.table_name), partition_key: Arc::clone(&partition_addr.partition_key), @@ -241,7 +241,7 @@ fn mark_chunks_to_compact( max_order = max_order.max(chunk.order()); chunk_ids.insert(chunk.id()); - // read IoxMetadata from the parquet chunk's metadata + // read IoxMetadataOld from the parquet chunk's metadata if let Some(parquet_chunk) = chunk.parquet_chunk() { let iox_parquet_metadata = parquet_chunk.parquet_metadata(); let iox_metadata = iox_parquet_metadata @@ -373,7 +373,7 @@ async fn persist_stream_to_chunk<'a>( db: &'a Db, partition_addr: &'a PartitionAddr, stream: SendableRecordBatchStream, - iox_metadata: IoxMetadata, + iox_metadata: IoxMetadataOld, ) -> Result>> { // Create a storage to save data of this chunk let storage = Storage::new(Arc::clone(&db.iox_object_store)); @@ -438,7 +438,7 @@ async fn update_preserved_catalog( async fn update_in_memory_catalog( chunk_ids: &[ChunkId], - iox_metadata: IoxMetadata, + iox_metadata: IoxMetadataOld, parquet_chunk: Option>, partition: Arc>, delete_predicates_before: HashSet>, diff --git a/db/src/lifecycle/write.rs b/db/src/lifecycle/write.rs index 071772ed86..82360f6f78 100644 --- a/db/src/lifecycle/write.rs +++ b/db/src/lifecycle/write.rs @@ -19,7 +19,7 @@ use observability_deps::tracing::{debug, warn}; use parquet_catalog::interface::CatalogParquetInfo; use parquet_file::{ chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, - metadata::IoxMetadata, + metadata::IoxMetadataOld, storage::Storage, }; use persistence_windows::{ @@ -117,7 +117,7 @@ pub(super) fn write_chunk_to_object_store( // // IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted // between creation and the transaction commit. - let metadata = IoxMetadata { + let metadata = IoxMetadataOld { creation_timestamp: db.time_provider.now(), table_name: Arc::clone(&table_name), partition_key: Arc::clone(&partition_key), diff --git a/parquet_catalog/src/dump.rs b/parquet_catalog/src/dump.rs index 0198153677..11ea2c0479 100644 --- a/parquet_catalog/src/dump.rs +++ b/parquet_catalog/src/dump.rs @@ -441,7 +441,7 @@ File { Ok( Metadata { iox_metadata: Ok( - IoxMetadata { + IoxMetadataOld { creation_timestamp: 1970-01-01T00:00:10.000000020+00:00, time_of_first_write: 1970-01-01T00:00:30.000000040+00:00, time_of_last_write: 1970-01-01T00:00:50.000000060+00:00, diff --git a/parquet_catalog/src/rebuild.rs b/parquet_catalog/src/rebuild.rs index efae6014bb..7c968227f3 100644 --- a/parquet_catalog/src/rebuild.rs +++ b/parquet_catalog/src/rebuild.rs @@ -175,7 +175,7 @@ mod tests { use datafusion_util::MemoryStream; use parquet::arrow::ArrowWriter; use parquet_file::{ - metadata::IoxMetadata, + metadata::IoxMetadataOld, storage::{MemWriter, Storage}, test_utils::{create_partition_and_database_checkpoint, make_record_batch, TestSize}, }; @@ -370,7 +370,7 @@ mod tests { Arc::clone(&table_name), Arc::clone(&partition_key), ); - let metadata = IoxMetadata { + let metadata = IoxMetadataOld { creation_timestamp: Time::from_timestamp_nanos(0), table_name: Arc::clone(&table_name), partition_key: Arc::clone(&partition_key), diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index f84c015732..6ff9c0c93b 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -125,7 +125,7 @@ pub const METADATA_VERSION: u32 = 10; /// File-level metadata key to store the IOx-specific data. /// -/// This will contain [`IoxMetadata`] serialized as base64-encoded [Protocol Buffers 3]. +/// This will contain [`IoxMetadataOld`] serialized as base64-encoded [Protocol Buffers 3]. /// /// [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3 pub const METADATA_KEY: &str = "IOX:metadata"; @@ -260,7 +260,7 @@ pub type Result = std::result::Result; /// [Catalog Properties]: https://github.com/influxdata/influxdb_iox/blob/main/docs/catalog_persistence.md#13-properties /// [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3 #[derive(Debug, Clone, Eq, PartialEq)] -pub struct IoxMetadata { +pub struct IoxMetadataOld { /// Timestamp when this file was created. pub creation_timestamp: Time, @@ -287,7 +287,7 @@ pub struct IoxMetadata { pub chunk_order: ChunkOrder, } -impl IoxMetadata { +impl IoxMetadataOld { /// Read from protobuf message fn from_protobuf(data: &[u8]) -> Result { // extract protobuf message from bytes @@ -637,7 +637,7 @@ impl DecodedIoxParquetMetaData { } /// Read IOx metadata from file-level key-value parquet metadata. - pub fn read_iox_metadata(&self) -> Result { + pub fn read_iox_metadata(&self) -> Result { // find file-level key-value metadata entry let kv = self .md @@ -656,7 +656,7 @@ impl DecodedIoxParquetMetaData { .context(IoxMetadataBrokenSnafu)?; // convert to Rust object - IoxMetadata::from_protobuf(proto_bytes.as_slice()) + IoxMetadataOld::from_protobuf(proto_bytes.as_slice()) } /// Read IOx schema from parquet metadata. @@ -987,7 +987,7 @@ mod tests { Arc::clone(&table_name), Arc::clone(&partition_key), ); - let metadata = IoxMetadata { + let metadata = IoxMetadataOld { creation_timestamp: Time::from_timestamp(3234, 0), table_name, partition_key, @@ -1009,7 +1009,7 @@ mod tests { // decoding should fail now assert_eq!( - IoxMetadata::from_protobuf(&proto_bytes) + IoxMetadataOld::from_protobuf(&proto_bytes) .unwrap_err() .to_string(), format!( diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 39a21d6cf8..6d4f10d15c 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -30,7 +30,7 @@ use std::{ sync::Arc, }; -use crate::metadata::{IoxMetadata, IoxParquetMetaData, METADATA_KEY}; +use crate::metadata::{IoxMetadataOld, IoxParquetMetaData, METADATA_KEY}; #[derive(Debug, Snafu)] pub enum Error { @@ -105,7 +105,7 @@ impl Storage { &self, chunk_addr: ChunkAddr, stream: SendableRecordBatchStream, - metadata: IoxMetadata, + metadata: IoxMetadataOld, ) -> Result> { // Create full path location of this file in object store let path = ParquetFilePath::new(&chunk_addr); @@ -141,7 +141,7 @@ impl Storage { async fn parquet_stream_to_bytes( mut stream: SendableRecordBatchStream, schema: SchemaRef, - metadata: IoxMetadata, + metadata: IoxMetadataOld, ) -> Result> { let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?; @@ -358,7 +358,7 @@ mod tests { Arc::clone(&table_name), Arc::clone(&partition_key), ); - let metadata = IoxMetadata { + let metadata = IoxMetadataOld { creation_timestamp: Time::from_timestamp_nanos(3453), table_name, partition_key, @@ -431,7 +431,7 @@ mod tests { Arc::clone(&table_name), Arc::clone(&partition_key), ); - let metadata = IoxMetadata { + let metadata = IoxMetadataOld { creation_timestamp: Time::from_timestamp_nanos(43069346), table_name: Arc::clone(&table_name), partition_key: Arc::clone(&partition_key), diff --git a/parquet_file/src/test_utils/generator.rs b/parquet_file/src/test_utils/generator.rs index ade2faea67..365ae708ff 100644 --- a/parquet_file/src/test_utils/generator.rs +++ b/parquet_file/src/test_utils/generator.rs @@ -1,5 +1,5 @@ use crate::chunk::{ChunkMetrics, ParquetChunk}; -use crate::metadata::IoxMetadata; +use crate::metadata::IoxMetadataOld; use crate::storage::Storage; use crate::test_utils::{ create_partition_and_database_checkpoint, make_iox_object_store, make_record_batch, TestSize, @@ -66,13 +66,13 @@ impl ChunkGenerator { &self.partition } - pub async fn generate(&mut self) -> Option<(ParquetChunk, IoxMetadata)> { + pub async fn generate(&mut self) -> Option<(ParquetChunk, IoxMetadataOld)> { let id = self.next_chunk; self.next_chunk += 1; self.generate_id(id).await } - pub async fn generate_id(&mut self, id: u32) -> Option<(ParquetChunk, IoxMetadata)> { + pub async fn generate_id(&mut self, id: u32) -> Option<(ParquetChunk, IoxMetadataOld)> { let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( Arc::clone(&self.partition.table_name), Arc::clone(&self.partition.partition_key), @@ -82,7 +82,7 @@ impl ChunkGenerator { let chunk_order = ChunkOrder::new(id).unwrap(); let chunk_addr = ChunkAddr::new(&self.partition, chunk_id); - let metadata = IoxMetadata { + let metadata = IoxMetadataOld { creation_timestamp: Time::from_timestamp(10, 20), table_name: Arc::clone(&self.partition.table_name), partition_key: Arc::clone(&self.partition.partition_key),