From 71555ee55ccde8489b8981f600edd5c779262213 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 May 2022 14:28:22 +0100 Subject: [PATCH 1/3] test: Parquet metadata integration test Adds two integration tests covering validation of the embedded IOx metadata within the Parquet file metadata, and validation of the derived ParquetFileParams metadata used to populate the catalog. --- parquet_file/tests/metadata.rs | 183 +++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 parquet_file/tests/metadata.rs diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs new file mode 100644 index 0000000000..b507b17201 --- /dev/null +++ b/parquet_file/tests/metadata.rs @@ -0,0 +1,183 @@ +use std::sync::Arc; + +use arrow::{ + array::{ArrayRef, StringBuilder, TimestampNanosecondBuilder}, + record_batch::RecordBatch, +}; +use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp}; +use iox_time::Time; +use object_store::DynObjectStore; +use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; +use schema::{builder::SchemaBuilder, InfluxFieldType, TIME_COLUMN_NAME}; + +#[tokio::test] +async fn test_decoded_iox_metadata() { + // A representative IOx data sample (with a time column, an invariant upheld + // in the IOx write path) + let data = [ + ( + TIME_COLUMN_NAME, + to_timestamp_array(&[ + // NOTE: not ordered to ensure min/max is derived, not head/tail + 1646917692000000000, + 1653311292000000000, + 1647695292000000000, + ]), + ), + ( + "some_field", + to_string_array(&["bananas", "platanos", "manzana"]), + ), + ]; + + // And the metadata the batch would be encoded with if it came through the + // IOx write path. + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + time_of_first_write: Time::from_timestamp_nanos(4242), + time_of_last_write: Time::from_timestamp_nanos(424242), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + row_count: 12341234, + sort_key: None, + }; + + let batch = RecordBatch::try_from_iter(data).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + let storage = ParquetStorage::new(object_store); + + let (iox_parquet_meta, file_size) = storage + .upload(stream, &meta) + .await + .expect("failed to serialise & persist record batch"); + + // Sanity check - can't assert the actual value. + assert!(file_size > 0); + + // Decode the IOx metadata embedded in the parquet file metadata. + let decoded = iox_parquet_meta + .decode() + .expect("failed to decode parquet file metadata"); + + // And verify the metadata matches the expected values. + assert_eq!( + decoded.row_count(), + 3, + "row count statistics does not match input row count" + ); + + let got = decoded + .read_iox_metadata_new() + .expect("failed to deserialise embedded IOx metadata"); + assert_eq!( + got, meta, + "embedded metadata does not match original metadata" + ); +} + +#[tokio::test] +async fn test_derive_parquet_file_params() { + // A representative IOx data sample (with a time column, an invariant upheld + // in the IOx write path) + let data = vec![ + to_string_array(&["bananas", "platanos", "manzana"]), + to_timestamp_array(&[ + 1646917692000000000, + 1653311292000000000, + 1647695292000000000, + ]), + ]; + + // And the metadata the batch would be encoded with if it came through the + // IOx write path. + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(1234), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + time_of_first_write: Time::from_timestamp_nanos(4242), + time_of_last_write: Time::from_timestamp_nanos(424242), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + row_count: 12341234, + sort_key: None, + }; + + // Build a schema that contains the IOx metadata, ensuring it is correctly + // populated in the final parquet file's metadata. + let schema = SchemaBuilder::new() + .influx_field("some_field", InfluxFieldType::String) + .timestamp() + .build() + .expect("could not create schema") + .as_arrow(); + + let batch = RecordBatch::try_new(schema, data).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + let storage = ParquetStorage::new(object_store); + + let (iox_parquet_meta, file_size) = storage + .upload(stream, &meta) + .await + .expect("failed to serialise & persist record batch"); + + // Use the IoxParquetMetaData and original IoxMetadata to derive a + // ParquetFileParams. + let catalog_data = meta.to_parquet_file(file_size, &iox_parquet_meta); + + // And verify the resulting statistics used in the catalog. + // + // NOTE: thrift-encoded metadata not checked + assert_eq!(catalog_data.sequencer_id, meta.sequencer_id); + assert_eq!(catalog_data.namespace_id, meta.namespace_id); + assert_eq!(catalog_data.table_id, meta.table_id); + assert_eq!(catalog_data.partition_id, meta.partition_id); + assert_eq!(catalog_data.object_store_id, meta.object_store_id); + assert_eq!(catalog_data.min_sequence_number, meta.min_sequence_number); + assert_eq!(catalog_data.max_sequence_number, meta.max_sequence_number); + assert_eq!(catalog_data.file_size_bytes, file_size as i64); + assert_eq!(catalog_data.compaction_level, meta.compaction_level); + assert_eq!(catalog_data.created_at, Timestamp::new(1234)); + + // NOTE: these DO NOT reflect the actual values! These values were not + // derived from the actual data, but instead trusted from the input + // IoxMetadata. + assert_eq!(catalog_data.row_count, meta.row_count); + assert_eq!(catalog_data.min_time, Timestamp::new(4242)); + assert_eq!(catalog_data.max_time, Timestamp::new(424242)); +} + +fn to_string_array(strs: &[&str]) -> ArrayRef { + let mut builder = StringBuilder::new(strs.len()); + for s in strs { + builder.append_value(s).expect("appending string"); + } + Arc::new(builder.finish()) +} + +fn to_timestamp_array(timestamps: &[i64]) -> ArrayRef { + let mut builder = TimestampNanosecondBuilder::new(timestamps.len()); + builder + .append_slice(timestamps) + .expect("failed to append timestamp values"); + Arc::new(builder.finish()) +} From a142a9eb570f098d5b492744151e9f4d4e5c0f9c Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 20 May 2022 17:18:11 +0100 Subject: [PATCH 2/3] refactor: remove row_count from IoxMetadata Remove the redundant row_count from the IoxMetadata structure that is serialised into the Parquet file. The reasoning is twofold: * The Parquet file's native metadata already contains a row count * Needing to know the number of rows up-front precludes streaming --- compactor/src/compact.rs | 7 ------- .../influxdata/iox/ingester/v1/parquet_metadata.proto | 10 +++++++--- ingester/src/compact.rs | 9 --------- ingester/src/persist.rs | 2 -- ingester/src/test_util.rs | 5 ----- iox_tests/src/util.rs | 1 - parquet_file/src/metadata.rs | 10 ++-------- parquet_file/src/serialise.rs | 1 - parquet_file/src/storage.rs | 1 - parquet_file/tests/metadata.rs | 4 +--- 10 files changed, 10 insertions(+), 40 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 32be1c0d53..88386dd05c 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -746,12 +746,8 @@ impl Compactor { ); let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum(); - let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?; debug!("got {} rows from stream {}", row_count, i); - if row_count == 0 { - continue; - } // Compute min and max of the `time` column let (min_time, max_time) = @@ -771,7 +767,6 @@ impl Compactor { time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number, max_sequence_number, - row_count, compaction_level: 1, // compacted result file always have level 1 sort_key: Some(sort_key.clone()), }; @@ -2410,7 +2405,6 @@ mod tests { time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 1, // level of compacted data is always 1 sort_key: Some(SortKey::from_columns(["tag1", "time"])), }; @@ -2558,7 +2552,6 @@ mod tests { time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 1, // file level of compacted file is always 1 sort_key: None, }; diff --git a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto index 45a0fa00c2..3fbeb4f493 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto @@ -7,6 +7,13 @@ import "google/protobuf/timestamp.proto"; // IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata // under a single key. message IoxMetadata { + // Removed as the Parquet metadata itself contains the row count, and + // specifying it here creates a dependency that prevents streaming + // serialisation (needing to know the number rows before you can serialise + // your parquet file with this metadata structure within it) + reserved 14; + reserved "row_count"; + // Object store ID. Used in the parquet filename. 16 bytes in big-endian order. bytes object_store_id = 1; @@ -46,9 +53,6 @@ message IoxMetadata { // The maximum sequence number from a sequencer in this parquet file. int64 max_sequence_number = 13; - // Number of rows of data in this file. - int64 row_count = 14; - // The sort key of this chunk SortKey sort_key = 15; diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 2f43dce91a..77227c3037 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -105,9 +105,6 @@ pub async fn compact_persisting_batch( .filter(|b| b.num_rows() != 0) .collect(); - let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum(); - let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?; - // Compute min and max of the `time` column let (min_time, max_time) = compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?; @@ -129,7 +126,6 @@ pub async fn compact_persisting_batch( time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: min_seq, max_sequence_number: max_seq, - row_count, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(metadata_sort_key), }; @@ -337,7 +333,6 @@ mod tests { 20000, seq_num_start, seq_num_end, - 3, INITIAL_COMPACTION_LEVEL, Some(SortKey::from_columns(["tag1", "time"])), ); @@ -432,7 +427,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // Sort key should now be set Some(SortKey::from_columns(["tag1", "tag3", "time"])), @@ -530,7 +524,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should be the same as specified (that is, not // recomputed) @@ -628,7 +621,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should be updated to include the new column just before // the time column @@ -729,7 +721,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should only contain the columns in this file Some(SortKey::from_columns(["tag3", "tag1", "time"])), diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index 1c4e458be7..f984efcabe 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -77,7 +77,6 @@ mod tests { time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 0, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: None, }; @@ -111,7 +110,6 @@ mod tests { time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: None, }; diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 36381c33dc..1ff68b94e6 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -64,8 +64,6 @@ pub async fn make_persisting_batch_with_meta() -> (Arc, Vec (Arc, Vec, ) -> IoxMetadata { @@ -157,7 +153,6 @@ pub fn make_meta( time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: SequenceNumber::new(min_sequence_number), max_sequence_number: SequenceNumber::new(max_sequence_number), - row_count, compaction_level, sort_key, } diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 3a60ebbf4b..f33a925d78 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -521,7 +521,6 @@ impl TestPartition { time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number, max_sequence_number, - row_count: row_count as i64, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(sort_key.clone()), }; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index b35c506c70..7e8771b827 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -279,9 +279,6 @@ pub struct IoxMetadata { /// sequence number of the last write pub max_sequence_number: SequenceNumber, - /// number of rows of data - pub row_count: i64, - /// the compaction level of the file pub compaction_level: i16, @@ -317,7 +314,6 @@ impl IoxMetadata { time_of_last_write: Some(self.time_of_last_write.date_time().into()), min_sequence_number: self.min_sequence_number.get(), max_sequence_number: self.max_sequence_number.get(), - row_count: self.row_count, sort_key, compaction_level: self.compaction_level as i32, }; @@ -377,7 +373,6 @@ impl IoxMetadata { time_of_last_write, min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number), max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number), - row_count: proto_msg.row_count, sort_key, compaction_level: proto_msg.compaction_level as i16, }) @@ -394,6 +389,7 @@ impl IoxMetadata { file_size_bytes: usize, metadata: &IoxParquetMetaData, ) -> ParquetFileParams { + let row_count = metadata.decode().expect("invalid metadata").row_count(); ParquetFileParams { sequencer_id: self.sequencer_id, namespace_id: self.namespace_id, @@ -406,8 +402,8 @@ impl IoxMetadata { max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()), file_size_bytes: file_size_bytes as i64, parquet_metadata: metadata.thrift_bytes().to_vec(), - row_count: self.row_count, compaction_level: self.compaction_level, + row_count: row_count.try_into().expect("row count overflows i64"), created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()), } } @@ -878,7 +874,6 @@ mod tests { time_of_last_write: Time::from_timestamp(3234, 3456), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 0, sort_key: Some(sort_key), }; @@ -906,7 +901,6 @@ mod tests { time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialise.rs index 4d27e1b561..e8e8286534 100644 --- a/parquet_file/src/serialise.rs +++ b/parquet_file/src/serialise.rs @@ -180,7 +180,6 @@ mod tests { time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 56d22cc3a6..9b731bbee3 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -294,7 +294,6 @@ mod tests { time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs index b507b17201..315e182423 100644 --- a/parquet_file/tests/metadata.rs +++ b/parquet_file/tests/metadata.rs @@ -47,7 +47,6 @@ async fn test_decoded_iox_metadata() { min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, - row_count: 12341234, sort_key: None, }; @@ -116,7 +115,6 @@ async fn test_derive_parquet_file_params() { min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, - row_count: 12341234, sort_key: None, }; @@ -157,11 +155,11 @@ async fn test_derive_parquet_file_params() { assert_eq!(catalog_data.file_size_bytes, file_size as i64); assert_eq!(catalog_data.compaction_level, meta.compaction_level); assert_eq!(catalog_data.created_at, Timestamp::new(1234)); + assert_eq!(catalog_data.row_count, 3); // NOTE: these DO NOT reflect the actual values! These values were not // derived from the actual data, but instead trusted from the input // IoxMetadata. - assert_eq!(catalog_data.row_count, meta.row_count); assert_eq!(catalog_data.min_time, Timestamp::new(4242)); assert_eq!(catalog_data.max_time, Timestamp::new(424242)); } From 2e6c49be83ec7529267d7685ed9e5b975dee9e38 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 May 2022 11:06:47 +0100 Subject: [PATCH 3/3] refactor: remove IoxMetadata min & max timestamp Removes the min/max timestamp fields from the IoxMetadata proto structure embedded within a Parquet file's metadata. These values are redundant as they already exist within the Parquet column statistics, and precluded streaming serialisation as these removed min/max values were needed before serialising the file. --- compactor/src/compact.rs | 13 +--- .../iox/ingester/v1/parquet_metadata.proto | 18 ++--- ingester/src/compact.rs | 19 +---- ingester/src/persist.rs | 4 -- ingester/src/test_util.rs | 6 -- iox_tests/src/util.rs | 2 - parquet_file/src/metadata.rs | 70 ++++++++++++------- parquet_file/src/serialise.rs | 2 - parquet_file/src/storage.rs | 2 - parquet_file/tests/metadata.rs | 13 +--- 10 files changed, 55 insertions(+), 94 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 88386dd05c..84b67676d0 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -21,10 +21,9 @@ use iox_query::{ exec::{Executor, ExecutorType}, frontend::reorg::ReorgPlanner, provider::overlap::group_potential_duplicates, - util::compute_timenanosecond_min_max, QueryChunk, }; -use iox_time::{Time, TimeProvider}; +use iox_time::TimeProvider; use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions}; use observability_deps::tracing::{debug, info, trace, warn}; use parquet_file::{ @@ -749,10 +748,6 @@ impl Compactor { debug!("got {} rows from stream {}", row_count, i); - // Compute min and max of the `time` column - let (min_time, max_time) = - compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?; - let meta = IoxMetadata { object_store_id: Uuid::new_v4(), creation_timestamp: self.time_provider.now(), @@ -763,8 +758,6 @@ impl Compactor { table_name: Arc::::clone(&iox_metadata.table_name), partition_id: iox_metadata.partition_id, partition_key: Arc::::clone(&iox_metadata.partition_key), - time_of_first_write: Time::from_timestamp_nanos(min_time), - time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number, max_sequence_number, compaction_level: 1, // compacted result file always have level 1 @@ -2401,8 +2394,6 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: compactor.time_provider.now(), - time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), compaction_level: 1, // level of compacted data is always 1 @@ -2548,8 +2539,6 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: compactor.time_provider.now(), - time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), compaction_level: 1, // file level of compacted file is always 1 diff --git a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto index 3fbeb4f493..e9e4d85e75 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto @@ -7,12 +7,12 @@ import "google/protobuf/timestamp.proto"; // IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata // under a single key. message IoxMetadata { - // Removed as the Parquet metadata itself contains the row count, and - // specifying it here creates a dependency that prevents streaming - // serialisation (needing to know the number rows before you can serialise - // your parquet file with this metadata structure within it) - reserved 14; - reserved "row_count"; + // Removed as the Parquet metadata itself contains the row count & min/max + // timestamps, and specifying them here creates a dependency that prevents + // streaming serialisation (needing to know the number rows before you can + // serialise your parquet file with this metadata structure within it) + reserved 10, 11, 14; + reserved "row_count", "time_of_first_write", "time_of_last_write"; // Object store ID. Used in the parquet filename. 16 bytes in big-endian order. bytes object_store_id = 1; @@ -41,12 +41,6 @@ message IoxMetadata { // Partition key of the partition that holds this parquet file. string partition_key = 9; - // Wallclock timestamp of when the first data in this file was received by IOx. - google.protobuf.Timestamp time_of_first_write = 10; - - // Wallclock timestamp of when the last data in this file was received by IOx. - google.protobuf.Timestamp time_of_last_write = 11; - // The minimum sequence number from a sequencer in this parquet file. int64 min_sequence_number = 12; diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 77227c3037..c37db0a960 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -8,10 +8,9 @@ use iox_catalog::interface::INITIAL_COMPACTION_LEVEL; use iox_query::{ exec::{Executor, ExecutorType}, frontend::reorg::ReorgPlanner, - util::compute_timenanosecond_min_max, QueryChunk, QueryChunkMeta, }; -use iox_time::{Time, TimeProvider}; +use iox_time::TimeProvider; use parquet_file::metadata::IoxMetadata; use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; use snafu::{ResultExt, Snafu}; @@ -105,10 +104,6 @@ pub async fn compact_persisting_batch( .filter(|b| b.num_rows() != 0) .collect(); - // Compute min and max of the `time` column - let (min_time, max_time) = - compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?; - // Compute min and max sequence numbers let (min_seq, max_seq) = batch.data.min_max_sequence_numbers(); @@ -122,8 +117,6 @@ pub async fn compact_persisting_batch( table_name: Arc::from(table_name.as_str()), partition_id: batch.partition_id, partition_key: Arc::from(partition_key.as_str()), - time_of_first_write: Time::from_timestamp_nanos(min_time), - time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: min_seq, max_sequence_number: max_seq, compaction_level: INITIAL_COMPACTION_LEVEL, @@ -329,8 +322,6 @@ mod tests { table_name, partition_id, partition_key, - 8000, - 20000, seq_num_start, seq_num_end, INITIAL_COMPACTION_LEVEL, @@ -423,8 +414,6 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, INITIAL_COMPACTION_LEVEL, @@ -520,8 +509,6 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, INITIAL_COMPACTION_LEVEL, @@ -617,8 +604,6 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, INITIAL_COMPACTION_LEVEL, @@ -717,8 +702,6 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, INITIAL_COMPACTION_LEVEL, diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index f984efcabe..ec40724526 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -73,8 +73,6 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: now(), - time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), compaction_level: INITIAL_COMPACTION_LEVEL, @@ -106,8 +104,6 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: now(), - time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), compaction_level: INITIAL_COMPACTION_LEVEL, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 1ff68b94e6..09b6d52836 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -89,8 +89,6 @@ pub async fn make_persisting_batch_with_meta() -> (Arc, Vec, - /// Time of the first write of the data - /// This is also the min value of the column `time` - pub time_of_first_write: Time, - - /// Time of the last write of the data - /// This is also the max value of the column `time` - pub time_of_last_write: Time, - /// sequence number of the first write pub min_sequence_number: SequenceNumber, @@ -310,8 +302,6 @@ impl IoxMetadata { table_name: self.table_name.to_string(), partition_id: self.partition_id.get(), partition_key: self.partition_key.to_string(), - time_of_first_write: Some(self.time_of_first_write.date_time().into()), - time_of_last_write: Some(self.time_of_last_write.date_time().into()), min_sequence_number: self.min_sequence_number.get(), max_sequence_number: self.max_sequence_number.get(), sort_key, @@ -334,12 +324,6 @@ impl IoxMetadata { // extract creation timestamp let creation_timestamp = decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?; - // extract time of first write - let time_of_first_write = - decode_timestamp_from_field(proto_msg.time_of_first_write, "time_of_first_write")?; - // extract time of last write - let time_of_last_write = - decode_timestamp_from_field(proto_msg.time_of_last_write, "time_of_last_write")?; // extract strings let namespace_name = Arc::from(proto_msg.namespace_name.as_ref()); @@ -369,8 +353,6 @@ impl IoxMetadata { table_name, partition_id: PartitionId::new(proto_msg.partition_id), partition_key, - time_of_first_write, - time_of_last_write, min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number), max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number), sort_key, @@ -384,12 +366,52 @@ impl IoxMetadata { } /// Create a corresponding iox catalog's ParquetFile + /// + /// # Panics + /// + /// This method panics if the [`IoxParquetMetaData`] structure does not + /// contain valid metadata bytes, has no readable schema, or has no field + /// statistics. + /// + /// A [`RecordBatch`] serialised without the embedded metadata found in the + /// IOx [`Schema`] type will cause a statistic resolution failure due to + /// lack of the IOx field type metadata for the time column. Batches + /// produced from the through the IOx write path always include this + /// metadata. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch pub fn to_parquet_file( &self, file_size_bytes: usize, metadata: &IoxParquetMetaData, ) -> ParquetFileParams { - let row_count = metadata.decode().expect("invalid metadata").row_count(); + let decoded = metadata.decode().expect("invalid IOx metadata"); + let row_count = decoded.row_count(); + + // Derive the min/max timestamp from the Parquet column statistics. + let schema = decoded + .read_schema() + .expect("failed to read encoded schema"); + let time_summary = decoded + .read_statistics(&*schema) + .expect("invalid statistics") + .into_iter() + .find(|v| v.name == TIME_COLUMN_NAME) + .expect("no time column in metadata statistics"); + + // Sanity check the type of this column before using the values. + assert_eq!(time_summary.influxdb_type, Some(InfluxDbType::Timestamp)); + + // Extract the min/max timestamps. + let (min_time, max_time) = match time_summary.stats { + Statistics::I64(stats) => { + let min = Timestamp::new(stats.min.expect("no min time statistic")); + let max = Timestamp::new(stats.max.expect("no max time statistic")); + (min, max) + } + _ => panic!("unexpected physical type for timestamp column"), + }; + ParquetFileParams { sequencer_id: self.sequencer_id, namespace_id: self.namespace_id, @@ -398,8 +420,8 @@ impl IoxMetadata { object_store_id: self.object_store_id, min_sequence_number: self.min_sequence_number, max_sequence_number: self.max_sequence_number, - min_time: Timestamp::new(self.time_of_first_write.timestamp_nanos()), - max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()), + min_time, + max_time, file_size_bytes: file_size_bytes as i64, parquet_metadata: metadata.thrift_bytes().to_vec(), compaction_level: self.compaction_level, @@ -870,8 +892,6 @@ mod tests { table_name: Arc::from("weather"), partition_id: PartitionId::new(4), partition_key: Arc::from("part"), - time_of_first_write: Time::from_timestamp(3234, 0), - time_of_last_write: Time::from_timestamp(3234, 3456), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), compaction_level: 0, @@ -897,8 +917,6 @@ mod tests { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialise.rs index e8e8286534..3d0b8f8838 100644 --- a/parquet_file/src/serialise.rs +++ b/parquet_file/src/serialise.rs @@ -176,8 +176,6 @@ mod tests { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 9b731bbee3..1ac5de4705 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -290,8 +290,6 @@ mod tests { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs index 315e182423..68acd09dd5 100644 --- a/parquet_file/tests/metadata.rs +++ b/parquet_file/tests/metadata.rs @@ -42,8 +42,6 @@ async fn test_decoded_iox_metadata() { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, @@ -92,6 +90,7 @@ async fn test_derive_parquet_file_params() { let data = vec![ to_string_array(&["bananas", "platanos", "manzana"]), to_timestamp_array(&[ + // NOTE: not ordered to ensure min/max extracted, not head/tail 1646917692000000000, 1653311292000000000, 1647695292000000000, @@ -110,8 +109,6 @@ async fn test_derive_parquet_file_params() { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, @@ -156,12 +153,8 @@ async fn test_derive_parquet_file_params() { assert_eq!(catalog_data.compaction_level, meta.compaction_level); assert_eq!(catalog_data.created_at, Timestamp::new(1234)); assert_eq!(catalog_data.row_count, 3); - - // NOTE: these DO NOT reflect the actual values! These values were not - // derived from the actual data, but instead trusted from the input - // IoxMetadata. - assert_eq!(catalog_data.min_time, Timestamp::new(4242)); - assert_eq!(catalog_data.max_time, Timestamp::new(424242)); + assert_eq!(catalog_data.min_time, Timestamp::new(1646917692000000000)); + assert_eq!(catalog_data.max_time, Timestamp::new(1653311292000000000)); } fn to_string_array(strs: &[&str]) -> ArrayRef {