diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 32be1c0d53..88386dd05c 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -746,12 +746,8 @@ impl Compactor { ); let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum(); - let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?; debug!("got {} rows from stream {}", row_count, i); - if row_count == 0 { - continue; - } // Compute min and max of the `time` column let (min_time, max_time) = @@ -771,7 +767,6 @@ impl Compactor { time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number, max_sequence_number, - row_count, compaction_level: 1, // compacted result file always have level 1 sort_key: Some(sort_key.clone()), }; @@ -2410,7 +2405,6 @@ mod tests { time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 1, // level of compacted data is always 1 sort_key: Some(SortKey::from_columns(["tag1", "time"])), }; @@ -2558,7 +2552,6 @@ mod tests { time_of_last_write: compactor.time_provider.now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 1, // file level of compacted file is always 1 sort_key: None, }; diff --git a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto index 45a0fa00c2..3fbeb4f493 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto @@ -7,6 +7,13 @@ import "google/protobuf/timestamp.proto"; // IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata // under a single key. message IoxMetadata { + // Removed as the Parquet metadata itself contains the row count, and + // specifying it here creates a dependency that prevents streaming + // serialisation (needing to know the number rows before you can serialise + // your parquet file with this metadata structure within it) + reserved 14; + reserved "row_count"; + // Object store ID. Used in the parquet filename. 16 bytes in big-endian order. bytes object_store_id = 1; @@ -46,9 +53,6 @@ message IoxMetadata { // The maximum sequence number from a sequencer in this parquet file. int64 max_sequence_number = 13; - // Number of rows of data in this file. - int64 row_count = 14; - // The sort key of this chunk SortKey sort_key = 15; diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 2f43dce91a..77227c3037 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -105,9 +105,6 @@ pub async fn compact_persisting_batch( .filter(|b| b.num_rows() != 0) .collect(); - let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum(); - let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?; - // Compute min and max of the `time` column let (min_time, max_time) = compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?; @@ -129,7 +126,6 @@ pub async fn compact_persisting_batch( time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: min_seq, max_sequence_number: max_seq, - row_count, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(metadata_sort_key), }; @@ -337,7 +333,6 @@ mod tests { 20000, seq_num_start, seq_num_end, - 3, INITIAL_COMPACTION_LEVEL, Some(SortKey::from_columns(["tag1", "time"])), ); @@ -432,7 +427,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // Sort key should now be set Some(SortKey::from_columns(["tag1", "tag3", "time"])), @@ -530,7 +524,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should be the same as specified (that is, not // recomputed) @@ -628,7 +621,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should be updated to include the new column just before // the time column @@ -729,7 +721,6 @@ mod tests { 220000, seq_num_start, seq_num_end, - 4, INITIAL_COMPACTION_LEVEL, // The sort key in the metadata should only contain the columns in this file Some(SortKey::from_columns(["tag3", "tag1", "time"])), diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index 1c4e458be7..f984efcabe 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -77,7 +77,6 @@ mod tests { time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 0, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: None, }; @@ -111,7 +110,6 @@ mod tests { time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: None, }; diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 36381c33dc..1ff68b94e6 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -64,8 +64,6 @@ pub async fn make_persisting_batch_with_meta() -> (Arc, Vec (Arc, Vec, ) -> IoxMetadata { @@ -157,7 +153,6 @@ pub fn make_meta( time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: SequenceNumber::new(min_sequence_number), max_sequence_number: SequenceNumber::new(max_sequence_number), - row_count, compaction_level, sort_key, } diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 3a60ebbf4b..f33a925d78 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -521,7 +521,6 @@ impl TestPartition { time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number, max_sequence_number, - row_count: row_count as i64, compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(sort_key.clone()), }; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index b35c506c70..7e8771b827 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -279,9 +279,6 @@ pub struct IoxMetadata { /// sequence number of the last write pub max_sequence_number: SequenceNumber, - /// number of rows of data - pub row_count: i64, - /// the compaction level of the file pub compaction_level: i16, @@ -317,7 +314,6 @@ impl IoxMetadata { time_of_last_write: Some(self.time_of_last_write.date_time().into()), min_sequence_number: self.min_sequence_number.get(), max_sequence_number: self.max_sequence_number.get(), - row_count: self.row_count, sort_key, compaction_level: self.compaction_level as i32, }; @@ -377,7 +373,6 @@ impl IoxMetadata { time_of_last_write, min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number), max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number), - row_count: proto_msg.row_count, sort_key, compaction_level: proto_msg.compaction_level as i16, }) @@ -394,6 +389,7 @@ impl IoxMetadata { file_size_bytes: usize, metadata: &IoxParquetMetaData, ) -> ParquetFileParams { + let row_count = metadata.decode().expect("invalid metadata").row_count(); ParquetFileParams { sequencer_id: self.sequencer_id, namespace_id: self.namespace_id, @@ -406,8 +402,8 @@ impl IoxMetadata { max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()), file_size_bytes: file_size_bytes as i64, parquet_metadata: metadata.thrift_bytes().to_vec(), - row_count: self.row_count, compaction_level: self.compaction_level, + row_count: row_count.try_into().expect("row count overflows i64"), created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()), } } @@ -878,7 +874,6 @@ mod tests { time_of_last_write: Time::from_timestamp(3234, 3456), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), - row_count: 3, compaction_level: 0, sort_key: Some(sort_key), }; @@ -906,7 +901,6 @@ mod tests { time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialise.rs index 4d27e1b561..e8e8286534 100644 --- a/parquet_file/src/serialise.rs +++ b/parquet_file/src/serialise.rs @@ -180,7 +180,6 @@ mod tests { time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 56d22cc3a6..9b731bbee3 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -294,7 +294,6 @@ mod tests { time_of_last_write: Time::from_timestamp_nanos(424242), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), - row_count: 1000, compaction_level: 1, sort_key: None, }; diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs index b507b17201..315e182423 100644 --- a/parquet_file/tests/metadata.rs +++ b/parquet_file/tests/metadata.rs @@ -47,7 +47,6 @@ async fn test_decoded_iox_metadata() { min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, - row_count: 12341234, sort_key: None, }; @@ -116,7 +115,6 @@ async fn test_derive_parquet_file_params() { min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), compaction_level: 1, - row_count: 12341234, sort_key: None, }; @@ -157,11 +155,11 @@ async fn test_derive_parquet_file_params() { assert_eq!(catalog_data.file_size_bytes, file_size as i64); assert_eq!(catalog_data.compaction_level, meta.compaction_level); assert_eq!(catalog_data.created_at, Timestamp::new(1234)); + assert_eq!(catalog_data.row_count, 3); // NOTE: these DO NOT reflect the actual values! These values were not // derived from the actual data, but instead trusted from the input // IoxMetadata. - assert_eq!(catalog_data.row_count, meta.row_count); assert_eq!(catalog_data.min_time, Timestamp::new(4242)); assert_eq!(catalog_data.max_time, Timestamp::new(424242)); }