diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 081e3a0a46..b3cdb6fd21 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -108,6 +108,23 @@ impl From for TableSummary { } } +impl TableSummaryAndTimes { + pub fn size(&self) -> usize { + // Total size of all ColumnSummaries that belong to this table which include + // column names and their stats + let size: usize = self.columns.iter().map(|c| c.size()).sum(); + size + + self.name.len() // Add size of the table name + + mem::size_of::() // Add size of this struct that points to + // table, ColumnSummary, and times + } + + /// Get the column summary by name. + pub fn column(&self, name: &str) -> Option<&ColumnSummary> { + self.columns.iter().find(|c| c.name == name) + } +} + /// Metadata and statistics information for a table. This can be /// either for the portion of a Table stored within a single chunk or /// aggregated across chunks. diff --git a/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto index a0f16328e3..b83b658f96 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto @@ -31,6 +31,12 @@ message IoxMetadata { // Database checkpoint created at the time of the write. DatabaseCheckpoint database_checkpoint = 7; + + // Timestamp when this file was first written. + FixedSizeTimestamp time_of_first_write = 8; + + // Timestamp when this file was last written. + FixedSizeTimestamp time_of_last_write = 9; } // Partition checkpoint. diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 7b6425ee8c..566dcdb332 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,6 +1,9 @@ -use crate::{metadata::IoxParquetMetaData, storage::Storage}; +use crate::{ + metadata::{IoxMetadata, IoxParquetMetaData}, + storage::Storage, +}; use data_types::{ - partition_metadata::{Statistics, TableSummary}, + partition_metadata::{Statistics, TableSummaryAndTimes}, timestamp::TimestampRange, }; use datafusion::physical_plan::SendableRecordBatchStream; @@ -82,7 +85,7 @@ pub struct ParquetChunk { partition_key: Arc, /// Meta data of the table - table_summary: Arc, + table_summary: Arc, /// Schema that goes with this table's parquet file schema: Arc, @@ -122,6 +125,15 @@ impl ParquetChunk { .context(IoxMetadataReadFailed { path: &file_location, })?; + + let IoxMetadata { + table_name, + time_of_first_write, + time_of_last_write, + partition_key, + .. + } = iox_md; + let schema = parquet_metadata.read_schema().context(SchemaReadFailed { path: &file_location, })?; @@ -130,14 +142,15 @@ impl ParquetChunk { .context(StatisticsReadFailed { path: &file_location, })?; - - let table_summary = TableSummary { - name: iox_md.table_name.to_string(), + let table_summary = TableSummaryAndTimes { + name: table_name.to_string(), columns, + time_of_first_write, + time_of_last_write, }; Ok(Self::new_from_parts( - iox_md.partition_key, + partition_key, Arc::new(table_summary), schema, file_location, @@ -148,11 +161,12 @@ impl ParquetChunk { )) } - /// Creates a new chunk from given parts w/o parsing anything from the provided parquet metadata. + /// Creates a new chunk from given parts w/o parsing anything from the provided parquet + /// metadata. #[allow(clippy::too_many_arguments)] pub(crate) fn new_from_parts( partition_key: Arc, - table_summary: Arc, + table_summary: Arc, schema: Arc, file_location: Path, store: Arc, @@ -186,7 +200,7 @@ impl ParquetChunk { } /// Returns the summary statistics for this chunk - pub fn table_summary(&self) -> &Arc { + pub fn table_summary(&self) -> &Arc { &self.table_summary } @@ -221,8 +235,7 @@ impl ParquetChunk { } } - // Return the columns names that belong to the given column - // selection + // Return the columns names that belong to the given column selection pub fn column_names(&self, selection: Selection<'_>) -> Option> { let fields = self.schema.inner().fields().iter(); @@ -273,7 +286,7 @@ impl ParquetChunk { } /// Extracts min/max values of the timestamp column, from the TableSummary, if possible -fn extract_range(table_summary: &TableSummary) -> Option { +fn extract_range(table_summary: &TableSummaryAndTimes) -> Option { table_summary .column(TIME_COLUMN_NAME) .map(|c| { diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index fbb087f2ff..9f271236aa 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -252,6 +252,9 @@ pub struct IoxMetadata { /// Timestamp when this file was created. pub creation_timestamp: DateTime, + pub time_of_first_write: DateTime, // TODO: METADATA_VERSION? + pub time_of_last_write: DateTime, + /// Table that holds this parquet file. pub table_name: Arc, @@ -285,13 +288,14 @@ impl IoxMetadata { } // extract creation timestamp - let creation_timestamp: DateTime = decode_timestamp( - proto_msg - .creation_timestamp - .context(IoxMetadataFieldMissing { - field: "creation_timestamp", - })?, - )?; + let creation_timestamp = + decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?; + // extract time of first write + let time_of_first_write = + decode_timestamp_from_field(proto_msg.time_of_first_write, "time_of_first_write")?; + // extract time of last write + let time_of_last_write = + decode_timestamp_from_field(proto_msg.time_of_last_write, "time_of_last_write")?; // extract strings let table_name = Arc::from(proto_msg.table_name.as_ref()); @@ -317,12 +321,9 @@ impl IoxMetadata { } }) .collect::>>()?; - let min_unpersisted_timestamp = decode_timestamp( - proto_partition_checkpoint - .min_unpersisted_timestamp - .context(IoxMetadataFieldMissing { - field: "partition_checkpoint.min_unpersisted_timestamp", - })?, + let min_unpersisted_timestamp = decode_timestamp_from_field( + proto_partition_checkpoint.min_unpersisted_timestamp, + "partition_checkpoint.min_unpersisted_timestamp", )?; let partition_checkpoint = PartitionCheckpoint::new( Arc::clone(&table_name), @@ -355,6 +356,8 @@ impl IoxMetadata { Ok(Self { creation_timestamp, + time_of_first_write, + time_of_last_write, table_name, partition_key, chunk_id: proto_msg.chunk_id, @@ -407,6 +410,8 @@ impl IoxMetadata { let proto_msg = proto::IoxMetadata { version: METADATA_VERSION, creation_timestamp: Some(encode_timestamp(self.creation_timestamp)), + time_of_first_write: Some(encode_timestamp(self.time_of_first_write)), + time_of_last_write: Some(encode_timestamp(self.time_of_last_write)), table_name: self.table_name.to_string(), partition_key: self.partition_key.to_string(), chunk_id: self.chunk_id, @@ -439,6 +444,13 @@ fn decode_timestamp(ts: proto::FixedSizeTimestamp) -> Result> { Ok(chrono::DateTime::::from_utc(dt, Utc)) } +fn decode_timestamp_from_field( + value: Option, + field: &'static str, +) -> Result> { + decode_timestamp(value.context(IoxMetadataFieldMissing { field })?) +} + /// Parquet metadata with IOx-specific wrapper. #[derive(Clone, Debug)] pub struct IoxParquetMetaData { @@ -972,6 +984,8 @@ mod tests { chunk_id: 1337, partition_checkpoint, database_checkpoint, + time_of_first_write: Utc::now(), + time_of_last_write: Utc::now(), }; let proto_bytes = metadata.to_protobuf().unwrap(); @@ -1020,10 +1034,12 @@ mod tests { chunk_id: 1337, partition_checkpoint, database_checkpoint, + time_of_first_write: Utc::now(), + time_of_last_write: Utc::now(), }; let proto_bytes = metadata.to_protobuf().unwrap(); - assert_eq!(proto_bytes.len(), 56); + assert_eq!(proto_bytes.len(), 88); } } } diff --git a/parquet_file/src/rebuild.rs b/parquet_file/src/rebuild.rs index b398d281fd..57cfe4703c 100644 --- a/parquet_file/src/rebuild.rs +++ b/parquet_file/src/rebuild.rs @@ -473,6 +473,8 @@ mod tests { chunk_id, partition_checkpoint, database_checkpoint, + time_of_first_write: Utc::now(), + time_of_last_write: Utc::now(), }; let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches)); let (path, file_size_bytes, metadata) = storage diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index ebe9052d18..3b5fb26d50 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -473,6 +473,8 @@ mod tests { chunk_id: 1337, partition_checkpoint, database_checkpoint, + time_of_first_write: Utc::now(), + time_of_last_write: Utc::now(), }; // create parquet file @@ -545,6 +547,8 @@ mod tests { chunk_id, partition_checkpoint, database_checkpoint, + time_of_first_write: Utc::now(), + time_of_last_write: Utc::now(), }; let (path, _file_size_bytes, _metadata) = storage diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index b0a4b74cda..9d040cb621 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -14,7 +14,9 @@ use arrow::{ use chrono::{TimeZone, Utc}; use data_types::{ chunk_metadata::ChunkAddr, - partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, + partition_metadata::{ + ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummaryAndTimes, + }, server_id::ServerId, }; use datafusion::physical_plan::SendableRecordBatchStream; @@ -130,8 +132,12 @@ pub async fn make_chunk_given_record_batch( let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); let storage = Storage::new(Arc::clone(&store), server_id); - let mut table_summary = TableSummary::new(addr.table_name.to_string()); - table_summary.columns = column_summaries; + let table_summary = TableSummaryAndTimes { + name: addr.table_name.to_string(), + columns: column_summaries, + time_of_first_write: Utc.timestamp(30, 40), + time_of_last_write: Utc.timestamp(50, 60), + }; let stream: SendableRecordBatchStream = if record_batches.is_empty() { Box::pin(MemoryStream::new_with_schema( record_batches, @@ -151,6 +157,8 @@ pub async fn make_chunk_given_record_batch( chunk_id: addr.chunk_id, partition_checkpoint, database_checkpoint, + time_of_first_write: Utc.timestamp(30, 40), + time_of_last_write: Utc.timestamp(50, 60), }; let (path, file_size_bytes, parquet_metadata) = storage .write_to_object_store(addr.clone(), stream, metadata) diff --git a/server/src/db.rs b/server/src/db.rs index fcc7dda4d4..02f2215d5b 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2023,7 +2023,7 @@ mod tests { .eq(1.0) .unwrap(); - let expected_parquet_size = 647; + let expected_parquet_size = 671; catalog_chunk_size_bytes_metric_eq( &test_db.metric_registry, "read_buffer", @@ -2471,7 +2471,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2284.0) + .sample_sum_eq(2308.0) .unwrap(); // while MB and RB chunk are identical, the PQ chunk is a new one (split off) @@ -2591,7 +2591,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2284.0) + .sample_sum_eq(2308.0) .unwrap(); // Unload RB chunk but keep it in OS @@ -2621,7 +2621,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(647.0) + .sample_sum_eq(671.0) .unwrap(); // Verify data written to the parquet file in object store @@ -3219,8 +3219,8 @@ mod tests { 2, ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, - 3260, // size of RB and OS chunks - 1479, // size of parquet file + 3284, // size of RB and OS chunks + 1523, // size of parquet file 2, ), ChunkSummary::new_without_timestamps( @@ -3248,14 +3248,15 @@ mod tests { for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) { assert_eq!( expected_summary, actual_summary, - "\n\nexpected:\n{:#?}\n\nactual:\n{:#?}\n\n", - expected_summary, actual_summary + "\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\ + all expected:\n{:#?}\n\nall actual:\n{:#?}", + expected_summary, actual_summary, expected, chunk_summaries ); } assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2446 + 87); assert_eq!(db.catalog.metrics().memory().read_buffer(), 2434); - assert_eq!(db.catalog.metrics().memory().object_store(), 826); + assert_eq!(db.catalog.metrics().memory().object_store(), 850); } #[tokio::test] diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index b1b54ae7a0..c9628a127d 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -336,9 +336,19 @@ impl CatalogChunk { ) -> Self { assert_eq!(chunk.table_name(), addr.table_name.as_ref()); + let summary = chunk.table_summary(); + let first_write = summary.time_of_first_write; + let last_write = summary.time_of_last_write; + + // this is temporary + let table_summary = TableSummary { + name: summary.name.clone(), + columns: summary.columns.clone(), + }; + // Cache table summary + schema let meta = Arc::new(ChunkMetadata { - table_summary: Arc::clone(chunk.table_summary()), + table_summary: Arc::new(table_summary), schema: chunk.schema(), }); @@ -354,8 +364,8 @@ impl CatalogChunk { lifecycle_action: None, metrics, access_recorder: Default::default(), - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: Some(first_write), + time_of_last_write: Some(last_write), time_closed: None, }; chunk.update_metrics(); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 58ed37f7da..3a7f3b70d7 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -207,7 +207,7 @@ impl DbChunk { match &self.state { State::MutableBuffer { chunk } => chunk.table_summary().time_of_first_write, State::ReadBuffer { chunk, .. } => chunk.table_summary().time_of_first_write, - _ => unimplemented!(), + State::ParquetFile { chunk } => chunk.table_summary().time_of_first_write, } } @@ -215,7 +215,7 @@ impl DbChunk { match &self.state { State::MutableBuffer { chunk } => chunk.table_summary().time_of_last_write, State::ReadBuffer { chunk, .. } => chunk.table_summary().time_of_last_write, - _ => unimplemented!(), + State::ParquetFile { chunk } => chunk.table_summary().time_of_last_write, } } } @@ -495,7 +495,10 @@ impl QueryChunkMeta for DbChunk { mod tests { use super::*; use crate::{ - db::{catalog::chunk::CatalogChunk, test_helpers::write_lp}, + db::{ + catalog::chunk::{CatalogChunk, ChunkStage}, + test_helpers::write_lp, + }, utils::make_db, }; use data_types::chunk_metadata::ChunkStorage; @@ -588,7 +591,10 @@ mod tests { async fn parquet_records_access() { let db = make_db().await.db; + let before_creation = Utc::now(); write_lp(&db, "cpu,tag=1 bar=1 1").await; + let after_creation = Utc::now(); + let id = db .persist_partition( "cpu", @@ -598,6 +604,7 @@ mod tests { .await .unwrap() .id; + db.unload_read_buffer("cpu", "1970-01-01T00", id).unwrap(); let chunks = db.catalog.chunks(); @@ -605,7 +612,45 @@ mod tests { let chunk = chunks.into_iter().next().unwrap(); let chunk = chunk.read(); assert_eq!(chunk.storage().1, ChunkStorage::ObjectStoreOnly); + let first_write = chunk.time_of_first_write().unwrap(); + let last_write = chunk.time_of_last_write().unwrap(); + assert_eq!(first_write, last_write); + assert!(before_creation < first_write); + assert!(last_write < after_creation); test_chunk_access(&chunk).await } + + #[tokio::test] + async fn parquet_snapshot() { + let db = make_db().await.db; + + let before_creation = Utc::now(); + write_lp(&db, "cpu,tag=1 bar=1 1").await; + let after_creation = Utc::now(); + write_lp(&db, "cpu,tag=2 bar=2 2").await; + let after_write = Utc::now(); + + db.persist_partition( + "cpu", + "1970-01-01T00", + Instant::now() + Duration::from_secs(10000), + ) + .await + .unwrap(); + + let chunks = db.catalog.chunks(); + assert_eq!(chunks.len(), 1); + let chunk = chunks.into_iter().next().unwrap(); + let chunk = chunk.read(); + assert!(matches!(chunk.stage(), ChunkStage::Persisted { .. })); + let snapshot = DbChunk::parquet_file_snapshot(&chunk); + + let first_write = snapshot.time_of_first_write(); + let last_write = snapshot.time_of_last_write(); + assert!(before_creation < first_write); + assert!(first_write < after_creation); + assert!(first_write < last_write); + assert!(last_write < after_write); + } } diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index 1076adce47..fe3bad1f1e 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -62,6 +62,9 @@ pub(super) fn write_chunk_to_object_store( chunk.set_writing_to_object_store(®istration)?; let db_chunk = DbChunk::snapshot(&*chunk); + let time_of_first_write = db_chunk.time_of_first_write(); + let time_of_last_write = db_chunk.time_of_last_write(); + debug!(chunk=%chunk.addr(), "chunk marked WRITING , loading tables into object store"); // Drop locks @@ -118,6 +121,8 @@ pub(super) fn write_chunk_to_object_store( chunk_id: addr.chunk_id, partition_checkpoint, database_checkpoint, + time_of_first_write, + time_of_last_write, }; let (path, file_size_bytes, parquet_metadata) = storage .write_to_object_store(addr, stream, metadata)