From ec42aa0ba9f0cbb688fc6142f24d533f4a30aabe Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 6 Feb 2023 20:47:14 +0100 Subject: [PATCH] refactor: Consolidate end to end normalization code (#6848) --- .../src/snapshot_comparison.rs | 37 +-- .../src/snapshot_comparison/normalization.rs | 249 ++++++++++++------ .../src/snapshot_comparison/queries.rs | 169 +++++------- 3 files changed, 243 insertions(+), 212 deletions(-) diff --git a/test_helpers_end_to_end/src/snapshot_comparison.rs b/test_helpers_end_to_end/src/snapshot_comparison.rs index e2bf769db8..118b33db54 100644 --- a/test_helpers_end_to_end/src/snapshot_comparison.rs +++ b/test_helpers_end_to_end/src/snapshot_comparison.rs @@ -3,7 +3,6 @@ mod queries; use crate::snapshot_comparison::queries::TestQueries; use crate::{run_influxql, run_sql, MiniCluster}; -use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch}; use snafu::{OptionExt, ResultExt, Snafu}; use std::fmt::{Display, Formatter}; use std::{ @@ -11,7 +10,6 @@ use std::{ path::{Path, PathBuf}, }; -use self::normalization::normalize_results; use self::queries::Query; #[derive(Debug, Snafu)] @@ -98,19 +96,7 @@ pub async fn run( for q in queries.iter() { output.push(format!("-- {}: {}", language, q.text())); - if q.sorted_compare() { - output.push("-- Results After Sorting".into()) - } - if q.normalized_uuids() { - output.push("-- Results After Normalizing UUIDs".into()) - } - if q.normalized_metrics() { - output.push("-- Results After Normalizing Metrics".into()) - } - if q.normalized_filters() { - output.push("-- Results After Normalizing Filters".into()) - } - + q.add_description(&mut output); let results = run_query(cluster, q, language).await?; output.extend(results); } @@ -233,7 +219,7 @@ async fn run_query( ) -> Result> { let query_text = query.text(); - let mut results = match language { + let results = match language { Language::Sql => { run_sql( query_text, @@ -252,22 +238,5 @@ async fn run_query( } }; - // compare against sorted results, if requested - if query.sorted_compare() && !results.is_empty() { - let schema = results[0].schema(); - let batch = - arrow::compute::concat_batches(&schema, &results).expect("concatenating batches"); - results = vec![sort_record_batch(batch)]; - } - - let current_results = pretty_format_batches(&results) - .unwrap() - .trim() - .lines() - .map(|s| s.to_string()) - .collect::>(); - - let current_results = normalize_results(query, current_results); - - Ok(current_results) + Ok(query.normalize_results(results)) } diff --git a/test_helpers_end_to_end/src/snapshot_comparison/normalization.rs b/test_helpers_end_to_end/src/snapshot_comparison/normalization.rs index 39ac696c58..8a01d777ab 100644 --- a/test_helpers_end_to_end/src/snapshot_comparison/normalization.rs +++ b/test_helpers_end_to_end/src/snapshot_comparison/normalization.rs @@ -1,9 +1,28 @@ -use crate::snapshot_comparison::queries::Query; +use arrow::record_batch::RecordBatch; +use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch}; use once_cell::sync::Lazy; use regex::{Captures, Regex}; use std::{borrow::Cow, collections::HashMap}; use uuid::Uuid; +/// Match the parquet UUID +/// +/// For example, given +/// `32/51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet` +/// +/// matches `1d325760-2b20-48de-ab48-2267b034133d` +static REGEX_UUID: Lazy = Lazy::new(|| { + Regex::new("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}").expect("UUID regex") +}); + +/// Match the parquet directory names +/// For example, given +/// `32/51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet` +/// +/// matches `32/51/216/13452` +static REGEX_DIRS: Lazy = + Lazy::new(|| Regex::new(r#"[0-9]+/[0-9]+/[0-9]+/[0-9]+"#).expect("directory regex")); + /// Replace table row separators of flexible width with fixed with. This is required /// because the original timing values may differ in "printed width", so the table /// cells have different widths and hence the separators / borders. E.g.: @@ -22,93 +41,159 @@ static REGEX_LINESEP: Lazy = Lazy::new(|| Regex::new(r#"[+-]{6,}"#).expec /// ` |` -> ` |` static REGEX_COL: Lazy = Lazy::new(|| Regex::new(r#"\s+\|"#).expect("col regex")); +/// Matches line like `metrics=[foo=1, bar=2]` +static REGEX_METRICS: Lazy = + Lazy::new(|| Regex::new(r#"metrics=\[([^\]]*)\]"#).expect("metrics regex")); + +/// Matches things like `1s`, `1.2ms` and `10.2μs` +static REGEX_TIMING: Lazy = + Lazy::new(|| Regex::new(r#"[0-9]+(\.[0-9]+)?.s"#).expect("timing regex")); + +/// Matches things like `FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000` +static REGEX_FILTER: Lazy = + Lazy::new(|| Regex::new("FilterExec: .*").expect("filter regex")); + fn normalize_for_variable_width(s: Cow) -> String { let s = REGEX_LINESEP.replace_all(&s, "----------"); REGEX_COL.replace_all(&s, " |").to_string() } -pub(crate) fn normalize_results(query: &Query, mut current_results: Vec) -> Vec { - // normalize UUIDs, if requested - if query.normalized_uuids() { - let regex_uuid = Regex::new("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}") - .expect("UUID regex"); - let regex_dirs = Regex::new(r#"[0-9]+/[0-9]+/[0-9]+/[0-9]+"#).expect("directory regex"); +/// A query to run with optional annotations +#[derive(Debug, PartialEq, Eq, Default)] +pub struct Normalizer { + /// If true, results are sorted first + pub sorted_compare: bool, - let mut seen: HashMap = HashMap::new(); - current_results = current_results - .into_iter() - .map(|s| { - let s = regex_uuid.replace_all(&s, |s: &Captures| { - let next = seen.len() as u128; - Uuid::from_u128( - *seen - .entry(s.get(0).unwrap().as_str().to_owned()) - .or_insert(next), - ) - .to_string() - }); + /// If true, replace UUIDs with static placeholders. + pub normalized_uuids: bool, - let s = normalize_for_variable_width(s); + /// If true, normalize timings in queries by replacing them with + /// static placeholders, for example: + /// + /// `1s` -> `1.234ms` + pub normalized_metrics: bool, - regex_dirs.replace_all(&s, "1/1/1/1").to_string() - }) - .collect(); - } - - // normalize metrics, if requested - if query.normalized_metrics() { - // Parse regex once and apply to all rows. See description around the `replace...` calls on - // why/how the regexes are used. - let regex_metrics = Regex::new(r#"metrics=\[([^\]]*)\]"#).expect("metrics regex"); - let regex_timing = Regex::new(r#"[0-9]+(\.[0-9]+)?.s"#).expect("timing regex"); - - current_results = current_results - .into_iter() - .map(|s| { - // Replace timings with fixed value, e.g.: - // - // `1s` -> `1.234ms` - // `1.2ms` -> `1.234ms` - // `10.2μs` -> `1.234ms` - let s = regex_timing.replace_all(&s, "1.234ms"); - - let s = normalize_for_variable_width(s); - - // Metrics are currently ordered by value (not by key), so different timings may - // reorder them. We "parse" the list and normalize the sorting. E.g.: - // - // `metrics=[]` => `metrics=[]` - // `metrics=[foo=1, bar=2]` => `metrics=[bar=2, foo=1]` - // `metrics=[foo=2, bar=1]` => `metrics=[bar=1, foo=2]` - regex_metrics - .replace_all(&s, |c: &Captures| { - let mut metrics: Vec<_> = c[1].split(", ").collect(); - metrics.sort(); - format!("metrics=[{}]", metrics.join(", ")) - }) - .to_string() - }) - .collect(); - } - - // normalize Filters, if requested - // - // Converts: - // FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000 - // - // to - // FilterExec: - if query.normalized_filters() { - let filter_regex = Regex::new("FilterExec: .*").expect("filter regex"); - current_results = current_results - .into_iter() - .map(|s| { - filter_regex - .replace_all(&s, |_: &Captures| "FilterExec: ") - .to_string() - }) - .collect(); - } - - current_results + /// if true, normalize filter predicates for explain plans + /// `FilterExec: ` + pub normalized_filters: bool, +} + +impl Normalizer { + #[cfg(test)] + pub fn new() -> Self { + Default::default() + } + + /// Take the output of running the query and apply the specified normalizations to them + pub fn normalize_results(&self, mut results: Vec) -> Vec { + // compare against sorted results, if requested + if self.sorted_compare && !results.is_empty() { + let schema = results[0].schema(); + let batch = + arrow::compute::concat_batches(&schema, &results).expect("concatenating batches"); + results = vec![sort_record_batch(batch)]; + } + + let mut current_results = pretty_format_batches(&results) + .unwrap() + .trim() + .lines() + .map(|s| s.to_string()) + .collect::>(); + + // normalize UUIDs, if requested + if self.normalized_uuids { + let mut seen: HashMap = HashMap::new(); + current_results = current_results + .into_iter() + .map(|s| { + // Rewrite parquet directory names like + // `51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet` + // + // to: + // 1/1/1/1/00000000-0000-0000-0000-000000000000.parquet + + let s = REGEX_UUID.replace_all(&s, |s: &Captures| { + let next = seen.len() as u128; + Uuid::from_u128( + *seen + .entry(s.get(0).unwrap().as_str().to_owned()) + .or_insert(next), + ) + .to_string() + }); + + let s = normalize_for_variable_width(s); + REGEX_DIRS.replace_all(&s, "1/1/1/1").to_string() + }) + .collect(); + } + + // normalize metrics, if requested + if self.normalized_metrics { + current_results = current_results + .into_iter() + .map(|s| { + // Replace timings with fixed value, e.g.: + // + // `1s` -> `1.234ms` + // `1.2ms` -> `1.234ms` + // `10.2μs` -> `1.234ms` + let s = REGEX_TIMING.replace_all(&s, "1.234ms"); + + let s = normalize_for_variable_width(s); + + // Metrics are currently ordered by value (not by key), so different timings may + // reorder them. We "parse" the list and normalize the sorting. E.g.: + // + // `metrics=[]` => `metrics=[]` + // `metrics=[foo=1, bar=2]` => `metrics=[bar=2, foo=1]` + // `metrics=[foo=2, bar=1]` => `metrics=[bar=1, foo=2]` + REGEX_METRICS + .replace_all(&s, |c: &Captures| { + let mut metrics: Vec<_> = c[1].split(", ").collect(); + metrics.sort(); + format!("metrics=[{}]", metrics.join(", ")) + }) + .to_string() + }) + .collect(); + } + + // normalize Filters, if requested + // + // Converts: + // FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000 + // + // to + // FilterExec: + if self.normalized_filters { + current_results = current_results + .into_iter() + .map(|s| { + REGEX_FILTER + .replace_all(&s, |_: &Captures| "FilterExec: ") + .to_string() + }) + .collect(); + } + + current_results + } + + /// Adds information on what normalizations were applied to the input + pub fn add_description(&self, output: &mut Vec) { + if self.sorted_compare { + output.push("-- Results After Sorting".into()) + } + if self.normalized_uuids { + output.push("-- Results After Normalizing UUIDs".into()) + } + if self.normalized_metrics { + output.push("-- Results After Normalizing Metrics".into()) + } + if self.normalized_filters { + output.push("-- Results After Normalizing Filters".into()) + } + } } diff --git a/test_helpers_end_to_end/src/snapshot_comparison/queries.rs b/test_helpers_end_to_end/src/snapshot_comparison/queries.rs index 3c76195aba..70a5f8da4c 100644 --- a/test_helpers_end_to_end/src/snapshot_comparison/queries.rs +++ b/test_helpers_end_to_end/src/snapshot_comparison/queries.rs @@ -1,22 +1,12 @@ +use arrow::record_batch::RecordBatch; + +use super::normalization::Normalizer; + /// A query to run with optional annotations #[derive(Debug, PartialEq, Eq, Default)] pub struct Query { - /// If true, results are sorted first prior to comparison, meaning that differences in the - /// output order compared with expected order do not cause a diff - sorted_compare: bool, - - /// If true, replace UUIDs with static placeholders. - normalized_uuids: bool, - - /// If true, normalize timings in queries by replacing them with - /// static placeholders, for example: - /// - /// `1s` -> `1.234ms` - normalized_metrics: bool, - - /// if true, normalize filter predicates for explain plans - /// `FilterExec: ` - normalized_filters: bool, + /// Describes how query text should be normalized + normalizer: Normalizer, /// The query string text: String, @@ -27,49 +17,49 @@ impl Query { fn new(text: impl Into) -> Self { let text = text.into(); Self { - sorted_compare: false, - normalized_uuids: false, - normalized_metrics: false, - normalized_filters: false, + normalizer: Normalizer::new(), text, } } - #[cfg(test)] - fn with_sorted_compare(mut self) -> Self { - self.sorted_compare = true; + pub fn text(&self) -> &str { + &self.text + } + + pub fn with_sorted_compare(mut self) -> Self { + self.normalizer.sorted_compare = true; self } - /// Get a reference to the query text. - pub fn text(&self) -> &str { - self.text.as_ref() + pub fn with_normalized_uuids(mut self) -> Self { + self.normalizer.normalized_uuids = true; + self } - /// Get the query's sorted compare. - pub fn sorted_compare(&self) -> bool { - self.sorted_compare + pub fn with_normalize_metrics(mut self) -> Self { + self.normalizer.normalized_metrics = true; + self } - /// Get queries normalized UUID - pub fn normalized_uuids(&self) -> bool { - self.normalized_uuids + pub fn with_normalize_filters(mut self) -> Self { + self.normalizer.normalized_filters = true; + self } - /// Use normalized timing values - pub fn normalized_metrics(&self) -> bool { - self.normalized_metrics + /// Take the output of running the query and apply the specified normalizations to them + pub fn normalize_results(&self, results: Vec) -> Vec { + self.normalizer.normalize_results(results) } - /// Use normalized filter plans - pub fn normalized_filters(&self) -> bool { - self.normalized_filters + /// Adds information on what normalizations were applied to the input + pub fn add_description(&self, output: &mut Vec) { + self.normalizer.add_description(output) } } #[derive(Debug, Default)] struct QueryBuilder { - query: Query, + pub query: Query, } impl QueryBuilder { @@ -85,22 +75,6 @@ impl QueryBuilder { self.query.text.push(c) } - fn sorted_compare(&mut self) { - self.query.sorted_compare = true; - } - - fn normalized_uuids(&mut self) { - self.query.normalized_uuids = true; - } - - fn normalize_metrics(&mut self) { - self.query.normalized_metrics = true; - } - - fn normalize_filters(&mut self) { - self.query.normalized_filters = true; - } - fn is_empty(&self) -> bool { self.query.text.is_empty() } @@ -125,54 +99,57 @@ impl TestQueries { S: AsRef, { let mut queries = vec![]; - let mut builder = QueryBuilder::new(); - lines.into_iter().for_each(|line| { - let line = line.as_ref().trim(); - const COMPARE_STR: &str = "-- IOX_COMPARE: "; - if line.starts_with(COMPARE_STR) { - let (_, options) = line.split_at(COMPARE_STR.len()); - for option in options.split(',') { - let option = option.trim(); - match option { - "sorted" => { - builder.sorted_compare(); + let mut builder = lines + .into_iter() + .fold(QueryBuilder::new(), |mut builder, line| { + let line = line.as_ref().trim(); + const COMPARE_STR: &str = "-- IOX_COMPARE: "; + if line.starts_with(COMPARE_STR) { + let (_, options) = line.split_at(COMPARE_STR.len()); + for option in options.split(',') { + let option = option.trim(); + match option { + "sorted" => { + builder.query = builder.query.with_sorted_compare(); + } + "uuid" => { + builder.query = builder.query.with_normalized_uuids(); + } + "metrics" => { + builder.query = builder.query.with_normalize_metrics(); + } + "filters" => { + builder.query = builder.query.with_normalize_filters(); + } + _ => {} } - "uuid" => { - builder.normalized_uuids(); - } - "metrics" => { - builder.normalize_metrics(); - } - "filters" => { - builder.normalize_filters(); - } - _ => {} } } - } - if line.starts_with("--") { - return; - } - if line.is_empty() { - return; - } - - // replace newlines - if !builder.is_empty() { - builder.push(' '); - } - builder.push_str(line); - - // declare queries when we see a semicolon at the end of the line - if line.ends_with(';') { - if let Some(q) = builder.build_and_reset() { - queries.push(q); + if line.starts_with("--") { + return builder; + } + if line.is_empty() { + return builder; } - } - }); + // replace newlines + if !builder.is_empty() { + builder.push(' '); + } + builder.push_str(line); + + // declare queries when we see a semicolon at the end of the line + if line.ends_with(';') { + if let Some(q) = builder.build_and_reset() { + queries.push(q); + } + } + builder + }); + + // get last one, if any if let Some(q) = builder.build_and_reset() { queries.push(q); }