diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 32be1c0d53..84b67676d0 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -21,10 +21,9 @@ use iox_query::{ exec::{Executor, ExecutorType}, frontend::reorg::ReorgPlanner, provider::overlap::group_potential_duplicates, - util::compute_timenanosecond_min_max, QueryChunk, }; -use iox_time::{Time, TimeProvider}; +use iox_time::TimeProvider; use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions}; use observability_deps::tracing::{debug, info, trace, warn}; use parquet_file::{ @@ -746,16 +745,8 @@ impl Compactor { ); let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum(); - let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?; debug!("got {} rows from stream {}", row_count, i); - if row_count == 0 { - continue; - } - - // Compute min and max of the `time` column - let (min_time, max_time) = - compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?; let meta = IoxMetadata { object_store_id: Uuid::new_v4(), @@ -767,11 +758,8 @@ impl Compactor { table_name: Arc::::clone(&iox_metadata.table_name), partition_id: iox_metadata.partition_id, partition_key: Arc::::clone(&iox_metadata.partition_key), - time_of_first_write: Time::from_timestamp_nanos(min_time), - time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number, max_sequence_number, - row_count, compaction_level: 1, // compacted result file always have level 1 sort_key: Some(sort_key.clone()), }; @@ -2406,11 +2394,8 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: compactor.time_provider.now(), - time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 1, // level of compacted data is always 1 sort_key: Some(SortKey::from_columns(["tag1", "time"])), }; @@ -2554,11 +2539,8 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: compactor.time_provider.now(), - time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 1, // file level of compacted file is always 1 sort_key: None, }; diff --git a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto index 45a0fa00c2..e9e4d85e75 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto @@ -7,6 +7,13 @@ import "google/protobuf/timestamp.proto"; // IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata // under a single key. message IoxMetadata { + // Removed as the Parquet metadata itself contains the row count & min/max + // timestamps, and specifying them here creates a dependency that prevents + // streaming serialisation (needing to know the number rows before you can + // serialise your parquet file with this metadata structure within it) + reserved 10, 11, 14; + reserved "row_count", "time_of_first_write", "time_of_last_write"; + // Object store ID. Used in the parquet filename. 16 bytes in big-endian order. bytes object_store_id = 1; @@ -34,21 +41,12 @@ message IoxMetadata { // Partition key of the partition that holds this parquet file. string partition_key = 9; - // Wallclock timestamp of when the first data in this file was received by IOx. - google.protobuf.Timestamp time_of_first_write = 10; - - // Wallclock timestamp of when the last data in this file was received by IOx. - google.protobuf.Timestamp time_of_last_write = 11; - // The minimum sequence number from a sequencer in this parquet file. int64 min_sequence_number = 12; // The maximum sequence number from a sequencer in this parquet file. int64 max_sequence_number = 13; - // Number of rows of data in this file. - int64 row_count = 14; - // The sort key of this chunk SortKey sort_key = 15; diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 2f43dce91a..c37db0a960 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -8,10 +8,9 @@ use iox_catalog::interface::INITIAL_COMPACTION_LEVEL; use iox_query::{ exec::{Executor, ExecutorType}, frontend::reorg::ReorgPlanner, - util::compute_timenanosecond_min_max, QueryChunk, QueryChunkMeta, }; -use iox_time::{Time, TimeProvider}; +use iox_time::TimeProvider; use parquet_file::metadata::IoxMetadata; use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; use snafu::{ResultExt, Snafu}; @@ -105,13 +104,6 @@ pub async fn compact_persisting_batch( .filter(|b| b.num_rows() != 0) .collect(); - let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum(); - let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?; - - // Compute min and max of the `time` column - let (min_time, max_time) = - compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?; - // Compute min and max sequence numbers let (min_seq, max_seq) = batch.data.min_max_sequence_numbers(); @@ -125,11 +117,8 @@ pub async fn compact_persisting_batch( table_name: Arc::from(table_name.as_str()), partition_id: batch.partition_id, partition_key: Arc::from(partition_key.as_str()), - time_of_first_write: Time::from_timestamp_nanos(min_time), - time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: min_seq, max_sequence_number: max_seq, - row_count, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(metadata_sort_key), }; @@ -333,11 +322,8 @@ mod tests { table_name, partition_id, partition_key, - 8000, - 20000, seq_num_start, seq_num_end, - 3, INITIAL_COMPACTION_LEVEL, Some(SortKey::from_columns(["tag1", "time"])), ); @@ -428,11 +414,8 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // Sort key should now be set Some(SortKey::from_columns(["tag1", "tag3", "time"])), @@ -526,11 +509,8 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should be the same as specified (that is, not // recomputed) @@ -624,11 +604,8 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should be updated to include the new column just before // the time column @@ -725,11 +702,8 @@ mod tests { table_name, partition_id, partition_key, - 28000, - 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should only contain the columns in this file Some(SortKey::from_columns(["tag3", "tag1", "time"])), diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index 1c4e458be7..ec40724526 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -73,11 +73,8 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: now(), - time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 0, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: None, }; @@ -107,11 +104,8 @@ mod tests { table_name: "temperature".into(), partition_id: PartitionId::new(4), partition_key: "somehour".into(), - time_of_first_write: now(), - time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: None, }; diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 36381c33dc..09b6d52836 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -64,8 +64,6 @@ pub async fn make_persisting_batch_with_meta() -> (Arc, Vec (Arc, Vec, ) -> IoxMetadata { @@ -153,11 +145,8 @@ pub fn make_meta( table_name: Arc::from(table_name), partition_id: PartitionId::new(partition_id), partition_key: Arc::from(partition_key), - time_of_first_write: Time::from_timestamp_nanos(min_time), - time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: SequenceNumber::new(min_sequence_number), max_sequence_number: SequenceNumber::new(max_sequence_number), - row_count, compaction_level, sort_key, } diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 3a60ebbf4b..2ce89c0fb9 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -517,11 +517,8 @@ impl TestPartition { table_name: self.table.table.name.clone().into(), partition_id: self.partition.id, partition_key: self.partition.partition_key.clone().into(), - time_of_first_write: Time::from_timestamp_nanos(min_time), - time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number, max_sequence_number, - row_count: row_count as i64, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(sort_key.clone()), }; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index b35c506c70..074eac3e27 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -108,7 +108,7 @@ use parquet::{ use prost::Message; use schema::{ sort::{SortKey, SortKeyBuilder}, - InfluxColumnType, InfluxFieldType, Schema, + InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME, }; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{convert::TryInto, sync::Arc}; @@ -265,23 +265,12 @@ pub struct IoxMetadata { /// parittion key of the data pub partition_key: Arc, - /// Time of the first write of the data - /// This is also the min value of the column `time` - pub time_of_first_write: Time, - - /// Time of the last write of the data - /// This is also the max value of the column `time` - pub time_of_last_write: Time, - /// sequence number of the first write pub min_sequence_number: SequenceNumber, /// sequence number of the last write pub max_sequence_number: SequenceNumber, - /// number of rows of data - pub row_count: i64, - /// the compaction level of the file pub compaction_level: i16, @@ -313,11 +302,8 @@ impl IoxMetadata { table_name: self.table_name.to_string(), partition_id: self.partition_id.get(), partition_key: self.partition_key.to_string(), - time_of_first_write: Some(self.time_of_first_write.date_time().into()), - time_of_last_write: Some(self.time_of_last_write.date_time().into()), min_sequence_number: self.min_sequence_number.get(), max_sequence_number: self.max_sequence_number.get(), - row_count: self.row_count, sort_key, compaction_level: self.compaction_level as i32, }; @@ -338,12 +324,6 @@ impl IoxMetadata { // extract creation timestamp let creation_timestamp = decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?; - // extract time of first write - let time_of_first_write = - decode_timestamp_from_field(proto_msg.time_of_first_write, "time_of_first_write")?; - // extract time of last write - let time_of_last_write = - decode_timestamp_from_field(proto_msg.time_of_last_write, "time_of_last_write")?; // extract strings let namespace_name = Arc::from(proto_msg.namespace_name.as_ref()); @@ -373,11 +353,8 @@ impl IoxMetadata { table_name, partition_id: PartitionId::new(proto_msg.partition_id), partition_key, - time_of_first_write, - time_of_last_write, min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number), max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number), - row_count: proto_msg.row_count, sort_key, compaction_level: proto_msg.compaction_level as i16, }) @@ -389,11 +366,52 @@ impl IoxMetadata { } /// Create a corresponding iox catalog's ParquetFile + /// + /// # Panics + /// + /// This method panics if the [`IoxParquetMetaData`] structure does not + /// contain valid metadata bytes, has no readable schema, or has no field + /// statistics. + /// + /// A [`RecordBatch`] serialised without the embedded metadata found in the + /// IOx [`Schema`] type will cause a statistic resolution failure due to + /// lack of the IOx field type metadata for the time column. Batches + /// produced from the through the IOx write path always include this + /// metadata. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch pub fn to_parquet_file( &self, file_size_bytes: usize, metadata: &IoxParquetMetaData, ) -> ParquetFileParams { + let decoded = metadata.decode().expect("invalid IOx metadata"); + let row_count = decoded.row_count(); + + // Derive the min/max timestamp from the Parquet column statistics. + let schema = decoded + .read_schema() + .expect("failed to read encoded schema"); + let time_summary = decoded + .read_statistics(&*schema) + .expect("invalid statistics") + .into_iter() + .find(|v| v.name == TIME_COLUMN_NAME) + .expect("no time column in metadata statistics"); + + // Sanity check the type of this column before using the values. + assert_eq!(time_summary.influxdb_type, Some(InfluxDbType::Timestamp)); + + // Extract the min/max timestamps. + let (min_time, max_time) = match time_summary.stats { + Statistics::I64(stats) => { + let min = Timestamp::new(stats.min.expect("no min time statistic")); + let max = Timestamp::new(stats.max.expect("no max time statistic")); + (min, max) + } + _ => panic!("unexpected physical type for timestamp column"), + }; + ParquetFileParams { sequencer_id: self.sequencer_id, namespace_id: self.namespace_id, @@ -402,12 +420,12 @@ impl IoxMetadata { object_store_id: self.object_store_id, min_sequence_number: self.min_sequence_number, max_sequence_number: self.max_sequence_number, - min_time: Timestamp::new(self.time_of_first_write.timestamp_nanos()), - max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()), + min_time, + max_time, file_size_bytes: file_size_bytes as i64, parquet_metadata: metadata.thrift_bytes().to_vec(), - row_count: self.row_count, compaction_level: self.compaction_level, + row_count: row_count.try_into().expect("row count overflows i64"), created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()), } } @@ -874,11 +892,8 @@ mod tests { table_name: Arc::from("weather"), partition_id: PartitionId::new(4), partition_key: Arc::from("part"), - time_of_first_write: Time::from_timestamp(3234, 0), - time_of_last_write: Time::from_timestamp(3234, 3456), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 0, sort_key: Some(sort_key), }; @@ -902,11 +917,8 @@ mod tests { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialise.rs index 4d27e1b561..3d0b8f8838 100644 --- a/parquet_file/src/serialise.rs +++ b/parquet_file/src/serialise.rs @@ -176,11 +176,8 @@ mod tests { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 56d22cc3a6..1ac5de4705 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -290,11 +290,8 @@ mod tests { table_name: "platanos".into(), partition_id: PartitionId::new(4), partition_key: "potato".into(), - time_of_first_write: Time::from_timestamp_nanos(4242), - time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs new file mode 100644 index 0000000000..68acd09dd5 --- /dev/null +++ b/parquet_file/tests/metadata.rs @@ -0,0 +1,174 @@ +use std::sync::Arc; + +use arrow::{ + array::{ArrayRef, StringBuilder, TimestampNanosecondBuilder}, + record_batch::RecordBatch, +}; +use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp}; +use iox_time::Time; +use object_store::DynObjectStore; +use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; +use schema::{builder::SchemaBuilder, InfluxFieldType, TIME_COLUMN_NAME}; + +#[tokio::test] +async fn test_decoded_iox_metadata() { + // A representative IOx data sample (with a time column, an invariant upheld + // in the IOx write path) + let data = [ + ( + TIME_COLUMN_NAME, + to_timestamp_array(&[ + // NOTE: not ordered to ensure min/max is derived, not head/tail + 1646917692000000000, + 1653311292000000000, + 1647695292000000000, + ]), + ), + ( + "some_field", + to_string_array(&["bananas", "platanos", "manzana"]), + ), + ]; + + // And the metadata the batch would be encoded with if it came through the + // IOx write path. + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + sort_key: None, + }; + + let batch = RecordBatch::try_from_iter(data).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + let storage = ParquetStorage::new(object_store); + + let (iox_parquet_meta, file_size) = storage + .upload(stream, &meta) + .await + .expect("failed to serialise & persist record batch"); + + // Sanity check - can't assert the actual value. + assert!(file_size > 0); + + // Decode the IOx metadata embedded in the parquet file metadata. + let decoded = iox_parquet_meta + .decode() + .expect("failed to decode parquet file metadata"); + + // And verify the metadata matches the expected values. + assert_eq!( + decoded.row_count(), + 3, + "row count statistics does not match input row count" + ); + + let got = decoded + .read_iox_metadata_new() + .expect("failed to deserialise embedded IOx metadata"); + assert_eq!( + got, meta, + "embedded metadata does not match original metadata" + ); +} + +#[tokio::test] +async fn test_derive_parquet_file_params() { + // A representative IOx data sample (with a time column, an invariant upheld + // in the IOx write path) + let data = vec![ + to_string_array(&["bananas", "platanos", "manzana"]), + to_timestamp_array(&[ + // NOTE: not ordered to ensure min/max extracted, not head/tail + 1646917692000000000, + 1653311292000000000, + 1647695292000000000, + ]), + ]; + + // And the metadata the batch would be encoded with if it came through the + // IOx write path. + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(1234), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + sort_key: None, + }; + + // Build a schema that contains the IOx metadata, ensuring it is correctly + // populated in the final parquet file's metadata. + let schema = SchemaBuilder::new() + .influx_field("some_field", InfluxFieldType::String) + .timestamp() + .build() + .expect("could not create schema") + .as_arrow(); + + let batch = RecordBatch::try_new(schema, data).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + let storage = ParquetStorage::new(object_store); + + let (iox_parquet_meta, file_size) = storage + .upload(stream, &meta) + .await + .expect("failed to serialise & persist record batch"); + + // Use the IoxParquetMetaData and original IoxMetadata to derive a + // ParquetFileParams. + let catalog_data = meta.to_parquet_file(file_size, &iox_parquet_meta); + + // And verify the resulting statistics used in the catalog. + // + // NOTE: thrift-encoded metadata not checked + assert_eq!(catalog_data.sequencer_id, meta.sequencer_id); + assert_eq!(catalog_data.namespace_id, meta.namespace_id); + assert_eq!(catalog_data.table_id, meta.table_id); + assert_eq!(catalog_data.partition_id, meta.partition_id); + assert_eq!(catalog_data.object_store_id, meta.object_store_id); + assert_eq!(catalog_data.min_sequence_number, meta.min_sequence_number); + assert_eq!(catalog_data.max_sequence_number, meta.max_sequence_number); + assert_eq!(catalog_data.file_size_bytes, file_size as i64); + assert_eq!(catalog_data.compaction_level, meta.compaction_level); + assert_eq!(catalog_data.created_at, Timestamp::new(1234)); + assert_eq!(catalog_data.row_count, 3); + assert_eq!(catalog_data.min_time, Timestamp::new(1646917692000000000)); + assert_eq!(catalog_data.max_time, Timestamp::new(1653311292000000000)); +} + +fn to_string_array(strs: &[&str]) -> ArrayRef { + let mut builder = StringBuilder::new(strs.len()); + for s in strs { + builder.append_value(s).expect("appending string"); + } + Arc::new(builder.finish()) +} + +fn to_timestamp_array(timestamps: &[i64]) -> ArrayRef { + let mut builder = TimestampNanosecondBuilder::new(timestamps.len()); + builder + .append_slice(timestamps) + .expect("failed to append timestamp values"); + Arc::new(builder.finish()) +}