diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 758d8dc3c7..161c67a79d 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -2,10 +2,13 @@ use crate::{ query::QueryableParquetChunk, - utils::{CompactedData, ParquetFileWithTombstone, TablePartition}, + utils::{CompactedData, ParquetFileWithTombstone}, }; use backoff::BackoffConfig; -use data_types2::{ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TombstoneId}; +use data_types2::{ + ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TablePartition, Timestamp, + TombstoneId, +}; use datafusion::error::DataFusionError; use iox_catalog::interface::Catalog; use object_store::ObjectStore; @@ -98,6 +101,11 @@ pub enum Error { source: iox_catalog::interface::Error, }, + #[snafu(display("Error while requesting level 1 parquet files {}", source))] + Level1 { + source: iox_catalog::interface::Error, + }, + #[snafu(display("Error updating catalog {}", source))] Update { source: iox_catalog::interface::Error, @@ -156,6 +164,21 @@ impl Compactor { .context(Level0Snafu) } + async fn level_1_parquet_files( + &self, + table_partition: TablePartition, + min_time: Timestamp, + max_time: Timestamp, + ) -> Result> { + let mut repos = self.catalog.repositories().await; + + repos + .parquet_files() + .level_1(table_partition, min_time, max_time) + .await + .context(Level1Snafu) + } + async fn update_to_level_1(&self, parquet_file_ids: &[ParquetFileId]) -> Result<()> { let mut repos = self.catalog.repositories().await; @@ -189,12 +212,29 @@ impl Compactor { // Read level-0 parquet files let level_0_files = self.level_0_parquet_files(sequencer_id).await?; + // If there are no level-0 parquet files, return because there's nothing to do + if level_0_files.is_empty() { + return Ok(()); + } + // Group files into table partition let mut partitions = Self::group_parquet_files_into_partition(level_0_files); - // Get level-1 files overlapped with level-0 - for (_key, val) in &mut partitions.iter_mut() { - let level_1_files: Vec = vec![]; // TODO: #3946 + // Get level-1 files overlapped in time with level-0 + for (key, val) in &mut partitions.iter_mut() { + let overall_min_time = val + .iter() + .map(|pf| pf.min_time) + .min() + .expect("The list of files was checked for emptiness above"); + let overall_max_time = val + .iter() + .map(|pf| pf.max_time) + .max() + .expect("The list of files was checked for emptiness above"); + let level_1_files = self + .level_1_parquet_files(*key, overall_min_time, overall_max_time) + .await?; val.extend(level_1_files); } @@ -285,7 +325,8 @@ impl Compactor { } // Compact given files. Assume the given files are overlaped in time. - // If the assumption does not meet, we will spend time not to compact anything but put data together + // If the assumption does not meet, we will spend time not to compact anything but put data + // together async fn compact( &self, overlapped_files: Vec, diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index 79a2600690..f060c8679d 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -1,38 +1,15 @@ //! Helpers of the Compactor -use std::{collections::HashSet, sync::Arc}; - +use crate::query::QueryableParquetChunk; use arrow::record_batch::RecordBatch; -use data_types2::{ - ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, Tombstone, TombstoneId, -}; +use data_types2::{ParquetFile, ParquetFileId, Tombstone, TombstoneId}; use iox_object_store::IoxObjectStore; use object_store::ObjectStore; use parquet_file::{ chunk::{new_parquet_chunk, ChunkMetrics, DecodedParquetFile}, metadata::IoxMetadata, }; - -use crate::query::QueryableParquetChunk; - -/// Define table partition -#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] -pub struct TablePartition { - sequencer_id: SequencerId, - table_id: TableId, - partition_id: PartitionId, -} - -impl TablePartition { - /// Return a new table partition - pub fn new(sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId) -> Self { - Self { - sequencer_id, - table_id, - partition_id, - } - } -} +use std::{collections::HashSet, sync::Arc}; /// Wrapper of a parquet file and its tombstones #[allow(missing_docs)] diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index 548ff41067..a373205ac8 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -17,6 +17,7 @@ use std::{ collections::BTreeMap, convert::TryFrom, fmt::{Debug, Formatter}, + ops::{Add, Sub}, sync::Arc, }; use uuid::Uuid; @@ -189,6 +190,29 @@ impl std::fmt::Display for PartitionId { } } +/// Combination of Sequencer ID, Table ID, and Partition ID useful for identifying groups of +/// Parquet files to be compacted together. +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +pub struct TablePartition { + /// The sequencer ID + pub sequencer_id: SequencerId, + /// The table ID + pub table_id: TableId, + /// The partition ID + pub partition_id: PartitionId, +} + +impl TablePartition { + /// Combine the relevant parts + pub fn new(sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId) -> Self { + Self { + sequencer_id, + table_id, + partition_id, + } + } +} + /// Unique ID for a `Tombstone` #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] @@ -234,6 +258,22 @@ impl Timestamp { } } +impl Add for Timestamp { + type Output = Self; + + fn add(self, other: i64) -> Self { + Self(self.0 + other) + } +} + +impl Sub for Timestamp { + type Output = Self; + + fn sub(self, other: i64) -> Self { + Self(self.0 - other) + } +} + /// Unique ID for a `ParquetFile` #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 7e2fb8eb1b..77dfc20877 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -5,7 +5,8 @@ use data_types2::{ Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, - Sequencer, SequencerId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId, + Sequencer, SequencerId, Table, TableId, TablePartition, TableSchema, Timestamp, Tombstone, + TombstoneId, }; use snafu::{OptionExt, Snafu}; use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc}; @@ -441,6 +442,16 @@ pub trait ParquetFileRepo: Send + Sync { /// define a file as a candidate for compaction async fn level_0(&mut self, sequencer_id: SequencerId) -> Result>; + /// List parquet files for a given table partition, in a given time range, with compaction + /// level 1, and other criteria that define a file as a candidate for compaction with a level 0 + /// file + async fn level_1( + &mut self, + table_partition: TablePartition, + min_time: Timestamp, + max_time: Timestamp, + ) -> Result>; + /// Update the compaction level of the specified parquet files to level 1. Returns the IDs /// of the files that were successfully updated. async fn update_to_level_1( @@ -551,6 +562,7 @@ pub(crate) mod test_helpers { test_tombstone(Arc::clone(&catalog)).await; test_parquet_file(Arc::clone(&catalog)).await; test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await; + test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await; test_update_to_compaction_level_1(Arc::clone(&catalog)).await; test_add_parquet_file_with_tombstones(Arc::clone(&catalog)).await; test_txn_isolation(Arc::clone(&catalog)).await; @@ -1541,6 +1553,230 @@ pub(crate) mod test_helpers { ); } + async fn test_parquet_file_compaction_level_1(catalog: Arc) { + let mut repos = catalog.repositories().await; + let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = repos.query_pools().create_or_get("foo").await.unwrap(); + let namespace = repos + .namespaces() + .create( + "namespace_parquet_file_compaction_level_1_test", + "inf", + kafka.id, + pool.id, + ) + .await + .unwrap(); + let table = repos + .tables() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let other_table = repos + .tables() + .create_or_get("test_table2", namespace.id) + .await + .unwrap(); + let sequencer = repos + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(100)) + .await + .unwrap(); + let other_sequencer = repos + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(101)) + .await + .unwrap(); + let partition = repos + .partitions() + .create_or_get("one", sequencer.id, table.id) + .await + .unwrap(); + let other_partition = repos + .partitions() + .create_or_get("two", sequencer.id, table.id) + .await + .unwrap(); + + // Set up the window of times we're interested in level 1 files for + let query_min_time = Timestamp::new(5); + let query_max_time = Timestamp::new(10); + + // Create a file with times entirely within the window + let parquet_file_params = ParquetFileParams { + sequencer_id: sequencer.id, + table_id: partition.table_id, + partition_id: partition.id, + object_store_id: Uuid::new_v4(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(140), + min_time: query_min_time + 1, + max_time: query_max_time - 1, + file_size_bytes: 1337, + parquet_metadata: b"md1".to_vec(), + row_count: 0, + created_at: Timestamp::new(1), + }; + let parquet_file = repos + .parquet_files() + .create(parquet_file_params.clone()) + .await + .unwrap(); + + // Create a file that will remain as level 0 + let level_0_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + ..parquet_file_params.clone() + }; + let _level_0_file = repos.parquet_files().create(level_0_params).await.unwrap(); + + // Create a file completely before the window + let too_early_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + min_time: query_min_time - 2, + max_time: query_min_time - 1, + ..parquet_file_params.clone() + }; + let too_early_file = repos + .parquet_files() + .create(too_early_params) + .await + .unwrap(); + + // Create a file overlapping the window on the lower end + let overlap_lower_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + min_time: query_min_time - 1, + max_time: query_min_time + 1, + ..parquet_file_params.clone() + }; + let overlap_lower_file = repos + .parquet_files() + .create(overlap_lower_params) + .await + .unwrap(); + + // Create a file overlapping the window on the upper end + let overlap_upper_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + min_time: query_max_time - 1, + max_time: query_max_time + 1, + ..parquet_file_params.clone() + }; + let overlap_upper_file = repos + .parquet_files() + .create(overlap_upper_params) + .await + .unwrap(); + + // Create a file completely after the window + let too_late_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + min_time: query_max_time + 1, + max_time: query_max_time + 2, + ..parquet_file_params.clone() + }; + let too_late_file = repos.parquet_files().create(too_late_params).await.unwrap(); + + // Create a file for some other sequencer + let other_sequencer_params = ParquetFileParams { + sequencer_id: other_sequencer.id, + object_store_id: Uuid::new_v4(), + ..parquet_file_params.clone() + }; + let other_sequencer_file = repos + .parquet_files() + .create(other_sequencer_params) + .await + .unwrap(); + + // Create a file for the same sequencer but a different table + let other_table_params = ParquetFileParams { + table_id: other_table.id, + object_store_id: Uuid::new_v4(), + ..parquet_file_params.clone() + }; + let other_table_file = repos + .parquet_files() + .create(other_table_params) + .await + .unwrap(); + + // Create a file for the same sequencer and table but a different partition + let other_partition_params = ParquetFileParams { + partition_id: other_partition.id, + object_store_id: Uuid::new_v4(), + ..parquet_file_params.clone() + }; + let other_partition_file = repos + .parquet_files() + .create(other_partition_params) + .await + .unwrap(); + + // Create a file too big to be considered + let too_big_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + file_size_bytes: MAX_COMPACT_SIZE + 1, + ..parquet_file_params.clone() + }; + let too_big_file = repos.parquet_files().create(too_big_params).await.unwrap(); + + // Create a file marked to be deleted + let to_delete_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + ..parquet_file_params.clone() + }; + let to_delete_file = repos + .parquet_files() + .create(to_delete_params) + .await + .unwrap(); + repos + .parquet_files() + .flag_for_delete(to_delete_file.id) + .await + .unwrap(); + + // Make all but _level_0_file compaction level 1 + repos + .parquet_files() + .update_to_level_1(&[ + parquet_file.id, + too_early_file.id, + too_late_file.id, + overlap_lower_file.id, + overlap_upper_file.id, + other_sequencer_file.id, + other_table_file.id, + other_partition_file.id, + too_big_file.id, + to_delete_file.id, + ]) + .await + .unwrap(); + + // Level 1 parquet files for a sequencer should contain only those that match the right + // criteria + let table_partition = TablePartition::new(sequencer.id, table.id, partition.id); + let level_1 = repos + .parquet_files() + .level_1(table_partition, query_min_time, query_max_time) + .await + .unwrap(); + let mut level_1_ids: Vec<_> = level_1.iter().map(|pf| pf.id).collect(); + level_1_ids.sort(); + let expected = vec![parquet_file, overlap_lower_file, overlap_upper_file]; + let mut expected_ids: Vec<_> = expected.iter().map(|pf| pf.id).collect(); + expected_ids.sort(); + + assert_eq!( + level_1_ids, expected_ids, + "\nlevel 1: {:#?}\nexpected: {:#?}", + level_1, expected, + ); + } + async fn test_update_to_compaction_level_1(catalog: Arc) { let mut repos = catalog.repositories().await; let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 02930e1b2f..5db4476d11 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -15,7 +15,7 @@ use data_types2::{ Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, - TableId, Timestamp, Tombstone, TombstoneId, + TableId, TablePartition, Timestamp, Tombstone, TombstoneId, }; use observability_deps::tracing::warn; use std::fmt::Formatter; @@ -859,6 +859,31 @@ impl ParquetFileRepo for MemTxn { .collect()) } + async fn level_1( + &mut self, + table_partition: TablePartition, + min_time: Timestamp, + max_time: Timestamp, + ) -> Result> { + let stage = self.stage(); + + Ok(stage + .parquet_files + .iter() + .filter(|f| { + f.sequencer_id == table_partition.sequencer_id + && f.table_id == table_partition.table_id + && f.partition_id == table_partition.partition_id + && f.compaction_level == 1 + && !f.to_delete + && f.file_size_bytes <= MAX_COMPACT_SIZE + && ((f.min_time <= min_time && f.max_time >= min_time) + || (f.min_time > min_time && f.min_time <= max_time)) + }) + .cloned() + .collect()) + } + async fn update_to_level_1( &mut self, parquet_file_ids: &[ParquetFileId], diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index fd0c137615..123396a86c 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -10,7 +10,7 @@ use data_types2::{ Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, - TableId, Timestamp, Tombstone, TombstoneId, + TableId, TablePartition, Timestamp, Tombstone, TombstoneId, }; use metric::{Metric, U64Histogram, U64HistogramOptions}; use std::{fmt::Debug, sync::Arc}; @@ -263,6 +263,7 @@ decorate!( "parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; "parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result>; + "parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result>; "parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result>; "parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result; "parquet_count" = count(&mut self) -> Result; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 38ae369bb9..cf465d91e7 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -14,7 +14,7 @@ use data_types2::{ Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, - TableId, Timestamp, Tombstone, TombstoneId, + TableId, TablePartition, Timestamp, Tombstone, TombstoneId, }; use observability_deps::tracing::{info, warn}; use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row}; @@ -1300,6 +1300,52 @@ WHERE parquet_file.sequencer_id = $1 .map_err(|e| Error::SqlxError { source: e }) } + async fn level_1( + &mut self, + table_partition: TablePartition, + min_time: Timestamp, + max_time: Timestamp, + ) -> Result> { + sqlx::query_as::<_, ParquetFile>( + r#" +SELECT + parquet_file.id as id, + parquet_file.sequencer_id as sequencer_id, + parquet_file.table_id as table_id, + parquet_file.partition_id as partition_id, + parquet_file.object_store_id as object_store_id, + parquet_file.min_sequence_number as min_sequence_number, + parquet_file.max_sequence_number as max_sequence_number, + parquet_file.min_time as min_time, + parquet_file.max_time as max_time, + parquet_file.to_delete as to_delete, + parquet_file.file_size_bytes as file_size_bytes, + parquet_file.parquet_metadata as parquet_metadata, + parquet_file.row_count as row_count, + parquet_file.compaction_level as compaction_level, + parquet_file.created_at as created_at +FROM parquet_file +WHERE parquet_file.sequencer_id = $1 + AND parquet_file.table_id = $2 + AND parquet_file.partition_id = $3 + AND parquet_file.compaction_level = 1 + AND parquet_file.to_delete = false + AND parquet_file.file_size_bytes <= $4 + AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $5) + OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $6)) + ;"#, + ) + .bind(&table_partition.sequencer_id) // $1 + .bind(&table_partition.table_id) // $2 + .bind(&table_partition.partition_id) // $3 + .bind(MAX_COMPACT_SIZE) // $4 + .bind(min_time) // $5 + .bind(max_time) // $6 + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e }) + } + async fn update_to_level_1( &mut self, parquet_file_ids: &[ParquetFileId],