From 252ced7adff817f88eee61ba135464dff65ed876 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" <193874+carols10cents@users.noreply.github.com> Date: Thu, 24 Feb 2022 10:20:50 -0500 Subject: [PATCH] feat: Add row count to the parquet_file record in the catalog (#3847) Fixes #3842. --- .../iox/ingester/v1/parquet_metadata.proto | 3 +++ ingester/src/compact.rs | 8 +++++++ ingester/src/data.rs | 1 + ingester/src/persist.rs | 3 +++ ingester/src/test_util.rs | 9 ++++++-- ...23192306_add_row_count_to_parquet_file.sql | 4 ++++ iox_catalog/src/interface.rs | 21 +++++++++++++------ iox_catalog/src/lib.rs | 1 + iox_catalog/src/mem.rs | 2 ++ iox_catalog/src/metrics.rs | 2 +- iox_catalog/src/postgres.rs | 6 ++++-- parquet_file/src/metadata.rs | 7 +++++++ 12 files changed, 56 insertions(+), 11 deletions(-) create mode 100644 iox_catalog/migrations/20220223192306_add_row_count_to_parquet_file.sql diff --git a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto index 0083c942d4..0e15874659 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto @@ -44,4 +44,7 @@ message IoxMetadata { // The maximum sequence number from a sequencer in this parquet file. int64 max_sequence_number = 13; + + // Number of rows of data in this file. + int64 row_count = 14; } diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 4e8245976a..a8d273dcb1 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -48,6 +48,9 @@ pub enum Error { #[snafu(display("Error while casting Timenanosecond on Time column"))] TimeCasting, + + #[snafu(display("Could not convert row count to i64"))] + RowCountTypeConversion { source: std::num::TryFromIntError }, } /// A specialized `Error` for Ingester's Compact errors @@ -131,6 +134,9 @@ pub async fn compact_persisting_batch( .filter(|b| b.num_rows() != 0) .collect(); + let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum(); + let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?; + // Compute min and max of the `time` column let (min_time, max_time) = compute_timenanosecond_min_max(&output_batches)?; @@ -151,6 +157,7 @@ pub async fn compact_persisting_batch( time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: min_seq, max_sequence_number: max_seq, + row_count, }; Ok(Some((output_batches, meta))) @@ -342,6 +349,7 @@ mod tests { 20000, seq_num_start, seq_num_end, + 3, ); assert_eq!(expected_meta, meta); } diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 1a539850cf..a6b3775bb5 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -268,6 +268,7 @@ impl Persister for IngesterData { parquet_file.max_time, parquet_file.file_size_bytes, parquet_file.parquet_metadata.clone(), + parquet_file.row_count, ) .await }) diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index 752d943fd9..9056b70bac 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -139,6 +139,7 @@ mod tests { time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), + row_count: 0, }; let object_store = object_store(); @@ -163,6 +164,7 @@ mod tests { time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), + row_count: 3, }; let chunk1 = Arc::new( @@ -201,6 +203,7 @@ mod tests { time_of_last_write: now(), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), + row_count: 0, }; let path = parquet_file_object_store_path(&metadata, &object_store); diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 863a4bd22f..a122391294 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -13,10 +13,10 @@ use std::sync::Arc; use time::{SystemProvider, Time, TimeProvider}; use uuid::Uuid; -/// Create a persting batch, some tombstones and corresponding metadata fot them after compaction +/// Create a persisting batch, some tombstones and corresponding metadata for them after compaction pub async fn make_persisting_batch_with_meta() -> (Arc, Vec, IoxMetadata) { - // record bacthes of input data + // record batches of input data let batches = create_batches_with_influxtype_different_columns_different_order().await; // tombstones @@ -49,6 +49,8 @@ pub async fn make_persisting_batch_with_meta() -> (Arc, Vec (Arc, Vec IoxMetadata { IoxMetadata { object_store_id, @@ -135,6 +139,7 @@ pub fn make_meta( time_of_last_write: Time::from_timestamp_nanos(max_time), min_sequence_number: SequenceNumber::new(min_sequence_number), max_sequence_number: SequenceNumber::new(max_sequence_number), + row_count, } } diff --git a/iox_catalog/migrations/20220223192306_add_row_count_to_parquet_file.sql b/iox_catalog/migrations/20220223192306_add_row_count_to_parquet_file.sql new file mode 100644 index 0000000000..c94ff815ee --- /dev/null +++ b/iox_catalog/migrations/20220223192306_add_row_count_to_parquet_file.sql @@ -0,0 +1,4 @@ +ALTER TABLE + IF EXISTS parquet_file +ADD + COLUMN row_count BIGINT NOT NULL DEFAULT 0; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 26fa67d019..5cbd54e3c6 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -592,6 +592,7 @@ pub trait ParquetFileRepo: Send + Sync { max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec, + row_count: i64, ) -> Result; /// Flag the parquet file for deletion @@ -1041,6 +1042,8 @@ pub struct ParquetFile { pub file_size_bytes: i64, /// thrift-encoded parquet metadata pub parquet_metadata: Vec, + /// the number of rows of data in this file + pub row_count: i64, } /// Data for a processed tombstone reference in the catalog. @@ -1539,9 +1542,9 @@ pub(crate) mod test_helpers { let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - // Must have no rows - let row_count = repos.parquet_files().count().await.unwrap(); - assert_eq!(row_count, 0); + // Must have no parquet file records + let num_parquet_files = repos.parquet_files().count().await.unwrap(); + assert_eq!(num_parquet_files, 0); let parquet_file = repos .parquet_files() @@ -1556,6 +1559,7 @@ pub(crate) mod test_helpers { max_time, 1337, b"md1".to_vec(), + 0, ) .await .unwrap(); @@ -1574,6 +1578,7 @@ pub(crate) mod test_helpers { max_time, 1338, b"md2".to_vec(), + 0, ) .await .unwrap_err(); @@ -1592,13 +1597,14 @@ pub(crate) mod test_helpers { max_time, 1339, b"md3".to_vec(), + 0, ) .await .unwrap(); - // Must have 2 rows - let row_count = repos.parquet_files().count().await.unwrap(); - assert_eq!(row_count, 2); + // Must have 2 parquet files + let num_parquet_files = repos.parquet_files().count().await.unwrap(); + assert_eq!(num_parquet_files, 2); let exist_id = parquet_file.id; let non_exist_id = ParquetFileId::new(other_file.id.get() + 10); @@ -1719,6 +1725,7 @@ pub(crate) mod test_helpers { to_delete: false, file_size_bytes: 1337, parquet_metadata: b"md1".to_vec(), + row_count: 0, }; let other_parquet = ParquetFile { id: ParquetFileId::new(0), //fake id that will never be used @@ -1733,6 +1740,7 @@ pub(crate) mod test_helpers { to_delete: false, file_size_bytes: 1338, parquet_metadata: b"md2".to_vec(), + row_count: 0, }; let another_parquet = ParquetFile { id: ParquetFileId::new(0), //fake id that will never be used @@ -1747,6 +1755,7 @@ pub(crate) mod test_helpers { to_delete: false, file_size_bytes: 1339, parquet_metadata: b"md3".to_vec(), + row_count: 0, }; let parquet_file_count_before = txn.parquet_files().count().await.unwrap(); diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index d3d19faa65..426c0ca3fb 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -204,6 +204,7 @@ pub async fn add_parquet_file_with_tombstones( parquet_file.max_time, parquet_file.file_size_bytes, parquet_file.parquet_metadata.clone(), + parquet_file.row_count, ) .await?; diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index b9b7afccaf..1a2cfd4374 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -649,6 +649,7 @@ impl ParquetFileRepo for MemTxn { max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec, + row_count: i64, ) -> Result { let stage = self.stage(); @@ -670,6 +671,7 @@ impl ParquetFileRepo for MemTxn { max_sequence_number, min_time, max_time, + row_count, to_delete: false, file_size_bytes, parquet_metadata, diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index b42ca1d02f..f5d48511b0 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -233,7 +233,7 @@ decorate!( decorate!( impl_trait = ParquetFileRepo, methods = [ - "parquet_create" = create( &mut self, sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId, object_store_id: Uuid, min_sequence_number: SequenceNumber, max_sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec) -> Result; + "parquet_create" = create( &mut self, sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId, object_store_id: Uuid, min_sequence_number: SequenceNumber, max_sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec, row_count: i64) -> Result; "parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>; "parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result>; "parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 35fb4a13a7..b303611ebe 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -962,11 +962,12 @@ impl ParquetFileRepo for PostgresTxn { max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec, + row_count: i64, ) -> Result { let rec = sqlx::query_as::<_, ParquetFile>( r#" -INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, parquet_metadata ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, false, $9, $10 ) +INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, parquet_metadata, row_count ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, false, $9, $10, $11 ) RETURNING * "#, ) @@ -980,6 +981,7 @@ RETURNING * .bind(max_time) // $8 .bind(file_size_bytes) // $9 .bind(parquet_metadata) // $10 + .bind(row_count) // $11 .fetch_one(&mut self.inner) .await .map_err(|e| { diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index cc53f884ed..ff0ad4d6e4 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -510,6 +510,9 @@ pub struct IoxMetadata { /// sequence number of the last write pub max_sequence_number: SequenceNumber, + + /// number of rows of data + pub row_count: i64, } impl IoxMetadata { @@ -529,6 +532,7 @@ impl IoxMetadata { time_of_last_write: Some(self.time_of_last_write.date_time().into()), min_sequence_number: self.min_sequence_number.get(), max_sequence_number: self.max_sequence_number.get(), + row_count: self.row_count, }; let mut buf = Vec::new(); @@ -584,6 +588,7 @@ impl IoxMetadata { time_of_last_write, min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number), max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number), + row_count: proto_msg.row_count, }) } @@ -615,6 +620,7 @@ impl IoxMetadata { to_delete: false, file_size_bytes: file_size_bytes as i64, parquet_metadata: metadata.thrift_bytes().to_vec(), + row_count: self.row_count, } } } @@ -1223,6 +1229,7 @@ mod tests { time_of_last_write: Time::from_timestamp(3234, 3456), min_sequence_number: SequenceNumber::new(5), max_sequence_number: SequenceNumber::new(6), + row_count: 3, }; let proto = iox_metadata.to_protobuf().unwrap();