diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 566dcdb332..61b37edfe9 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -3,7 +3,7 @@ use crate::{ storage::Storage, }; use data_types::{ - partition_metadata::{Statistics, TableSummaryAndTimes}, + partition_metadata::{Statistics, TableSummary}, timestamp::TimestampRange, }; use datafusion::physical_plan::SendableRecordBatchStream; @@ -85,7 +85,7 @@ pub struct ParquetChunk { partition_key: Arc, /// Meta data of the table - table_summary: Arc, + table_summary: Arc, /// Schema that goes with this table's parquet file schema: Arc, @@ -128,8 +128,6 @@ impl ParquetChunk { let IoxMetadata { table_name, - time_of_first_write, - time_of_last_write, partition_key, .. } = iox_md; @@ -142,11 +140,9 @@ impl ParquetChunk { .context(StatisticsReadFailed { path: &file_location, })?; - let table_summary = TableSummaryAndTimes { + let table_summary = TableSummary { name: table_name.to_string(), columns, - time_of_first_write, - time_of_last_write, }; Ok(Self::new_from_parts( @@ -166,7 +162,7 @@ impl ParquetChunk { #[allow(clippy::too_many_arguments)] pub(crate) fn new_from_parts( partition_key: Arc, - table_summary: Arc, + table_summary: Arc, schema: Arc, file_location: Path, store: Arc, @@ -200,7 +196,7 @@ impl ParquetChunk { } /// Returns the summary statistics for this chunk - pub fn table_summary(&self) -> &Arc { + pub fn table_summary(&self) -> &Arc { &self.table_summary } @@ -286,7 +282,7 @@ impl ParquetChunk { } /// Extracts min/max values of the timestamp column, from the TableSummary, if possible -fn extract_range(table_summary: &TableSummaryAndTimes) -> Option { +fn extract_range(table_summary: &TableSummary) -> Option { table_summary .column(TIME_COLUMN_NAME) .map(|c| { diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index b86a035299..a981512bf8 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -14,9 +14,7 @@ use arrow::{ use chrono::{TimeZone, Utc}; use data_types::{ chunk_metadata::ChunkAddr, - partition_metadata::{ - ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummaryAndTimes, - }, + partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, server_id::ServerId, }; use datafusion::physical_plan::SendableRecordBatchStream; @@ -132,11 +130,9 @@ pub async fn make_chunk_given_record_batch( let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); let storage = Storage::new(Arc::clone(&store), server_id); - let table_summary = TableSummaryAndTimes { + let table_summary = TableSummary { name: addr.table_name.to_string(), columns: column_summaries, - time_of_first_write: Utc.timestamp(30, 40), - time_of_last_write: Utc.timestamp(50, 60), }; let stream: SendableRecordBatchStream = if record_batches.is_empty() { Box::pin(MemoryStream::new_with_schema( diff --git a/server/src/db.rs b/server/src/db.rs index 997d1d44c6..cfbed3c834 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1939,7 +1939,7 @@ mod tests { .eq(1.0) .unwrap(); - let expected_parquet_size = 703; + let expected_parquet_size = 679; catalog_chunk_size_bytes_metric_eq( &test_db.metric_registry, "read_buffer", @@ -2421,7 +2421,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2316.0) + .sample_sum_eq(2292.0) .unwrap(); // while MB and RB chunk are identical, the PQ chunk is a new one (split off) @@ -2541,7 +2541,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2316.0) + .sample_sum_eq(2292.0) .unwrap(); // Unload RB chunk but keep it in OS @@ -2571,7 +2571,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(703.0) + .sample_sum_eq(679.0) .unwrap(); // Verify data written to the parquet file in object store @@ -3179,7 +3179,7 @@ mod tests { id: 2, storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, - memory_bytes: 3308, // size of RB and OS chunks + memory_bytes: 3284, // size of RB and OS chunks object_store_bytes: 1523, // size of parquet file row_count: 2, time_of_last_access: None, @@ -3231,7 +3231,7 @@ mod tests { assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87); assert_eq!(db.catalog.metrics().memory().read_buffer(), 2410); - assert_eq!(db.catalog.metrics().memory().object_store(), 898); + assert_eq!(db.catalog.metrics().memory().object_store(), 874); } #[tokio::test] diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index d48b3cd8a5..bea2efbd83 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -343,23 +343,15 @@ impl CatalogChunk { pub(super) fn new_object_store_only( addr: ChunkAddr, chunk: Arc, + time_of_first_write: DateTime, + time_of_last_write: DateTime, metrics: ChunkMetrics, ) -> Self { assert_eq!(chunk.table_name(), addr.table_name.as_ref()); - let summary = chunk.table_summary(); - let time_of_first_write = summary.time_of_first_write; - let time_of_last_write = summary.time_of_last_write; - - // this is temporary - let table_summary = TableSummary { - name: summary.name.clone(), - columns: summary.columns.clone(), - }; - // Cache table summary + schema let meta = Arc::new(ChunkMetadata { - table_summary: Arc::new(table_summary), + table_summary: Arc::clone(chunk.table_summary()), schema: chunk.schema(), }); @@ -1163,12 +1155,16 @@ mod tests { async fn make_persisted_chunk() -> CatalogChunk { let addr = chunk_addr(); + let now = Utc::now(); + // assemble ParquetChunk let parquet_chunk = make_parquet_chunk(addr.clone()).await; CatalogChunk::new_object_store_only( addr, Arc::new(parquet_chunk), + now, + now, ChunkMetrics::new_unregistered(), ) } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 39a9390bb8..30b101209d 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -201,6 +201,8 @@ impl Partition { &mut self, chunk_id: u32, chunk: Arc, + time_of_first_write: DateTime, + time_of_last_write: DateTime, ) -> Arc> { assert_eq!(chunk.table_name(), self.table_name()); @@ -211,6 +213,8 @@ impl Partition { .new_chunk_lock(CatalogChunk::new_object_store_only( addr, chunk, + time_of_first_write, + time_of_last_write, self.metrics.new_chunk_metrics(), )), ); diff --git a/server/src/db/load.rs b/server/src/db/load.rs index dd1c583654..1c5add8ad5 100644 --- a/server/src/db/load.rs +++ b/server/src/db/load.rs @@ -229,7 +229,12 @@ impl CatalogState for Loader { let schema_handle = TableSchemaUpsertHandle::new(&table_schema, &parquet_chunk.schema()) .map_err(|e| Box::new(e) as _) .context(SchemaError { path: info.path })?; - partition.insert_object_store_only_chunk(iox_md.chunk_id, parquet_chunk); + partition.insert_object_store_only_chunk( + iox_md.chunk_id, + parquet_chunk, + iox_md.time_of_first_write, + iox_md.time_of_last_write, + ); schema_handle.commit(); Ok(())