From e16306d21c47ec76e99f53a5c04445236a88ca72 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 14 Sep 2022 11:14:17 -0400 Subject: [PATCH 1/4] refactor: Move fetching of extra partition info into the method because it's always needed --- compactor/src/cold.rs | 54 ++-------- compactor/src/compact.rs | 210 ++++++++++++++++++++++++--------------- compactor/src/hot.rs | 67 ++----------- compactor/src/lib.rs | 9 +- 4 files changed, 149 insertions(+), 191 deletions(-) diff --git a/compactor/src/cold.rs b/compactor/src/cold.rs index d8c4f0a2f4..69a1e3db07 100644 --- a/compactor/src/cold.rs +++ b/compactor/src/cold.rs @@ -22,7 +22,7 @@ pub async fn compact(compactor: Arc) -> usize { debug!(compaction_type, "start collecting partitions to compact"); let attributes = Attributes::from(&[("partition_type", compaction_type)]); let start_time = compactor.time_provider.now(); - let candidates = Backoff::new(&compactor.backoff_config) + let (candidates, table_columns) = Backoff::new(&compactor.backoff_config) .retry_all_errors("cold_partitions_to_compact", || async { compactor .cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard) @@ -41,46 +41,6 @@ pub async fn compact(compactor: Arc) -> usize { duration.record(delta); } - // Get extra needed information for selected partitions - let start_time = compactor.time_provider.now(); - - // Column types and their counts of the tables of the partition candidates - debug!( - num_candidates=?candidates.len(), - compaction_type, - "start getting column types for the partition candidates" - ); - let table_columns = Backoff::new(&compactor.backoff_config) - .retry_all_errors("table_columns", || async { - compactor.table_columns(&candidates).await - }) - .await - .expect("retry forever"); - - // Add other compaction-needed info into selected partitions - debug!( - num_candidates=?candidates.len(), - compaction_type, - "start getting additional info for the partition candidates" - ); - let candidates = Backoff::new(&compactor.backoff_config) - .retry_all_errors("add_info_to_partitions", || async { - compactor.add_info_to_partitions(&candidates).await - }) - .await - .expect("retry forever"); - - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .partitions_extra_info_reading_duration - .recorder(attributes.clone()); - duration.record(delta); - } - let n_candidates = candidates.len(); if n_candidates == 0 { debug!(compaction_type, "no compaction candidates found"); @@ -96,7 +56,7 @@ pub async fn compact(compactor: Arc) -> usize { Arc::clone(&compactor), compaction_type, compact_in_parallel, - candidates.clone(), + candidates.into(), table_columns, ) .await; @@ -436,14 +396,13 @@ mod tests { // ------------------------------------------------ // Compact - let candidates = compactor + let (mut candidates, _table_columns) = compactor .cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard) .await .unwrap(); - let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); assert_eq!(candidates.len(), 1); - let c = candidates.pop_front().unwrap(); + let c = candidates.pop().unwrap(); let parquet_files_for_compaction = parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides( @@ -633,14 +592,13 @@ mod tests { // ------------------------------------------------ // Compact - let candidates = compactor + let (mut candidates, _table_columns) = compactor .cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard) .await .unwrap(); - let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); assert_eq!(candidates.len(), 1); - let c = candidates.pop_front().unwrap(); + let c = candidates.pop().unwrap(); let parquet_files_for_compaction = parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides( diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index ac61720930..6a9eb3a61c 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -18,7 +18,7 @@ use parquet_file::storage::ParquetStorage; use schema::sort::SortKey; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet}, sync::Arc, time::Duration, }; @@ -276,15 +276,19 @@ impl Compactor { // Minimum number of the most recent writes per partition we want to count // to prioritize partitions min_recent_ingested_files: usize, - ) -> Result> { + ) -> Result<( + Vec>, + HashMap>, + )> { + let compaction_type = "hot"; let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard); - let mut repos = self.catalog.repositories().await; for shard_id in &self.shards { let attributes = Attributes::from([ ("shard_id", format!("{}", *shard_id).into()), - ("partition_type", "hot".into()), + ("partition_type", compaction_type.into()), ]); + let mut repos = self.catalog.repositories().await; // Get the most recent highest ingested throughput partitions within // the last 4 hours. If not, increase to 24 hours @@ -326,13 +330,41 @@ impl Compactor { debug!( shard_id = shard_id.get(), n = num_partitions, - "hot compaction candidates", + compaction_type, + "compaction candidates", ); let number_gauge = self.compaction_candidate_gauge.recorder(attributes); number_gauge.set(num_partitions as u64); } - Ok(candidates) + // Get extra needed information for selected partitions + let start_time = self.time_provider.now(); + + // Column types and their counts of the tables of the partition candidates + debug!( + num_candidates=?candidates.len(), + compaction_type, + "start getting column types for the partition candidates" + ); + let table_columns = self.table_columns(&candidates).await?; + + // Add other compaction-needed info into selected partitions + debug!( + num_candidates=?candidates.len(), + compaction_type, + "start getting additional info for the partition candidates" + ); + let candidates = self.add_info_to_partitions(&candidates).await?; + + if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) { + let attributes = Attributes::from(&[("partition_type", compaction_type)]); + let duration = self + .partitions_extra_info_reading_duration + .recorder(attributes); + duration.record(delta); + } + + Ok((candidates, table_columns)) } /// Return a list of partitions that: @@ -345,20 +377,24 @@ impl Compactor { &self, // Max number of cold partitions per shard we want to compact max_num_partitions_per_shard: usize, - ) -> Result> { + ) -> Result<( + Vec>, + HashMap>, + )> { + let compaction_type = "cold"; let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard); - let mut repos = self.catalog.repositories().await; for shard_id in &self.shards { let attributes = Attributes::from([ ("shard_id", format!("{}", *shard_id).into()), - ("partition_type", "cold".into()), + ("partition_type", compaction_type.into()), ]); let time_8_hours_ago = Timestamp::new( (self.time_provider.now() - Duration::from_secs(60 * 60 * 8)).timestamp_nanos(), ); + let mut repos = self.catalog.repositories().await; let mut partitions = repos .parquet_files() .most_cold_files_partitions( @@ -378,17 +414,45 @@ impl Compactor { debug!( shard_id = shard_id.get(), n = num_partitions, - "cold compaction candidates", + compaction_type, + "compaction candidates", ); let number_gauge = self.compaction_candidate_gauge.recorder(attributes); number_gauge.set(num_partitions as u64); } - Ok(candidates) + // Get extra needed information for selected partitions + let start_time = self.time_provider.now(); + + // Column types and their counts of the tables of the partition candidates + debug!( + num_candidates=?candidates.len(), + compaction_type, + "start getting column types for the partition candidates" + ); + let table_columns = self.table_columns(&candidates).await?; + + // Add other compaction-needed info into selected partitions + debug!( + num_candidates=?candidates.len(), + compaction_type, + "start getting additional info for the partition candidates" + ); + let candidates = self.add_info_to_partitions(&candidates).await?; + + if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) { + let attributes = Attributes::from(&[("partition_type", compaction_type)]); + let duration = self + .partitions_extra_info_reading_duration + .recorder(attributes); + duration.record(delta); + } + + Ok((candidates, table_columns)) } /// Get column types for tables of given partitions - pub async fn table_columns( + async fn table_columns( &self, partitions: &[PartitionParam], ) -> Result>> { @@ -409,10 +473,10 @@ impl Compactor { } /// Add namespace and table information to partition candidates. - pub async fn add_info_to_partitions( + async fn add_info_to_partitions( &self, partitions: &[PartitionParam], - ) -> Result>> { + ) -> Result>> { let mut repos = self.catalog.repositories().await; let table_ids: HashSet<_> = partitions.iter().map(|p| p.table_id).collect(); @@ -481,7 +545,7 @@ impl Compactor { partition_key: part.partition_key.clone(), }) }) - .collect::>()) + .collect()) } } @@ -672,7 +736,7 @@ mod tests { // -------------------------------------- // Case 1: no files yet --> no partition candidates // - let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); + let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -696,7 +760,7 @@ mod tests { .unwrap(); txn.commit().await.unwrap(); // No non-deleted level 0 files yet --> no candidates - let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); + let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -715,7 +779,7 @@ mod tests { txn.commit().await.unwrap(); // No hot candidates - let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); + let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -733,9 +797,9 @@ mod tests { txn.commit().await.unwrap(); // // Has at least one partition with a recent write --> make it a candidate - let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); + let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert_eq!(candidates.len(), 1); - assert_eq!(candidates[0].partition_id, partition4.id); + assert_eq!(candidates[0].id(), partition4.id); // -------------------------------------- // Case 5: has 2 partitions with 2 different groups of recent writes: @@ -755,9 +819,9 @@ mod tests { txn.commit().await.unwrap(); // // make partitions in the most recent group candidates - let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); + let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert_eq!(candidates.len(), 1); - assert_eq!(candidates[0].partition_id, partition3.id); + assert_eq!(candidates[0].id(), partition3.id); // -------------------------------------- // Case 6: has partition candidates for 2 shards @@ -776,36 +840,24 @@ mod tests { txn.commit().await.unwrap(); // // Will have 2 candidates, one for each shard - let mut candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); - candidates.sort(); + let (mut candidates, _table_columns) = + compactor.hot_partitions_to_compact(1, 1).await.unwrap(); + candidates.sort_by_key(|c| c.candidate); assert_eq!(candidates.len(), 2); - assert_eq!(candidates[0].partition_id, partition3.id); - assert_eq!(candidates[0].shard_id, shard.id); - assert_eq!(candidates[1].partition_id, another_partition.id); - assert_eq!(candidates[1].shard_id, another_shard.id); - // Add info to partition - let partitions_with_info = compactor.add_info_to_partitions(&candidates).await.unwrap(); - assert_eq!(partitions_with_info.len(), 2); - // - assert_eq!(*partitions_with_info[0].namespace, namespace); - assert_eq!(*partitions_with_info[0].table, table); - assert_eq!( - partitions_with_info[0].partition_key, - partition3.partition_key - ); - assert_eq!(partitions_with_info[0].sort_key, partition3.sort_key()); // this sort key is None - // - assert_eq!(*partitions_with_info[1].namespace, namespace); - assert_eq!(*partitions_with_info[1].table, another_table); - assert_eq!( - partitions_with_info[1].partition_key, - another_partition.partition_key - ); - assert_eq!( - partitions_with_info[1].sort_key, - another_partition.sort_key() - ); // this sort key is Some(tag1, time) + assert_eq!(candidates[0].id(), partition3.id); + assert_eq!(candidates[0].shard_id(), shard.id); + assert_eq!(*candidates[0].namespace, namespace); + assert_eq!(*candidates[0].table, table); + assert_eq!(candidates[0].partition_key, partition3.partition_key); + assert_eq!(candidates[0].sort_key, partition3.sort_key()); // this sort key is None + + assert_eq!(candidates[1].id(), another_partition.id); + assert_eq!(candidates[1].shard_id(), another_shard.id); + assert_eq!(*candidates[1].namespace, namespace); + assert_eq!(*candidates[1].table, another_table); + assert_eq!(candidates[1].partition_key, another_partition.partition_key); + assert_eq!(candidates[1].sort_key, another_partition.sort_key()); // this sort key is Some(tag1, time) } fn make_compactor_config() -> CompactorConfig { @@ -952,7 +1004,7 @@ mod tests { // -------------------------------------- // Case 1: no files yet --> no partition candidates // - let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -973,7 +1025,7 @@ mod tests { let _pf2 = txn.parquet_files().create(p2).await.unwrap(); txn.commit().await.unwrap(); // No non-deleted level 0 files yet --> no candidates - let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -990,9 +1042,9 @@ mod tests { txn.commit().await.unwrap(); // // Has at least one partition with a L0 file --> make it a candidate - let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap(); assert_eq!(candidates.len(), 1); - assert_eq!(candidates[0].partition_id, partition2.id); + assert_eq!(candidates[0].id(), partition2.id); // -------------------------------------- // Case 4: has two cold partitions --> return the candidate with the most L0 @@ -1013,9 +1065,9 @@ mod tests { let _pf5 = txn.parquet_files().create(p5).await.unwrap(); txn.commit().await.unwrap(); // Partition with the most l0 files is the candidate - let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap(); assert_eq!(candidates.len(), 1); - assert_eq!(candidates[0].partition_id, partition4.id); + assert_eq!(candidates[0].id(), partition4.id); // -------------------------------------- // Case 5: "warm" and "hot" partitions aren't returned @@ -1045,21 +1097,21 @@ mod tests { let _pf5_hot = txn.parquet_files().create(p5_hot).await.unwrap(); txn.commit().await.unwrap(); // Partition4 is still the only candidate - let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap(); assert_eq!(candidates.len(), 1); - assert_eq!(candidates[0].partition_id, partition4.id); + assert_eq!(candidates[0].id(), partition4.id); // Ask for 2 partitions per shard; get partition4 and partition2 - let candidates = compactor.cold_partitions_to_compact(2).await.unwrap(); + let (candidates, _table_columns) = compactor.cold_partitions_to_compact(2).await.unwrap(); assert_eq!(candidates.len(), 2); - assert_eq!(candidates[0].partition_id, partition4.id); - assert_eq!(candidates[1].partition_id, partition2.id); + assert_eq!(candidates[0].id(), partition4.id); + assert_eq!(candidates[1].id(), partition2.id); // Ask for 3 partitions per shard; still get only partition4 and partition2 - let candidates = compactor.cold_partitions_to_compact(3).await.unwrap(); + let (candidates, _table_columns) = compactor.cold_partitions_to_compact(3).await.unwrap(); assert_eq!(candidates.len(), 2); - assert_eq!(candidates[0].partition_id, partition4.id); - assert_eq!(candidates[1].partition_id, partition2.id); + assert_eq!(candidates[0].id(), partition4.id); + assert_eq!(candidates[1].id(), partition2.id); // -------------------------------------- // Case 6: has partition candidates for 2 shards @@ -1078,24 +1130,26 @@ mod tests { txn.commit().await.unwrap(); // Will have 2 candidates, one for each shard - let mut candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); - candidates.sort(); + let (mut candidates, _table_columns) = + compactor.cold_partitions_to_compact(1).await.unwrap(); + candidates.sort_by_key(|c| c.candidate); assert_eq!(candidates.len(), 2); - assert_eq!(candidates[0].partition_id, partition4.id); - assert_eq!(candidates[0].shard_id, shard.id); - assert_eq!(candidates[1].partition_id, another_partition.id); - assert_eq!(candidates[1].shard_id, another_shard.id); + assert_eq!(candidates[0].id(), partition4.id); + assert_eq!(candidates[0].shard_id(), shard.id); + assert_eq!(candidates[1].id(), another_partition.id); + assert_eq!(candidates[1].shard_id(), another_shard.id); // Ask for 2 candidates per shard; get back 3: 2 from shard and 1 from // another_shard - let mut candidates = compactor.cold_partitions_to_compact(2).await.unwrap(); - candidates.sort(); + let (mut candidates, _table_columns) = + compactor.cold_partitions_to_compact(2).await.unwrap(); + candidates.sort_by_key(|c| c.candidate); assert_eq!(candidates.len(), 3); - assert_eq!(candidates[0].partition_id, partition2.id); - assert_eq!(candidates[0].shard_id, shard.id); - assert_eq!(candidates[1].partition_id, partition4.id); - assert_eq!(candidates[1].shard_id, shard.id); - assert_eq!(candidates[2].partition_id, another_partition.id); - assert_eq!(candidates[2].shard_id, another_shard.id); + assert_eq!(candidates[0].id(), partition2.id); + assert_eq!(candidates[0].shard_id(), shard.id); + assert_eq!(candidates[1].id(), partition4.id); + assert_eq!(candidates[1].shard_id(), shard.id); + assert_eq!(candidates[2].id(), another_partition.id); + assert_eq!(candidates[2].shard_id(), another_shard.id); } } diff --git a/compactor/src/hot.rs b/compactor/src/hot.rs index ede3fa5487..f59628bf55 100644 --- a/compactor/src/hot.rs +++ b/compactor/src/hot.rs @@ -13,7 +13,7 @@ pub async fn compact(compactor: Arc) -> usize { debug!(compaction_type, "start collecting partitions to compact"); let attributes = Attributes::from(&[("partition_type", compaction_type)]); let start_time = compactor.time_provider.now(); - let candidates = Backoff::new(&compactor.backoff_config) + let (candidates, table_columns) = Backoff::new(&compactor.backoff_config) .retry_all_errors("hot_partitions_to_compact", || async { compactor .hot_partitions_to_compact( @@ -37,46 +37,6 @@ pub async fn compact(compactor: Arc) -> usize { duration.record(delta); } - // Get extra needed information for selected partitions - let start_time = compactor.time_provider.now(); - - // Column types and their counts of the tables of the partition candidates - debug!( - num_candidates=?candidates.len(), - compaction_type, - "start getting column types for the partition candidates" - ); - let table_columns = Backoff::new(&compactor.backoff_config) - .retry_all_errors("table_columns", || async { - compactor.table_columns(&candidates).await - }) - .await - .expect("retry forever"); - - // Add other compaction-needed info into selected partitions - debug!( - num_candidates=?candidates.len(), - compaction_type, - "start getting additional info for the partition candidates" - ); - let candidates = Backoff::new(&compactor.backoff_config) - .retry_all_errors("add_info_to_partitions", || async { - compactor.add_info_to_partitions(&candidates).await - }) - .await - .expect("retry forever"); - - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .partitions_extra_info_reading_duration - .recorder(attributes.clone()); - duration.record(delta); - } - let n_candidates = candidates.len(); if n_candidates == 0 { debug!(compaction_type, "no compaction candidates found"); @@ -91,7 +51,7 @@ pub async fn compact(compactor: Arc) -> usize { Arc::clone(&compactor), compaction_type, compact_in_parallel, - candidates, + candidates.into(), table_columns, ) .await; @@ -126,11 +86,7 @@ mod tests { use iox_tests::util::{TestCatalog, TestParquetFileBuilder}; use iox_time::{SystemProvider, TimeProvider}; use parquet_file::storage::ParquetStorage; - use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, - time::Duration, - }; + use std::{collections::HashMap, sync::Arc, time::Duration}; #[tokio::test] async fn test_compact_hot_partition_candidates() { @@ -277,7 +233,7 @@ mod tests { partition6.create_parquet_file_catalog_record(pf6_2).await; // partition candidates: partitions with L0 and overlapped L1 - let candidates = compactor + let (mut candidates, table_columns) = compactor .hot_partitions_to_compact( compactor.config.max_number_partitions_per_shard, compactor @@ -288,8 +244,6 @@ mod tests { .unwrap(); assert_eq!(candidates.len(), 6); - // column types of the partitions - let table_columns = compactor.table_columns(&candidates).await.unwrap(); assert_eq!(table_columns.len(), 1); let mut cols = table_columns.get(&table.table.id).unwrap().clone(); assert_eq!(cols.len(), 5); @@ -304,11 +258,7 @@ mod tests { expected_cols.sort_by_key(|c| c.col_type); assert_eq!(cols, expected_cols); - // Add other compaction-needed info into selected partitions - let candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); - let mut sorted_candidates = candidates.into_iter().collect::>(); - sorted_candidates.sort_by_key(|c| c.candidate.partition_id); - let sorted_candidates = sorted_candidates.into_iter().collect::>(); + candidates.sort_by_key(|c| c.candidate.partition_id); { let mut repos = compactor.catalog.repositories().await; @@ -330,7 +280,7 @@ mod tests { Arc::clone(&compactor), "hot", mock_compactor.compaction_function(), - sorted_candidates, + candidates.into(), table_columns, ) .await; @@ -568,7 +518,7 @@ mod tests { // ------------------------------------------------ // Compact - let candidates = compactor + let (mut candidates, _table_columns) = compactor .hot_partitions_to_compact( compactor.config.max_number_partitions_per_shard, compactor @@ -577,10 +527,9 @@ mod tests { ) .await .unwrap(); - let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); assert_eq!(candidates.len(), 1); - let c = candidates.pop_front().unwrap(); + let c = candidates.pop().unwrap(); let parquet_files_for_compaction = parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides( diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index db0388446b..c42785f43c 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -430,7 +430,7 @@ pub mod tests { .with_creation_time(hot_time_one_hour_ago); partition1.create_parquet_file_catalog_record(pf1_1).await; - let candidates = compactor + let (mut candidates, _table_columns) = compactor .hot_partitions_to_compact( compactor.config.max_number_partitions_per_shard, compactor @@ -441,17 +441,14 @@ pub mod tests { .unwrap(); assert_eq!(candidates.len(), 1); - let candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); - let mut sorted_candidates = candidates.into_iter().collect::>(); - sorted_candidates.sort_by_key(|c| c.candidate.partition_id); - let sorted_candidates = sorted_candidates.into_iter().collect::>(); + candidates.sort_by_key(|c| c.candidate.partition_id); let table_columns = HashMap::new(); compact_candidates_with_memory_budget( Arc::clone(&compactor), "hot", mock_compactor.compaction_function(), - sorted_candidates, + candidates.into(), table_columns, ) .await; From e6f2a105e56d55464cb6abe8d1fe94068b27dd95 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Thu, 15 Sep 2022 10:19:03 +1000 Subject: [PATCH 2/4] feat: Improved InfluxQL error messages (#5632) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: Drive by to improve tests and coverage * chore: Make Error generic, so we can change it * chore: change visibility pub(crate) is superfluous, as we are yet to specify which APIs are public outside the crate in lib.rs * chore: Introduce crate IResult type In preparation of adding custom error type * feat: Initial implementation of custom error type * chore: Add module docs * chore: Rename IResult → ParseResult; syntax and expect errors * chore: ParserResult and error refactoring * chore: Drive by simplification * feat: Add custom errors to string parsing * feat: Added public API to parse a set of statements * chore: Errors are dyn Display to convey their intent Errors from the parser are only displayable messages. * chore: Separate SHOW for improved error handling By moving SHOW to a separate parser, we can display clearer error messages when consuming SHOW followed by an unexpected token. * chore: Docs and cleanup * chore: Add tests and a specific `ParseError` type The fields are intentionally not public yet, as we would like clients of the package to display the message only. * chore: PR feedback to improve the `ORDER BY` error message --- influxdb_influxql_parser/src/common.rs | 118 +++++++++++----- influxdb_influxql_parser/src/expression.rs | 26 ++-- influxdb_influxql_parser/src/identifier.rs | 6 +- influxdb_influxql_parser/src/internal.rs | 89 ++++++++++++ influxdb_influxql_parser/src/keywords.rs | 16 +-- influxdb_influxql_parser/src/lib.rs | 123 +++++++++++++++++ influxdb_influxql_parser/src/literal.rs | 29 ++-- influxdb_influxql_parser/src/parameter.rs | 9 +- influxdb_influxql_parser/src/show.rs | 41 ++++++ .../src/show_measurements.rs | 130 ++++++++++++------ influxdb_influxql_parser/src/statement.rs | 27 ++++ influxdb_influxql_parser/src/string.rs | 97 +++++++++---- influxdb_influxql_parser/src/test_util.rs | 16 +++ 13 files changed, 581 insertions(+), 146 deletions(-) create mode 100644 influxdb_influxql_parser/src/internal.rs create mode 100644 influxdb_influxql_parser/src/show.rs create mode 100644 influxdb_influxql_parser/src/statement.rs diff --git a/influxdb_influxql_parser/src/common.rs b/influxdb_influxql_parser/src/common.rs index 8eb052af7e..555f83db96 100644 --- a/influxdb_influxql_parser/src/common.rs +++ b/influxdb_influxql_parser/src/common.rs @@ -2,13 +2,13 @@ use crate::expression::{conditional_expression, Expr}; use crate::identifier::{identifier, Identifier}; +use crate::internal::{expect, map_fail, ParseResult}; use core::fmt; use nom::branch::alt; use nom::bytes::complete::{tag, tag_no_case}; -use nom::character::complete::{digit1, line_ending, multispace0, multispace1}; -use nom::combinator::{cut, eof, map, map_res, opt, value}; -use nom::sequence::{delimited, pair, preceded, terminated}; -use nom::IResult; +use nom::character::complete::{char, digit1, multispace1}; +use nom::combinator::{map, opt, value}; +use nom::sequence::{pair, preceded, terminated}; use std::fmt::Formatter; /// Represents a fully-qualified measurement name. @@ -48,7 +48,7 @@ impl fmt::Display for MeasurementNameExpression { } /// Match a 3-part measurement name expression. -pub fn measurement_name_expression(i: &str) -> IResult<&str, MeasurementNameExpression> { +pub fn measurement_name_expression(i: &str) -> ParseResult<&str, MeasurementNameExpression> { let (remaining_input, (opt_db_rp, name)) = pair( opt(alt(( // database "." retention_policy "." @@ -84,30 +84,39 @@ pub fn measurement_name_expression(i: &str) -> IResult<&str, MeasurementNameExpr } /// Parse an unsigned integer. -fn unsigned_number(i: &str) -> IResult<&str, u64> { - map_res(digit1, |s: &str| s.parse())(i) +fn unsigned_number(i: &str) -> ParseResult<&str, u64> { + map_fail("unable to parse unsigned integer", digit1, &str::parse)(i) } /// Parse a LIMIT clause. -pub fn limit_clause(i: &str) -> IResult<&str, u64> { - preceded(pair(tag_no_case("LIMIT"), multispace1), unsigned_number)(i) +pub fn limit_clause(i: &str) -> ParseResult<&str, u64> { + preceded( + pair(tag_no_case("LIMIT"), multispace1), + expect( + "invalid LIMIT clause, expected unsigned integer", + unsigned_number, + ), + )(i) } /// Parse an OFFSET clause. -pub fn offset_clause(i: &str) -> IResult<&str, u64> { - preceded(pair(tag_no_case("OFFSET"), multispace1), unsigned_number)(i) +pub fn offset_clause(i: &str) -> ParseResult<&str, u64> { + preceded( + pair(tag_no_case("OFFSET"), multispace1), + expect( + "invalid OFFSET clause, expected unsigned integer", + unsigned_number, + ), + )(i) } /// Parse a terminator that ends a SQL statement. -pub fn statement_terminator(i: &str) -> IResult<&str, ()> { - let (remaining_input, _) = - delimited(multispace0, alt((tag(";"), line_ending, eof)), multispace0)(i)?; - - Ok((remaining_input, ())) +pub fn statement_terminator(i: &str) -> ParseResult<&str, ()> { + value((), char(';'))(i) } /// Parse a `WHERE` clause. -pub(crate) fn where_clause(i: &str) -> IResult<&str, Expr> { +pub fn where_clause(i: &str) -> ParseResult<&str, Expr> { preceded( pair(tag_no_case("WHERE"), multispace1), conditional_expression, @@ -144,7 +153,7 @@ pub enum OrderByClause { /// ``` /// /// [EBNF]: https://www.w3.org/TR/2010/REC-xquery-20101214/#EBNFNotation -pub(crate) fn order_by_clause(i: &str) -> IResult<&str, OrderByClause> { +pub fn order_by_clause(i: &str) -> ParseResult<&str, OrderByClause> { let order = || { preceded( multispace1, @@ -161,23 +170,25 @@ pub(crate) fn order_by_clause(i: &str) -> IResult<&str, OrderByClause> { tag_no_case("ORDER"), preceded(multispace1, tag_no_case("BY")), ), - // cut to force failure, as `ORDER BY` must be followed by one of the following - cut(alt(( - // "ASC" | "DESC" - order(), - // "TIME" ( "ASC" | "DESC" )? - map( - preceded(preceded(multispace1, tag_no_case("TIME")), opt(order())), - Option::<_>::unwrap_or_default, - ), - ))), + expect( + "invalid ORDER BY, expected ASC, DESC or TIME", + alt(( + // "ASC" | "DESC" + order(), + // "TIME" ( "ASC" | "DESC" )? + map( + preceded(preceded(multispace1, tag_no_case("TIME")), opt(order())), + Option::<_>::unwrap_or_default, + ), + )), + ), )(i) } #[cfg(test)] mod tests { use super::*; - use crate::assert_failure; + use crate::assert_expect_error; #[test] fn test_measurement_name_expression() { @@ -226,10 +237,22 @@ mod tests { assert_eq!(got, 123); // not digits - limit_clause("LIMIT sdf").unwrap_err(); + assert_expect_error!( + limit_clause("LIMIT from"), + "invalid LIMIT clause, expected unsigned integer" + ); + + // incomplete input + assert_expect_error!( + limit_clause("LIMIT "), + "invalid LIMIT clause, expected unsigned integer" + ); // overflow - limit_clause("LIMIT 34593745733489743985734857394").unwrap_err(); + assert_expect_error!( + limit_clause("LIMIT 34593745733489743985734857394"), + "unable to parse unsigned integer" + ); } #[test] @@ -246,10 +269,22 @@ mod tests { assert_eq!(got, 123); // not digits - offset_clause("OFFSET sdf").unwrap_err(); + assert_expect_error!( + offset_clause("OFFSET from"), + "invalid OFFSET clause, expected unsigned integer" + ); + + // incomplete input + assert_expect_error!( + offset_clause("OFFSET "), + "invalid OFFSET clause, expected unsigned integer" + ); // overflow - offset_clause("OFFSET 34593745733489743985734857394").unwrap_err(); + assert_expect_error!( + offset_clause("OFFSET 34593745733489743985734857394"), + "unable to parse unsigned integer" + ); } #[test] @@ -280,7 +315,10 @@ mod tests { // Fallible cases // Must be "time" identifier - assert_failure!(order_by_clause("ORDER by foo")); + assert_expect_error!( + order_by_clause("ORDER by foo"), + "invalid ORDER BY, expected ASC, DESC or TIME" + ); } #[test] @@ -296,4 +334,16 @@ mod tests { where_clause("WHERE foo = LIMIT 10").unwrap_err(); where_clause("WHERE").unwrap_err(); } + + #[test] + fn test_statement_terminator() { + let (i, _) = statement_terminator(";foo").unwrap(); + assert_eq!(i, "foo"); + + let (i, _) = statement_terminator("; foo").unwrap(); + assert_eq!(i, " foo"); + + // Fallible cases + statement_terminator("foo").unwrap_err(); + } } diff --git a/influxdb_influxql_parser/src/expression.rs b/influxdb_influxql_parser/src/expression.rs index 241ec1b4cf..f17626d1c5 100644 --- a/influxdb_influxql_parser/src/expression.rs +++ b/influxdb_influxql_parser/src/expression.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use crate::identifier::unquoted_identifier; +use crate::internal::ParseResult; use crate::literal::literal_regex; use crate::{ identifier::{identifier, Identifier}, @@ -13,7 +14,6 @@ use nom::character::complete::{char, multispace0}; use nom::combinator::{cut, map, value}; use nom::multi::{many0, separated_list0}; use nom::sequence::{delimited, preceded, separated_pair, tuple}; -use nom::IResult; use std::fmt::{Display, Formatter, Write}; /// An InfluxQL expression of any type. @@ -173,7 +173,7 @@ impl Display for BinaryOperator { } /// Parse a unary expression. -fn unary(i: &str) -> IResult<&str, Expr> { +fn unary(i: &str) -> ParseResult<&str, Expr> { let (i, op) = preceded( multispace0, alt(( @@ -188,7 +188,7 @@ fn unary(i: &str) -> IResult<&str, Expr> { } /// Parse a parenthesis expression. -fn parens(i: &str) -> IResult<&str, Expr> { +fn parens(i: &str) -> ParseResult<&str, Expr> { delimited( preceded(multispace0, char('(')), map(conditional_expression, |e| Expr::Nested(e.into())), @@ -197,7 +197,7 @@ fn parens(i: &str) -> IResult<&str, Expr> { } /// Parse a function call expression -fn call(i: &str) -> IResult<&str, Expr> { +fn call(i: &str) -> ParseResult<&str, Expr> { map( separated_pair( unquoted_identifier, @@ -224,7 +224,7 @@ fn call(i: &str) -> IResult<&str, Expr> { } /// Parse an operand expression, such as a literal, identifier or bind parameter. -fn operand(i: &str) -> IResult<&str, Expr> { +fn operand(i: &str) -> ParseResult<&str, Expr> { preceded( multispace0, alt(( @@ -238,14 +238,14 @@ fn operand(i: &str) -> IResult<&str, Expr> { /// Parse precedence priority 1 operators. /// /// These are the highest precedence operators, and include parenthesis and the unary operators. -fn factor(i: &str) -> IResult<&str, Expr> { +fn factor(i: &str) -> ParseResult<&str, Expr> { alt((unary, parens, operand))(i) } /// Parse arithmetic, precedence priority 2 operators. /// /// This includes the multiplication, division, bitwise and, and modulus operators. -fn term(i: &str) -> IResult<&str, Expr> { +fn term(i: &str) -> ParseResult<&str, Expr> { let (input, left) = factor(i)?; let (input, remaining) = many0(tuple(( preceded( @@ -265,7 +265,7 @@ fn term(i: &str) -> IResult<&str, Expr> { /// Parse arithmetic, precedence priority 3 operators. /// /// This includes the addition, subtraction, bitwise or, and bitwise xor operators. -fn arithmetic(i: &str) -> IResult<&str, Expr> { +fn arithmetic(i: &str) -> ParseResult<&str, Expr> { let (input, left) = term(i)?; let (input, remaining) = many0(tuple(( preceded( @@ -283,7 +283,7 @@ fn arithmetic(i: &str) -> IResult<&str, Expr> { } /// Parse the conditional regular expression operators `=~` and `!~`. -fn conditional_regex(i: &str) -> IResult<&str, Expr> { +fn conditional_regex(i: &str) -> ParseResult<&str, Expr> { let (input, f1) = arithmetic(i)?; let (input, exprs) = many0(tuple(( preceded( @@ -299,7 +299,7 @@ fn conditional_regex(i: &str) -> IResult<&str, Expr> { } /// Parse conditional operators. -fn conditional(i: &str) -> IResult<&str, Expr> { +fn conditional(i: &str) -> ParseResult<&str, Expr> { let (input, f1) = conditional_regex(i)?; let (input, exprs) = many0(tuple(( preceded( @@ -320,7 +320,7 @@ fn conditional(i: &str) -> IResult<&str, Expr> { } /// Parse conjunction operators, such as `AND`. -fn conjunction(i: &str) -> IResult<&str, Expr> { +fn conjunction(i: &str) -> ParseResult<&str, Expr> { let (input, f1) = conditional(i)?; let (input, exprs) = many0(tuple(( value( @@ -333,7 +333,7 @@ fn conjunction(i: &str) -> IResult<&str, Expr> { } /// Parse disjunction operator, such as `OR`. -fn disjunction(i: &str) -> IResult<&str, Expr> { +fn disjunction(i: &str) -> ParseResult<&str, Expr> { let (input, f1) = conjunction(i)?; let (input, exprs) = many0(tuple(( value(BinaryOperator::Or, preceded(multispace0, tag_no_case("or"))), @@ -343,7 +343,7 @@ fn disjunction(i: &str) -> IResult<&str, Expr> { } /// Parse an InfluxQL conditional expression. -pub fn conditional_expression(i: &str) -> IResult<&str, Expr> { +pub fn conditional_expression(i: &str) -> ParseResult<&str, Expr> { disjunction(i) } diff --git a/influxdb_influxql_parser/src/identifier.rs b/influxdb_influxql_parser/src/identifier.rs index 0ece1f27a6..e7a8f9696b 100644 --- a/influxdb_influxql_parser/src/identifier.rs +++ b/influxdb_influxql_parser/src/identifier.rs @@ -13,6 +13,7 @@ #![allow(dead_code)] +use crate::internal::ParseResult; use crate::keywords::sql_keyword; use crate::string::double_quoted_string; use crate::write_escaped; @@ -22,12 +23,11 @@ use nom::character::complete::{alpha1, alphanumeric1}; use nom::combinator::{map, not, recognize}; use nom::multi::many0_count; use nom::sequence::{pair, preceded}; -use nom::IResult; use std::fmt; use std::fmt::{Display, Formatter, Write}; /// Parse an unquoted InfluxQL identifier. -pub(crate) fn unquoted_identifier(i: &str) -> IResult<&str, String> { +pub fn unquoted_identifier(i: &str) -> ParseResult<&str, String> { map( preceded( not(sql_keyword), @@ -68,7 +68,7 @@ impl Display for Identifier { } /// Parses an InfluxQL [Identifier]. -pub fn identifier(i: &str) -> IResult<&str, Identifier> { +pub fn identifier(i: &str) -> ParseResult<&str, Identifier> { // See: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L358-L362 alt(( map(unquoted_identifier, Identifier::Unquoted), diff --git a/influxdb_influxql_parser/src/internal.rs b/influxdb_influxql_parser/src/internal.rs new file mode 100644 index 0000000000..0603e6baf1 --- /dev/null +++ b/influxdb_influxql_parser/src/internal.rs @@ -0,0 +1,89 @@ +//! Internal result and error types used to build InfluxQL parsers +//! +use nom::error::{ErrorKind as NomErrorKind, ParseError as NomParseError}; +use nom::Parser; +use std::fmt::{Display, Formatter}; + +/// This trait must be implemented in order to use the [`map_fail`] and +/// [`expect`] functions for generating user-friendly error messages. +pub trait ParseError<'a>: NomParseError<&'a str> + Sized { + fn from_message(input: &'a str, message: &'static str) -> Self; +} + +/// An internal error type used to build InfluxQL parsers. +#[derive(Debug, PartialEq, Eq)] +pub enum Error { + Syntax { input: I, message: &'static str }, + Nom(I, NomErrorKind), +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Syntax { input: _, message } => { + write!(f, "Syntax error: {}", message)?; + } + Self::Nom(_, kind) => write!(f, "nom error: {:?}", kind)?, + } + + Ok(()) + } +} + +impl<'a> ParseError<'a> for Error<&'a str> { + fn from_message(input: &'a str, message: &'static str) -> Self { + Self::Syntax { input, message } + } +} + +/// Applies a function returning a [`ParseResult`] over the result of the `parser`. +/// If the parser returns an error, the result will be mapped to a [`nom::Err::Failure`] +/// with the specified `message` for additional context. +pub fn map_fail<'a, O1, O2, E: ParseError<'a>, E2, F, G>( + message: &'static str, + mut parser: F, + mut f: G, +) -> impl FnMut(&'a str) -> ParseResult<&'a str, O2, E> +where + F: Parser<&'a str, O1, E>, + G: FnMut(O1) -> Result, +{ + move |input| { + let (input, o1) = parser.parse(input)?; + match f(o1) { + Ok(o2) => Ok((input, o2)), + Err(_) => Err(nom::Err::Failure(E::from_message(input, message))), + } + } +} + +/// Transforms a [`nom::Err::Error`] to a [`nom::Err::Failure`] using `message` for additional +/// context. +pub fn expect<'a, E: ParseError<'a>, F, O>( + message: &'static str, + mut f: F, +) -> impl FnMut(&'a str) -> ParseResult<&'a str, O, E> +where + F: Parser<&'a str, O, E>, +{ + move |i| match f.parse(i) { + Ok(o) => Ok(o), + Err(nom::Err::Incomplete(i)) => Err(nom::Err::Incomplete(i)), + Err(nom::Err::Error(_)) => Err(nom::Err::Failure(E::from_message(i, message))), + Err(nom::Err::Failure(e)) => Err(nom::Err::Failure(e)), + } +} + +impl NomParseError for Error { + fn from_error_kind(input: I, kind: NomErrorKind) -> Self { + Self::Nom(input, kind) + } + + fn append(_: I, _: NomErrorKind, other: Self) -> Self { + other + } +} + +/// ParseResult is a type alias for [`nom::IResult`] used by nom combinator +/// functions for parsing InfluxQL. +pub type ParseResult> = nom::IResult; diff --git a/influxdb_influxql_parser/src/keywords.rs b/influxdb_influxql_parser/src/keywords.rs index e009f02bfd..9a2f09009b 100644 --- a/influxdb_influxql_parser/src/keywords.rs +++ b/influxdb_influxql_parser/src/keywords.rs @@ -4,14 +4,14 @@ #![allow(dead_code)] +use crate::internal::ParseResult; use nom::branch::alt; use nom::bytes::complete::{tag, tag_no_case}; use nom::combinator::{eof, peek}; use nom::sequence::terminated; -use nom::IResult; /// Peeks at the input for acceptable characters following a keyword. -fn keyword_follow_char(i: &str) -> IResult<&str, &str> { +fn keyword_follow_char(i: &str) -> ParseResult<&str, &str> { peek(alt(( tag(" "), tag("\n"), @@ -26,7 +26,7 @@ fn keyword_follow_char(i: &str) -> IResult<&str, &str> { } /// Parses the input for matching InfluxQL keywords from ALL to DROP. -fn keyword_all_to_drop(i: &str) -> IResult<&str, &str> { +fn keyword_all_to_drop(i: &str) -> ParseResult<&str, &str> { alt(( terminated(tag_no_case("ALL"), keyword_follow_char), terminated(tag_no_case("ALTER"), keyword_follow_char), @@ -53,7 +53,7 @@ fn keyword_all_to_drop(i: &str) -> IResult<&str, &str> { } /// Parses the input for matching InfluxQL keywords from DURATION to LIMIT. -fn keyword_duration_to_limit(i: &str) -> IResult<&str, &str> { +fn keyword_duration_to_limit(i: &str) -> ParseResult<&str, &str> { alt(( terminated(tag_no_case("DURATION"), keyword_follow_char), terminated(tag_no_case("END"), keyword_follow_char), @@ -79,7 +79,7 @@ fn keyword_duration_to_limit(i: &str) -> IResult<&str, &str> { } /// Parses the input for matching InfluxQL keywords from MEASUREMENT to SET. -fn keyword_measurement_to_set(i: &str) -> IResult<&str, &str> { +fn keyword_measurement_to_set(i: &str) -> ParseResult<&str, &str> { alt(( terminated(tag_no_case("MEASUREMENT"), keyword_follow_char), terminated(tag_no_case("MEASUREMENTS"), keyword_follow_char), @@ -106,7 +106,7 @@ fn keyword_measurement_to_set(i: &str) -> IResult<&str, &str> { } /// Parses the input for matching InfluxQL keywords from SHOW to WRITE. -fn keyword_show_to_write(i: &str) -> IResult<&str, &str> { +fn keyword_show_to_write(i: &str) -> ParseResult<&str, &str> { alt(( terminated(tag_no_case("SHOW"), keyword_follow_char), terminated(tag_no_case("SHARD"), keyword_follow_char), @@ -127,8 +127,8 @@ fn keyword_show_to_write(i: &str) -> IResult<&str, &str> { ))(i) } -// Matches any InfluxQL reserved keyword. -pub fn sql_keyword(i: &str) -> IResult<&str, &str> { +/// Matches any InfluxQL reserved keyword. +pub fn sql_keyword(i: &str) -> ParseResult<&str, &str> { // NOTE that the alt function takes a tuple with a maximum arity of 21, hence // the reason these are broken into groups alt(( diff --git a/influxdb_influxql_parser/src/lib.rs b/influxdb_influxql_parser/src/lib.rs index 6fb07d3ec4..80b3a203d5 100644 --- a/influxdb_influxql_parser/src/lib.rs +++ b/influxdb_influxql_parser/src/lib.rs @@ -11,14 +11,137 @@ clippy::use_self, clippy::clone_on_ref_ptr )] + +use crate::common::statement_terminator; +use crate::internal::Error as InternalError; +use crate::statement::statement; +pub use crate::statement::Statement; +use nom::character::complete::multispace0; +use nom::combinator::eof; +use nom::Offset; +use std::fmt::{Debug, Display, Formatter}; + mod common; mod expression; mod identifier; +mod internal; mod keywords; mod literal; mod parameter; +mod show; mod show_measurements; +mod statement; mod string; #[cfg(test)] mod test_util; + +/// A error returned when parsing an InfluxQL query using +/// [`parse_statements`] fails. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParseError { + message: String, + pos: usize, +} + +impl Display for ParseError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{} at pos {}", self.message, self.pos)?; + Ok(()) + } +} + +/// ParseResult is type that represents the success or failure of parsing +/// a given input into a set of InfluxQL statements. +/// +/// Errors are human-readable messages indicating the cause of the parse failure. +pub type ParseResult = Result, ParseError>; + +/// Parse the input into a set of InfluxQL statements. +pub fn parse_statements(input: &str) -> ParseResult { + let mut res = Vec::new(); + let mut i: &str = input; + + loop { + // Consume whitespace from the input + i = match multispace0::<_, nom::error::Error<_>>(i) { + Ok((i1, _)) => i1, + _ => unreachable!("multispace0 is infallible"), + }; + + if eof::<_, nom::error::Error<_>>(i).is_ok() { + return Ok(res); + } + + if let Ok((i1, _)) = statement_terminator(i) { + i = i1; + continue; + } + + match statement(i) { + Ok((i1, o)) => { + res.push(o); + i = i1; + } + Err(nom::Err::Failure(InternalError::Syntax { + input: pos, + message, + })) => { + return Err(ParseError { + message: message.into(), + pos: input.offset(pos), + }) + } + // any other error indicates an invalid statement + Err(_) => { + return Err(ParseError { + message: "invalid SQL statement".into(), + pos: input.offset(i), + }) + } + } + } +} + +#[cfg(test)] +mod test { + use crate::parse_statements; + + /// Validates that the [`parse_statements`] function + /// handles statement terminators and errors. + #[test] + fn test_parse_statements() { + // Parse a single statement, without a terminator + let got = parse_statements("SHOW MEASUREMENTS").unwrap(); + assert_eq!(format!("{}", got.first().unwrap()), "SHOW MEASUREMENTS"); + + // Parse a single statement, with a terminator + let got = parse_statements("SHOW MEASUREMENTS;").unwrap(); + assert_eq!(format!("{}", got[0]), "SHOW MEASUREMENTS"); + + // Parse multiple statements with whitespace + let got = parse_statements("SHOW MEASUREMENTS;\nSHOW MEASUREMENTS LIMIT 1").unwrap(); + assert_eq!(format!("{}", got[0]), "SHOW MEASUREMENTS"); + assert_eq!(format!("{}", got[1]), "SHOW MEASUREMENTS LIMIT 1"); + + // Parse multiple statements with a terminator in quotes, ensuring it is not interpreted as + // a terminator + let got = parse_statements( + "SHOW MEASUREMENTS WITH MEASUREMENT = \";\";SHOW MEASUREMENTS LIMIT 1", + ) + .unwrap(); + assert_eq!( + format!("{}", got[0]), + "SHOW MEASUREMENTS WITH MEASUREMENT = \";\"" + ); + assert_eq!(format!("{}", got[1]), "SHOW MEASUREMENTS LIMIT 1"); + + // Returns error for invalid statement + let got = parse_statements("BAD SQL").unwrap_err(); + assert_eq!(format!("{}", got), "invalid SQL statement at pos 0"); + + // Returns error for invalid statement after first + let got = parse_statements("SHOW MEASUREMENTS;BAD SQL").unwrap_err(); + assert_eq!(format!("{}", got), "invalid SQL statement at pos 18"); + } +} diff --git a/influxdb_influxql_parser/src/literal.rs b/influxdb_influxql_parser/src/literal.rs index d58dbedea0..352d3436ba 100644 --- a/influxdb_influxql_parser/src/literal.rs +++ b/influxdb_influxql_parser/src/literal.rs @@ -1,14 +1,14 @@ #![allow(dead_code)] +use crate::internal::{map_fail, ParseResult}; use crate::string::{regex, single_quoted_string, Regex}; use crate::write_escaped; use nom::branch::alt; use nom::bytes::complete::{tag, tag_no_case}; use nom::character::complete::digit1; -use nom::combinator::{map, map_res, recognize, value}; +use nom::combinator::{map, recognize, value}; use nom::multi::fold_many1; use nom::sequence::{pair, separated_pair}; -use nom::IResult; use std::fmt::{Display, Formatter, Write}; /// Number of nanoseconds in a microsecond. @@ -110,8 +110,8 @@ impl Display for Literal { /// ```text /// INTEGER ::= [0-9]+ /// ``` -fn integer(i: &str) -> IResult<&str, i64> { - map_res(digit1, |s: &str| s.parse())(i) +fn integer(i: &str) -> ParseResult<&str, i64> { + map_fail("unable to parse integer", digit1, &str::parse)(i) } /// Parse an unsigned InfluxQL integer. @@ -121,8 +121,8 @@ fn integer(i: &str) -> IResult<&str, i64> { /// ```text /// INTEGER ::= [0-9]+ /// ``` -fn unsigned_integer(i: &str) -> IResult<&str, u64> { - map_res(digit1, |s: &str| s.parse())(i) +fn unsigned_integer(i: &str) -> ParseResult<&str, u64> { + map_fail("unable to parse unsigned integer", digit1, &str::parse)(i) } /// Parse an unsigned InfluxQL floating point number. @@ -133,15 +133,16 @@ fn unsigned_integer(i: &str) -> IResult<&str, u64> { /// float ::= INTEGER "." INTEGER /// INTEGER ::= [0-9]+ /// ``` -fn float(i: &str) -> IResult<&str, f64> { - map_res( +fn float(i: &str) -> ParseResult<&str, f64> { + map_fail( + "unable to parse float", recognize(separated_pair(digit1, tag("."), digit1)), - |s: &str| s.parse(), + &str::parse, )(i) } /// Parse the input for an InfluxQL boolean, which must be the value `true` or `false`. -fn boolean(i: &str) -> IResult<&str, bool> { +fn boolean(i: &str) -> ParseResult<&str, bool> { alt(( value(true, tag_no_case("true")), value(false, tag_no_case("false")), @@ -202,7 +203,7 @@ impl Display for Duration { } /// Parse the input for a InfluxQL duration fragment and returns the value in nanoseconds. -fn single_duration(i: &str) -> IResult<&str, i64> { +fn single_duration(i: &str) -> ParseResult<&str, i64> { use DurationUnit::*; map( @@ -234,7 +235,7 @@ fn single_duration(i: &str) -> IResult<&str, i64> { } /// Parse the input for an InfluxQL duration and returns the value in nanoseconds. -fn duration(i: &str) -> IResult<&str, Duration> { +fn duration(i: &str) -> ParseResult<&str, Duration> { map( fold_many1(single_duration, || 0, |acc, fragment| acc + fragment), Duration, @@ -244,7 +245,7 @@ fn duration(i: &str) -> IResult<&str, Duration> { /// Parse an InfluxQL literal, except a [`Regex`]. /// /// See [`literal_regex`] for parsing literal regular expressions. -pub fn literal(i: &str) -> IResult<&str, Literal> { +pub fn literal(i: &str) -> ParseResult<&str, Literal> { alt(( // NOTE: order is important, as floats should be tested before durations and integers. map(float, Literal::Float), @@ -256,7 +257,7 @@ pub fn literal(i: &str) -> IResult<&str, Literal> { } /// Parse an InfluxQL literal regular expression. -pub fn literal_regex(i: &str) -> IResult<&str, Literal> { +pub fn literal_regex(i: &str) -> ParseResult<&str, Literal> { map(regex, Literal::Regex)(i) } diff --git a/influxdb_influxql_parser/src/parameter.rs b/influxdb_influxql_parser/src/parameter.rs index bd5376f71d..a673ca532f 100644 --- a/influxdb_influxql_parser/src/parameter.rs +++ b/influxdb_influxql_parser/src/parameter.rs @@ -9,6 +9,7 @@ #![allow(dead_code)] +use crate::internal::ParseResult; use crate::string::double_quoted_string; use crate::write_escaped; use nom::branch::alt; @@ -17,12 +18,11 @@ use nom::character::complete::{alphanumeric1, char}; use nom::combinator::{map, recognize}; use nom::multi::many1_count; use nom::sequence::preceded; -use nom::IResult; use std::fmt; use std::fmt::{Display, Formatter, Write}; /// Parse an unquoted InfluxQL bind parameter. -fn unquoted_parameter(i: &str) -> IResult<&str, String> { +fn unquoted_parameter(i: &str) -> ParseResult<&str, String> { map( recognize(many1_count(alt((alphanumeric1, tag("_"))))), str::to_string, @@ -57,7 +57,7 @@ impl Display for BindParameter { } /// Parses an InfluxQL [BindParameter]. -pub fn parameter(i: &str) -> IResult<&str, BindParameter> { +pub fn parameter(i: &str) -> ParseResult<&str, BindParameter> { // See: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L358-L362 preceded( char('$'), @@ -99,8 +99,7 @@ mod test { // └─────────────────────────────┘ // missing `$` prefix - let res = parameter("cpu"); - assert!(res.is_err()); + parameter("cpu").unwrap_err(); } #[test] diff --git a/influxdb_influxql_parser/src/show.rs b/influxdb_influxql_parser/src/show.rs new file mode 100644 index 0000000000..fcc84d80f6 --- /dev/null +++ b/influxdb_influxql_parser/src/show.rs @@ -0,0 +1,41 @@ +use crate::internal::{expect, ParseResult}; +use crate::show_measurements::show_measurements; +use crate::Statement; +use nom::bytes::complete::tag_no_case; +use nom::character::complete::multispace1; +use nom::combinator::map; +use nom::sequence::{pair, preceded}; + +/// Parse a SHOW statement. +pub fn show_statement(i: &str) -> ParseResult<&str, Statement> { + preceded( + pair(tag_no_case("SHOW"), multispace1), + expect( + "invalid SHOW statement, expected MEASUREMENTS", + // NOTE: This will become an alt(()) once more statements are added + map(show_measurements, |v| { + Statement::ShowMeasurements(Box::new(v)) + }), + ), + )(i) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::assert_expect_error; + + #[test] + fn test_show_statement() { + let (_, got) = show_statement("SHOW MEASUREMENTS").unwrap(); + assert_eq!(format!("{}", got), "SHOW MEASUREMENTS"); + + // Fallible case + + // Unsupported SHOW + assert_expect_error!( + show_statement("SHOW TAG KEYS"), + "invalid SHOW statement, expected MEASUREMENTS" + ); + } +} diff --git a/influxdb_influxql_parser/src/show_measurements.rs b/influxdb_influxql_parser/src/show_measurements.rs index 9bff7806a4..f2714c6e54 100644 --- a/influxdb_influxql_parser/src/show_measurements.rs +++ b/influxdb_influxql_parser/src/show_measurements.rs @@ -4,17 +4,18 @@ #![allow(dead_code)] +use crate::internal::{expect, ParseResult}; use nom::branch::alt; use nom::bytes::complete::{tag, tag_no_case}; use nom::character::complete::{char, multispace1}; use nom::combinator::{map, opt, value}; +use nom::sequence::tuple; use nom::sequence::{pair, preceded, terminated}; -use nom::{sequence::tuple, IResult}; use std::fmt; use std::fmt::Formatter; use crate::common::{ - limit_clause, measurement_name_expression, offset_clause, statement_terminator, where_clause, + limit_clause, measurement_name_expression, offset_clause, where_clause, MeasurementNameExpression, }; use crate::expression::Expr; @@ -43,20 +44,23 @@ impl fmt::Display for OnExpression { } /// Parse the `ON` expression of the `SHOW MEASUREMENTS` statement. -fn on_expression(i: &str) -> IResult<&str, OnExpression> { +fn on_expression(i: &str) -> ParseResult<&str, OnExpression> { preceded( pair(tag_no_case("ON"), multispace1), - alt(( - value(OnExpression::AllDatabasesAndRetentionPolicies, tag("*.*")), - value(OnExpression::AllDatabases, tag("*")), - map( - pair(opt(terminated(identifier, tag("."))), identifier), - |tup| match tup { - (None, db) => OnExpression::Database(db), - (Some(db), rp) => OnExpression::DatabaseRetentionPolicy(db, rp), - }, - ), - )), + expect( + "invalid ON clause, expected wildcard or identifier", + alt(( + value(OnExpression::AllDatabasesAndRetentionPolicies, tag("*.*")), + value(OnExpression::AllDatabases, tag("*")), + map( + pair(opt(terminated(identifier, tag("."))), identifier), + |tup| match tup { + (None, db) => OnExpression::Database(db), + (Some(db), rp) => OnExpression::DatabaseRetentionPolicy(db, rp), + }, + ), + )), + ), )(i) } @@ -114,50 +118,63 @@ impl fmt::Display for MeasurementExpression { } } -fn with_measurement_expression(i: &str) -> IResult<&str, MeasurementExpression> { +fn with_measurement_expression(i: &str) -> ParseResult<&str, MeasurementExpression> { preceded( tuple(( - tag_no_case("with"), + tag_no_case("WITH"), multispace1, - tag_no_case("measurement"), - multispace1, - )), - alt(( - map( - tuple((char('='), multispace1, measurement_name_expression)), - |(_, _, name)| MeasurementExpression::Equals(name), + expect( + "invalid WITH clause, expected MEASUREMENT", + tag_no_case("measurement"), ), - map(tuple((tag("=~"), multispace1, regex)), |(_, _, regex)| { - MeasurementExpression::Regex(regex) - }), + multispace1, )), + expect( + "expected = or =~", + alt(( + map( + tuple(( + char('='), + multispace1, + expect( + "expected measurement name or wildcard", + measurement_name_expression, + ), + )), + |(_, _, name)| MeasurementExpression::Equals(name), + ), + map( + tuple(( + tag("=~"), + multispace1, + expect("expected regex literal", regex), + )), + |(_, _, regex)| MeasurementExpression::Regex(regex), + ), + )), + ), )(i) } -pub fn show_measurements(i: &str) -> IResult<&str, ShowMeasurementsStatement> { +/// Parse a `SHOW MEASUREMENTS` statement after `SHOW` and any whitespace has been consumed. +pub fn show_measurements(i: &str) -> ParseResult<&str, ShowMeasurementsStatement> { let ( remaining_input, ( - _, // "SHOW" - _, // _, // "MEASUREMENTS" on_expression, measurement_expression, condition, limit, offset, - _, // ";" ), ) = tuple(( - tag_no_case("show"), - multispace1, - tag_no_case("measurements"), + tag_no_case("MEASUREMENTS"), opt(preceded(multispace1, on_expression)), opt(preceded(multispace1, with_measurement_expression)), opt(preceded(multispace1, where_clause)), opt(preceded(multispace1, limit_clause)), opt(preceded(multispace1, offset_clause)), - statement_terminator, ))(i)?; Ok(( @@ -175,10 +192,11 @@ pub fn show_measurements(i: &str) -> IResult<&str, ShowMeasurementsStatement> { #[cfg(test)] mod test { use super::*; + use crate::assert_expect_error; #[test] fn test_show_measurements() { - let (_, got) = show_measurements("SHOW measurements;").unwrap(); + let (_, got) = show_measurements("measurements").unwrap(); assert_eq!( got, ShowMeasurementsStatement { @@ -187,7 +205,7 @@ mod test { }, ); - let (_, got) = show_measurements("SHOW measurements ON foo;").unwrap(); + let (_, got) = show_measurements("measurements ON foo").unwrap(); assert_eq!( got, ShowMeasurementsStatement { @@ -197,7 +215,7 @@ mod test { ); let (_, got) = show_measurements( - "SHOW\nMEASUREMENTS\tON foo WITH MEASUREMENT\n= bar WHERE\ntrue LIMIT 10 OFFSET 20;", + "MEASUREMENTS\tON foo WITH MEASUREMENT\n= bar WHERE\ntrue LIMIT 10 OFFSET 20", ) .unwrap(); assert_eq!( @@ -221,10 +239,9 @@ mod test { "SHOW MEASUREMENTS ON foo WITH MEASUREMENT = bar WHERE true LIMIT 10 OFFSET 20" ); - let (_, got) = show_measurements( - "SHOW\nMEASUREMENTS\tON foo WITH MEASUREMENT\n=~ /bar/ WHERE\ntrue;", - ) - .unwrap(); + let (_, got) = + show_measurements("MEASUREMENTS\tON foo WITH MEASUREMENT\n=~ /bar/ WHERE\ntrue") + .unwrap(); assert_eq!( got, ShowMeasurementsStatement { @@ -310,6 +327,11 @@ mod test { got, OnExpression::AllDatabasesAndRetentionPolicies )); + + assert_expect_error!( + on_expression("ON WHERE cpu = 'test'"), + "invalid ON clause, expected wildcard or identifier" + ) } #[test] @@ -329,16 +351,34 @@ mod test { // Fallible cases + // Missing MEASUREMENT token + assert_expect_error!( + with_measurement_expression("WITH =~ foo"), + "invalid WITH clause, expected MEASUREMENT" + ); + // Must have a regex for equal regex operator - with_measurement_expression("WITH measurement =~ foo").unwrap_err(); + assert_expect_error!( + with_measurement_expression("WITH measurement =~ foo"), + "expected regex literal" + ); // Unsupported regex not equal operator - with_measurement_expression("WITH measurement !~ foo").unwrap_err(); + assert_expect_error!( + with_measurement_expression("WITH measurement !~ foo"), + "expected = or =~" + ); // Must have an identifier for equal operator - with_measurement_expression("WITH measurement = /foo/").unwrap_err(); + assert_expect_error!( + with_measurement_expression("WITH measurement = /foo/"), + "expected measurement name or wildcard" + ); // Must have an identifier - with_measurement_expression("WITH measurement = 1").unwrap_err(); + assert_expect_error!( + with_measurement_expression("WITH measurement = 1"), + "expected measurement name or wildcard" + ); } } diff --git a/influxdb_influxql_parser/src/statement.rs b/influxdb_influxql_parser/src/statement.rs new file mode 100644 index 0000000000..798f42cf4c --- /dev/null +++ b/influxdb_influxql_parser/src/statement.rs @@ -0,0 +1,27 @@ +use crate::internal::ParseResult; +use crate::show::show_statement; +use crate::show_measurements::ShowMeasurementsStatement; +use std::fmt::{Display, Formatter}; + +/// An InfluxQL statement. +#[derive(Debug, Clone, PartialEq)] +pub enum Statement { + /// Represents a `SHOW MEASUREMENTS` statement. + ShowMeasurements(Box), +} + +impl Display for Statement { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::ShowMeasurements(s) => write!(f, "{}", s)?, + }; + + Ok(()) + } +} + +/// Parse a single InfluxQL statement. +pub fn statement(i: &str) -> ParseResult<&str, Statement> { + // NOTE: This will become an alt(()) once more statements are added + show_statement(i) +} diff --git a/influxdb_influxql_parser/src/string.rs b/influxdb_influxql_parser/src/string.rs index 56dc697b36..918fb337fb 100644 --- a/influxdb_influxql_parser/src/string.rs +++ b/influxdb_influxql_parser/src/string.rs @@ -6,6 +6,7 @@ // Taken liberally from https://github.com/Geal/nom/blob/main/examples/string.rs and // amended for InfluxQL. +use crate::internal::{expect, ParseError, ParseResult}; use nom::branch::alt; use nom::bytes::complete::{is_not, tag}; use nom::character::complete::char; @@ -13,7 +14,7 @@ use nom::combinator::{map, value, verify}; use nom::error::Error; use nom::multi::fold_many0; use nom::sequence::{delimited, preceded}; -use nom::{IResult, Parser}; +use nom::Parser; use std::fmt::{Display, Formatter, Write}; /// Writes `s` to `f`, mapping any characters from => to their escaped equivalents. @@ -40,41 +41,51 @@ enum StringFragment<'a> { } /// Parse a single-quoted literal string. -pub fn single_quoted_string(i: &str) -> IResult<&str, String> { +pub fn single_quoted_string(i: &str) -> ParseResult<&str, String> { let escaped = preceded( char('\\'), - alt((char('\\'), char('\''), value('\n', char('n')))), + expect( + r#"invalid escape sequence, expected \\, \' or \n"#, + alt((char('\\'), char('\''), value('\n', char('n')))), + ), ); string( '\'', + "unterminated string literal", verify(is_not("'\\\n"), |s: &str| !s.is_empty()), escaped, )(i) } /// Parse a double-quoted identifier string. -pub fn double_quoted_string(i: &str) -> IResult<&str, String> { +pub fn double_quoted_string(i: &str) -> ParseResult<&str, String> { let escaped = preceded( char('\\'), - alt((char('\\'), char('"'), value('\n', char('n')))), + expect( + r#"invalid escape sequence, expected \\, \" or \n"#, + alt((char('\\'), char('"'), value('\n', char('n')))), + ), ); string( '"', + "unterminated string literal", verify(is_not("\"\\\n"), |s: &str| !s.is_empty()), escaped, )(i) } -fn string<'a, T, U>( +fn string<'a, T, U, E>( delimiter: char, + unterminated_message: &'static str, literal: T, escaped: U, -) -> impl FnMut(&'a str) -> IResult<&'a str, String> +) -> impl FnMut(&'a str) -> ParseResult<&'a str, String, E> where - T: Parser<&'a str, &'a str, Error<&'a str>>, - U: Parser<&'a str, char, Error<&'a str>>, + T: Parser<&'a str, &'a str, E>, + U: Parser<&'a str, char, E>, + E: ParseError<'a>, { let fragment = alt(( map(literal, StringFragment::Literal), @@ -89,13 +100,17 @@ where string }); - delimited(char(delimiter), build_string, char(delimiter)) + delimited( + char(delimiter), + build_string, + expect(unterminated_message, char(delimiter)), + ) } /// Parse regular expression literal characters. /// /// Consumes i until reaching and escaped delimiter ("\/"), newline or eof. -fn regex_literal(i: &str) -> IResult<&str, &str> { +fn regex_literal(i: &str) -> ParseResult<&str, &str> { let mut remaining = &i[..i.len()]; let mut consumed = &i[..0]; @@ -123,7 +138,7 @@ fn regex_literal(i: &str) -> IResult<&str, &str> { /// An unescaped regular expression. #[derive(Clone, Debug, PartialEq, Eq)] -pub struct Regex(pub(crate) String); +pub struct Regex(pub String); impl Display for Regex { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -147,13 +162,22 @@ impl From<&str> for Regex { } /// Parse a regular expression, delimited by `/`. -pub fn regex(i: &str) -> IResult<&str, Regex> { - map(string('/', regex_literal, map(tag("\\/"), |_| '/')), Regex)(i) +pub fn regex(i: &str) -> ParseResult<&str, Regex> { + map( + string( + '/', + "unterminated regex literal", + regex_literal, + map(tag("\\/"), |_| '/'), + ), + Regex, + )(i) } #[cfg(test)] mod test { use super::*; + use crate::assert_expect_error; #[test] fn test_double_quoted_string() { @@ -180,18 +204,32 @@ mod test { let (_, got) = double_quoted_string("\"quick\rdraw\"").unwrap(); assert_eq!(got, "quick\rdraw"); + // Empty string + let (i, got) = double_quoted_string("\"\"").unwrap(); + assert_eq!(i, ""); + assert_eq!(got, ""); + // ┌─────────────────────────────┐ // │ Fallible tests │ // └─────────────────────────────┘ // Not terminated - double_quoted_string(r#""quick draw"#).unwrap_err(); + assert_expect_error!( + double_quoted_string(r#""quick draw"#), + "unterminated string literal" + ); // Literal newline - double_quoted_string("\"quick\ndraw\"").unwrap_err(); + assert_expect_error!( + double_quoted_string("\"quick\ndraw\""), + "unterminated string literal" + ); // Invalid escape - double_quoted_string(r#""quick\idraw""#).unwrap_err(); + assert_expect_error!( + double_quoted_string(r#""quick\idraw""#), + r#"invalid escape sequence, expected \\, \" or \n"# + ); } #[test] @@ -219,15 +257,25 @@ mod test { let (_, got) = single_quoted_string("'quick\rdraw'").unwrap(); assert_eq!(got, "quick\rdraw"); + // Empty string + let (i, got) = single_quoted_string("''").unwrap(); + assert_eq!(i, ""); + assert_eq!(got, ""); + // ┌─────────────────────────────┐ // │ Fallible tests │ // └─────────────────────────────┘ - // Not terminated - single_quoted_string(r#"'quick draw"#).unwrap_err(); + assert_expect_error!( + single_quoted_string(r#"'quick draw"#), + "unterminated string literal" + ); // Invalid escape - single_quoted_string(r#"'quick\idraw'"#).unwrap_err(); + assert_expect_error!( + single_quoted_string(r#"'quick\idraw'"#), + r#"invalid escape sequence, expected \\, \' or \n"# + ); } #[test] @@ -244,19 +292,20 @@ mod test { assert_eq!(got, "hello\\n".into()); // Empty regex - let (_, got) = regex("//").unwrap(); + let (i, got) = regex("//").unwrap(); + assert_eq!(i, ""); assert_eq!(got, "".into()); // Fallible cases // Missing trailing delimiter - regex(r#"/hello"#).unwrap_err(); + assert_expect_error!(regex(r#"/hello"#), "unterminated regex literal"); // Embedded newline - regex("/hello\nworld").unwrap_err(); + assert_expect_error!(regex("/hello\nworld/"), "unterminated regex literal"); // Single backslash fails, which matches Go implementation // See: https://go.dev/play/p/_8J1v5-382G - regex(r#"/\/"#).unwrap_err(); + assert_expect_error!(regex(r#"/\/"#), "unterminated regex literal"); } } diff --git a/influxdb_influxql_parser/src/test_util.rs b/influxdb_influxql_parser/src/test_util.rs index 14216fc301..6f290baf86 100644 --- a/influxdb_influxql_parser/src/test_util.rs +++ b/influxdb_influxql_parser/src/test_util.rs @@ -7,3 +7,19 @@ macro_rules! assert_failure { assert!(matches!($RESULT.unwrap_err(), nom::Err::Failure(_))); }; } + +/// Asserts that the result of a nom parser is an [`crate::internal::Error::Syntax`] and a [`nom::Err::Failure`]. +#[macro_export] +macro_rules! assert_expect_error { + ($RESULT:expr, $MSG:expr) => { + match $RESULT.unwrap_err() { + nom::Err::Failure($crate::internal::Error::Syntax { + input: _, + message: got, + }) => { + assert_eq!(format!("{}", got), $MSG) + } + e => panic!("Expected Failure(Syntax(_, msg), got {:?}", e), + } + }; +} From 7c4c9186361574ed74ffb706474422339439c020 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 14 Sep 2022 22:21:13 -0400 Subject: [PATCH 3/4] chore: add parttion id into panic message (#5641) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- compactor/src/parquet_file_combining.rs | 8 +++++--- compactor/src/utils.rs | 4 +++- querier/src/chunk/mod.rs | 2 +- schema/src/sort.rs | 14 +++++++------- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 25b72d463b..4013b35f93 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -189,7 +189,7 @@ pub(crate) async fn compact_parquet_files( .sort_key .as_ref() .expect("no partition sort key in catalog") - .filter_to(&merged_schema.primary_key()); + .filter_to(&merged_schema.primary_key(), partition_id.get()); let (small_cutoff_bytes, large_cutoff_bytes) = cutoff_bytes(max_desired_file_size_bytes, percentage_max_file_size); @@ -355,7 +355,7 @@ pub(crate) async fn compact_final_no_splits( .sort_key .as_ref() .expect("no partition sort key in catalog") - .filter_to(&merged_schema.primary_key()); + .filter_to(&merged_schema.primary_key(), partition_id.get()); let ctx = exec.new_context(ExecutorType::Reorg); // Compact everything into one file @@ -538,7 +538,9 @@ fn to_queryable_parquet_chunk( .select_by_names(&selection) .expect("schema in-sync"); let pk = schema.primary_key(); - let sort_key = partition_sort_key.as_ref().map(|sk| sk.filter_to(&pk)); + let sort_key = partition_sort_key + .as_ref() + .map(|sk| sk.filter_to(&pk, file.partition_id().get())); let file = Arc::new(ParquetFile::from(file)); let parquet_chunk = ParquetChunk::new(Arc::clone(&file), Arc::new(schema), store); diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index a83ce7375b..166d8dbc6a 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -129,7 +129,9 @@ impl ParquetFileWithTombstone { .select_by_names(&selection) .expect("schema in-sync"); let pk = schema.primary_key(); - let sort_key = partition_sort_key.as_ref().map(|sk| sk.filter_to(&pk)); + let sort_key = partition_sort_key + .as_ref() + .map(|sk| sk.filter_to(&pk, self.partition_id.get())); let parquet_chunk = ParquetChunk::new(Arc::clone(&self.data), Arc::new(schema), store); diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 1af323b90c..ce432f36a0 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -504,7 +504,7 @@ impl ChunkAdapter { // calculate sort key let pk_cols = schema.primary_key(); - let sort_key = partition_sort_key_ref.filter_to(&pk_cols); + let sort_key = partition_sort_key_ref.filter_to(&pk_cols, parquet_file.partition_id.get()); assert!( !sort_key.is_empty(), "Sort key can never be empty because there should at least be a time column", diff --git a/schema/src/sort.rs b/schema/src/sort.rs index 626133ba91..4aa12b130e 100644 --- a/schema/src/sort.rs +++ b/schema/src/sort.rs @@ -209,15 +209,15 @@ impl SortKey { /// # Panics /// /// Panics if any columns in the primary key are NOT present in this sort key. - pub fn filter_to(&self, primary_key: &[&str]) -> SortKey { + pub fn filter_to(&self, primary_key: &[&str], partition_id: i64) -> SortKey { let missing_from_catalog_key: Vec<_> = primary_key .iter() .filter(|col| !self.contains(col)) .collect(); if !missing_from_catalog_key.is_empty() { panic!( - "Primary key column(s) found that don't appear in the catalog sort key: [{:?}]", - missing_from_catalog_key + "Primary key column(s) found that don't appear in the catalog sort key [{:?}] of partition: {}. Sort key: {:?}", + missing_from_catalog_key, partition_id, self ) } @@ -914,7 +914,7 @@ mod tests { let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); let data_primary_key = ["host", "env", "time"]; - let filtered = catalog_sort_key.filter_to(&data_primary_key); + let filtered = catalog_sort_key.filter_to(&data_primary_key, 1); assert_eq!(catalog_sort_key, filtered); // If the catalog sort key contains more columns than the primary key, the filtered key @@ -922,7 +922,7 @@ mod tests { let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); let data_primary_key = ["host", "time"]; - let filtered = catalog_sort_key.filter_to(&data_primary_key); + let filtered = catalog_sort_key.filter_to(&data_primary_key, 1); let expected = SortKey::from_columns(["host", "time"]); assert_eq!(expected, filtered); @@ -931,7 +931,7 @@ mod tests { let catalog_sort_key = SortKey::from_columns(["host", "env", "zone", "time"]); let data_primary_key = ["env", "host", "time"]; - let filtered = catalog_sort_key.filter_to(&data_primary_key); + let filtered = catalog_sort_key.filter_to(&data_primary_key, 1); let expected = SortKey::from_columns(["host", "env", "time"]); assert_eq!(expected, filtered); } @@ -946,7 +946,7 @@ mod tests { let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); let data_primary_key = ["host", "env", "zone", "time"]; - catalog_sort_key.filter_to(&data_primary_key); + catalog_sort_key.filter_to(&data_primary_key, 1); } #[test] From e5d8f23fcdf24e7aa8cf7ec06f5ac4101123b846 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Thu, 15 Sep 2022 16:52:31 +1000 Subject: [PATCH 4/4] chore: Remove variants from Identifier and BindParameter types (#5642) * chore: Remove variants from Identifier and BindParameter types This simplifies usage of these types. Display traits have been updated to properly quote and escape the output, when necessary. * chore: Fix docs --- influxdb_influxql_parser/src/common.rs | 12 ++-- influxdb_influxql_parser/src/expression.rs | 8 +-- influxdb_influxql_parser/src/identifier.rs | 68 ++++++++----------- influxdb_influxql_parser/src/parameter.rs | 68 +++++++++---------- .../src/show_measurements.rs | 23 ++++--- influxdb_influxql_parser/src/string.rs | 32 +++++++-- 6 files changed, 111 insertions(+), 100 deletions(-) diff --git a/influxdb_influxql_parser/src/common.rs b/influxdb_influxql_parser/src/common.rs index 555f83db96..38cd350350 100644 --- a/influxdb_influxql_parser/src/common.rs +++ b/influxdb_influxql_parser/src/common.rs @@ -198,7 +198,7 @@ mod tests { MeasurementNameExpression { database: None, retention_policy: None, - name: Identifier::Unquoted("diskio".into()), + name: "diskio".into(), } ); @@ -206,9 +206,9 @@ mod tests { assert_eq!( got, MeasurementNameExpression { - database: Some(Identifier::Unquoted("telegraf".into())), - retention_policy: Some(Identifier::Unquoted("autogen".into())), - name: Identifier::Unquoted("diskio".into()), + database: Some("telegraf".into()), + retention_policy: Some("autogen".into()), + name: "diskio".into(), } ); @@ -216,9 +216,9 @@ mod tests { assert_eq!( got, MeasurementNameExpression { - database: Some(Identifier::Unquoted("telegraf".into())), + database: Some("telegraf".into()), retention_policy: None, - name: Identifier::Unquoted("diskio".into()), + name: "diskio".into(), } ); } diff --git a/influxdb_influxql_parser/src/expression.rs b/influxdb_influxql_parser/src/expression.rs index f17626d1c5..f8286bedd6 100644 --- a/influxdb_influxql_parser/src/expression.rs +++ b/influxdb_influxql_parser/src/expression.rs @@ -200,7 +200,7 @@ fn parens(i: &str) -> ParseResult<&str, Expr> { fn call(i: &str) -> ParseResult<&str, Expr> { map( separated_pair( - unquoted_identifier, + map(unquoted_identifier, &str::to_string), multispace0, delimited( char('('), @@ -364,7 +364,7 @@ mod test { /// Constructs an [Expr::Identifier] expression. macro_rules! ident { ($EXPR: expr) => { - Expr::Identifier(crate::identifier::Identifier::Unquoted($EXPR.into())) + Expr::Identifier($EXPR.into()) }; } @@ -378,7 +378,7 @@ mod test { /// Constructs a [Expr::BindParameter] expression. macro_rules! param { ($EXPR: expr) => { - Expr::BindParameter(crate::parameter::BindParameter::Unquoted($EXPR.into()).into()) + Expr::BindParameter(crate::parameter::BindParameter($EXPR.into()).into()) }; } @@ -603,7 +603,7 @@ mod test { // quoted identifier let (_, e) = conditional_expression(r#""foo" + 'bar'"#).unwrap(); let got = format!("{}", e); - assert_eq!(got, r#""foo" + 'bar'"#); + assert_eq!(got, r#"foo + 'bar'"#); // Duration let (_, e) = conditional_expression("- 6h30m").unwrap(); diff --git a/influxdb_influxql_parser/src/identifier.rs b/influxdb_influxql_parser/src/identifier.rs index e7a8f9696b..0b20d7e101 100644 --- a/influxdb_influxql_parser/src/identifier.rs +++ b/influxdb_influxql_parser/src/identifier.rs @@ -16,7 +16,7 @@ use crate::internal::ParseResult; use crate::keywords::sql_keyword; use crate::string::double_quoted_string; -use crate::write_escaped; +use crate::write_quoted_string; use nom::branch::alt; use nom::bytes::complete::tag; use nom::character::complete::{alpha1, alphanumeric1}; @@ -27,42 +27,35 @@ use std::fmt; use std::fmt::{Display, Formatter, Write}; /// Parse an unquoted InfluxQL identifier. -pub fn unquoted_identifier(i: &str) -> ParseResult<&str, String> { - map( - preceded( - not(sql_keyword), - recognize(pair( - alt((alpha1, tag("_"))), - many0_count(alt((alphanumeric1, tag("_")))), - )), - ), - str::to_string, +pub fn unquoted_identifier(i: &str) -> ParseResult<&str, &str> { + preceded( + not(sql_keyword), + recognize(pair( + alt((alpha1, tag("_"))), + many0_count(alt((alphanumeric1, tag("_")))), + )), )(i) } -/// `Identifier` is a type that represents either a quoted ([`Identifier::Quoted`]) or unquoted ([`Identifier::Unquoted`]) -/// InfluxQL identifier. +/// A type that represents an InfluxQL identifier. #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub enum Identifier { - /// Contains an unquoted identifier - Unquoted(String), +pub struct Identifier(pub String); - /// Contains an unescaped quoted identifier - Quoted(String), +impl From for Identifier { + fn from(s: String) -> Self { + Self(s) + } +} + +impl From<&str> for Identifier { + fn from(s: &str) -> Self { + Self(s.to_string()) + } } impl Display for Identifier { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::Unquoted(s) => write!(f, "{}", s)?, - Self::Quoted(s) => { - f.write_char('"')?; - // escape characters per https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L576-L583 - write_escaped!(f, s, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\""); - f.write_char('"')?; - } - }; - + write_quoted_string!(f, '"', self.0.as_str(), unquoted_identifier, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\""); Ok(()) } } @@ -71,8 +64,8 @@ impl Display for Identifier { pub fn identifier(i: &str) -> ParseResult<&str, Identifier> { // See: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L358-L362 alt(( - map(unquoted_identifier, Identifier::Unquoted), - map(double_quoted_string, Identifier::Quoted), + map(unquoted_identifier, Into::into), + map(double_quoted_string, Into::into), ))(i) } @@ -109,24 +102,21 @@ mod test { fn test_identifier() { // quoted let (_, got) = identifier("\"quick draw\"").unwrap(); - assert!(matches!(got, Identifier::Quoted(s) if s == "quick draw")); + assert_eq!(got, "quick draw".into()); // unquoted let (_, got) = identifier("quick_draw").unwrap(); - assert!(matches!(got, Identifier::Unquoted(s) if s == "quick_draw")); + assert_eq!(got, "quick_draw".into()); } #[test] fn test_identifier_display() { - // test quoted identifier properly escapes specific characters - let got = format!( - "{}", - Identifier::Quoted("quick\n\t\\\"'draw \u{1f47d}".to_string()) - ); + // Identifier properly escapes specific characters and quotes output + let got = format!("{}", Identifier("quick\n\t\\\"'draw \u{1f47d}".into())); assert_eq!(got, r#""quick\n \\\"'draw 👽""#); - // test unquoted identifier - let got = format!("{}", Identifier::Unquoted("quick_draw".to_string())); + // Identifier displays unquoted output + let got = format!("{}", Identifier("quick_draw".into())); assert_eq!(got, "quick_draw"); } } diff --git a/influxdb_influxql_parser/src/parameter.rs b/influxdb_influxql_parser/src/parameter.rs index a673ca532f..7cfaaf2c50 100644 --- a/influxdb_influxql_parser/src/parameter.rs +++ b/influxdb_influxql_parser/src/parameter.rs @@ -11,7 +11,7 @@ use crate::internal::ParseResult; use crate::string::double_quoted_string; -use crate::write_escaped; +use crate::write_quoted_string; use nom::branch::alt; use nom::bytes::complete::tag; use nom::character::complete::{alphanumeric1, char}; @@ -22,36 +22,30 @@ use std::fmt; use std::fmt::{Display, Formatter, Write}; /// Parse an unquoted InfluxQL bind parameter. -fn unquoted_parameter(i: &str) -> ParseResult<&str, String> { - map( - recognize(many1_count(alt((alphanumeric1, tag("_"))))), - str::to_string, - )(i) +fn unquoted_parameter(i: &str) -> ParseResult<&str, &str> { + recognize(many1_count(alt((alphanumeric1, tag("_")))))(i) } -/// `BindParameter` is a type that represents either a quoted ([`BindParameter::Quoted`]) or unquoted ([`BindParameter::Unquoted`]) -/// InfluxQL bind parameter. +/// A type that represents an InfluxQL bind parameter. #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub enum BindParameter { - /// Contains an unquoted bind parameter - Unquoted(String), +pub struct BindParameter(pub String); - /// Contains an unescaped quoted identifier - Quoted(String), +impl From for BindParameter { + fn from(s: String) -> Self { + Self(s) + } +} + +impl From<&str> for BindParameter { + fn from(s: &str) -> Self { + Self(s.to_string()) + } } impl Display for BindParameter { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::Unquoted(s) => write!(f, "${}", s)?, - Self::Quoted(s) => { - f.write_str("$\"")?; - // escape characters per https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L576-L583 - write_escaped!(f, s, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\""); - f.write_char('"')?; - } - }; - + f.write_char('$')?; + write_quoted_string!(f, '"', self.0.as_str(), unquoted_parameter, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\""); Ok(()) } } @@ -62,8 +56,8 @@ pub fn parameter(i: &str) -> ParseResult<&str, BindParameter> { preceded( char('$'), alt(( - map(unquoted_parameter, BindParameter::Unquoted), - map(double_quoted_string, BindParameter::Quoted), + map(unquoted_parameter, Into::into), + map(double_quoted_string, Into::into), )), )(i) } @@ -76,23 +70,23 @@ mod test { fn test_parameter() { // all ascii let (_, got) = parameter("$cpu").unwrap(); - assert_eq!(got, BindParameter::Unquoted("cpu".into())); + assert_eq!(got, "cpu".into()); // digits let (_, got) = parameter("$01").unwrap(); - assert_eq!(got, BindParameter::Unquoted("01".into())); + assert_eq!(got, "01".into()); // all valid chars let (_, got) = parameter("$cpu_0").unwrap(); - assert_eq!(got, BindParameter::Unquoted("cpu_0".into())); + assert_eq!(got, "cpu_0".into()); // keyword let (_, got) = parameter("$from").unwrap(); - assert_eq!(got, BindParameter::Unquoted("from".into())); + assert_eq!(got, "from".into()); // quoted let (_, got) = parameter("$\"quick draw\"").unwrap(); - assert!(matches!(got, BindParameter::Quoted(s) if s == "quick draw")); + assert_eq!(got, "quick draw".into()); // ┌─────────────────────────────┐ // │ Fallible tests │ @@ -104,12 +98,16 @@ mod test { #[test] fn test_bind_parameter_display() { - // test quoted identifier properly escapes specific characters - let got = format!("{}", BindParameter::Quoted("from".to_string())); - assert_eq!(got, r#"$"from""#); + // BindParameter displays quoted output + let got = format!("{}", BindParameter("from foo".into())); + assert_eq!(got, r#"$"from foo""#); - // test unquoted identifier - let got = format!("{}", BindParameter::Unquoted("quick_draw".to_string())); + // BindParameter displays quoted and escaped output + let got = format!("{}", BindParameter("from\nfoo".into())); + assert_eq!(got, r#"$"from\nfoo""#); + + // BindParameter displays unquoted output + let got = format!("{}", BindParameter("quick_draw".into())); assert_eq!(got, "$quick_draw"); } } diff --git a/influxdb_influxql_parser/src/show_measurements.rs b/influxdb_influxql_parser/src/show_measurements.rs index f2714c6e54..92851e968c 100644 --- a/influxdb_influxql_parser/src/show_measurements.rs +++ b/influxdb_influxql_parser/src/show_measurements.rs @@ -209,7 +209,7 @@ mod test { assert_eq!( got, ShowMeasurementsStatement { - on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))), + on_expression: Some(OnExpression::Database("foo".into())), ..Default::default() }, ); @@ -221,12 +221,12 @@ mod test { assert_eq!( got, ShowMeasurementsStatement { - on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))), + on_expression: Some(OnExpression::Database("foo".into())), measurement_expression: Some(MeasurementExpression::Equals( MeasurementNameExpression { database: None, retention_policy: None, - name: Identifier::Unquoted("bar".into()), + name: "bar".into(), } )), condition: Some(Expr::Literal(true.into())), @@ -245,7 +245,7 @@ mod test { assert_eq!( got, ShowMeasurementsStatement { - on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))), + on_expression: Some(OnExpression::Database("foo".into())), measurement_expression: Some(MeasurementExpression::Regex(Regex("bar".into()))), condition: Some(Expr::Literal(true.into())), limit: None, @@ -272,7 +272,7 @@ mod test { let got = format!( "{}", ShowMeasurementsStatement { - on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))), + on_expression: Some(OnExpression::Database("foo".into())), ..Default::default() } ); @@ -282,8 +282,8 @@ mod test { "{}", ShowMeasurementsStatement { on_expression: Some(OnExpression::DatabaseRetentionPolicy( - Identifier::Unquoted("foo".into()), - Identifier::Unquoted("bar".into()) + "foo".into(), + "bar".into() )), ..Default::default() } @@ -312,11 +312,12 @@ mod test { #[test] fn test_on_expression() { let (_, got) = on_expression("ON cpu").unwrap(); - assert!(matches!(got, OnExpression::Database(Identifier::Unquoted(db)) if db == "cpu")); + assert_eq!(got, OnExpression::Database("cpu".into())); let (_, got) = on_expression("ON cpu.autogen").unwrap(); - assert!( - matches!(got, OnExpression::DatabaseRetentionPolicy(Identifier::Unquoted(db), Identifier::Unquoted(rp)) if db == "cpu" && rp == "autogen") + assert_eq!( + got, + OnExpression::DatabaseRetentionPolicy("cpu".into(), "autogen".into()) ); let (_, got) = on_expression("ON *").unwrap(); @@ -342,7 +343,7 @@ mod test { MeasurementExpression::Equals(MeasurementNameExpression { database: None, retention_policy: None, - name: Identifier::Unquoted("foo".into()) + name: "foo".into() }) ); diff --git a/influxdb_influxql_parser/src/string.rs b/influxdb_influxql_parser/src/string.rs index 918fb337fb..f19194a9d1 100644 --- a/influxdb_influxql_parser/src/string.rs +++ b/influxdb_influxql_parser/src/string.rs @@ -17,20 +17,42 @@ use nom::sequence::{delimited, preceded}; use nom::Parser; use std::fmt::{Display, Formatter, Write}; -/// Writes `s` to `f`, mapping any characters from => to their escaped equivalents. +/// Writes `S` to `F`, mapping any characters `FROM` => `TO` their escaped equivalents. #[macro_export] macro_rules! write_escaped { - ($f: expr, $s: expr $(, $from:expr => $to:expr)+) => { - for c in $s.chars() { + ($F: expr, $STRING: expr $(, $FROM:expr => $TO:expr)+) => { + for c in $STRING.chars() { match c { $( - $from => $f.write_str($to)?, + $FROM => $F.write_str($TO)?, )+ - _ => $f.write_char(c)?, + _ => $F.write_char(c)?, } } }; } +/// Writes `S` to `F`, optionally surrounding in `QUOTE`s, if FN(S) fails, +/// and mapping any characters `FROM` => `TO` their escaped equivalents. +#[macro_export] +macro_rules! write_quoted_string { + ($F: expr, $QUOTE: literal, $STRING: expr, $FN: expr $(, $FROM:expr => $TO:expr)+) => { + if nom::sequence::terminated($FN, nom::combinator::eof)($STRING).is_ok() { + $F.write_str($STRING)?; + } else { + // must be escaped + $F.write_char($QUOTE)?; + for c in $STRING.chars() { + match c { + $( + $FROM => $F.write_str($TO)?, + )+ + _ => $F.write_char(c)?, + } + } + $F.write_char($QUOTE)?; + } + }; +} /// A string fragment contains a fragment of a string being parsed: either /// a non-empty Literal (a series of non-escaped characters) or a single.