From ff31407dceaca2db86269d7777f887f64a3a394a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 9 Mar 2022 16:50:52 -0500 Subject: [PATCH] refactor: Extract a ParquetFileParams type for create This has the advantages of: - Not needing to create fake parquet file IDs or fake deleted_at values that aren't used by create before insertion - Not needing too many arguments for create - Naming the arguments so it's easier to see what value is what argument, especially in tests - Easier to reuse arguments or parts of arguments by using copies of params, which makes it easier to see differences, especially in tests --- data_types2/src/lib.rs | 33 ++++- ingester/src/data.rs | 52 +++---- iox_catalog/src/interface.rs | 253 ++++++++++++----------------------- iox_catalog/src/lib.rs | 25 +--- iox_catalog/src/mem.rs | 42 +++--- iox_catalog/src/metrics.rs | 9 +- iox_catalog/src/postgres.rs | 42 +++--- iox_tests/src/util.rs | 36 ++--- parquet_file/src/metadata.rs | 10 +- 9 files changed, 206 insertions(+), 296 deletions(-) diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index 001907b63e..2d8458fd57 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -619,7 +619,7 @@ pub struct Tombstone { pub serialized_predicate: String, } -/// Data for a parquet file reference in the catalog. +/// Data for a parquet file reference that has been inserted in the catalog. #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] pub struct ParquetFile { /// the id of the file in the catalog @@ -654,6 +654,37 @@ pub struct ParquetFile { pub created_at: Timestamp, } +/// Data for a parquet file to be inserted into the catalog. +#[derive(Debug, Clone, PartialEq)] +pub struct ParquetFileParams { + /// the sequencer that sequenced writes that went into this file + pub sequencer_id: SequencerId, + /// the table + pub table_id: TableId, + /// the partition + pub partition_id: PartitionId, + /// the uuid used in the object store path for this file + pub object_store_id: Uuid, + /// the minimum sequence number from a record in this file + pub min_sequence_number: SequenceNumber, + /// the maximum sequence number from a record in this file + pub max_sequence_number: SequenceNumber, + /// the min timestamp of data in this file + pub min_time: Timestamp, + /// the max timestamp of data in this file + pub max_time: Timestamp, + /// file size in bytes + pub file_size_bytes: i64, + /// thrift-encoded parquet metadata + pub parquet_metadata: Vec, + /// the number of rows of data in this file + pub row_count: i64, + /// the compaction level of the file + pub compaction_level: i16, + /// the creation time of the parquet file + pub created_at: Timestamp, +} + /// The starting compaction level for parquet files is zero. pub const INITIAL_COMPACTION_LEVEL: i16 = 0; diff --git a/ingester/src/data.rs b/ingester/src/data.rs index ae36d19af9..55a90d1ab6 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -242,24 +242,7 @@ impl Persister for IngesterData { Backoff::new(&self.backoff_config) .retry_all_errors("add parquet file to catalog", || async { let mut repos = self.catalog.repositories().await; - repos - .parquet_files() - .create( - parquet_file.sequencer_id, - parquet_file.table_id, - parquet_file.partition_id, - parquet_file.object_store_id, - parquet_file.min_sequence_number, - parquet_file.max_sequence_number, - parquet_file.min_time, - parquet_file.max_time, - parquet_file.file_size_bytes, - parquet_file.parquet_metadata.clone(), - parquet_file.row_count, - parquet_file.compaction_level, - parquet_file.created_at, - ) - .await + repos.parquet_files().create(parquet_file.clone()).await }) .await .expect("retry forever"); @@ -1262,7 +1245,7 @@ mod tests { use super::*; use crate::{lifecycle::LifecycleConfig, test_util::create_tombstone}; use arrow_util::assert_batches_sorted_eq; - use data_types2::{NamespaceSchema, Sequence}; + use data_types2::{NamespaceSchema, ParquetFileParams, Sequence}; use dml::{DmlMeta, DmlWrite}; use futures::TryStreamExt; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; @@ -1909,23 +1892,24 @@ mod tests { .create_or_get("1970-01-01", sequencer.id, table.id) .await .unwrap(); + let parquet_file_params = ParquetFileParams { + sequencer_id: sequencer.id, + table_id: table.id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(0), + max_sequence_number: SequenceNumber::new(1), + min_time: Timestamp::new(1), + max_time: Timestamp::new(1), + file_size_bytes: 0, + parquet_metadata: vec![], + row_count: 0, + compaction_level: 0, + created_at: Timestamp::new(1), + }; repos .parquet_files() - .create( - sequencer.id, - table.id, - partition.id, - Uuid::new_v4(), - SequenceNumber::new(0), - SequenceNumber::new(1), - Timestamp::new(1), - Timestamp::new(1), - 0, - vec![], - 0, - 0, - Timestamp::new(1), - ) + .create(parquet_file_params) .await .unwrap(); std::mem::drop(repos); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 39c3d2c1e2..868f95fb9e 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -3,9 +3,9 @@ use async_trait::async_trait; use data_types2::{ Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, - NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId, - PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, - SequencerId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId, + NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, + PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, + Sequencer, SequencerId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId, }; use snafu::{OptionExt, Snafu}; use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc}; @@ -410,23 +410,7 @@ pub trait TombstoneRepo: Send + Sync { #[async_trait] pub trait ParquetFileRepo: Send + Sync { /// create the parquet file - #[allow(clippy::too_many_arguments)] - async fn create( - &mut self, - sequencer_id: SequencerId, - table_id: TableId, - partition_id: PartitionId, - object_store_id: Uuid, - min_sequence_number: SequenceNumber, - max_sequence_number: SequenceNumber, - min_time: Timestamp, - max_time: Timestamp, - file_size_bytes: i64, - parquet_metadata: Vec, - row_count: i64, - level: i16, - created_at: Timestamp, - ) -> Result; + async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result; /// Flag the parquet file for deletion async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>; @@ -759,23 +743,24 @@ pub(crate) mod test_helpers { .create_or_get("1970-01-01", seq.id, t.id) .await .unwrap(); + let parquet_file_params = ParquetFileParams { + sequencer_id: seq.id, + table_id: t.id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(513), + min_time: Timestamp::new(1), + max_time: Timestamp::new(2), + file_size_bytes: 0, + parquet_metadata: vec![], + row_count: 0, + compaction_level: 0, + created_at: Timestamp::new(1), + }; let p1 = repos .parquet_files() - .create( - seq.id, - t.id, - partition.id, - Uuid::new_v4(), - SequenceNumber::new(10), - SequenceNumber::new(513), - Timestamp::new(1), - Timestamp::new(2), - 0, - vec![], - 0, - 0, - Timestamp::new(1), - ) + .create(parquet_file_params.clone()) .await .unwrap(); let ti = repos @@ -795,23 +780,15 @@ pub(crate) mod test_helpers { ); // and with another parquet file persisted + let parquet_file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(514), + max_sequence_number: SequenceNumber::new(1008), + ..parquet_file_params + }; let p1 = repos .parquet_files() - .create( - seq.id, - t.id, - partition.id, - Uuid::new_v4(), - SequenceNumber::new(514), - SequenceNumber::new(1008), - Timestamp::new(1), - Timestamp::new(2), - 0, - vec![], - 0, - 0, - Timestamp::new(1), - ) + .create(parquet_file_params.clone()) .await .unwrap(); let ti = repos @@ -1286,67 +1263,44 @@ pub(crate) mod test_helpers { let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); + let parquet_file_params = ParquetFileParams { + sequencer_id: sequencer.id, + table_id: partition.table_id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(140), + min_time, + max_time, + file_size_bytes: 1337, + parquet_metadata: b"md1".to_vec(), + row_count: 0, + compaction_level: 0, + created_at: Timestamp::new(1), + }; let parquet_file = repos .parquet_files() - .create( - sequencer.id, - partition.table_id, - partition.id, - Uuid::new_v4(), - SequenceNumber::new(10), - SequenceNumber::new(140), - min_time, - max_time, - 1337, - b"md1".to_vec(), - 0, - 0, - Timestamp::new(1), - ) + .create(parquet_file_params.clone()) .await .unwrap(); // verify that trying to create a file with the same UUID throws an error let err = repos .parquet_files() - .create( - sequencer.id, - partition.table_id, - partition.id, - parquet_file.object_store_id, - SequenceNumber::new(10), - SequenceNumber::new(140), - min_time, - max_time, - 1338, - b"md2".to_vec(), - 0, - 0, - Timestamp::new(1), - ) + .create(parquet_file_params.clone()) .await .unwrap_err(); assert!(matches!(err, Error::FileExists { object_store_id: _ })); - let other_file = repos - .parquet_files() - .create( - sequencer.id, - other_partition.table_id, - other_partition.id, - Uuid::new_v4(), - SequenceNumber::new(45), - SequenceNumber::new(200), - min_time, - max_time, - 1339, - b"md3".to_vec(), - 0, - 0, - Timestamp::new(1), - ) - .await - .unwrap(); + let other_params = ParquetFileParams { + table_id: other_partition.table_id, + partition_id: other_partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(45), + max_sequence_number: SequenceNumber::new(200), + ..parquet_file_params.clone() + }; + let other_file = repos.parquet_files().create(other_params).await.unwrap(); let exist_id = parquet_file.id; let non_exist_id = ParquetFileId::new(other_file.id.get() + 10); @@ -1404,44 +1358,24 @@ pub(crate) mod test_helpers { .await .unwrap(); assert_eq!(Vec::::new(), files); + + let f1_params = ParquetFileParams { + table_id: partition2.table_id, + partition_id: partition2.id, + object_store_id: Uuid::new_v4(), + ..parquet_file_params + }; let f1 = repos .parquet_files() - .create( - sequencer.id, - partition2.table_id, - partition2.id, - Uuid::new_v4(), - SequenceNumber::new(10), - SequenceNumber::new(140), - min_time, - max_time, - 1337, - b"md4".to_vec(), - 0, - 0, - Timestamp::new(1), - ) - .await - .unwrap(); - let f2 = repos - .parquet_files() - .create( - sequencer.id, - partition2.table_id, - partition2.id, - Uuid::new_v4(), - SequenceNumber::new(10), - SequenceNumber::new(140), - min_time, - max_time, - 1337, - b"md4".to_vec(), - 0, - 0, - Timestamp::new(1), - ) + .create(f1_params.clone()) .await .unwrap(); + + let f2_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + ..f1_params + }; + let f2 = repos.parquet_files().create(f2_params).await.unwrap(); let files = repos .parquet_files() .list_by_namespace_not_to_delete(namespace2.id) @@ -1533,9 +1467,8 @@ pub(crate) mod test_helpers { .await .unwrap(); - // Prepare metadata in form of ParquetFile to get added with tombstone - let parquet = ParquetFile { - id: ParquetFileId::new(0), //fake id that will never be used + // Prepare metadata in form of ParquetFileParams to get added with tombstone + let parquet = ParquetFileParams { sequencer_id: sequencer.id, table_id: table.id, partition_id: partition.id, @@ -1544,46 +1477,23 @@ pub(crate) mod test_helpers { max_sequence_number: SequenceNumber::new(10), min_time, max_time, - to_delete: false, file_size_bytes: 1337, parquet_metadata: b"md1".to_vec(), row_count: 0, compaction_level: 0, created_at: Timestamp::new(1), }; - let other_parquet = ParquetFile { - id: ParquetFileId::new(0), //fake id that will never be used - sequencer_id: sequencer.id, - table_id: table.id, - partition_id: partition.id, + let other_parquet = ParquetFileParams { object_store_id: Uuid::new_v4(), min_sequence_number: SequenceNumber::new(11), max_sequence_number: SequenceNumber::new(20), - min_time, - max_time, - to_delete: false, - file_size_bytes: 1338, - parquet_metadata: b"md2".to_vec(), - row_count: 0, - compaction_level: 0, - created_at: Timestamp::new(1), + ..parquet.clone() }; - let another_parquet = ParquetFile { - id: ParquetFileId::new(0), //fake id that will never be used - sequencer_id: sequencer.id, - table_id: table.id, - partition_id: partition.id, + let another_parquet = ParquetFileParams { object_store_id: Uuid::new_v4(), min_sequence_number: SequenceNumber::new(21), max_sequence_number: SequenceNumber::new(30), - min_time, - max_time, - to_delete: false, - file_size_bytes: 1339, - parquet_metadata: b"md3".to_vec(), - row_count: 0, - compaction_level: 0, - created_at: Timestamp::new(1), + ..parquet.clone() }; let parquet_file_count_before = txn.parquet_files().count().await.unwrap(); @@ -1592,10 +1502,13 @@ pub(crate) mod test_helpers { // Add parquet and processed tombstone in one transaction let mut txn = catalog.start_transaction().await.unwrap(); - let (parquet_file, p_tombstones) = - add_parquet_file_with_tombstones(&parquet, &[t1.clone(), t2.clone()], txn.deref_mut()) - .await - .unwrap(); + let (parquet_file, p_tombstones) = add_parquet_file_with_tombstones( + parquet.clone(), + &[t1.clone(), t2.clone()], + txn.deref_mut(), + ) + .await + .unwrap(); txn.commit().await.unwrap(); assert_eq!(p_tombstones.len(), 2); assert_eq!(t1.id, p_tombstones[0].tombstone_id); @@ -1625,7 +1538,7 @@ pub(crate) mod test_helpers { // Error due to duplicate parquet file let mut txn = catalog.start_transaction().await.unwrap(); - add_parquet_file_with_tombstones(&parquet, &[t3.clone(), t1.clone()], txn.deref_mut()) + add_parquet_file_with_tombstones(parquet, &[t3.clone(), t1.clone()], txn.deref_mut()) .await .unwrap_err(); txn.abort().await.unwrap(); @@ -1640,7 +1553,7 @@ pub(crate) mod test_helpers { // Add new parquet and new tombstone. Should go trhough let (parquet_file, p_tombstones) = - add_parquet_file_with_tombstones(&other_parquet, &[t3.clone()], txn.deref_mut()) + add_parquet_file_with_tombstones(other_parquet, &[t3.clone()], txn.deref_mut()) .await .unwrap(); assert_eq!(p_tombstones.len(), 1); @@ -1664,7 +1577,7 @@ pub(crate) mod test_helpers { let mut txn = catalog.start_transaction().await.unwrap(); let mut t4 = t3.clone(); t4.id = TombstoneId::new(t4.id.get() + 10); - add_parquet_file_with_tombstones(&another_parquet, &[t4], txn.deref_mut()) + add_parquet_file_with_tombstones(another_parquet, &[t4], txn.deref_mut()) .await .unwrap_err(); txn.abort().await.unwrap(); diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 142decef44..a410f9f0b9 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -13,8 +13,8 @@ use crate::interface::{Error, Result, Transaction}; use data_types2::{ - ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, ParquetFile, ProcessedTombstone, - QueryPool, Sequencer, SequencerId, TableSchema, Tombstone, + ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, ParquetFile, ParquetFileParams, + ProcessedTombstone, QueryPool, Sequencer, SequencerId, TableSchema, Tombstone, }; use interface::{ColumnUpsertRequest, RepoCollection}; use mutable_batch::MutableBatch; @@ -196,29 +196,12 @@ pub async fn create_or_get_default_records( // TODO: this function is no longer needed in the ingester. It might be needed by the Compactor /// Insert the conpacted parquet file and its tombstones pub async fn add_parquet_file_with_tombstones( - parquet_file: &ParquetFile, + parquet_file: ParquetFileParams, tombstones: &[Tombstone], txn: &mut dyn Transaction, ) -> Result<(ParquetFile, Vec), Error> { // create a parquet file in the catalog first - let parquet = txn - .parquet_files() - .create( - parquet_file.sequencer_id, - parquet_file.table_id, - parquet_file.partition_id, - parquet_file.object_store_id, - parquet_file.min_sequence_number, - parquet_file.max_sequence_number, - parquet_file.min_time, - parquet_file.max_time, - parquet_file.file_size_bytes, - parquet_file.parquet_metadata.clone(), - parquet_file.row_count, - parquet_file.compaction_level, - parquet_file.created_at, - ) - .await?; + let parquet = txn.parquet_files().create(parquet_file).await?; // Now the parquet available, create its processed tombstones let mut processed_tombstones = Vec::with_capacity(tombstones.len()); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 647ce99a8e..8cb4cd5b21 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -13,16 +13,15 @@ use crate::{ use async_trait::async_trait; use data_types2::{ Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, - ParquetFile, ParquetFileId, Partition, PartitionId, PartitionInfo, ProcessedTombstone, - QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, - Tombstone, TombstoneId, + ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, + ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, + TableId, Timestamp, Tombstone, TombstoneId, }; use observability_deps::tracing::warn; use std::fmt::Formatter; use std::sync::Arc; use std::{collections::HashSet, convert::TryFrom}; use tokio::sync::{Mutex, OwnedMutexGuard}; -use uuid::Uuid; /// In-memory catalog that implements the `RepoCollection` and individual repo traits from /// the catalog interface. @@ -750,24 +749,25 @@ impl TombstoneRepo for MemTxn { #[async_trait] impl ParquetFileRepo for MemTxn { - async fn create( - &mut self, - sequencer_id: SequencerId, - table_id: TableId, - partition_id: PartitionId, - object_store_id: Uuid, - min_sequence_number: SequenceNumber, - max_sequence_number: SequenceNumber, - min_time: Timestamp, - max_time: Timestamp, - file_size_bytes: i64, - parquet_metadata: Vec, - row_count: i64, - level: i16, - created_at: Timestamp, - ) -> Result { + async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { let stage = self.stage(); + let ParquetFileParams { + sequencer_id, + table_id, + partition_id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time, + max_time, + file_size_bytes, + parquet_metadata, + row_count, + compaction_level, + created_at, + } = parquet_file_params; + if stage .parquet_files .iter() @@ -790,7 +790,7 @@ impl ParquetFileRepo for MemTxn { to_delete: false, file_size_bytes, parquet_metadata, - compaction_level: level, + compaction_level, created_at, }; stage.parquet_files.push(parquet_file); diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 4b08800506..987130f54b 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -8,14 +8,13 @@ use crate::interface::{ use async_trait::async_trait; use data_types2::{ Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, - ParquetFile, ParquetFileId, Partition, PartitionId, PartitionInfo, ProcessedTombstone, - QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, - Tombstone, TombstoneId, + ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, + ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, + TableId, Timestamp, Tombstone, TombstoneId, }; use metric::{Metric, U64Histogram, U64HistogramOptions}; use std::{fmt::Debug, sync::Arc}; use time::{SystemProvider, TimeProvider}; -use uuid::Uuid; /// Decorates a implementation of the catalog's [`RepoCollection`] (and the /// transactional variant) with instrumentation that emits latency histograms @@ -259,7 +258,7 @@ decorate!( decorate!( impl_trait = ParquetFileRepo, methods = [ - "parquet_create" = create( &mut self, sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId, object_store_id: Uuid, min_sequence_number: SequenceNumber, max_sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec, row_count: i64, level: i16, created_at: Timestamp) -> Result; + "parquet_create" = create( &mut self, parquet_file_params: ParquetFileParams) -> Result; "parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>; "parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 2c9e431d63..faca9ec183 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -12,15 +12,14 @@ use crate::{ use async_trait::async_trait; use data_types2::{ Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, - ParquetFile, ParquetFileId, Partition, PartitionId, PartitionInfo, ProcessedTombstone, - QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, - Tombstone, TombstoneId, + ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, + ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, + TableId, Timestamp, Tombstone, TombstoneId, }; use observability_deps::tracing::{info, warn}; use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row}; use sqlx_hotswap_pool::HotSwapPool; use std::{sync::Arc, time::Duration}; -use uuid::Uuid; const CONNECT_TIMEOUT: Duration = Duration::from_secs(2); const IDLE_TIMEOUT: Duration = Duration::from_secs(500); @@ -1159,22 +1158,23 @@ WHERE table_name.namespace_id = $1; #[async_trait] impl ParquetFileRepo for PostgresTxn { - async fn create( - &mut self, - sequencer_id: SequencerId, - table_id: TableId, - partition_id: PartitionId, - object_store_id: Uuid, - min_sequence_number: SequenceNumber, - max_sequence_number: SequenceNumber, - min_time: Timestamp, - max_time: Timestamp, - file_size_bytes: i64, - parquet_metadata: Vec, - row_count: i64, - level: i16, - created_at: Timestamp, - ) -> Result { + async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { + let ParquetFileParams { + sequencer_id, + table_id, + partition_id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time, + max_time, + file_size_bytes, + parquet_metadata, + row_count, + compaction_level, + created_at, + } = parquet_file_params; + let rec = sqlx::query_as::<_, ParquetFile>( r#" INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, parquet_metadata, row_count, compaction_level, created_at ) @@ -1193,7 +1193,7 @@ RETURNING * .bind(file_size_bytes) // $9 .bind(parquet_metadata) // $10 .bind(row_count) // $11 - .bind(level) // $12 + .bind(compaction_level) // $12 .bind(created_at) // $13 .fetch_one(&mut self.inner) .await diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index ee03d4a7bd..aa7da7d746 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -3,8 +3,9 @@ use arrow::record_batch::RecordBatch; use bytes::Bytes; use data_types2::{ - ColumnType, InfluxDbType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, Partition, - QueryPool, SequenceNumber, Sequencer, Table, Timestamp, Tombstone, + ColumnType, InfluxDbType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, + ParquetFileParams, Partition, QueryPool, SequenceNumber, Sequencer, Table, Timestamp, + Tombstone, }; use iox_catalog::{interface::Catalog, mem::MemCatalog}; use iox_object_store::{IoxObjectStore, ParquetFilePath}; @@ -302,23 +303,24 @@ impl TestPartition { }) .unwrap(); + let parquet_file_params = ParquetFileParams { + sequencer_id: self.sequencer.sequencer.id, + table_id: self.table.table.id, + partition_id: self.partition.id, + object_store_id, + min_sequence_number, + max_sequence_number, + min_time: Timestamp::new(ts_min_max.min), + max_time: Timestamp::new(ts_min_max.max), + file_size_bytes: file_size_bytes as i64, + parquet_metadata: parquet_metadata_bin, + row_count: row_count as i64, + compaction_level: 0, + created_at: Timestamp::new(1), + }; let parquet_file = repos .parquet_files() - .create( - self.sequencer.sequencer.id, - self.table.table.id, - self.partition.id, - object_store_id, - min_sequence_number, - max_sequence_number, - Timestamp::new(ts_min_max.min), - Timestamp::new(ts_min_max.max), - file_size_bytes as i64, - parquet_metadata_bin, - row_count as i64, - 0, - Timestamp::new(1), - ) + .create(parquet_file_params) .await .unwrap(); diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index b1bbb247e4..954f1e0d95 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -91,8 +91,8 @@ use data_types::{ partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, }; use data_types2::{ - NamespaceId, ParquetFile, ParquetFileId, PartitionId, SequenceNumber, SequencerId, TableId, - Timestamp, INITIAL_COMPACTION_LEVEL, + NamespaceId, ParquetFileParams, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp, + INITIAL_COMPACTION_LEVEL, }; use generated_types::influxdata::iox::{ ingester::v1 as proto, preserved_catalog::v1 as preserved_catalog, @@ -635,9 +635,8 @@ impl IoxMetadata { &self, file_size_bytes: usize, metadata: &IoxParquetMetaData, - ) -> ParquetFile { - ParquetFile { - id: ParquetFileId::new(0), // this will be created in the DB. This 0 won't be used anywhere + ) -> ParquetFileParams { + ParquetFileParams { sequencer_id: self.sequencer_id, table_id: self.table_id, partition_id: self.partition_id, @@ -646,7 +645,6 @@ impl IoxMetadata { max_sequence_number: self.max_sequence_number, min_time: Timestamp::new(self.time_of_first_write.timestamp_nanos()), max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()), - to_delete: false, file_size_bytes: file_size_bytes as i64, parquet_metadata: metadata.thrift_bytes().to_vec(), row_count: self.row_count,