refactor: Rename Rust struct parquet_file::IoxMetadata to be IoxMetadataOld
parent
1b298bb5bd
commit
0f72a881ef
|
@ -27,7 +27,7 @@ use observability_deps::tracing::info;
|
|||
use parquet_catalog::interface::CatalogParquetInfo;
|
||||
use parquet_file::{
|
||||
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
|
||||
metadata::IoxMetadata,
|
||||
metadata::IoxMetadataOld,
|
||||
storage::Storage,
|
||||
};
|
||||
use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint};
|
||||
|
@ -105,7 +105,7 @@ pub(crate) fn compact_object_store_chunks(
|
|||
let _guard = db.cleanup_lock.read().await;
|
||||
|
||||
// Step 4.1: Write the chunk as a parquet file into the object store
|
||||
let iox_metadata = IoxMetadata {
|
||||
let iox_metadata = IoxMetadataOld {
|
||||
creation_timestamp: db.time_provider.now(),
|
||||
table_name: Arc::clone(&partition_addr.table_name),
|
||||
partition_key: Arc::clone(&partition_addr.partition_key),
|
||||
|
@ -241,7 +241,7 @@ fn mark_chunks_to_compact(
|
|||
max_order = max_order.max(chunk.order());
|
||||
chunk_ids.insert(chunk.id());
|
||||
|
||||
// read IoxMetadata from the parquet chunk's metadata
|
||||
// read IoxMetadataOld from the parquet chunk's metadata
|
||||
if let Some(parquet_chunk) = chunk.parquet_chunk() {
|
||||
let iox_parquet_metadata = parquet_chunk.parquet_metadata();
|
||||
let iox_metadata = iox_parquet_metadata
|
||||
|
@ -373,7 +373,7 @@ async fn persist_stream_to_chunk<'a>(
|
|||
db: &'a Db,
|
||||
partition_addr: &'a PartitionAddr,
|
||||
stream: SendableRecordBatchStream,
|
||||
iox_metadata: IoxMetadata,
|
||||
iox_metadata: IoxMetadataOld,
|
||||
) -> Result<Option<Arc<ParquetChunk>>> {
|
||||
// Create a storage to save data of this chunk
|
||||
let storage = Storage::new(Arc::clone(&db.iox_object_store));
|
||||
|
@ -438,7 +438,7 @@ async fn update_preserved_catalog(
|
|||
|
||||
async fn update_in_memory_catalog(
|
||||
chunk_ids: &[ChunkId],
|
||||
iox_metadata: IoxMetadata,
|
||||
iox_metadata: IoxMetadataOld,
|
||||
parquet_chunk: Option<Arc<ParquetChunk>>,
|
||||
partition: Arc<RwLock<Partition>>,
|
||||
delete_predicates_before: HashSet<Arc<DeletePredicate>>,
|
||||
|
|
|
@ -19,7 +19,7 @@ use observability_deps::tracing::{debug, warn};
|
|||
use parquet_catalog::interface::CatalogParquetInfo;
|
||||
use parquet_file::{
|
||||
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
|
||||
metadata::IoxMetadata,
|
||||
metadata::IoxMetadataOld,
|
||||
storage::Storage,
|
||||
};
|
||||
use persistence_windows::{
|
||||
|
@ -117,7 +117,7 @@ pub(super) fn write_chunk_to_object_store(
|
|||
//
|
||||
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
|
||||
// between creation and the transaction commit.
|
||||
let metadata = IoxMetadata {
|
||||
let metadata = IoxMetadataOld {
|
||||
creation_timestamp: db.time_provider.now(),
|
||||
table_name: Arc::clone(&table_name),
|
||||
partition_key: Arc::clone(&partition_key),
|
||||
|
|
|
@ -441,7 +441,7 @@ File {
|
|||
Ok(
|
||||
Metadata {
|
||||
iox_metadata: Ok(
|
||||
IoxMetadata {
|
||||
IoxMetadataOld {
|
||||
creation_timestamp: 1970-01-01T00:00:10.000000020+00:00,
|
||||
time_of_first_write: 1970-01-01T00:00:30.000000040+00:00,
|
||||
time_of_last_write: 1970-01-01T00:00:50.000000060+00:00,
|
||||
|
|
|
@ -175,7 +175,7 @@ mod tests {
|
|||
use datafusion_util::MemoryStream;
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use parquet_file::{
|
||||
metadata::IoxMetadata,
|
||||
metadata::IoxMetadataOld,
|
||||
storage::{MemWriter, Storage},
|
||||
test_utils::{create_partition_and_database_checkpoint, make_record_batch, TestSize},
|
||||
};
|
||||
|
@ -370,7 +370,7 @@ mod tests {
|
|||
Arc::clone(&table_name),
|
||||
Arc::clone(&partition_key),
|
||||
);
|
||||
let metadata = IoxMetadata {
|
||||
let metadata = IoxMetadataOld {
|
||||
creation_timestamp: Time::from_timestamp_nanos(0),
|
||||
table_name: Arc::clone(&table_name),
|
||||
partition_key: Arc::clone(&partition_key),
|
||||
|
|
|
@ -125,7 +125,7 @@ pub const METADATA_VERSION: u32 = 10;
|
|||
|
||||
/// File-level metadata key to store the IOx-specific data.
|
||||
///
|
||||
/// This will contain [`IoxMetadata`] serialized as base64-encoded [Protocol Buffers 3].
|
||||
/// This will contain [`IoxMetadataOld`] serialized as base64-encoded [Protocol Buffers 3].
|
||||
///
|
||||
/// [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3
|
||||
pub const METADATA_KEY: &str = "IOX:metadata";
|
||||
|
@ -260,7 +260,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// [Catalog Properties]: https://github.com/influxdata/influxdb_iox/blob/main/docs/catalog_persistence.md#13-properties
|
||||
/// [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct IoxMetadata {
|
||||
pub struct IoxMetadataOld {
|
||||
/// Timestamp when this file was created.
|
||||
pub creation_timestamp: Time,
|
||||
|
||||
|
@ -287,7 +287,7 @@ pub struct IoxMetadata {
|
|||
pub chunk_order: ChunkOrder,
|
||||
}
|
||||
|
||||
impl IoxMetadata {
|
||||
impl IoxMetadataOld {
|
||||
/// Read from protobuf message
|
||||
fn from_protobuf(data: &[u8]) -> Result<Self> {
|
||||
// extract protobuf message from bytes
|
||||
|
@ -637,7 +637,7 @@ impl DecodedIoxParquetMetaData {
|
|||
}
|
||||
|
||||
/// Read IOx metadata from file-level key-value parquet metadata.
|
||||
pub fn read_iox_metadata(&self) -> Result<IoxMetadata> {
|
||||
pub fn read_iox_metadata(&self) -> Result<IoxMetadataOld> {
|
||||
// find file-level key-value metadata entry
|
||||
let kv = self
|
||||
.md
|
||||
|
@ -656,7 +656,7 @@ impl DecodedIoxParquetMetaData {
|
|||
.context(IoxMetadataBrokenSnafu)?;
|
||||
|
||||
// convert to Rust object
|
||||
IoxMetadata::from_protobuf(proto_bytes.as_slice())
|
||||
IoxMetadataOld::from_protobuf(proto_bytes.as_slice())
|
||||
}
|
||||
|
||||
/// Read IOx schema from parquet metadata.
|
||||
|
@ -987,7 +987,7 @@ mod tests {
|
|||
Arc::clone(&table_name),
|
||||
Arc::clone(&partition_key),
|
||||
);
|
||||
let metadata = IoxMetadata {
|
||||
let metadata = IoxMetadataOld {
|
||||
creation_timestamp: Time::from_timestamp(3234, 0),
|
||||
table_name,
|
||||
partition_key,
|
||||
|
@ -1009,7 +1009,7 @@ mod tests {
|
|||
|
||||
// decoding should fail now
|
||||
assert_eq!(
|
||||
IoxMetadata::from_protobuf(&proto_bytes)
|
||||
IoxMetadataOld::from_protobuf(&proto_bytes)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
format!(
|
||||
|
|
|
@ -30,7 +30,7 @@ use std::{
|
|||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::metadata::{IoxMetadata, IoxParquetMetaData, METADATA_KEY};
|
||||
use crate::metadata::{IoxMetadataOld, IoxParquetMetaData, METADATA_KEY};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
|
@ -105,7 +105,7 @@ impl Storage {
|
|||
&self,
|
||||
chunk_addr: ChunkAddr,
|
||||
stream: SendableRecordBatchStream,
|
||||
metadata: IoxMetadata,
|
||||
metadata: IoxMetadataOld,
|
||||
) -> Result<Option<(ParquetFilePath, usize, IoxParquetMetaData)>> {
|
||||
// Create full path location of this file in object store
|
||||
let path = ParquetFilePath::new(&chunk_addr);
|
||||
|
@ -141,7 +141,7 @@ impl Storage {
|
|||
async fn parquet_stream_to_bytes(
|
||||
mut stream: SendableRecordBatchStream,
|
||||
schema: SchemaRef,
|
||||
metadata: IoxMetadata,
|
||||
metadata: IoxMetadataOld,
|
||||
) -> Result<Vec<u8>> {
|
||||
let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?;
|
||||
|
||||
|
@ -358,7 +358,7 @@ mod tests {
|
|||
Arc::clone(&table_name),
|
||||
Arc::clone(&partition_key),
|
||||
);
|
||||
let metadata = IoxMetadata {
|
||||
let metadata = IoxMetadataOld {
|
||||
creation_timestamp: Time::from_timestamp_nanos(3453),
|
||||
table_name,
|
||||
partition_key,
|
||||
|
@ -431,7 +431,7 @@ mod tests {
|
|||
Arc::clone(&table_name),
|
||||
Arc::clone(&partition_key),
|
||||
);
|
||||
let metadata = IoxMetadata {
|
||||
let metadata = IoxMetadataOld {
|
||||
creation_timestamp: Time::from_timestamp_nanos(43069346),
|
||||
table_name: Arc::clone(&table_name),
|
||||
partition_key: Arc::clone(&partition_key),
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::chunk::{ChunkMetrics, ParquetChunk};
|
||||
use crate::metadata::IoxMetadata;
|
||||
use crate::metadata::IoxMetadataOld;
|
||||
use crate::storage::Storage;
|
||||
use crate::test_utils::{
|
||||
create_partition_and_database_checkpoint, make_iox_object_store, make_record_batch, TestSize,
|
||||
|
@ -66,13 +66,13 @@ impl ChunkGenerator {
|
|||
&self.partition
|
||||
}
|
||||
|
||||
pub async fn generate(&mut self) -> Option<(ParquetChunk, IoxMetadata)> {
|
||||
pub async fn generate(&mut self) -> Option<(ParquetChunk, IoxMetadataOld)> {
|
||||
let id = self.next_chunk;
|
||||
self.next_chunk += 1;
|
||||
self.generate_id(id).await
|
||||
}
|
||||
|
||||
pub async fn generate_id(&mut self, id: u32) -> Option<(ParquetChunk, IoxMetadata)> {
|
||||
pub async fn generate_id(&mut self, id: u32) -> Option<(ParquetChunk, IoxMetadataOld)> {
|
||||
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
|
||||
Arc::clone(&self.partition.table_name),
|
||||
Arc::clone(&self.partition.partition_key),
|
||||
|
@ -82,7 +82,7 @@ impl ChunkGenerator {
|
|||
let chunk_order = ChunkOrder::new(id).unwrap();
|
||||
let chunk_addr = ChunkAddr::new(&self.partition, chunk_id);
|
||||
|
||||
let metadata = IoxMetadata {
|
||||
let metadata = IoxMetadataOld {
|
||||
creation_timestamp: Time::from_timestamp(10, 20),
|
||||
table_name: Arc::clone(&self.partition.table_name),
|
||||
partition_key: Arc::clone(&self.partition.partition_key),
|
||||
|
|
Loading…
Reference in New Issue